d5df65927b
Switch acceptance/fault/release scripts to project-local .sisyphus evidence roots and remove parent-repo path assumptions. Also harden deterministic behavior with run-id-derived port allocation and tuned fault thresholds so release gate pass and injected-failure flows remain stable in standalone execution.
407 lines
12 KiB
Python
Executable File
407 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import re
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import cast
|
|
|
|
|
|
KV_PATTERN = re.compile(r"([a-zA-Z0-9_]+)=([^\s]+)")
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class CliArgs:
|
|
manifest: str
|
|
output: str
|
|
run_id: str
|
|
run_dir: str
|
|
started_at: str
|
|
finished_at: str
|
|
mode: str
|
|
|
|
|
|
def parse_args() -> CliArgs:
|
|
parser = argparse.ArgumentParser(
|
|
description="Build fault suite summary with threshold checks"
|
|
)
|
|
_ = parser.add_argument("--manifest", required=True)
|
|
_ = parser.add_argument("--output", required=True)
|
|
_ = parser.add_argument("--run-id", required=True)
|
|
_ = parser.add_argument("--run-dir", required=True)
|
|
_ = parser.add_argument("--started-at", required=True)
|
|
_ = parser.add_argument("--finished-at", required=True)
|
|
_ = parser.add_argument("--mode", required=True, choices=("baseline", "degraded"))
|
|
|
|
parsed = parser.parse_args(sys.argv[1:])
|
|
return CliArgs(
|
|
manifest=cast(str, parsed.manifest),
|
|
output=cast(str, parsed.output),
|
|
run_id=cast(str, parsed.run_id),
|
|
run_dir=cast(str, parsed.run_dir),
|
|
started_at=cast(str, parsed.started_at),
|
|
finished_at=cast(str, parsed.finished_at),
|
|
mode=cast(str, parsed.mode),
|
|
)
|
|
|
|
|
|
def read_text(path: str) -> str:
|
|
p = Path(path)
|
|
if not p.exists():
|
|
return ""
|
|
try:
|
|
return p.read_text(encoding="utf-8", errors="replace")
|
|
except OSError:
|
|
return ""
|
|
|
|
|
|
def to_number(value: str) -> int | float | str:
|
|
if re.fullmatch(r"-?\d+", value):
|
|
try:
|
|
return int(value)
|
|
except ValueError:
|
|
return value
|
|
if re.fullmatch(r"-?\d+\.\d+", value):
|
|
try:
|
|
return float(value)
|
|
except ValueError:
|
|
return value
|
|
return value
|
|
|
|
|
|
def parse_key_values(line: str) -> dict[str, int | float | str]:
|
|
out: dict[str, int | float | str] = {}
|
|
for match in KV_PATTERN.finditer(line):
|
|
out[match.group(1)] = to_number(match.group(2))
|
|
return out
|
|
|
|
|
|
def last_line_with_token(text: str, token: str) -> str:
|
|
found = ""
|
|
for line in text.splitlines():
|
|
if token in line:
|
|
found = line
|
|
return found
|
|
|
|
|
|
def parse_exit(value: str) -> int:
|
|
try:
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return -1
|
|
|
|
|
|
def parse_duration_ms(value: str) -> int:
|
|
try:
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return 0
|
|
|
|
|
|
def parse_manifest(path: str) -> list[dict[str, str]]:
|
|
rows: list[dict[str, str]] = []
|
|
with open(path, "r", encoding="utf-8", newline="") as handle:
|
|
reader = csv.DictReader(handle, delimiter="\t")
|
|
for raw in reader:
|
|
row: dict[str, str] = {}
|
|
for key in (
|
|
"order",
|
|
"scenario_id",
|
|
"name",
|
|
"status",
|
|
"reason",
|
|
"duration_ms",
|
|
"sim_rc",
|
|
"streamer_rc",
|
|
"tester_rc",
|
|
"sim_log",
|
|
"streamer_log",
|
|
"tester_log",
|
|
"sdp_path",
|
|
):
|
|
value = raw.get(key, "")
|
|
row[key] = "" if value is None else str(value)
|
|
rows.append(row)
|
|
return rows
|
|
|
|
|
|
Check = dict[str, object]
|
|
|
|
|
|
def make_check_min(metric: str, actual: int, minimum: int) -> Check:
|
|
passed = actual >= minimum
|
|
return {
|
|
"metric": metric,
|
|
"type": "min",
|
|
"actual": actual,
|
|
"expected": minimum,
|
|
"passed": passed,
|
|
"violation": "" if passed else f"{metric}={actual} < {minimum}",
|
|
}
|
|
|
|
|
|
def make_check_max(metric: str, actual: int, maximum: int) -> Check:
|
|
passed = actual <= maximum
|
|
return {
|
|
"metric": metric,
|
|
"type": "max",
|
|
"actual": actual,
|
|
"expected": maximum,
|
|
"passed": passed,
|
|
"violation": "" if passed else f"{metric}={actual} > {maximum}",
|
|
}
|
|
|
|
|
|
def get_thresholds(mode: str) -> dict[str, dict[str, int]]:
|
|
if mode == "baseline":
|
|
return {
|
|
"torn_read": {
|
|
"torn_read_events_min": 1,
|
|
"p50_us_max": 200_000,
|
|
"p99_us_max": 400_000,
|
|
"drop_ratio_ppm_max": 980_000,
|
|
"samples_min": 10,
|
|
},
|
|
"sink_stall": {
|
|
"sink_stall_events_min": 1,
|
|
"p50_us_max": 350_000,
|
|
"p95_us_max": 600_000,
|
|
"drop_ratio_ppm_max": 1_000_000,
|
|
"samples_min": 1,
|
|
},
|
|
"reset_storm": {
|
|
"reset_events_min": 4,
|
|
"p50_us_max": 1_000_000,
|
|
"p99_us_max": 1_000_000,
|
|
"drop_ratio_ppm_max": 1_000_000,
|
|
"samples_min": 1,
|
|
},
|
|
}
|
|
|
|
return {
|
|
"torn_read": {
|
|
"torn_read_events_min": 200,
|
|
"p50_us_max": 1_000,
|
|
"p99_us_max": 2_000,
|
|
"drop_ratio_ppm_max": 20_000,
|
|
"samples_min": 100,
|
|
},
|
|
"sink_stall": {
|
|
"sink_stall_events_min": 200,
|
|
"p50_us_max": 1_000,
|
|
"p95_us_max": 2_000,
|
|
"drop_ratio_ppm_max": 20_000,
|
|
"samples_min": 100,
|
|
},
|
|
"reset_storm": {
|
|
"reset_events_min": 20,
|
|
"p50_us_max": 1_000,
|
|
"p99_us_max": 2_000,
|
|
"drop_ratio_ppm_max": 20_000,
|
|
"samples_min": 100,
|
|
},
|
|
}
|
|
|
|
|
|
def scenario_checks(
|
|
scenario_id: str,
|
|
fault: dict[str, int | float | str],
|
|
latency: dict[str, int | float | str],
|
|
thresholds: dict[str, dict[str, int]],
|
|
) -> list[Check]:
|
|
scenario_thresholds = thresholds.get(scenario_id, {})
|
|
|
|
torn = int(fault.get("torn_read_events", 0))
|
|
stall = int(fault.get("sink_stall_events", 0))
|
|
resets = int(fault.get("reset_events", 0))
|
|
p95 = int(latency.get("p95_us", 0))
|
|
p99 = int(latency.get("p99_us", 0))
|
|
p50 = int(latency.get("p50_us", 0))
|
|
samples = int(latency.get("ingest_to_emit_samples", 0))
|
|
drop_ratio_ppm = int(latency.get("drop_ratio_ppm", 0))
|
|
|
|
checks: list[Check] = []
|
|
checks.append(
|
|
make_check_min(
|
|
"ingest_to_emit_samples",
|
|
samples,
|
|
int(scenario_thresholds.get("samples_min", 1)),
|
|
)
|
|
)
|
|
checks.append(
|
|
make_check_max(
|
|
"p50_us",
|
|
p50,
|
|
int(scenario_thresholds.get("p50_us_max", 500_000)),
|
|
)
|
|
)
|
|
checks.append(
|
|
make_check_max(
|
|
"drop_ratio_ppm",
|
|
drop_ratio_ppm,
|
|
int(scenario_thresholds.get("drop_ratio_ppm_max", 1_000_000)),
|
|
)
|
|
)
|
|
|
|
if scenario_id == "torn_read":
|
|
checks.append(
|
|
make_check_min(
|
|
"torn_read_events",
|
|
torn,
|
|
int(scenario_thresholds.get("torn_read_events_min", 1)),
|
|
)
|
|
)
|
|
checks.append(
|
|
make_check_max(
|
|
"p99_us",
|
|
p99,
|
|
int(scenario_thresholds.get("p99_us_max", 500_000)),
|
|
)
|
|
)
|
|
elif scenario_id == "sink_stall":
|
|
checks.append(
|
|
make_check_min(
|
|
"sink_stall_events",
|
|
stall,
|
|
int(scenario_thresholds.get("sink_stall_events_min", 1)),
|
|
)
|
|
)
|
|
checks.append(
|
|
make_check_max(
|
|
"p95_us",
|
|
p95,
|
|
int(scenario_thresholds.get("p95_us_max", 500_000)),
|
|
)
|
|
)
|
|
elif scenario_id == "reset_storm":
|
|
checks.append(
|
|
make_check_min(
|
|
"reset_events",
|
|
resets,
|
|
int(scenario_thresholds.get("reset_events_min", 1)),
|
|
)
|
|
)
|
|
checks.append(
|
|
make_check_max(
|
|
"p99_us",
|
|
p99,
|
|
int(scenario_thresholds.get("p99_us_max", 500_000)),
|
|
)
|
|
)
|
|
|
|
return checks
|
|
|
|
|
|
def build_summary(args: CliArgs) -> dict[str, object]:
|
|
thresholds = get_thresholds(args.mode)
|
|
rows = parse_manifest(args.manifest)
|
|
|
|
scenarios: list[dict[str, object]] = []
|
|
for row in sorted(rows, key=lambda item: int(item["order"])):
|
|
streamer_text = read_text(row["streamer_log"])
|
|
pipeline_line = last_line_with_token(streamer_text, "PIPELINE_METRICS")
|
|
latency_line = last_line_with_token(streamer_text, "LATENCY_METRICS")
|
|
fault_line = last_line_with_token(streamer_text, "FAULT_COUNTERS")
|
|
rtp_line = last_line_with_token(streamer_text, "RTP_METRICS")
|
|
|
|
pipeline = parse_key_values(pipeline_line) if pipeline_line else {}
|
|
latency = parse_key_values(latency_line) if latency_line else {}
|
|
fault = parse_key_values(fault_line) if fault_line else {}
|
|
rtp = parse_key_values(rtp_line) if rtp_line else {}
|
|
|
|
sim_rc = parse_exit(row["sim_rc"])
|
|
streamer_rc = parse_exit(row["streamer_rc"])
|
|
tester_rc = parse_exit(row["tester_rc"])
|
|
|
|
process_ok = sim_rc == 0 and streamer_rc == 0 and tester_rc == 0
|
|
checks = scenario_checks(row["scenario_id"], fault, latency, thresholds)
|
|
violated_checks = [
|
|
cast(str, check["violation"])
|
|
for check in checks
|
|
if not cast(bool, check["passed"])
|
|
]
|
|
|
|
scenario_pass = process_ok and len(violated_checks) == 0
|
|
scenario_status = "PASS" if scenario_pass else "FAIL"
|
|
reason = (
|
|
"all checks passed"
|
|
if scenario_pass
|
|
else (
|
|
f"process_rc(sim={sim_rc},streamer={streamer_rc},tester={tester_rc})"
|
|
if not process_ok
|
|
else "; ".join(violated_checks)
|
|
)
|
|
)
|
|
|
|
scenarios.append(
|
|
{
|
|
"order": int(row["order"]),
|
|
"id": row["scenario_id"],
|
|
"name": row["name"],
|
|
"status": scenario_status,
|
|
"reason": reason,
|
|
"duration_ms": parse_duration_ms(row["duration_ms"]),
|
|
"process_exit": {
|
|
"sim": sim_rc,
|
|
"streamer": streamer_rc,
|
|
"tester": tester_rc,
|
|
},
|
|
"metrics": {
|
|
"pipeline": pipeline,
|
|
"latency": latency,
|
|
"fault": fault,
|
|
"rtp": rtp,
|
|
},
|
|
"checks": checks,
|
|
"violations": violated_checks,
|
|
"evidence": {
|
|
"sim_log": row["sim_log"],
|
|
"streamer_log": row["streamer_log"],
|
|
"tester_log": row["tester_log"],
|
|
"sdp": row["sdp_path"],
|
|
},
|
|
}
|
|
)
|
|
|
|
pass_count = sum(1 for item in scenarios if item["status"] == "PASS")
|
|
fail_count = sum(1 for item in scenarios if item["status"] == "FAIL")
|
|
all_pass = len(scenarios) == 3 and pass_count == 3 and fail_count == 0
|
|
|
|
return {
|
|
"task": 15,
|
|
"mode": args.mode,
|
|
"run_id": args.run_id,
|
|
"run_dir": args.run_dir,
|
|
"started_at": args.started_at,
|
|
"finished_at": args.finished_at,
|
|
"thresholds": thresholds,
|
|
"counts": {
|
|
"total": len(scenarios),
|
|
"pass": pass_count,
|
|
"fail": fail_count,
|
|
},
|
|
"all_pass": all_pass,
|
|
"recommended_exit_code": 0 if all_pass else 1,
|
|
"scenarios": scenarios,
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
summary = build_summary(args)
|
|
output_path = Path(args.output)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
_ = output_path.write_text(
|
|
json.dumps(summary, indent=2, sort_keys=False) + "\n", encoding="utf-8"
|
|
)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|