991f7ded34
This commit packages the standalone task-14 acceptance and task-15 fault-suite execution toolchain for downstream validation. It includes all runnable harness scripts, helper utilities, and generated evidence captures so downstream behavior can be reproduced and reviewed independently from docs and core implementation. Bundling these assets separately allows QA/automation workflows to validate runtime changes without dragging operational notes or release-gate documentation into the same review unit. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
335 lines
9.3 KiB
Python
Executable File
335 lines
9.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import re
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import cast
|
|
|
|
|
|
MetricValue = int | float | str | bool
|
|
|
|
|
|
KV_PATTERN = re.compile(r"([a-zA-Z_]+)=([^\s]+)")
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class CliArgs:
|
|
manifest: str
|
|
output: str
|
|
run_id: str
|
|
run_dir: str
|
|
started_at: str
|
|
finished_at: str
|
|
|
|
|
|
def parse_args() -> CliArgs:
|
|
parser = argparse.ArgumentParser(
|
|
description="Build JSON summary for standalone acceptance matrix"
|
|
)
|
|
|
|
_ = parser.add_argument(
|
|
"--manifest", required=True, help="TSV manifest produced by acceptance runner"
|
|
)
|
|
_ = parser.add_argument("--output", required=True, help="Output JSON summary path")
|
|
_ = parser.add_argument("--run-id", required=True)
|
|
_ = parser.add_argument("--run-dir", required=True)
|
|
_ = parser.add_argument("--started-at", required=True)
|
|
_ = parser.add_argument("--finished-at", required=True)
|
|
|
|
parsed = parser.parse_args(sys.argv[1:])
|
|
manifest = cast(str, parsed.manifest)
|
|
output = cast(str, parsed.output)
|
|
run_id = cast(str, parsed.run_id)
|
|
run_dir = cast(str, parsed.run_dir)
|
|
started_at = cast(str, parsed.started_at)
|
|
finished_at = cast(str, parsed.finished_at)
|
|
|
|
return CliArgs(
|
|
manifest=manifest,
|
|
output=output,
|
|
run_id=run_id,
|
|
run_dir=run_dir,
|
|
started_at=started_at,
|
|
finished_at=finished_at,
|
|
)
|
|
|
|
|
|
def read_text(path: str) -> str:
|
|
p = Path(path)
|
|
if not p.exists():
|
|
return ""
|
|
try:
|
|
return p.read_text(encoding="utf-8", errors="replace")
|
|
except OSError:
|
|
return ""
|
|
|
|
|
|
def to_number(value: str) -> MetricValue:
|
|
if re.fullmatch(r"-?\d+", value):
|
|
try:
|
|
return int(value)
|
|
except ValueError:
|
|
return value
|
|
if re.fullmatch(r"-?\d+\.\d+", value):
|
|
try:
|
|
return float(value)
|
|
except ValueError:
|
|
return value
|
|
return value
|
|
|
|
|
|
def parse_key_value_metrics(line: str) -> dict[str, MetricValue]:
|
|
metrics: dict[str, MetricValue] = {}
|
|
for match in KV_PATTERN.finditer(line):
|
|
key = match.group(1)
|
|
raw = match.group(2)
|
|
metrics[key] = to_number(raw)
|
|
return metrics
|
|
|
|
|
|
def extract_last_matching_line(text: str, token: str) -> str:
|
|
match = ""
|
|
for line in text.splitlines():
|
|
if token in line:
|
|
match = line
|
|
return match
|
|
|
|
|
|
def parse_rtp_tester_metrics(text: str) -> dict[str, MetricValue]:
|
|
metrics: dict[str, MetricValue] = {}
|
|
patterns = {
|
|
"packets_received": r"Packets received:\s*(\d+)",
|
|
"sequence_gaps": r"Sequence gaps:\s*(\d+)",
|
|
"invalid_packets": r"Invalid packets:\s*(\d+)",
|
|
"detected_payload_type": r"Detected payload type:\s*(\d+)",
|
|
}
|
|
for key, pattern in patterns.items():
|
|
m = re.search(pattern, text)
|
|
if m:
|
|
metrics[key] = int(m.group(1))
|
|
return metrics
|
|
|
|
|
|
def parse_rtmp_tester_metrics(text: str) -> dict[str, MetricValue]:
|
|
metrics: dict[str, MetricValue] = {}
|
|
|
|
messages = re.search(
|
|
r"Messages:\s*total=(\d+),\s*audio=(\d+),\s*video=(\d+),\s*data=(\d+),\s*chunk-size-updates=(\d+)",
|
|
text,
|
|
)
|
|
if messages:
|
|
metrics.update(
|
|
{
|
|
"messages_total": int(messages.group(1)),
|
|
"messages_audio": int(messages.group(2)),
|
|
"messages_video": int(messages.group(3)),
|
|
"messages_data": int(messages.group(4)),
|
|
"chunk_size_updates": int(messages.group(5)),
|
|
}
|
|
)
|
|
|
|
counts = re.search(
|
|
r"Video signaling counts:\s*h264=(\d+),\s*h265-enhanced=(\d+),\s*h265-domestic=(\d+),\s*unknown=(\d+)",
|
|
text,
|
|
)
|
|
if counts:
|
|
metrics.update(
|
|
{
|
|
"h264_video_messages": int(counts.group(1)),
|
|
"h265_enhanced_video_messages": int(counts.group(2)),
|
|
"h265_domestic_video_messages": int(counts.group(3)),
|
|
"unknown_video_messages": int(counts.group(4)),
|
|
}
|
|
)
|
|
|
|
matching = re.search(
|
|
r"Matching count for expected mode:\s*(\d+)\s*\(threshold=(\d+)\)", text
|
|
)
|
|
if matching:
|
|
metrics.update(
|
|
{
|
|
"matching_count": int(matching.group(1)),
|
|
"matching_threshold": int(matching.group(2)),
|
|
}
|
|
)
|
|
|
|
return metrics
|
|
|
|
|
|
def parse_streamer_metrics(text: str) -> dict[str, dict[str, MetricValue]]:
|
|
result: dict[str, dict[str, MetricValue]] = {}
|
|
for token, key in (
|
|
("PIPELINE_METRICS", "pipeline"),
|
|
("RTP_METRICS", "rtp"),
|
|
("RTMP_METRICS", "rtmp"),
|
|
):
|
|
line = extract_last_matching_line(text, token)
|
|
if line:
|
|
result[key] = parse_key_value_metrics(line)
|
|
return result
|
|
|
|
|
|
def parse_sdp_metrics(path: str) -> dict[str, MetricValue]:
|
|
p = Path(path)
|
|
if not path:
|
|
return {}
|
|
if not p.exists():
|
|
return {"exists": False}
|
|
text = read_text(path)
|
|
metrics: dict[str, MetricValue] = {
|
|
"exists": True,
|
|
"bytes": p.stat().st_size,
|
|
"has_h264": "H264/90000" in text,
|
|
"has_h265": ("H265/90000" in text) or ("HEVC/90000" in text),
|
|
}
|
|
m = re.search(r"m=video\s+\d+\s+RTP/AVP\s+(\d+)", text)
|
|
if m:
|
|
metrics["payload_type"] = int(m.group(1))
|
|
return metrics
|
|
|
|
|
|
def parse_exit_code(value: str) -> int:
|
|
try:
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return -1
|
|
|
|
|
|
def parse_duration_ms(value: str) -> int:
|
|
try:
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return 0
|
|
|
|
|
|
MANIFEST_FIELDS = (
|
|
"order",
|
|
"row_id",
|
|
"name",
|
|
"protocol",
|
|
"codec",
|
|
"rtmp_mode",
|
|
"status",
|
|
"reason",
|
|
"duration_ms",
|
|
"sim_rc",
|
|
"streamer_rc",
|
|
"tester_rc",
|
|
"sim_log",
|
|
"streamer_log",
|
|
"tester_log",
|
|
"sdp_path",
|
|
)
|
|
|
|
|
|
def parse_manifest(path: str) -> list[dict[str, str]]:
|
|
rows: list[dict[str, str]] = []
|
|
with open(path, "r", encoding="utf-8", newline="") as handle:
|
|
reader = csv.DictReader(handle, delimiter="\t")
|
|
for raw_row in reader:
|
|
row: dict[str, str] = {}
|
|
for field in MANIFEST_FIELDS:
|
|
value = raw_row.get(field, "")
|
|
row[field] = "" if value is None else str(value)
|
|
rows.append(row)
|
|
return rows
|
|
|
|
|
|
def build_summary(args: CliArgs) -> dict[str, object]:
|
|
manifest_rows = parse_manifest(args.manifest)
|
|
rows: list[dict[str, object]] = []
|
|
|
|
for row in sorted(manifest_rows, key=lambda item: int(item["order"])):
|
|
streamer_log = row["streamer_log"]
|
|
tester_log = row["tester_log"]
|
|
sim_log = row["sim_log"]
|
|
sdp_path = row.get("sdp_path", "")
|
|
|
|
streamer_text = read_text(streamer_log)
|
|
tester_text = read_text(tester_log)
|
|
|
|
tester_metrics: dict[str, MetricValue]
|
|
if row["protocol"] == "rtp":
|
|
tester_metrics = parse_rtp_tester_metrics(tester_text)
|
|
else:
|
|
tester_metrics = parse_rtmp_tester_metrics(tester_text)
|
|
|
|
metrics: dict[str, object] = {
|
|
"tester": tester_metrics,
|
|
"streamer": parse_streamer_metrics(streamer_text),
|
|
}
|
|
if row["protocol"] == "rtp":
|
|
metrics["sdp"] = parse_sdp_metrics(sdp_path)
|
|
|
|
rows.append(
|
|
{
|
|
"order": int(row["order"]),
|
|
"id": row["row_id"],
|
|
"name": row["name"],
|
|
"protocol": row["protocol"],
|
|
"codec": row["codec"],
|
|
"rtmp_mode": row["rtmp_mode"] if row["rtmp_mode"] else None,
|
|
"status": row["status"],
|
|
"reason": row["reason"],
|
|
"duration_ms": parse_duration_ms(row["duration_ms"]),
|
|
"exit_codes": {
|
|
"sim": parse_exit_code(row["sim_rc"]),
|
|
"streamer": parse_exit_code(row["streamer_rc"]),
|
|
"tester": parse_exit_code(row["tester_rc"]),
|
|
},
|
|
"metrics": metrics,
|
|
"evidence": {
|
|
"sim_log": sim_log,
|
|
"streamer_log": streamer_log,
|
|
"tester_log": tester_log,
|
|
"sdp": sdp_path if sdp_path else None,
|
|
},
|
|
}
|
|
)
|
|
|
|
pass_count = sum(1 for row in rows if row["status"] == "PASS")
|
|
fail_count = sum(1 for row in rows if row["status"] == "FAIL")
|
|
skip_count = sum(1 for row in rows if row["status"] == "SKIP")
|
|
|
|
all_pass = (
|
|
len(rows) == 5 and pass_count == 5 and fail_count == 0 and skip_count == 0
|
|
)
|
|
|
|
return {
|
|
"run_id": args.run_id,
|
|
"run_dir": args.run_dir,
|
|
"started_at": args.started_at,
|
|
"finished_at": args.finished_at,
|
|
"counts": {
|
|
"total": len(rows),
|
|
"pass": pass_count,
|
|
"fail": fail_count,
|
|
"skip": skip_count,
|
|
},
|
|
"all_pass": all_pass,
|
|
"recommended_exit_code": 0 if all_pass else 1,
|
|
"rows": rows,
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
output_path = Path(args.output)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
summary = build_summary(args)
|
|
_ = output_path.write_text(
|
|
json.dumps(summary, indent=2, sort_keys=False) + "\n", encoding="utf-8"
|
|
)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|