feat(test): add downstream acceptance and fault harness artifacts

This commit packages the standalone task-14 acceptance and task-15 fault-suite execution toolchain for downstream validation.

It includes all runnable harness scripts, helper utilities, and generated evidence captures so downstream behavior can be reproduced and reviewed independently from docs and core implementation.

Bundling these assets separately allows QA/automation workflows to validate runtime changes without dragging operational notes or release-gate documentation into the same review unit.

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
2026-03-05 20:32:12 +08:00
parent 56e874ab6d
commit 991f7ded34
6 changed files with 1894 additions and 0 deletions
+334
View File
@@ -0,0 +1,334 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import csv
import json
import re
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import cast
MetricValue = int | float | str | bool
KV_PATTERN = re.compile(r"([a-zA-Z_]+)=([^\s]+)")
@dataclass(frozen=True)
class CliArgs:
manifest: str
output: str
run_id: str
run_dir: str
started_at: str
finished_at: str
def parse_args() -> CliArgs:
parser = argparse.ArgumentParser(
description="Build JSON summary for standalone acceptance matrix"
)
_ = parser.add_argument(
"--manifest", required=True, help="TSV manifest produced by acceptance runner"
)
_ = parser.add_argument("--output", required=True, help="Output JSON summary path")
_ = parser.add_argument("--run-id", required=True)
_ = parser.add_argument("--run-dir", required=True)
_ = parser.add_argument("--started-at", required=True)
_ = parser.add_argument("--finished-at", required=True)
parsed = parser.parse_args(sys.argv[1:])
manifest = cast(str, parsed.manifest)
output = cast(str, parsed.output)
run_id = cast(str, parsed.run_id)
run_dir = cast(str, parsed.run_dir)
started_at = cast(str, parsed.started_at)
finished_at = cast(str, parsed.finished_at)
return CliArgs(
manifest=manifest,
output=output,
run_id=run_id,
run_dir=run_dir,
started_at=started_at,
finished_at=finished_at,
)
def read_text(path: str) -> str:
p = Path(path)
if not p.exists():
return ""
try:
return p.read_text(encoding="utf-8", errors="replace")
except OSError:
return ""
def to_number(value: str) -> MetricValue:
if re.fullmatch(r"-?\d+", value):
try:
return int(value)
except ValueError:
return value
if re.fullmatch(r"-?\d+\.\d+", value):
try:
return float(value)
except ValueError:
return value
return value
def parse_key_value_metrics(line: str) -> dict[str, MetricValue]:
metrics: dict[str, MetricValue] = {}
for match in KV_PATTERN.finditer(line):
key = match.group(1)
raw = match.group(2)
metrics[key] = to_number(raw)
return metrics
def extract_last_matching_line(text: str, token: str) -> str:
match = ""
for line in text.splitlines():
if token in line:
match = line
return match
def parse_rtp_tester_metrics(text: str) -> dict[str, MetricValue]:
metrics: dict[str, MetricValue] = {}
patterns = {
"packets_received": r"Packets received:\s*(\d+)",
"sequence_gaps": r"Sequence gaps:\s*(\d+)",
"invalid_packets": r"Invalid packets:\s*(\d+)",
"detected_payload_type": r"Detected payload type:\s*(\d+)",
}
for key, pattern in patterns.items():
m = re.search(pattern, text)
if m:
metrics[key] = int(m.group(1))
return metrics
def parse_rtmp_tester_metrics(text: str) -> dict[str, MetricValue]:
metrics: dict[str, MetricValue] = {}
messages = re.search(
r"Messages:\s*total=(\d+),\s*audio=(\d+),\s*video=(\d+),\s*data=(\d+),\s*chunk-size-updates=(\d+)",
text,
)
if messages:
metrics.update(
{
"messages_total": int(messages.group(1)),
"messages_audio": int(messages.group(2)),
"messages_video": int(messages.group(3)),
"messages_data": int(messages.group(4)),
"chunk_size_updates": int(messages.group(5)),
}
)
counts = re.search(
r"Video signaling counts:\s*h264=(\d+),\s*h265-enhanced=(\d+),\s*h265-domestic=(\d+),\s*unknown=(\d+)",
text,
)
if counts:
metrics.update(
{
"h264_video_messages": int(counts.group(1)),
"h265_enhanced_video_messages": int(counts.group(2)),
"h265_domestic_video_messages": int(counts.group(3)),
"unknown_video_messages": int(counts.group(4)),
}
)
matching = re.search(
r"Matching count for expected mode:\s*(\d+)\s*\(threshold=(\d+)\)", text
)
if matching:
metrics.update(
{
"matching_count": int(matching.group(1)),
"matching_threshold": int(matching.group(2)),
}
)
return metrics
def parse_streamer_metrics(text: str) -> dict[str, dict[str, MetricValue]]:
result: dict[str, dict[str, MetricValue]] = {}
for token, key in (
("PIPELINE_METRICS", "pipeline"),
("RTP_METRICS", "rtp"),
("RTMP_METRICS", "rtmp"),
):
line = extract_last_matching_line(text, token)
if line:
result[key] = parse_key_value_metrics(line)
return result
def parse_sdp_metrics(path: str) -> dict[str, MetricValue]:
p = Path(path)
if not path:
return {}
if not p.exists():
return {"exists": False}
text = read_text(path)
metrics: dict[str, MetricValue] = {
"exists": True,
"bytes": p.stat().st_size,
"has_h264": "H264/90000" in text,
"has_h265": ("H265/90000" in text) or ("HEVC/90000" in text),
}
m = re.search(r"m=video\s+\d+\s+RTP/AVP\s+(\d+)", text)
if m:
metrics["payload_type"] = int(m.group(1))
return metrics
def parse_exit_code(value: str) -> int:
try:
return int(value)
except (TypeError, ValueError):
return -1
def parse_duration_ms(value: str) -> int:
try:
return int(value)
except (TypeError, ValueError):
return 0
MANIFEST_FIELDS = (
"order",
"row_id",
"name",
"protocol",
"codec",
"rtmp_mode",
"status",
"reason",
"duration_ms",
"sim_rc",
"streamer_rc",
"tester_rc",
"sim_log",
"streamer_log",
"tester_log",
"sdp_path",
)
def parse_manifest(path: str) -> list[dict[str, str]]:
rows: list[dict[str, str]] = []
with open(path, "r", encoding="utf-8", newline="") as handle:
reader = csv.DictReader(handle, delimiter="\t")
for raw_row in reader:
row: dict[str, str] = {}
for field in MANIFEST_FIELDS:
value = raw_row.get(field, "")
row[field] = "" if value is None else str(value)
rows.append(row)
return rows
def build_summary(args: CliArgs) -> dict[str, object]:
manifest_rows = parse_manifest(args.manifest)
rows: list[dict[str, object]] = []
for row in sorted(manifest_rows, key=lambda item: int(item["order"])):
streamer_log = row["streamer_log"]
tester_log = row["tester_log"]
sim_log = row["sim_log"]
sdp_path = row.get("sdp_path", "")
streamer_text = read_text(streamer_log)
tester_text = read_text(tester_log)
tester_metrics: dict[str, MetricValue]
if row["protocol"] == "rtp":
tester_metrics = parse_rtp_tester_metrics(tester_text)
else:
tester_metrics = parse_rtmp_tester_metrics(tester_text)
metrics: dict[str, object] = {
"tester": tester_metrics,
"streamer": parse_streamer_metrics(streamer_text),
}
if row["protocol"] == "rtp":
metrics["sdp"] = parse_sdp_metrics(sdp_path)
rows.append(
{
"order": int(row["order"]),
"id": row["row_id"],
"name": row["name"],
"protocol": row["protocol"],
"codec": row["codec"],
"rtmp_mode": row["rtmp_mode"] if row["rtmp_mode"] else None,
"status": row["status"],
"reason": row["reason"],
"duration_ms": parse_duration_ms(row["duration_ms"]),
"exit_codes": {
"sim": parse_exit_code(row["sim_rc"]),
"streamer": parse_exit_code(row["streamer_rc"]),
"tester": parse_exit_code(row["tester_rc"]),
},
"metrics": metrics,
"evidence": {
"sim_log": sim_log,
"streamer_log": streamer_log,
"tester_log": tester_log,
"sdp": sdp_path if sdp_path else None,
},
}
)
pass_count = sum(1 for row in rows if row["status"] == "PASS")
fail_count = sum(1 for row in rows if row["status"] == "FAIL")
skip_count = sum(1 for row in rows if row["status"] == "SKIP")
all_pass = (
len(rows) == 5 and pass_count == 5 and fail_count == 0 and skip_count == 0
)
return {
"run_id": args.run_id,
"run_dir": args.run_dir,
"started_at": args.started_at,
"finished_at": args.finished_at,
"counts": {
"total": len(rows),
"pass": pass_count,
"fail": fail_count,
"skip": skip_count,
},
"all_pass": all_pass,
"recommended_exit_code": 0 if all_pass else 1,
"rows": rows,
}
def main() -> int:
args = parse_args()
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
summary = build_summary(args)
_ = output_path.write_text(
json.dumps(summary, indent=2, sort_keys=False) + "\n", encoding="utf-8"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())