Files
cvmmap-streamer/scripts/acceptance_summary_helper.py

269 lines
8.0 KiB
Python
Executable File

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import csv
import json
import re
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import cast
KV_PATTERN = re.compile(r"([a-zA-Z0-9_]+)=([^\s]+)")
@dataclass(frozen=True)
class CliArgs:
manifest: str
output: str
run_id: str
run_dir: str
started_at: str
finished_at: str
def parse_args() -> CliArgs:
parser = argparse.ArgumentParser(
description="Build JSON summary for standalone acceptance matrix"
)
_ = parser.add_argument("--manifest", required=True)
_ = parser.add_argument("--output", required=True)
_ = parser.add_argument("--run-id", required=True)
_ = parser.add_argument("--run-dir", required=True)
_ = parser.add_argument("--started-at", required=True)
_ = parser.add_argument("--finished-at", required=True)
parsed = parser.parse_args(sys.argv[1:])
return CliArgs(
manifest=cast(str, parsed.manifest),
output=cast(str, parsed.output),
run_id=cast(str, parsed.run_id),
run_dir=cast(str, parsed.run_dir),
started_at=cast(str, parsed.started_at),
finished_at=cast(str, parsed.finished_at),
)
def read_text(path: str) -> str:
p = Path(path)
if not p.exists():
return ""
try:
return p.read_text(encoding="utf-8", errors="replace")
except OSError:
return ""
def to_number(value: str) -> int | float | str:
if re.fullmatch(r"-?\d+", value):
try:
return int(value)
except ValueError:
return value
if re.fullmatch(r"-?\d+\.\d+", value):
try:
return float(value)
except ValueError:
return value
return value
def parse_key_values(line: str) -> dict[str, int | float | str]:
return {match.group(1): to_number(match.group(2)) for match in KV_PATTERN.finditer(line)}
def last_line_with_token(text: str, token: str) -> str:
found = ""
for line in text.splitlines():
if token in line:
found = line
return found
def parse_rtp_receiver_metrics(text: str) -> dict[str, int]:
metrics: dict[str, int] = {}
patterns = {
"packets_received": r"Packets received:\s*(\d+)",
"sequence_gaps": r"Sequence gaps:\s*(\d+)",
"invalid_packets": r"Invalid packets:\s*(\d+)",
"detected_payload_type": r"Detected payload type:\s*(\d+)",
}
for key, pattern in patterns.items():
match = re.search(pattern, text)
if match:
metrics[key] = int(match.group(1))
return metrics
def parse_rtmp_stub_metrics(text: str) -> dict[str, int]:
metrics: dict[str, int] = {}
messages = re.search(
r"Messages:\s*total=(\d+),\s*audio=(\d+),\s*video=(\d+),\s*data=(\d+),\s*chunk-size-updates=(\d+)",
text,
)
if messages:
metrics.update(
{
"messages_total": int(messages.group(1)),
"messages_audio": int(messages.group(2)),
"messages_video": int(messages.group(3)),
"messages_data": int(messages.group(4)),
"chunk_size_updates": int(messages.group(5)),
}
)
counts = re.search(
r"Video signaling counts:\s*h264=(\d+),\s*h265-enhanced=(\d+),\s*unknown=(\d+)",
text,
)
if counts:
metrics.update(
{
"h264_video_messages": int(counts.group(1)),
"h265_enhanced_video_messages": int(counts.group(2)),
"unknown_video_messages": int(counts.group(3)),
}
)
matching = re.search(
r"Matching count for expected mode:\s*(\d+)\s*\(threshold=(\d+)\)", text
)
if matching:
metrics.update(
{
"matching_count": int(matching.group(1)),
"matching_threshold": int(matching.group(2)),
}
)
return metrics
def parse_sdp_metrics(path: str) -> dict[str, object]:
if not path:
return {}
p = Path(path)
if not p.exists():
return {"exists": False}
text = read_text(path)
metrics: dict[str, object] = {
"exists": True,
"bytes": p.stat().st_size,
"has_h264": "H264/90000" in text,
"has_h265": ("H265/90000" in text) or ("HEVC/90000" in text),
}
match = re.search(r"m=video\s+\d+\s+RTP/AVP\s+(\d+)", text)
if match:
metrics["payload_type"] = int(match.group(1))
return metrics
def parse_exit(value: str) -> int:
try:
return int(value)
except (TypeError, ValueError):
return -1
def parse_duration_ms(value: str) -> int:
try:
return int(value)
except (TypeError, ValueError):
return 0
def parse_manifest(path: str) -> list[dict[str, str]]:
rows: list[dict[str, str]] = []
with open(path, "r", encoding="utf-8", newline="") as handle:
reader = csv.DictReader(handle, delimiter="\t")
for raw in reader:
row = {key: "" if value is None else str(value) for key, value in raw.items()}
rows.append(row)
return rows
def build_summary(args: CliArgs) -> dict[str, object]:
manifest_rows = parse_manifest(args.manifest)
rows: list[dict[str, object]] = []
for row in sorted(manifest_rows, key=lambda item: int(item["order"])):
emitter_text = read_text(row["emitter_log"])
receiver_text = read_text(row["receiver_log"])
emitter_metrics: dict[str, dict[str, int | float | str]] = {}
for token, key in (
("RTP_METRICS", "rtp"),
("RTMP_OUTPUT_METRICS", "rtmp"),
):
line = last_line_with_token(emitter_text, token)
if line:
emitter_metrics[key] = parse_key_values(line)
receiver_metrics: dict[str, object]
if row["protocol"] == "rtp":
receiver_metrics = parse_rtp_receiver_metrics(receiver_text)
else:
receiver_metrics = parse_rtmp_stub_metrics(receiver_text)
rows.append(
{
"order": int(row["order"]),
"id": row["row_id"],
"name": row["name"],
"protocol": row["protocol"],
"codec": row["codec"],
"transport": row["transport"],
"status": row["status"],
"reason": row["reason"],
"duration_ms": parse_duration_ms(row["duration_ms"]),
"exit_codes": {
"emitter": parse_exit(row["emitter_rc"]),
"receiver": parse_exit(row["receiver_rc"]),
},
"metrics": {
"emitter": emitter_metrics,
"receiver": receiver_metrics,
"sdp": parse_sdp_metrics(row.get("sdp_path", "")),
},
"evidence": {
"emitter_log": row["emitter_log"],
"receiver_log": row["receiver_log"],
"sdp_path": row.get("sdp_path") or None,
},
}
)
pass_count = sum(1 for row in rows if row["status"] == "PASS")
fail_count = sum(1 for row in rows if row["status"] == "FAIL")
skip_count = sum(1 for row in rows if row["status"] == "SKIP")
all_pass = len(rows) == 6 and pass_count == 6 and fail_count == 0 and skip_count == 0
return {
"run_id": args.run_id,
"run_dir": args.run_dir,
"started_at": args.started_at,
"finished_at": args.finished_at,
"counts": {
"total": len(rows),
"pass": pass_count,
"fail": fail_count,
"skip": skip_count,
},
"all_pass": all_pass,
"recommended_exit_code": 0 if all_pass else 1,
"rows": rows,
}
def main() -> int:
args = parse_args()
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
summary = build_summary(args)
output_path.write_text(json.dumps(summary, indent=2) + "\n", encoding="utf-8")
return 0
if __name__ == "__main__":
raise SystemExit(main())