Files
cvmmap-streamer/scripts/fault_summary_helper.py
T
crosstyan 991f7ded34 feat(test): add downstream acceptance and fault harness artifacts
This commit packages the standalone task-14 acceptance and task-15 fault-suite execution toolchain for downstream validation.

It includes all runnable harness scripts, helper utilities, and generated evidence captures so downstream behavior can be reproduced and reviewed independently from docs and core implementation.

Bundling these assets separately allows QA/automation workflows to validate runtime changes without dragging operational notes or release-gate documentation into the same review unit.

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
2026-03-05 20:32:12 +08:00

407 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import csv
import json
import re
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import cast
KV_PATTERN = re.compile(r"([a-zA-Z0-9_]+)=([^\s]+)")
@dataclass(frozen=True)
class CliArgs:
manifest: str
output: str
run_id: str
run_dir: str
started_at: str
finished_at: str
mode: str
def parse_args() -> CliArgs:
parser = argparse.ArgumentParser(
description="Build fault suite summary with threshold checks"
)
_ = parser.add_argument("--manifest", required=True)
_ = parser.add_argument("--output", required=True)
_ = parser.add_argument("--run-id", required=True)
_ = parser.add_argument("--run-dir", required=True)
_ = parser.add_argument("--started-at", required=True)
_ = parser.add_argument("--finished-at", required=True)
_ = parser.add_argument("--mode", required=True, choices=("baseline", "degraded"))
parsed = parser.parse_args(sys.argv[1:])
return CliArgs(
manifest=cast(str, parsed.manifest),
output=cast(str, parsed.output),
run_id=cast(str, parsed.run_id),
run_dir=cast(str, parsed.run_dir),
started_at=cast(str, parsed.started_at),
finished_at=cast(str, parsed.finished_at),
mode=cast(str, parsed.mode),
)
def read_text(path: str) -> str:
p = Path(path)
if not p.exists():
return ""
try:
return p.read_text(encoding="utf-8", errors="replace")
except OSError:
return ""
def to_number(value: str) -> int | float | str:
if re.fullmatch(r"-?\d+", value):
try:
return int(value)
except ValueError:
return value
if re.fullmatch(r"-?\d+\.\d+", value):
try:
return float(value)
except ValueError:
return value
return value
def parse_key_values(line: str) -> dict[str, int | float | str]:
out: dict[str, int | float | str] = {}
for match in KV_PATTERN.finditer(line):
out[match.group(1)] = to_number(match.group(2))
return out
def last_line_with_token(text: str, token: str) -> str:
found = ""
for line in text.splitlines():
if token in line:
found = line
return found
def parse_exit(value: str) -> int:
try:
return int(value)
except (TypeError, ValueError):
return -1
def parse_duration_ms(value: str) -> int:
try:
return int(value)
except (TypeError, ValueError):
return 0
def parse_manifest(path: str) -> list[dict[str, str]]:
rows: list[dict[str, str]] = []
with open(path, "r", encoding="utf-8", newline="") as handle:
reader = csv.DictReader(handle, delimiter="\t")
for raw in reader:
row: dict[str, str] = {}
for key in (
"order",
"scenario_id",
"name",
"status",
"reason",
"duration_ms",
"sim_rc",
"streamer_rc",
"tester_rc",
"sim_log",
"streamer_log",
"tester_log",
"sdp_path",
):
value = raw.get(key, "")
row[key] = "" if value is None else str(value)
rows.append(row)
return rows
Check = dict[str, object]
def make_check_min(metric: str, actual: int, minimum: int) -> Check:
passed = actual >= minimum
return {
"metric": metric,
"type": "min",
"actual": actual,
"expected": minimum,
"passed": passed,
"violation": "" if passed else f"{metric}={actual} < {minimum}",
}
def make_check_max(metric: str, actual: int, maximum: int) -> Check:
passed = actual <= maximum
return {
"metric": metric,
"type": "max",
"actual": actual,
"expected": maximum,
"passed": passed,
"violation": "" if passed else f"{metric}={actual} > {maximum}",
}
def get_thresholds(mode: str) -> dict[str, dict[str, int]]:
if mode == "baseline":
return {
"torn_read": {
"torn_read_events_min": 1,
"p50_us_max": 150_000,
"p99_us_max": 250_000,
"drop_ratio_ppm_max": 980_000,
"samples_min": 10,
},
"sink_stall": {
"sink_stall_events_min": 1,
"p50_us_max": 350_000,
"p95_us_max": 600_000,
"drop_ratio_ppm_max": 980_000,
"samples_min": 10,
},
"reset_storm": {
"reset_events_min": 4,
"p50_us_max": 800_000,
"p99_us_max": 1_000_000,
"drop_ratio_ppm_max": 1_000_000,
"samples_min": 1,
},
}
return {
"torn_read": {
"torn_read_events_min": 200,
"p50_us_max": 1_000,
"p99_us_max": 2_000,
"drop_ratio_ppm_max": 20_000,
"samples_min": 100,
},
"sink_stall": {
"sink_stall_events_min": 200,
"p50_us_max": 1_000,
"p95_us_max": 2_000,
"drop_ratio_ppm_max": 20_000,
"samples_min": 100,
},
"reset_storm": {
"reset_events_min": 20,
"p50_us_max": 1_000,
"p99_us_max": 2_000,
"drop_ratio_ppm_max": 20_000,
"samples_min": 100,
},
}
def scenario_checks(
scenario_id: str,
fault: dict[str, int | float | str],
latency: dict[str, int | float | str],
thresholds: dict[str, dict[str, int]],
) -> list[Check]:
scenario_thresholds = thresholds.get(scenario_id, {})
torn = int(fault.get("torn_read_events", 0))
stall = int(fault.get("sink_stall_events", 0))
resets = int(fault.get("reset_events", 0))
p95 = int(latency.get("p95_us", 0))
p99 = int(latency.get("p99_us", 0))
p50 = int(latency.get("p50_us", 0))
samples = int(latency.get("ingest_to_emit_samples", 0))
drop_ratio_ppm = int(latency.get("drop_ratio_ppm", 0))
checks: list[Check] = []
checks.append(
make_check_min(
"ingest_to_emit_samples",
samples,
int(scenario_thresholds.get("samples_min", 1)),
)
)
checks.append(
make_check_max(
"p50_us",
p50,
int(scenario_thresholds.get("p50_us_max", 500_000)),
)
)
checks.append(
make_check_max(
"drop_ratio_ppm",
drop_ratio_ppm,
int(scenario_thresholds.get("drop_ratio_ppm_max", 1_000_000)),
)
)
if scenario_id == "torn_read":
checks.append(
make_check_min(
"torn_read_events",
torn,
int(scenario_thresholds.get("torn_read_events_min", 1)),
)
)
checks.append(
make_check_max(
"p99_us",
p99,
int(scenario_thresholds.get("p99_us_max", 500_000)),
)
)
elif scenario_id == "sink_stall":
checks.append(
make_check_min(
"sink_stall_events",
stall,
int(scenario_thresholds.get("sink_stall_events_min", 1)),
)
)
checks.append(
make_check_max(
"p95_us",
p95,
int(scenario_thresholds.get("p95_us_max", 500_000)),
)
)
elif scenario_id == "reset_storm":
checks.append(
make_check_min(
"reset_events",
resets,
int(scenario_thresholds.get("reset_events_min", 1)),
)
)
checks.append(
make_check_max(
"p99_us",
p99,
int(scenario_thresholds.get("p99_us_max", 500_000)),
)
)
return checks
def build_summary(args: CliArgs) -> dict[str, object]:
thresholds = get_thresholds(args.mode)
rows = parse_manifest(args.manifest)
scenarios: list[dict[str, object]] = []
for row in sorted(rows, key=lambda item: int(item["order"])):
streamer_text = read_text(row["streamer_log"])
pipeline_line = last_line_with_token(streamer_text, "PIPELINE_METRICS")
latency_line = last_line_with_token(streamer_text, "LATENCY_METRICS")
fault_line = last_line_with_token(streamer_text, "FAULT_COUNTERS")
rtp_line = last_line_with_token(streamer_text, "RTP_METRICS")
pipeline = parse_key_values(pipeline_line) if pipeline_line else {}
latency = parse_key_values(latency_line) if latency_line else {}
fault = parse_key_values(fault_line) if fault_line else {}
rtp = parse_key_values(rtp_line) if rtp_line else {}
sim_rc = parse_exit(row["sim_rc"])
streamer_rc = parse_exit(row["streamer_rc"])
tester_rc = parse_exit(row["tester_rc"])
process_ok = sim_rc == 0 and streamer_rc == 0 and tester_rc == 0
checks = scenario_checks(row["scenario_id"], fault, latency, thresholds)
violated_checks = [
cast(str, check["violation"])
for check in checks
if not cast(bool, check["passed"])
]
scenario_pass = process_ok and len(violated_checks) == 0
scenario_status = "PASS" if scenario_pass else "FAIL"
reason = (
"all checks passed"
if scenario_pass
else (
f"process_rc(sim={sim_rc},streamer={streamer_rc},tester={tester_rc})"
if not process_ok
else "; ".join(violated_checks)
)
)
scenarios.append(
{
"order": int(row["order"]),
"id": row["scenario_id"],
"name": row["name"],
"status": scenario_status,
"reason": reason,
"duration_ms": parse_duration_ms(row["duration_ms"]),
"process_exit": {
"sim": sim_rc,
"streamer": streamer_rc,
"tester": tester_rc,
},
"metrics": {
"pipeline": pipeline,
"latency": latency,
"fault": fault,
"rtp": rtp,
},
"checks": checks,
"violations": violated_checks,
"evidence": {
"sim_log": row["sim_log"],
"streamer_log": row["streamer_log"],
"tester_log": row["tester_log"],
"sdp": row["sdp_path"],
},
}
)
pass_count = sum(1 for item in scenarios if item["status"] == "PASS")
fail_count = sum(1 for item in scenarios if item["status"] == "FAIL")
all_pass = len(scenarios) == 3 and pass_count == 3 and fail_count == 0
return {
"task": 15,
"mode": args.mode,
"run_id": args.run_id,
"run_dir": args.run_dir,
"started_at": args.started_at,
"finished_at": args.finished_at,
"thresholds": thresholds,
"counts": {
"total": len(scenarios),
"pass": pass_count,
"fail": fail_count,
},
"all_pass": all_pass,
"recommended_exit_code": 0 if all_pass else 1,
"scenarios": scenarios,
}
def main() -> int:
args = parse_args()
summary = build_summary(args)
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
_ = output_path.write_text(
json.dumps(summary, indent=2, sort_keys=False) + "\n", encoding="utf-8"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())