#!/usr/bin/env python3 from __future__ import annotations import argparse import csv import json import re import sys from dataclasses import dataclass from pathlib import Path from typing import cast MetricValue = int | float | str | bool KV_PATTERN = re.compile(r"([a-zA-Z_]+)=([^\s]+)") @dataclass(frozen=True) class CliArgs: manifest: str output: str run_id: str run_dir: str started_at: str finished_at: str def parse_args() -> CliArgs: parser = argparse.ArgumentParser( description="Build JSON summary for standalone acceptance matrix" ) _ = parser.add_argument( "--manifest", required=True, help="TSV manifest produced by acceptance runner" ) _ = parser.add_argument("--output", required=True, help="Output JSON summary path") _ = parser.add_argument("--run-id", required=True) _ = parser.add_argument("--run-dir", required=True) _ = parser.add_argument("--started-at", required=True) _ = parser.add_argument("--finished-at", required=True) parsed = parser.parse_args(sys.argv[1:]) manifest = cast(str, parsed.manifest) output = cast(str, parsed.output) run_id = cast(str, parsed.run_id) run_dir = cast(str, parsed.run_dir) started_at = cast(str, parsed.started_at) finished_at = cast(str, parsed.finished_at) return CliArgs( manifest=manifest, output=output, run_id=run_id, run_dir=run_dir, started_at=started_at, finished_at=finished_at, ) def read_text(path: str) -> str: p = Path(path) if not p.exists(): return "" try: return p.read_text(encoding="utf-8", errors="replace") except OSError: return "" def to_number(value: str) -> MetricValue: if re.fullmatch(r"-?\d+", value): try: return int(value) except ValueError: return value if re.fullmatch(r"-?\d+\.\d+", value): try: return float(value) except ValueError: return value return value def parse_key_value_metrics(line: str) -> dict[str, MetricValue]: metrics: dict[str, MetricValue] = {} for match in KV_PATTERN.finditer(line): key = match.group(1) raw = match.group(2) metrics[key] = to_number(raw) return metrics def extract_last_matching_line(text: str, token: str) -> str: match = "" for line in text.splitlines(): if token in line: match = line return match def parse_rtp_tester_metrics(text: str) -> dict[str, MetricValue]: metrics: dict[str, MetricValue] = {} patterns = { "packets_received": r"Packets received:\s*(\d+)", "sequence_gaps": r"Sequence gaps:\s*(\d+)", "invalid_packets": r"Invalid packets:\s*(\d+)", "detected_payload_type": r"Detected payload type:\s*(\d+)", } for key, pattern in patterns.items(): m = re.search(pattern, text) if m: metrics[key] = int(m.group(1)) return metrics def parse_rtmp_tester_metrics(text: str) -> dict[str, MetricValue]: metrics: dict[str, MetricValue] = {} messages = re.search( r"Messages:\s*total=(\d+),\s*audio=(\d+),\s*video=(\d+),\s*data=(\d+),\s*chunk-size-updates=(\d+)", text, ) if messages: metrics.update( { "messages_total": int(messages.group(1)), "messages_audio": int(messages.group(2)), "messages_video": int(messages.group(3)), "messages_data": int(messages.group(4)), "chunk_size_updates": int(messages.group(5)), } ) counts = re.search( r"Video signaling counts:\s*h264=(\d+),\s*h265-enhanced=(\d+),\s*h265-domestic=(\d+),\s*unknown=(\d+)", text, ) if counts: metrics.update( { "h264_video_messages": int(counts.group(1)), "h265_enhanced_video_messages": int(counts.group(2)), "h265_domestic_video_messages": int(counts.group(3)), "unknown_video_messages": int(counts.group(4)), } ) matching = re.search( r"Matching count for expected mode:\s*(\d+)\s*\(threshold=(\d+)\)", text ) if matching: metrics.update( { "matching_count": int(matching.group(1)), "matching_threshold": int(matching.group(2)), } ) return metrics def parse_streamer_metrics(text: str) -> dict[str, dict[str, MetricValue]]: result: dict[str, dict[str, MetricValue]] = {} for token, key in ( ("PIPELINE_METRICS", "pipeline"), ("RTP_METRICS", "rtp"), ("RTMP_METRICS", "rtmp"), ): line = extract_last_matching_line(text, token) if line: result[key] = parse_key_value_metrics(line) return result def parse_sdp_metrics(path: str) -> dict[str, MetricValue]: p = Path(path) if not path: return {} if not p.exists(): return {"exists": False} text = read_text(path) metrics: dict[str, MetricValue] = { "exists": True, "bytes": p.stat().st_size, "has_h264": "H264/90000" in text, "has_h265": ("H265/90000" in text) or ("HEVC/90000" in text), } m = re.search(r"m=video\s+\d+\s+RTP/AVP\s+(\d+)", text) if m: metrics["payload_type"] = int(m.group(1)) return metrics def parse_exit_code(value: str) -> int: try: return int(value) except (TypeError, ValueError): return -1 def parse_duration_ms(value: str) -> int: try: return int(value) except (TypeError, ValueError): return 0 MANIFEST_FIELDS = ( "order", "row_id", "name", "protocol", "codec", "rtmp_mode", "status", "reason", "duration_ms", "sim_rc", "streamer_rc", "tester_rc", "sim_log", "streamer_log", "tester_log", "sdp_path", ) def parse_manifest(path: str) -> list[dict[str, str]]: rows: list[dict[str, str]] = [] with open(path, "r", encoding="utf-8", newline="") as handle: reader = csv.DictReader(handle, delimiter="\t") for raw_row in reader: row: dict[str, str] = {} for field in MANIFEST_FIELDS: value = raw_row.get(field, "") row[field] = "" if value is None else str(value) rows.append(row) return rows def build_summary(args: CliArgs) -> dict[str, object]: manifest_rows = parse_manifest(args.manifest) rows: list[dict[str, object]] = [] for row in sorted(manifest_rows, key=lambda item: int(item["order"])): streamer_log = row["streamer_log"] tester_log = row["tester_log"] sim_log = row["sim_log"] sdp_path = row.get("sdp_path", "") streamer_text = read_text(streamer_log) tester_text = read_text(tester_log) tester_metrics: dict[str, MetricValue] if row["protocol"] == "rtp": tester_metrics = parse_rtp_tester_metrics(tester_text) else: tester_metrics = parse_rtmp_tester_metrics(tester_text) metrics: dict[str, object] = { "tester": tester_metrics, "streamer": parse_streamer_metrics(streamer_text), } if row["protocol"] == "rtp": metrics["sdp"] = parse_sdp_metrics(sdp_path) rows.append( { "order": int(row["order"]), "id": row["row_id"], "name": row["name"], "protocol": row["protocol"], "codec": row["codec"], "rtmp_mode": row["rtmp_mode"] if row["rtmp_mode"] else None, "status": row["status"], "reason": row["reason"], "duration_ms": parse_duration_ms(row["duration_ms"]), "exit_codes": { "sim": parse_exit_code(row["sim_rc"]), "streamer": parse_exit_code(row["streamer_rc"]), "tester": parse_exit_code(row["tester_rc"]), }, "metrics": metrics, "evidence": { "sim_log": sim_log, "streamer_log": streamer_log, "tester_log": tester_log, "sdp": sdp_path if sdp_path else None, }, } ) pass_count = sum(1 for row in rows if row["status"] == "PASS") fail_count = sum(1 for row in rows if row["status"] == "FAIL") skip_count = sum(1 for row in rows if row["status"] == "SKIP") all_pass = ( len(rows) == 5 and pass_count == 5 and fail_count == 0 and skip_count == 0 ) return { "run_id": args.run_id, "run_dir": args.run_dir, "started_at": args.started_at, "finished_at": args.finished_at, "counts": { "total": len(rows), "pass": pass_count, "fail": fail_count, "skip": skip_count, }, "all_pass": all_pass, "recommended_exit_code": 0 if all_pass else 1, "rows": rows, } def main() -> int: args = parse_args() output_path = Path(args.output) output_path.parent.mkdir(parents=True, exist_ok=True) summary = build_summary(args) _ = output_path.write_text( json.dumps(summary, indent=2, sort_keys=False) + "\n", encoding="utf-8" ) return 0 if __name__ == "__main__": raise SystemExit(main())