from __future__ import annotations import json from pathlib import Path import click from loguru import logger from pose_tracking_exp.detection import ( align_timestamp_sequences, parse_video_input_specs, scan_video, write_aligned_videos, ) from pose_tracking_exp.schema import TrackerConfig @click.command() @click.argument("inputs", nargs=-1, type=str, required=True) @click.option("--output-dir", type=click.Path(path_type=Path, file_okay=False), required=True) @click.option("--reference", "reference_name", type=str) @click.option("--max-skew-ms", type=float, default=None, help="Max timestamp skew in milliseconds.") @click.option("--min-views", type=click.IntRange(min=1), default=None) @click.option("--codec", type=str, default="mp4v", show_default=True) def main( inputs: tuple[str, ...], output_dir: Path, reference_name: str | None, max_skew_ms: float | None, min_views: int | None, codec: str, ) -> None: logger.remove() logger.add( click.get_text_stream("stderr"), level="INFO", format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}", ) parsed_inputs = parse_video_input_specs(inputs) tracker_defaults = TrackerConfig() scans = tuple( scan_video(path, source_name=source_name) for source_name, path in parsed_inputs ) if reference_name is None: reference_name = scans[0].source_name if min_views is None: min_views = len(scans) max_skew_ns = ( int(round(max_skew_ms * 1_000_000.0)) if max_skew_ms is not None else tracker_defaults.max_sync_skew_ns ) bundles = align_timestamp_sequences( scans, reference_name=reference_name, max_skew_ns=max_skew_ns, min_views=min_views, ) if not bundles: raise click.ClickException("No aligned frame bundles were found.") outputs = write_aligned_videos( scans, bundles, output_dir=output_dir, output_fps=scans[0].fps, codec=codec, ) metadata = { "reference_name": reference_name, "max_skew_ns": max_skew_ns, "min_views": min_views, "bundle_count": len(bundles), "sources": { scan.source_name: { "input_path": str(scan.path), "output_path": str(outputs[scan.source_name]), "input_fps": scan.fps, "input_frame_count": len(scan.timestamps_unix_ns), "output_frame_count": sum( 1 for bundle in bundles if scan.source_name in bundle.frame_indices_by_source ), } for scan in scans }, "bundles": [ { "bundle_index": bundle.bundle_index, "timestamp_unix_ns": bundle.timestamp_unix_ns, "frame_indices_by_source": bundle.frame_indices_by_source, } for bundle in bundles ], } (output_dir / "alignment.json").write_text(json.dumps(metadata, indent=2), encoding="utf-8") logger.info( "aligned {} bundles across {} sources into {}", len(bundles), len(scans), output_dir, ) if __name__ == "__main__": main()