#!/usr/bin/env python3 from __future__ import annotations import argparse import csv import json import re import sys from dataclasses import dataclass from pathlib import Path from typing import cast KV_PATTERN = re.compile(r"([a-zA-Z0-9_]+)=([^\s]+)") @dataclass(frozen=True) class CliArgs: manifest: str output: str run_id: str run_dir: str started_at: str finished_at: str mode: str def parse_args() -> CliArgs: parser = argparse.ArgumentParser( description="Build fault suite summary with threshold checks" ) _ = parser.add_argument("--manifest", required=True) _ = parser.add_argument("--output", required=True) _ = parser.add_argument("--run-id", required=True) _ = parser.add_argument("--run-dir", required=True) _ = parser.add_argument("--started-at", required=True) _ = parser.add_argument("--finished-at", required=True) _ = parser.add_argument("--mode", required=True, choices=("baseline", "degraded")) parsed = parser.parse_args(sys.argv[1:]) return CliArgs( manifest=cast(str, parsed.manifest), output=cast(str, parsed.output), run_id=cast(str, parsed.run_id), run_dir=cast(str, parsed.run_dir), started_at=cast(str, parsed.started_at), finished_at=cast(str, parsed.finished_at), mode=cast(str, parsed.mode), ) def read_text(path: str) -> str: p = Path(path) if not p.exists(): return "" try: return p.read_text(encoding="utf-8", errors="replace") except OSError: return "" def to_number(value: str) -> int | float | str: if re.fullmatch(r"-?\d+", value): try: return int(value) except ValueError: return value if re.fullmatch(r"-?\d+\.\d+", value): try: return float(value) except ValueError: return value return value def parse_key_values(line: str) -> dict[str, int | float | str]: out: dict[str, int | float | str] = {} for match in KV_PATTERN.finditer(line): out[match.group(1)] = to_number(match.group(2)) return out def last_line_with_token(text: str, token: str) -> str: found = "" for line in text.splitlines(): if token in line: found = line return found def parse_exit(value: str) -> int: try: return int(value) except (TypeError, ValueError): return -1 def parse_duration_ms(value: str) -> int: try: return int(value) except (TypeError, ValueError): return 0 def parse_manifest(path: str) -> list[dict[str, str]]: rows: list[dict[str, str]] = [] with open(path, "r", encoding="utf-8", newline="") as handle: reader = csv.DictReader(handle, delimiter="\t") for raw in reader: row: dict[str, str] = {} for key in ( "order", "scenario_id", "name", "status", "reason", "duration_ms", "sim_rc", "streamer_rc", "tester_rc", "sim_log", "streamer_log", "tester_log", "sdp_path", ): value = raw.get(key, "") row[key] = "" if value is None else str(value) rows.append(row) return rows Check = dict[str, object] def make_check_min(metric: str, actual: int, minimum: int) -> Check: passed = actual >= minimum return { "metric": metric, "type": "min", "actual": actual, "expected": minimum, "passed": passed, "violation": "" if passed else f"{metric}={actual} < {minimum}", } def make_check_max(metric: str, actual: int, maximum: int) -> Check: passed = actual <= maximum return { "metric": metric, "type": "max", "actual": actual, "expected": maximum, "passed": passed, "violation": "" if passed else f"{metric}={actual} > {maximum}", } def get_thresholds(mode: str) -> dict[str, dict[str, int]]: if mode == "baseline": return { "torn_read": { "torn_read_events_min": 1, "p50_us_max": 200_000, "p99_us_max": 400_000, "drop_ratio_ppm_max": 980_000, "samples_min": 10, }, "sink_stall": { "sink_stall_events_min": 1, "p50_us_max": 350_000, "p95_us_max": 600_000, "drop_ratio_ppm_max": 1_000_000, "samples_min": 1, }, "reset_storm": { "reset_events_min": 4, "p50_us_max": 1_000_000, "p99_us_max": 1_000_000, "drop_ratio_ppm_max": 1_000_000, "samples_min": 1, }, } return { "torn_read": { "torn_read_events_min": 200, "p50_us_max": 1_000, "p99_us_max": 2_000, "drop_ratio_ppm_max": 20_000, "samples_min": 100, }, "sink_stall": { "sink_stall_events_min": 200, "p50_us_max": 1_000, "p95_us_max": 2_000, "drop_ratio_ppm_max": 20_000, "samples_min": 100, }, "reset_storm": { "reset_events_min": 20, "p50_us_max": 1_000, "p99_us_max": 2_000, "drop_ratio_ppm_max": 20_000, "samples_min": 100, }, } def scenario_checks( scenario_id: str, fault: dict[str, int | float | str], latency: dict[str, int | float | str], thresholds: dict[str, dict[str, int]], ) -> list[Check]: scenario_thresholds = thresholds.get(scenario_id, {}) torn = int(fault.get("torn_read_events", 0)) stall = int(fault.get("sink_stall_events", 0)) resets = int(fault.get("reset_events", 0)) p95 = int(latency.get("p95_us", 0)) p99 = int(latency.get("p99_us", 0)) p50 = int(latency.get("p50_us", 0)) samples = int(latency.get("ingest_to_emit_samples", 0)) drop_ratio_ppm = int(latency.get("drop_ratio_ppm", 0)) checks: list[Check] = [] checks.append( make_check_min( "ingest_to_emit_samples", samples, int(scenario_thresholds.get("samples_min", 1)), ) ) checks.append( make_check_max( "p50_us", p50, int(scenario_thresholds.get("p50_us_max", 500_000)), ) ) checks.append( make_check_max( "drop_ratio_ppm", drop_ratio_ppm, int(scenario_thresholds.get("drop_ratio_ppm_max", 1_000_000)), ) ) if scenario_id == "torn_read": checks.append( make_check_min( "torn_read_events", torn, int(scenario_thresholds.get("torn_read_events_min", 1)), ) ) checks.append( make_check_max( "p99_us", p99, int(scenario_thresholds.get("p99_us_max", 500_000)), ) ) elif scenario_id == "sink_stall": checks.append( make_check_min( "sink_stall_events", stall, int(scenario_thresholds.get("sink_stall_events_min", 1)), ) ) checks.append( make_check_max( "p95_us", p95, int(scenario_thresholds.get("p95_us_max", 500_000)), ) ) elif scenario_id == "reset_storm": checks.append( make_check_min( "reset_events", resets, int(scenario_thresholds.get("reset_events_min", 1)), ) ) checks.append( make_check_max( "p99_us", p99, int(scenario_thresholds.get("p99_us_max", 500_000)), ) ) return checks def build_summary(args: CliArgs) -> dict[str, object]: thresholds = get_thresholds(args.mode) rows = parse_manifest(args.manifest) scenarios: list[dict[str, object]] = [] for row in sorted(rows, key=lambda item: int(item["order"])): streamer_text = read_text(row["streamer_log"]) pipeline_line = last_line_with_token(streamer_text, "PIPELINE_METRICS") latency_line = last_line_with_token(streamer_text, "LATENCY_METRICS") fault_line = last_line_with_token(streamer_text, "FAULT_COUNTERS") rtp_line = last_line_with_token(streamer_text, "RTP_METRICS") pipeline = parse_key_values(pipeline_line) if pipeline_line else {} latency = parse_key_values(latency_line) if latency_line else {} fault = parse_key_values(fault_line) if fault_line else {} rtp = parse_key_values(rtp_line) if rtp_line else {} sim_rc = parse_exit(row["sim_rc"]) streamer_rc = parse_exit(row["streamer_rc"]) tester_rc = parse_exit(row["tester_rc"]) process_ok = sim_rc == 0 and streamer_rc == 0 and tester_rc == 0 checks = scenario_checks(row["scenario_id"], fault, latency, thresholds) violated_checks = [ cast(str, check["violation"]) for check in checks if not cast(bool, check["passed"]) ] scenario_pass = process_ok and len(violated_checks) == 0 scenario_status = "PASS" if scenario_pass else "FAIL" reason = ( "all checks passed" if scenario_pass else ( f"process_rc(sim={sim_rc},streamer={streamer_rc},tester={tester_rc})" if not process_ok else "; ".join(violated_checks) ) ) scenarios.append( { "order": int(row["order"]), "id": row["scenario_id"], "name": row["name"], "status": scenario_status, "reason": reason, "duration_ms": parse_duration_ms(row["duration_ms"]), "process_exit": { "sim": sim_rc, "streamer": streamer_rc, "tester": tester_rc, }, "metrics": { "pipeline": pipeline, "latency": latency, "fault": fault, "rtp": rtp, }, "checks": checks, "violations": violated_checks, "evidence": { "sim_log": row["sim_log"], "streamer_log": row["streamer_log"], "tester_log": row["tester_log"], "sdp": row["sdp_path"], }, } ) pass_count = sum(1 for item in scenarios if item["status"] == "PASS") fail_count = sum(1 for item in scenarios if item["status"] == "FAIL") all_pass = len(scenarios) == 3 and pass_count == 3 and fail_count == 0 return { "task": 15, "mode": args.mode, "run_id": args.run_id, "run_dir": args.run_dir, "started_at": args.started_at, "finished_at": args.finished_at, "thresholds": thresholds, "counts": { "total": len(scenarios), "pass": pass_count, "fail": fail_count, }, "all_pass": all_pass, "recommended_exit_code": 0 if all_pass else 1, "scenarios": scenarios, } def main() -> int: args = parse_args() summary = build_summary(args) output_path = Path(args.output) output_path.parent.mkdir(parents=True, exist_ok=True) _ = output_path.write_text( json.dumps(summary, indent=2, sort_keys=False) + "\n", encoding="utf-8" ) return 0 if __name__ == "__main__": raise SystemExit(main())