Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions nemo/collections/common/data/lhotse/cutset.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,79 @@ def read_lhotse_manifest(config) -> tuple[CutSet, bool]:
return cuts, is_tarred


@data_type_parser("lhotse_concat")
def read_lhotse_concat(config: DictConfig) -> tuple[CutSet, bool]:
"""
Lazily concatenate cuts from an inner data source up to a specified max duration.

Uses a greedy sequential algorithm: iterates through source cuts and appends
each one to the current accumulator. When adding the next cut would exceed
``max_duration``, yields the accumulated cut and starts a new one.

Config options:

- ``max_duration``: Maximum duration (in seconds) for concatenated cuts.
- ``gap``: Duration of silence (in seconds) inserted between concatenated cuts (default: 0.0).
- ``input_cfg``: Inner data source configuration (same format as top-level ``input_cfg``).

Example config::

input_cfg:
- type: lhotse_concat
max_duration: 30.0
gap: 0.5
input_cfg:
- type: lhotse
cuts_path: /path/to/cuts.jsonl.gz
"""
max_duration = config.max_duration
gap = config.get("gap", 0.0)

cuts, is_tarred = read_cutset_from_config(config)

cuts = CutSet(LazyConcatCuts(source=cuts, max_duration=max_duration, gap=gap))
return cuts, is_tarred


class LazyConcatCuts:
"""
Lazily concatenates consecutive cuts from a source up to a maximum duration.

Greedy sequential algorithm: iterate through source cuts and append each one
to the current accumulator. When adding the next cut would exceed ``max_duration``,
yield the accumulator and start fresh. Cuts that individually exceed ``max_duration``
are yielded as-is (never dropped).
"""

def __init__(self, source, max_duration: float, gap: float = 0.0):
self.source = source
self.max_duration = max_duration
self.gap = gap

def __iter__(self):
acc = None
acc_duration = 0.0
for cut in self.source:
cut_dur = cut.duration
gap_dur = self.gap if acc is not None else 0.0
new_duration = acc_duration + gap_dur + cut_dur
if acc is not None and new_duration > self.max_duration:
yield acc
acc = cut
acc_duration = cut_dur
elif acc is None:
acc = cut
acc_duration = cut_dur
else:
if self.gap > 0:
acc = acc.pad(acc.duration + self.gap).append(cut)
else:
acc = acc.append(cut)
acc_duration = new_duration
if acc is not None:
yield acc


@data_type_parser(["parquet"])
def read_parquet_manifest(config: DictConfig) -> tuple[CutSet, bool]:
"""
Expand Down
Loading
Loading