#!/usr/bin/env python3 from __future__ import annotations import argparse import csv import json import re import sys from dataclasses import dataclass from pathlib import Path from typing import cast KV_PATTERN = re.compile(r"([a-zA-Z0-9_]+)=([^\s]+)") @dataclass(frozen=True) class CliArgs: manifest: str output: str run_id: str run_dir: str started_at: str finished_at: str def parse_args() -> CliArgs: parser = argparse.ArgumentParser( description="Build JSON summary for standalone acceptance matrix" ) _ = parser.add_argument("--manifest", required=True) _ = parser.add_argument("--output", required=True) _ = parser.add_argument("--run-id", required=True) _ = parser.add_argument("--run-dir", required=True) _ = parser.add_argument("--started-at", required=True) _ = parser.add_argument("--finished-at", required=True) parsed = parser.parse_args(sys.argv[1:]) return CliArgs( manifest=cast(str, parsed.manifest), output=cast(str, parsed.output), run_id=cast(str, parsed.run_id), run_dir=cast(str, parsed.run_dir), started_at=cast(str, parsed.started_at), finished_at=cast(str, parsed.finished_at), ) def read_text(path: str) -> str: p = Path(path) if not p.exists(): return "" try: return p.read_text(encoding="utf-8", errors="replace") except OSError: return "" def to_number(value: str) -> int | float | str: if re.fullmatch(r"-?\d+", value): try: return int(value) except ValueError: return value if re.fullmatch(r"-?\d+\.\d+", value): try: return float(value) except ValueError: return value return value def parse_key_values(line: str) -> dict[str, int | float | str]: return {match.group(1): to_number(match.group(2)) for match in KV_PATTERN.finditer(line)} def last_line_with_token(text: str, token: str) -> str: found = "" for line in text.splitlines(): if token in line: found = line return found def parse_rtp_receiver_metrics(text: str) -> dict[str, int]: metrics: dict[str, int] = {} patterns = { "packets_received": r"Packets received:\s*(\d+)", "sequence_gaps": r"Sequence gaps:\s*(\d+)", "invalid_packets": r"Invalid packets:\s*(\d+)", "detected_payload_type": r"Detected payload type:\s*(\d+)", } for key, pattern in patterns.items(): match = re.search(pattern, text) if match: metrics[key] = int(match.group(1)) return metrics def parse_rtmp_stub_metrics(text: str) -> dict[str, int]: metrics: dict[str, int] = {} messages = re.search( r"Messages:\s*total=(\d+),\s*audio=(\d+),\s*video=(\d+),\s*data=(\d+),\s*chunk-size-updates=(\d+)", text, ) if messages: metrics.update( { "messages_total": int(messages.group(1)), "messages_audio": int(messages.group(2)), "messages_video": int(messages.group(3)), "messages_data": int(messages.group(4)), "chunk_size_updates": int(messages.group(5)), } ) counts = re.search( r"Video signaling counts:\s*h264=(\d+),\s*h265-enhanced=(\d+),\s*unknown=(\d+)", text, ) if counts: metrics.update( { "h264_video_messages": int(counts.group(1)), "h265_enhanced_video_messages": int(counts.group(2)), "unknown_video_messages": int(counts.group(3)), } ) matching = re.search( r"Matching count for expected mode:\s*(\d+)\s*\(threshold=(\d+)\)", text ) if matching: metrics.update( { "matching_count": int(matching.group(1)), "matching_threshold": int(matching.group(2)), } ) return metrics def parse_sdp_metrics(path: str) -> dict[str, object]: if not path: return {} p = Path(path) if not p.exists(): return {"exists": False} text = read_text(path) metrics: dict[str, object] = { "exists": True, "bytes": p.stat().st_size, "has_h264": "H264/90000" in text, "has_h265": ("H265/90000" in text) or ("HEVC/90000" in text), } match = re.search(r"m=video\s+\d+\s+RTP/AVP\s+(\d+)", text) if match: metrics["payload_type"] = int(match.group(1)) return metrics def parse_exit(value: str) -> int: try: return int(value) except (TypeError, ValueError): return -1 def parse_duration_ms(value: str) -> int: try: return int(value) except (TypeError, ValueError): return 0 def parse_manifest(path: str) -> list[dict[str, str]]: rows: list[dict[str, str]] = [] with open(path, "r", encoding="utf-8", newline="") as handle: reader = csv.DictReader(handle, delimiter="\t") for raw in reader: row = {key: "" if value is None else str(value) for key, value in raw.items()} rows.append(row) return rows def build_summary(args: CliArgs) -> dict[str, object]: manifest_rows = parse_manifest(args.manifest) rows: list[dict[str, object]] = [] for row in sorted(manifest_rows, key=lambda item: int(item["order"])): emitter_text = read_text(row["emitter_log"]) receiver_text = read_text(row["receiver_log"]) emitter_metrics: dict[str, dict[str, int | float | str]] = {} for token, key in ( ("RTP_METRICS", "rtp"), ("RTMP_OUTPUT_METRICS", "rtmp"), ): line = last_line_with_token(emitter_text, token) if line: emitter_metrics[key] = parse_key_values(line) receiver_metrics: dict[str, object] if row["protocol"] == "rtp": receiver_metrics = parse_rtp_receiver_metrics(receiver_text) else: receiver_metrics = parse_rtmp_stub_metrics(receiver_text) rows.append( { "order": int(row["order"]), "id": row["row_id"], "name": row["name"], "protocol": row["protocol"], "codec": row["codec"], "transport": row["transport"], "status": row["status"], "reason": row["reason"], "duration_ms": parse_duration_ms(row["duration_ms"]), "exit_codes": { "emitter": parse_exit(row["emitter_rc"]), "receiver": parse_exit(row["receiver_rc"]), }, "metrics": { "emitter": emitter_metrics, "receiver": receiver_metrics, "sdp": parse_sdp_metrics(row.get("sdp_path", "")), }, "evidence": { "emitter_log": row["emitter_log"], "receiver_log": row["receiver_log"], "sdp_path": row.get("sdp_path") or None, }, } ) pass_count = sum(1 for row in rows if row["status"] == "PASS") fail_count = sum(1 for row in rows if row["status"] == "FAIL") skip_count = sum(1 for row in rows if row["status"] == "SKIP") all_pass = len(rows) == 6 and pass_count == 6 and fail_count == 0 and skip_count == 0 return { "run_id": args.run_id, "run_dir": args.run_dir, "started_at": args.started_at, "finished_at": args.finished_at, "counts": { "total": len(rows), "pass": pass_count, "fail": fail_count, "skip": skip_count, }, "all_pass": all_pass, "recommended_exit_code": 0 if all_pass else 1, "rows": rows, } def main() -> int: args = parse_args() output_path = Path(args.output) output_path.parent.mkdir(parents=True, exist_ok=True) summary = build_summary(args) output_path.write_text(json.dumps(summary, indent=2) + "\n", encoding="utf-8") return 0 if __name__ == "__main__": raise SystemExit(main())