feat(zed): add DuckDB segment timestamp indexer

Add a new mcap_video_bounds helper binary plus a zed_segment_time_index.py CLI that builds and queries an embedded DuckDB index for bundled ZED segment recordings.

The index stores segment folders, MCAP paths, video time bounds, durations, camera labels, and dataset metadata, and reuses the existing recursive multi-camera segment discovery logic so nested kindergarten layouts are indexed correctly.

Infer a dataset default timezone from folder names versus MCAP timestamps, and make point queries precision-aware so second-level folder timestamps like 2026-03-18T12-00-23 resolve to the matching segment instead of missing due to subsecond start offsets.

Verification:
- uv add 'duckdb>=1.0'
- cmake --build build --target mcap_video_bounds
- uv run python -m unittest tests.test_zed_segment_time_index
- uv run python scripts/zed_segment_time_index.py build /workspaces/data/kindergarten --jobs 8
- uv run python scripts/zed_segment_time_index.py query /workspaces/data/kindergarten --at 2026-03-18T12-00-23
This commit is contained in:
2026-03-23 09:35:54 +00:00
parent a0b9c95d5b
commit e3a423433e
7 changed files with 1185 additions and 0 deletions
+658
View File
@@ -0,0 +1,658 @@
#!/usr/bin/env python3
from __future__ import annotations
import concurrent.futures
import datetime as dt
import json
import os
import re
import subprocess
import tempfile
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from zoneinfo import ZoneInfo
import click
import duckdb
SCRIPT_PATH = Path(__file__).resolve()
REPO_ROOT = SCRIPT_PATH.parents[1]
DEFAULT_INDEX_NAME = "segment_time_index.duckdb"
INDEX_SCHEMA_VERSION = "1"
SEGMENT_FILE_PATTERN = re.compile(r".*_zed([0-9]+)\.svo2?$", re.IGNORECASE)
FOLDER_TIMESTAMP_PATTERN = re.compile(
r"^(?P<date>\d{4}-\d{2}-\d{2})[T ](?P<hour>\d{2})-(?P<minute>\d{2})-(?P<second>\d{2})(?P<fraction>\.\d+)?(?P<timezone>Z|[+-]\d{2}:\d{2})?$"
)
@dataclass(slots=True, frozen=True)
class SegmentScan:
segment_dir: Path
matched_files: int
camera_labels: tuple[str, ...]
is_valid: bool
reason: str | None = None
@dataclass(slots=True, frozen=True)
class BoundsRow:
segment_dir: Path
relative_segment_dir: str
group_path: str
activity: str
segment_name: str
mcap_path: Path
start_ns: int
end_ns: int
duration_ns: int
start_iso_utc: str
end_iso_utc: str
camera_count: int
camera_labels: str
video_message_count: int
index_source: str
def sorted_camera_labels(labels: set[str]) -> tuple[str, ...]:
return tuple(sorted(labels, key=lambda label: int(label[3:])))
def scan_segment_dir(segment_dir: Path) -> SegmentScan:
if not segment_dir.is_dir():
return SegmentScan(
segment_dir=segment_dir,
matched_files=0,
camera_labels=(),
is_valid=False,
reason=f"segment directory does not exist: {segment_dir}",
)
matched_by_camera: dict[str, list[Path]] = {}
for child in segment_dir.iterdir():
if not child.is_file():
continue
match = SEGMENT_FILE_PATTERN.fullmatch(child.name)
if match is None:
continue
label = f"zed{int(match.group(1))}"
matched_by_camera.setdefault(label, []).append(child)
matched_files = sum(len(paths) for paths in matched_by_camera.values())
camera_labels = sorted_camera_labels(set(matched_by_camera))
duplicate_cameras = [label for label, paths in sorted(matched_by_camera.items()) if len(paths) > 1]
if duplicate_cameras:
return SegmentScan(
segment_dir=segment_dir,
matched_files=matched_files,
camera_labels=camera_labels,
is_valid=False,
reason=f"duplicate camera inputs under {segment_dir}: {', '.join(duplicate_cameras)}",
)
if len(camera_labels) < 2:
return SegmentScan(
segment_dir=segment_dir,
matched_files=matched_files,
camera_labels=camera_labels,
is_valid=False,
reason=f"expected at least 2 camera inputs under {segment_dir}, found {len(camera_labels)}",
)
return SegmentScan(
segment_dir=segment_dir,
matched_files=matched_files,
camera_labels=camera_labels,
is_valid=True,
)
def discover_segment_dirs(root: Path, recursive: bool) -> tuple[list[SegmentScan], list[SegmentScan]]:
if not root.is_dir():
raise click.ClickException(f"input directory does not exist: {root}")
candidate_dirs = {root.resolve()}
iterator = root.rglob("*") if recursive else root.iterdir()
for path in iterator:
if path.is_dir():
candidate_dirs.add(path.resolve())
valid_scans: list[SegmentScan] = []
ignored_partial_scans: list[SegmentScan] = []
for segment_dir in sorted(candidate_dirs):
scan = scan_segment_dir(segment_dir)
if scan.is_valid:
valid_scans.append(scan)
elif scan.matched_files > 0:
ignored_partial_scans.append(scan)
if not valid_scans:
raise click.ClickException(f"no multi-camera segments found under {root}")
return valid_scans, ignored_partial_scans
def locate_binary(name: str, override: Path | None) -> Path:
if override is not None:
candidate = override.expanduser().resolve()
if not candidate.is_file():
raise click.ClickException(f"binary not found: {candidate}")
return candidate
candidates = (
REPO_ROOT / "build" / "bin" / name,
REPO_ROOT / "build" / name,
)
for candidate in candidates:
if candidate.is_file():
return candidate
raise click.ClickException(f"could not find {name} under {REPO_ROOT / 'build'}")
def default_index_path(dataset_root: Path) -> Path:
return dataset_root / DEFAULT_INDEX_NAME
def find_unique_mcap(segment_dir: Path) -> Path | None:
matches = sorted(path for path in segment_dir.iterdir() if path.is_file() and path.suffix.lower() == ".mcap")
if len(matches) == 1:
return matches[0]
return None
def format_ns_iso(ns: int, tzinfo: dt.tzinfo) -> str:
seconds, nanos = divmod(ns, 1_000_000_000)
stamp = dt.datetime.fromtimestamp(seconds, tz=dt.timezone.utc).astimezone(tzinfo)
offset = stamp.strftime("%z")
offset = f"{offset[:3]}:{offset[3:]}" if offset else ""
return f"{stamp.strftime('%Y-%m-%dT%H:%M:%S')}.{nanos:09d}{offset}"
def format_ns_utc(ns: int) -> str:
return format_ns_iso(ns, dt.timezone.utc).replace("+00:00", "Z")
def resolve_timezone(name: str) -> dt.tzinfo:
if name == "local":
local = dt.datetime.now().astimezone().tzinfo
if local is None:
raise click.ClickException("could not resolve local timezone")
return local
if name == "UTC":
return dt.timezone.utc
if name.startswith("UTC") and len(name) == len("UTC+00:00"):
try:
sign = 1 if name[3] == "+" else -1
hours = int(name[4:6])
minutes = int(name[7:9])
except ValueError as exc:
raise click.ClickException(f"invalid fixed UTC offset '{name}'") from exc
return dt.timezone(sign * dt.timedelta(hours=hours, minutes=minutes))
try:
return ZoneInfo(name)
except Exception as exc: # pragma: no cover - defensive wrapper around system tzdb
raise click.ClickException(f"unknown timezone '{name}': {exc}") from exc
def normalize_timestamp_text(value: str) -> str:
match = FOLDER_TIMESTAMP_PATTERN.fullmatch(value)
if match is None:
return value
parts = match.groupdict()
fraction = parts["fraction"] or ""
timezone_text = parts["timezone"] or ""
return f"{parts['date']}T{parts['hour']}:{parts['minute']}:{parts['second']}{fraction}{timezone_text}"
def parse_folder_name_naive(value: str) -> dt.datetime | None:
normalized = normalize_timestamp_text(value)
try:
parsed = dt.datetime.fromisoformat(normalized)
except ValueError:
return None
if parsed.tzinfo is not None:
return None
return parsed
def datetime_to_ns(value: dt.datetime) -> int:
utc_value = value.astimezone(dt.timezone.utc)
return int(utc_value.timestamp()) * 1_000_000_000 + utc_value.microsecond * 1_000
def parse_timestamp_to_ns(value: str, timezone_name: str) -> int:
stripped = value.strip()
if not stripped:
raise click.ClickException("timestamp value is empty")
digit_text = stripped.lstrip("+-")
if digit_text.isdigit():
raw_value = int(stripped)
digits = len(digit_text)
if digits <= 10:
return raw_value * 1_000_000_000
if digits <= 13:
return raw_value * 1_000_000
if digits <= 16:
return raw_value * 1_000
return raw_value
normalized = normalize_timestamp_text(stripped)
if normalized.endswith("Z"):
normalized = normalized[:-1] + "+00:00"
try:
parsed = dt.datetime.fromisoformat(normalized)
except ValueError as exc:
raise click.ClickException(f"invalid timestamp '{value}': {exc}") from exc
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=resolve_timezone(timezone_name))
return datetime_to_ns(parsed)
def parse_timestamp_window(value: str, timezone_name: str) -> tuple[int, int]:
stripped = value.strip()
if not stripped:
raise click.ClickException("timestamp value is empty")
digit_text = stripped.lstrip("+-")
if digit_text.isdigit():
base_ns = parse_timestamp_to_ns(stripped, timezone_name)
digits = len(digit_text)
if digits <= 10:
precision_ns = 1_000_000_000
elif digits <= 13:
precision_ns = 1_000_000
elif digits <= 16:
precision_ns = 1_000
else:
precision_ns = 1
return base_ns, base_ns + precision_ns - 1
normalized = normalize_timestamp_text(stripped)
base_ns = parse_timestamp_to_ns(stripped, timezone_name)
fraction_match = re.search(r"\.(\d+)", normalized)
if fraction_match is None:
precision_ns = 1_000_000_000
else:
digits = min(len(fraction_match.group(1)), 9)
precision_ns = 10 ** (9 - digits)
return base_ns, base_ns + precision_ns - 1
def probe_mcap_bounds(bounds_bin: Path, mcap_path: Path) -> dict[str, Any]:
result = subprocess.run(
[str(bounds_bin), str(mcap_path), "--json"],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result.returncode != 0:
stderr = result.stderr.strip() or result.stdout.strip() or f"exit {result.returncode}"
raise RuntimeError(f"{mcap_path}: {stderr}")
try:
return json.loads(result.stdout)
except json.JSONDecodeError as exc:
raise RuntimeError(f"{mcap_path}: failed to parse helper JSON: {exc}") from exc
def build_row(dataset_root: Path, scan: SegmentScan, bounds_bin: Path) -> BoundsRow | None:
mcap_path = find_unique_mcap(scan.segment_dir)
if mcap_path is None:
return None
bounds = probe_mcap_bounds(bounds_bin, mcap_path)
relative_segment_dir = scan.segment_dir.relative_to(dataset_root).as_posix()
parent = Path(relative_segment_dir).parent
group_path = "" if str(parent) == "." else parent.as_posix()
parts = Path(relative_segment_dir).parts
activity = parts[0] if parts else scan.segment_dir.name
start_ns = int(bounds["start_ns"])
end_ns = int(bounds["end_ns"])
return BoundsRow(
segment_dir=scan.segment_dir,
relative_segment_dir=relative_segment_dir,
group_path=group_path,
activity=activity,
segment_name=scan.segment_dir.name,
mcap_path=mcap_path,
start_ns=start_ns,
end_ns=end_ns,
duration_ns=max(0, end_ns - start_ns),
start_iso_utc=str(bounds["start_iso_utc"]),
end_iso_utc=str(bounds["end_iso_utc"]),
camera_count=len(scan.camera_labels),
camera_labels=",".join(scan.camera_labels),
video_message_count=int(bounds["video_message_count"]),
index_source="mcap_video_bounds",
)
def init_db(conn: duckdb.DuckDBPyConnection) -> None:
conn.execute(
"""
CREATE TABLE meta (
key VARCHAR PRIMARY KEY,
value VARCHAR NOT NULL
);
"""
)
conn.execute(
"""
CREATE TABLE segments (
segment_dir VARCHAR PRIMARY KEY,
relative_segment_dir VARCHAR NOT NULL,
group_path VARCHAR NOT NULL,
activity VARCHAR NOT NULL,
segment_name VARCHAR NOT NULL,
mcap_path VARCHAR NOT NULL,
start_ns BIGINT NOT NULL,
end_ns BIGINT NOT NULL,
duration_ns BIGINT NOT NULL,
start_iso_utc VARCHAR NOT NULL,
end_iso_utc VARCHAR NOT NULL,
camera_count INTEGER NOT NULL,
camera_labels VARCHAR NOT NULL,
video_message_count BIGINT NOT NULL,
index_source VARCHAR NOT NULL
);
"""
)
conn.execute("CREATE INDEX segments_start_ns_idx ON segments(start_ns);")
conn.execute("CREATE INDEX segments_end_ns_idx ON segments(end_ns);")
def write_index(index_path: Path, dataset_root: Path, rows: list[BoundsRow]) -> None:
index_path.parent.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile(prefix=f"{index_path.name}.", suffix=".tmp", dir=index_path.parent, delete=False) as handle:
temp_path = Path(handle.name)
temp_path.unlink(missing_ok=True)
inferred_timezone = infer_dataset_timezone(rows)
try:
conn = duckdb.connect(str(temp_path))
try:
init_db(conn)
conn.executemany(
"INSERT INTO meta (key, value) VALUES (?, ?)",
[
("schema_version", INDEX_SCHEMA_VERSION),
("dataset_root", str(dataset_root)),
("built_at_utc", dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")),
("default_timezone", inferred_timezone),
],
)
conn.executemany(
"""
INSERT INTO segments (
segment_dir,
relative_segment_dir,
group_path,
activity,
segment_name,
mcap_path,
start_ns,
end_ns,
duration_ns,
start_iso_utc,
end_iso_utc,
camera_count,
camera_labels,
video_message_count,
index_source
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
[
(
str(row.segment_dir),
row.relative_segment_dir,
row.group_path,
row.activity,
row.segment_name,
str(row.mcap_path),
row.start_ns,
row.end_ns,
row.duration_ns,
row.start_iso_utc,
row.end_iso_utc,
row.camera_count,
row.camera_labels,
row.video_message_count,
row.index_source,
)
for row in rows
],
)
finally:
conn.close()
temp_path.replace(index_path)
except Exception:
temp_path.unlink(missing_ok=True)
raise
def infer_dataset_timezone(rows: list[BoundsRow]) -> str:
offset_counts: dict[int, int] = {}
for row in rows:
folder_time = parse_folder_name_naive(row.segment_name)
if folder_time is None:
continue
actual_utc = dt.datetime.fromtimestamp(row.start_ns / 1_000_000_000, tz=dt.timezone.utc).replace(tzinfo=None)
offset_minutes = round((folder_time - actual_utc).total_seconds() / 60.0)
offset_counts[offset_minutes] = offset_counts.get(offset_minutes, 0) + 1
if not offset_counts:
return "local"
minutes = max(offset_counts.items(), key=lambda item: item[1])[0]
if minutes == 0:
return "UTC"
sign = "+" if minutes >= 0 else "-"
absolute_minutes = abs(minutes)
hours, mins = divmod(absolute_minutes, 60)
return f"UTC{sign}{hours:02d}:{mins:02d}"
def require_query_window(at: str | None, start: str | None, end: str | None, timezone_name: str) -> tuple[int, int]:
if at is not None and (start is not None or end is not None):
raise click.ClickException("use either --at or --start/--end, not both")
if at is not None:
return parse_timestamp_window(at, timezone_name)
if start is None or end is None:
raise click.ClickException("provide --at or both --start and --end")
start_ns = parse_timestamp_to_ns(start, timezone_name)
end_ns = parse_timestamp_to_ns(end, timezone_name)
if start_ns > end_ns:
raise click.ClickException("query start must be before or equal to query end")
return start_ns, end_ns
def load_meta(conn: duckdb.DuckDBPyConnection) -> dict[str, str]:
rows = conn.execute("SELECT key, value FROM meta").fetchall()
return {str(key): str(value) for key, value in rows}
def format_duration(duration_ns: int) -> str:
return f"{duration_ns / 1_000_000_000:.3f}s"
@click.group()
def cli() -> None:
"""Build and query a DuckDB index of bundled ZED segment timestamps."""
@cli.command()
@click.argument("dataset_root", type=click.Path(path_type=Path, file_okay=False))
@click.option("--index", "index_path", type=click.Path(path_type=Path, dir_okay=False))
@click.option("--recursive/--no-recursive", default=True, show_default=True)
@click.option("--jobs", type=click.IntRange(min=1), default=min(8, os.cpu_count() or 1), show_default=True)
@click.option("--bounds-bin", type=click.Path(path_type=Path, dir_okay=False))
def build(dataset_root: Path, index_path: Path | None, recursive: bool, jobs: int, bounds_bin: Path | None) -> None:
"""Build or replace the embedded DuckDB time index for DATASET_ROOT."""
dataset_root = dataset_root.expanduser().resolve()
index_path = (index_path or default_index_path(dataset_root)).expanduser().resolve()
bounds_binary = locate_binary("mcap_video_bounds", bounds_bin)
valid_scans, ignored_partial_scans = discover_segment_dirs(dataset_root, recursive)
click.echo(
f"discovered {len(valid_scans)} valid segment directories under {dataset_root}",
err=True,
)
if ignored_partial_scans:
click.echo(f"ignored {len(ignored_partial_scans)} partial segment directories", err=True)
rows: list[BoundsRow] = []
skipped_missing_mcap: list[Path] = []
errors: list[str] = []
with concurrent.futures.ThreadPoolExecutor(max_workers=jobs) as executor:
future_to_scan: dict[concurrent.futures.Future[BoundsRow | None], SegmentScan] = {
executor.submit(build_row, dataset_root, scan, bounds_binary): scan for scan in valid_scans
}
for future in concurrent.futures.as_completed(future_to_scan):
scan = future_to_scan[future]
try:
row = future.result()
except Exception as exc:
errors.append(f"{scan.segment_dir}: {exc}")
continue
if row is None:
skipped_missing_mcap.append(scan.segment_dir)
continue
rows.append(row)
rows.sort(key=lambda row: (row.start_ns, row.segment_dir.as_posix()))
if skipped_missing_mcap:
click.echo(f"skipped {len(skipped_missing_mcap)} segments with missing or ambiguous MCAP files", err=True)
if errors:
for error in errors:
click.echo(f"error: {error}", err=True)
raise click.ClickException(f"failed to probe {len(errors)} segment(s)")
if not rows:
raise click.ClickException("no indexable MCAP segments were found")
write_index(index_path, dataset_root, rows)
click.echo(
f"wrote {len(rows)} segments to {index_path} (skipped_missing_mcap={len(skipped_missing_mcap)})",
err=True,
)
@cli.command()
@click.argument("dataset_root", type=click.Path(path_type=Path, file_okay=False))
@click.option("--index", "index_path", type=click.Path(path_type=Path, dir_okay=False))
@click.option("--at")
@click.option("--start")
@click.option("--end")
@click.option("--json", "as_json", is_flag=True)
@click.option("--timezone", "timezone_name", default="dataset", show_default=True)
def query(
dataset_root: Path,
index_path: Path | None,
at: str | None,
start: str | None,
end: str | None,
as_json: bool,
timezone_name: str,
) -> None:
"""Query the embedded time index for matching segment folders."""
dataset_root = dataset_root.expanduser().resolve()
index_path = (index_path or default_index_path(dataset_root)).expanduser().resolve()
if not index_path.is_file():
raise click.ClickException(f"index not found: {index_path}")
conn = duckdb.connect(str(index_path), read_only=True)
try:
meta = load_meta(conn)
indexed_root = Path(meta.get("dataset_root", "")).expanduser().resolve()
if indexed_root != dataset_root:
raise click.ClickException(
f"index root mismatch: index was built for {indexed_root}, not {dataset_root}"
)
effective_timezone_name = meta.get("default_timezone", "local") if timezone_name == "dataset" else timezone_name
query_start_ns, query_end_ns = require_query_window(at, start, end, effective_timezone_name)
display_timezone = resolve_timezone(effective_timezone_name)
result_rows = conn.execute(
"""
SELECT
segment_dir,
relative_segment_dir,
group_path,
activity,
segment_name,
mcap_path,
start_ns,
end_ns,
duration_ns,
start_iso_utc,
end_iso_utc,
camera_count,
camera_labels,
video_message_count,
index_source
FROM segments
WHERE start_ns <= ? AND end_ns >= ?
ORDER BY start_ns, segment_dir
""",
[query_end_ns, query_start_ns],
).fetchall()
finally:
conn.close()
payload = [
{
"segment_dir": row[0],
"relative_segment_dir": row[1],
"group_path": row[2],
"activity": row[3],
"segment_name": row[4],
"mcap_path": row[5],
"start_ns": row[6],
"end_ns": row[7],
"duration_ns": row[8],
"start_iso_utc": row[9],
"end_iso_utc": row[10],
"camera_count": row[11],
"camera_labels": row[12].split(",") if row[12] else [],
"video_message_count": row[13],
"index_source": row[14],
"start_display": format_ns_iso(row[6], display_timezone),
"end_display": format_ns_iso(row[7], display_timezone),
}
for row in result_rows
]
if as_json:
click.echo(json.dumps(payload, indent=2, ensure_ascii=False))
return
if not payload:
click.echo("no matching segments")
return
click.echo(f"matched {len(payload)} segment(s)")
for row in payload:
click.echo(
" | ".join(
(
row["start_display"],
row["end_display"],
format_duration(int(row["duration_ns"])),
row["segment_dir"],
row["mcap_path"],
)
)
)
if __name__ == "__main__":
cli()