269 lines
8.0 KiB
Python
Executable File
269 lines
8.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import re
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import cast
|
|
|
|
|
|
KV_PATTERN = re.compile(r"([a-zA-Z0-9_]+)=([^\s]+)")
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class CliArgs:
|
|
manifest: str
|
|
output: str
|
|
run_id: str
|
|
run_dir: str
|
|
started_at: str
|
|
finished_at: str
|
|
|
|
|
|
def parse_args() -> CliArgs:
|
|
parser = argparse.ArgumentParser(
|
|
description="Build JSON summary for standalone acceptance matrix"
|
|
)
|
|
_ = parser.add_argument("--manifest", required=True)
|
|
_ = parser.add_argument("--output", required=True)
|
|
_ = parser.add_argument("--run-id", required=True)
|
|
_ = parser.add_argument("--run-dir", required=True)
|
|
_ = parser.add_argument("--started-at", required=True)
|
|
_ = parser.add_argument("--finished-at", required=True)
|
|
parsed = parser.parse_args(sys.argv[1:])
|
|
return CliArgs(
|
|
manifest=cast(str, parsed.manifest),
|
|
output=cast(str, parsed.output),
|
|
run_id=cast(str, parsed.run_id),
|
|
run_dir=cast(str, parsed.run_dir),
|
|
started_at=cast(str, parsed.started_at),
|
|
finished_at=cast(str, parsed.finished_at),
|
|
)
|
|
|
|
|
|
def read_text(path: str) -> str:
|
|
p = Path(path)
|
|
if not p.exists():
|
|
return ""
|
|
try:
|
|
return p.read_text(encoding="utf-8", errors="replace")
|
|
except OSError:
|
|
return ""
|
|
|
|
|
|
def to_number(value: str) -> int | float | str:
|
|
if re.fullmatch(r"-?\d+", value):
|
|
try:
|
|
return int(value)
|
|
except ValueError:
|
|
return value
|
|
if re.fullmatch(r"-?\d+\.\d+", value):
|
|
try:
|
|
return float(value)
|
|
except ValueError:
|
|
return value
|
|
return value
|
|
|
|
|
|
def parse_key_values(line: str) -> dict[str, int | float | str]:
|
|
return {match.group(1): to_number(match.group(2)) for match in KV_PATTERN.finditer(line)}
|
|
|
|
|
|
def last_line_with_token(text: str, token: str) -> str:
|
|
found = ""
|
|
for line in text.splitlines():
|
|
if token in line:
|
|
found = line
|
|
return found
|
|
|
|
|
|
def parse_rtp_receiver_metrics(text: str) -> dict[str, int]:
|
|
metrics: dict[str, int] = {}
|
|
patterns = {
|
|
"packets_received": r"Packets received:\s*(\d+)",
|
|
"sequence_gaps": r"Sequence gaps:\s*(\d+)",
|
|
"invalid_packets": r"Invalid packets:\s*(\d+)",
|
|
"detected_payload_type": r"Detected payload type:\s*(\d+)",
|
|
}
|
|
for key, pattern in patterns.items():
|
|
match = re.search(pattern, text)
|
|
if match:
|
|
metrics[key] = int(match.group(1))
|
|
return metrics
|
|
|
|
|
|
def parse_rtmp_stub_metrics(text: str) -> dict[str, int]:
|
|
metrics: dict[str, int] = {}
|
|
messages = re.search(
|
|
r"Messages:\s*total=(\d+),\s*audio=(\d+),\s*video=(\d+),\s*data=(\d+),\s*chunk-size-updates=(\d+)",
|
|
text,
|
|
)
|
|
if messages:
|
|
metrics.update(
|
|
{
|
|
"messages_total": int(messages.group(1)),
|
|
"messages_audio": int(messages.group(2)),
|
|
"messages_video": int(messages.group(3)),
|
|
"messages_data": int(messages.group(4)),
|
|
"chunk_size_updates": int(messages.group(5)),
|
|
}
|
|
)
|
|
|
|
counts = re.search(
|
|
r"Video signaling counts:\s*h264=(\d+),\s*h265-enhanced=(\d+),\s*unknown=(\d+)",
|
|
text,
|
|
)
|
|
if counts:
|
|
metrics.update(
|
|
{
|
|
"h264_video_messages": int(counts.group(1)),
|
|
"h265_enhanced_video_messages": int(counts.group(2)),
|
|
"unknown_video_messages": int(counts.group(3)),
|
|
}
|
|
)
|
|
|
|
matching = re.search(
|
|
r"Matching count for expected mode:\s*(\d+)\s*\(threshold=(\d+)\)", text
|
|
)
|
|
if matching:
|
|
metrics.update(
|
|
{
|
|
"matching_count": int(matching.group(1)),
|
|
"matching_threshold": int(matching.group(2)),
|
|
}
|
|
)
|
|
return metrics
|
|
|
|
|
|
def parse_sdp_metrics(path: str) -> dict[str, object]:
|
|
if not path:
|
|
return {}
|
|
p = Path(path)
|
|
if not p.exists():
|
|
return {"exists": False}
|
|
text = read_text(path)
|
|
metrics: dict[str, object] = {
|
|
"exists": True,
|
|
"bytes": p.stat().st_size,
|
|
"has_h264": "H264/90000" in text,
|
|
"has_h265": ("H265/90000" in text) or ("HEVC/90000" in text),
|
|
}
|
|
match = re.search(r"m=video\s+\d+\s+RTP/AVP\s+(\d+)", text)
|
|
if match:
|
|
metrics["payload_type"] = int(match.group(1))
|
|
return metrics
|
|
|
|
|
|
def parse_exit(value: str) -> int:
|
|
try:
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return -1
|
|
|
|
|
|
def parse_duration_ms(value: str) -> int:
|
|
try:
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return 0
|
|
|
|
|
|
def parse_manifest(path: str) -> list[dict[str, str]]:
|
|
rows: list[dict[str, str]] = []
|
|
with open(path, "r", encoding="utf-8", newline="") as handle:
|
|
reader = csv.DictReader(handle, delimiter="\t")
|
|
for raw in reader:
|
|
row = {key: "" if value is None else str(value) for key, value in raw.items()}
|
|
rows.append(row)
|
|
return rows
|
|
|
|
|
|
def build_summary(args: CliArgs) -> dict[str, object]:
|
|
manifest_rows = parse_manifest(args.manifest)
|
|
rows: list[dict[str, object]] = []
|
|
|
|
for row in sorted(manifest_rows, key=lambda item: int(item["order"])):
|
|
emitter_text = read_text(row["emitter_log"])
|
|
receiver_text = read_text(row["receiver_log"])
|
|
emitter_metrics: dict[str, dict[str, int | float | str]] = {}
|
|
for token, key in (
|
|
("RTP_METRICS", "rtp"),
|
|
("RTMP_OUTPUT_METRICS", "rtmp"),
|
|
):
|
|
line = last_line_with_token(emitter_text, token)
|
|
if line:
|
|
emitter_metrics[key] = parse_key_values(line)
|
|
|
|
receiver_metrics: dict[str, object]
|
|
if row["protocol"] == "rtp":
|
|
receiver_metrics = parse_rtp_receiver_metrics(receiver_text)
|
|
else:
|
|
receiver_metrics = parse_rtmp_stub_metrics(receiver_text)
|
|
|
|
rows.append(
|
|
{
|
|
"order": int(row["order"]),
|
|
"id": row["row_id"],
|
|
"name": row["name"],
|
|
"protocol": row["protocol"],
|
|
"codec": row["codec"],
|
|
"transport": row["transport"],
|
|
"status": row["status"],
|
|
"reason": row["reason"],
|
|
"duration_ms": parse_duration_ms(row["duration_ms"]),
|
|
"exit_codes": {
|
|
"emitter": parse_exit(row["emitter_rc"]),
|
|
"receiver": parse_exit(row["receiver_rc"]),
|
|
},
|
|
"metrics": {
|
|
"emitter": emitter_metrics,
|
|
"receiver": receiver_metrics,
|
|
"sdp": parse_sdp_metrics(row.get("sdp_path", "")),
|
|
},
|
|
"evidence": {
|
|
"emitter_log": row["emitter_log"],
|
|
"receiver_log": row["receiver_log"],
|
|
"sdp_path": row.get("sdp_path") or None,
|
|
},
|
|
}
|
|
)
|
|
|
|
pass_count = sum(1 for row in rows if row["status"] == "PASS")
|
|
fail_count = sum(1 for row in rows if row["status"] == "FAIL")
|
|
skip_count = sum(1 for row in rows if row["status"] == "SKIP")
|
|
all_pass = len(rows) == 6 and pass_count == 6 and fail_count == 0 and skip_count == 0
|
|
|
|
return {
|
|
"run_id": args.run_id,
|
|
"run_dir": args.run_dir,
|
|
"started_at": args.started_at,
|
|
"finished_at": args.finished_at,
|
|
"counts": {
|
|
"total": len(rows),
|
|
"pass": pass_count,
|
|
"fail": fail_count,
|
|
"skip": skip_count,
|
|
},
|
|
"all_pass": all_pass,
|
|
"recommended_exit_code": 0 if all_pass else 1,
|
|
"rows": rows,
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
output_path = Path(args.output)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
summary = build_summary(args)
|
|
output_path.write_text(json.dumps(summary, indent=2) + "\n", encoding="utf-8")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|