refactor(streamer): remove gstreamer and legacy rtmp paths

This commit is contained in:
2026-03-11 16:43:29 +08:00
parent ed3f32ff6e
commit 782af9481c
22 changed files with 817 additions and 3339 deletions
+209 -210
View File
@@ -1,6 +1,6 @@
#!/usr/bin/env bash
set -u -o pipefail
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
STREAMER_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)"
@@ -11,11 +11,10 @@ SUMMARY_HELPER="${SCRIPT_DIR}/acceptance_summary_helper.py"
RUN_ID=""
RUN_DIR=""
MANIFEST_TSV="${RUN_DIR}/rows.tsv"
SUMMARY_JSON="${RUN_DIR}/summary.json"
MANIFEST_TSV=""
SUMMARY_JSON=""
LATEST_SUMMARY_JSON="${EVIDENCE_ROOT}/task-14-acceptance-summary.json"
EVIDENCE_TEXT="${EVIDENCE_ROOT}/task-14-acceptance.txt"
STARTED_AT_UTC="$(date -u +"%Y-%m-%dT%H:%M:%SZ")"
mkdir -p "${TASK_EVIDENCE_DIR}"
@@ -34,7 +33,7 @@ allocate_run_dir() {
return 0
fi
attempts=$((attempts + 1))
sleep 0.01
sleep 0.01
done
echo "failed to allocate unique acceptance run directory" >&2
return 1
@@ -47,7 +46,7 @@ PORT_OFFSET="$((RUN_HASH % 1000))"
RTP_PORT_BASE="$((51040 + PORT_OFFSET))"
RTMP_PORT_BASE="$((19360 + PORT_OFFSET))"
echo -e "order\trow_id\tname\tprotocol\tcodec\trtmp_mode\tstatus\treason\tduration_ms\tsim_rc\tstreamer_rc\ttester_rc\tsim_log\tstreamer_log\ttester_log\tsdp_path" > "${MANIFEST_TSV}"
echo -e "order\trow_id\tname\tprotocol\tcodec\ttransport\tstatus\treason\tduration_ms\temitter_rc\treceiver_rc\temitter_log\treceiver_log\tsdp_path" > "${MANIFEST_TSV}"
cleanup_pids=()
@@ -55,6 +54,7 @@ cleanup_all() {
for pid in "${cleanup_pids[@]:-}"; do
if [[ -n "${pid}" ]] && kill -0 "${pid}" 2>/dev/null; then
kill "${pid}" 2>/dev/null || true
wait "${pid}" 2>/dev/null || true
fi
done
}
@@ -90,179 +90,193 @@ append_manifest_row() {
local name="$3"
local protocol="$4"
local codec="$5"
local rtmp_mode="$6"
local transport="$6"
local status="$7"
local reason="$8"
local duration_ms="$9"
local sim_rc="${10}"
local streamer_rc="${11}"
local tester_rc="${12}"
local sim_log="${13}"
local streamer_log="${14}"
local tester_log="${15}"
local sdp_path="${16}"
local emitter_rc="${10}"
local receiver_rc="${11}"
local emitter_log="${12}"
local receiver_log="${13}"
local sdp_path="${14}"
echo -e "${order}\t${row_id}\t${name}\t${protocol}\t${codec}\t${rtmp_mode}\t${status}\t${reason}\t${duration_ms}\t${sim_rc}\t${streamer_rc}\t${tester_rc}\t${sim_log}\t${streamer_log}\t${tester_log}\t${sdp_path}" >> "${MANIFEST_TSV}"
echo -e "${order}\t${row_id}\t${name}\t${protocol}\t${codec}\t${transport}\t${status}\t${reason}\t${duration_ms}\t${emitter_rc}\t${receiver_rc}\t${emitter_log}\t${receiver_log}\t${sdp_path}" >> "${MANIFEST_TSV}"
}
run_matrix_row() {
run_rtp_row() {
local order="$1"
local row_id="$2"
local name="$3"
local protocol="$4"
local codec="$5"
local rtmp_mode="$6"
local codec="$3"
local row_dir="${RUN_DIR}/${order}-${row_id}"
mkdir -p "${row_dir}"
local sim_log="${row_dir}/sim.log"
local streamer_log="${row_dir}/streamer.log"
local tester_log="${row_dir}/tester.log"
local sdp_path=""
local shm_name="cvmmap_accept_${row_id}_${RUN_ID}"
local zmq_endpoint="ipc:///tmp/cvmmap_accept_${row_id}_${RUN_ID}.ipc"
local sim_label="acc_${order}_${protocol}_${codec}"
local streamer_cmd=(
"${BUILD_DIR}/cvmmap_streamer"
--run-mode pipeline
--codec "${codec}"
--shm-name "${shm_name}"
--zmq-endpoint "${zmq_endpoint}"
--input-mode dummy
--dummy-label "${sim_label}"
--dummy-frames 320
--dummy-fps 200
--dummy-width 640
--dummy-height 360
--dummy-startup-delay-ms 0
--queue-size 1
--gop 30
--b-frames 0
--ingest-max-frames 120
--ingest-idle-timeout-ms 6000
)
local tester_cmd=()
if [[ "${protocol}" == "rtp" ]]; then
local rtp_port
local payload_type
if [[ "${codec}" == "h264" ]]; then
rtp_port="${RTP_PORT_BASE}"
payload_type=96
else
rtp_port="$((RTP_PORT_BASE + 2))"
payload_type=98
fi
sdp_path="${row_dir}/stream.sdp"
streamer_cmd+=(
--rtp
--rtp-endpoint "127.0.0.1:${rtp_port}"
--rtp-payload-type "${payload_type}"
--rtp-sdp "${sdp_path}"
)
tester_cmd=(
"${BUILD_DIR}/rtp_receiver_tester"
--port "${rtp_port}"
--expect-pt "${payload_type}"
--packet-threshold 1
--timeout-ms 10000
)
else
local rtmp_port
local tester_mode
case "${row_id}" in
rtmp_h264)
rtmp_port="${RTMP_PORT_BASE}"
tester_mode="h264"
;;
rtmp_h265_enhanced)
rtmp_port="$((RTMP_PORT_BASE + 2))"
tester_mode="h265-enhanced"
;;
rtmp_h265_domestic)
rtmp_port="$((RTMP_PORT_BASE + 4))"
tester_mode="h265-domestic"
;;
*)
rtmp_port="$((RTMP_PORT_BASE + 6))"
tester_mode="h264"
;;
esac
streamer_cmd+=(
--rtmp
--rtmp-url "rtmp://127.0.0.1:${rtmp_port}/live/${row_id}"
--rtmp-mode "${rtmp_mode}"
)
tester_cmd=(
"${BUILD_DIR}/rtmp_stub_tester"
--mode "${tester_mode}"
--listen-host 127.0.0.1
--listen-port "${rtmp_port}"
--video-threshold 1
--timeout-ms 10000
)
local emitter_log="${row_dir}/rtp_output.log"
local receiver_log="${row_dir}/rtp_receiver.log"
local sdp_path="${row_dir}/stream.sdp"
local port
port="$((RTP_PORT_BASE + (order - 1) * 2))"
local payload_type=96
if [[ "${codec}" == "h265" ]]; then
payload_type=98
fi
local row_start_ms row_end_ms duration_ms
local row_start_ms
row_start_ms="$(date +%s%3N)"
"${tester_cmd[@]}" > "${tester_log}" 2>&1 &
local tester_pid=$!
cleanup_pids+=("${tester_pid}")
"${BUILD_DIR}/rtp_receiver_tester" \
--port "${port}" \
--expect-pt "${payload_type}" \
--packet-threshold 1 \
--timeout-ms 12000 >"${receiver_log}" 2>&1 &
local receiver_pid=$!
cleanup_pids+=("${receiver_pid}")
sleep 1
: > "${sim_log}"
"${streamer_cmd[@]}" > "${streamer_log}" 2>&1
local streamer_rc=$?
set +e
"${BUILD_DIR}/rtp_output_tester" \
--host 127.0.0.1 \
--port "${port}" \
--payload-type "${payload_type}" \
--codec "${codec}" \
--encoder-device software \
--sdp-path "${sdp_path}" \
--frames 48 \
--width 320 \
--height 240 \
--frame-interval-ms 20 >"${emitter_log}" 2>&1
local emitter_rc=$?
set -e
wait_pid "${tester_pid}" 15
local tester_rc=$?
local sim_rc=0
set +e
wait_pid "${receiver_pid}" 20
local receiver_rc=$?
set -e
local row_end_ms
row_end_ms="$(date +%s%3N)"
duration_ms=$((row_end_ms - row_start_ms))
local duration_ms=$((row_end_ms - row_start_ms))
local status="PASS"
local reason="all-processes-ok"
if (( sim_rc != 0 || streamer_rc != 0 || tester_rc != 0 )); then
if (( emitter_rc != 0 || receiver_rc != 0 )); then
status="FAIL"
reason="sim_rc=${sim_rc},streamer_rc=${streamer_rc},tester_rc=${tester_rc}"
reason="emitter_rc=${emitter_rc},receiver_rc=${receiver_rc}"
if (( receiver_rc == 0 )) && grep -Eq "Broken pipe|Connection reset by peer" "${emitter_log}"; then
status="PASS"
reason="receiver exited cleanly after threshold; emitter observed peer close"
fi
fi
append_manifest_row \
"${order}" \
"${row_id}" \
"${name}" \
"${protocol}" \
"RTP + ${codec}" \
"rtp" \
"${codec}" \
"${rtmp_mode}" \
"udp" \
"${status}" \
"${reason}" \
"${duration_ms}" \
"${sim_rc}" \
"${streamer_rc}" \
"${tester_rc}" \
"${sim_log}" \
"${streamer_log}" \
"${tester_log}" \
"${emitter_rc}" \
"${receiver_rc}" \
"${emitter_log}" \
"${receiver_log}" \
"${sdp_path}"
printf "[%s] %s => %s (%s)\n" "${row_id}" "${name}" "${status}" "${reason}"
printf "[%s] RTP + %s => %s (%s)\n" "${row_id}" "${codec}" "${status}" "${reason}"
}
run_rtmp_row() {
local order="$1"
local row_id="$2"
local codec="$3"
local transport="$4"
local row_dir="${RUN_DIR}/${order}-${row_id}"
mkdir -p "${row_dir}"
local emitter_log="${row_dir}/rtmp_output.log"
local receiver_log="${row_dir}/rtmp_stub.log"
local port
port="$((RTMP_PORT_BASE + (order - 3) * 2))"
local mode="h264"
if [[ "${codec}" == "h265" ]]; then
mode="h265-enhanced"
fi
local row_start_ms
row_start_ms="$(date +%s%3N)"
"${BUILD_DIR}/rtmp_stub_tester" \
--mode "${mode}" \
--listen-host 127.0.0.1 \
--listen-port "${port}" \
--video-threshold 4 \
--timeout-ms 12000 >"${receiver_log}" 2>&1 &
local receiver_pid=$!
cleanup_pids+=("${receiver_pid}")
sleep 1
set +e
"${BUILD_DIR}/rtmp_output_tester" \
--rtmp-url "rtmp://127.0.0.1:${port}/live/${row_id}" \
--transport "${transport}" \
--codec "${codec}" \
--encoder-device software \
--frames 32 \
--width 320 \
--height 240 \
--frame-interval-ms 20 \
--linger-ms 200 >"${emitter_log}" 2>&1
local emitter_rc=$?
set -e
set +e
wait_pid "${receiver_pid}" 20
local receiver_rc=$?
set -e
local row_end_ms
row_end_ms="$(date +%s%3N)"
local duration_ms=$((row_end_ms - row_start_ms))
local status="PASS"
local reason="all-processes-ok"
if (( emitter_rc != 0 || receiver_rc != 0 )); then
status="FAIL"
reason="emitter_rc=${emitter_rc},receiver_rc=${receiver_rc}"
if (( receiver_rc == 0 )) && grep -Eq "Broken pipe|Connection reset by peer" "${emitter_log}"; then
status="PASS"
reason="receiver exited cleanly after threshold; emitter observed peer close"
fi
fi
append_manifest_row \
"${order}" \
"${row_id}" \
"RTMP + ${codec} + ${transport}" \
"rtmp" \
"${codec}" \
"${transport}" \
"${status}" \
"${reason}" \
"${duration_ms}" \
"${emitter_rc}" \
"${receiver_rc}" \
"${emitter_log}" \
"${receiver_log}" \
""
printf "[%s] RTMP + %s + %s => %s (%s)\n" "${row_id}" "${codec}" "${transport}" "${status}" "${reason}"
}
main() {
local required=(
"${BUILD_DIR}/cvmmap_streamer"
"${BUILD_DIR}/rtp_receiver_tester"
"${BUILD_DIR}/rtmp_stub_tester"
local required=(
"${BUILD_DIR}/rtp_output_tester"
"${BUILD_DIR}/rtp_receiver_tester"
"${BUILD_DIR}/rtmp_output_tester"
"${BUILD_DIR}/rtmp_stub_tester"
)
local missing=()
@@ -273,33 +287,24 @@ local required=(
done
if (( ${#missing[@]} > 0 )); then
for idx in 1 2 3 4 5; do
append_manifest_row \
"${idx}" \
"preflight_${idx}" \
"preflight missing binary" \
"preflight" \
"n/a" \
"" \
"SKIP" \
"missing binaries: ${missing[*]}" \
"0" \
"-1" \
"-1" \
"-1" \
"" \
"" \
"" \
""
done
else
run_matrix_row 1 "rtp_h264" "RTP + H.264" "rtp" "h264" ""
run_matrix_row 2 "rtp_h265" "RTP + H.265" "rtp" "h265" ""
run_matrix_row 3 "rtmp_h264" "RTMP + H.264" "rtmp" "h264" "enhanced"
run_matrix_row 4 "rtmp_h265_enhanced" "RTMP + H.265 enhanced" "rtmp" "h265" "enhanced"
run_matrix_row 5 "rtmp_h265_domestic" "RTMP + H.265 domestic" "rtmp" "h265" "domestic"
{
echo "task=14"
echo "run_id=${RUN_ID}"
echo "run_dir=${RUN_DIR}"
echo "manifest=${MANIFEST_TSV}"
echo "missing_binaries=${missing[*]}"
} > "${EVIDENCE_TEXT}"
echo "missing binaries: ${missing[*]}" >&2
return 1
fi
run_rtp_row 1 "rtp_h264" "h264"
run_rtp_row 2 "rtp_h265" "h265"
run_rtmp_row 3 "rtmp_h264_libavformat" "h264" "libavformat"
run_rtmp_row 4 "rtmp_h265_libavformat" "h265" "libavformat"
run_rtmp_row 5 "rtmp_h264_ffmpeg_process" "h264" "ffmpeg_process"
run_rtmp_row 6 "rtmp_h265_ffmpeg_process" "h265" "ffmpeg_process"
local finished_at_utc
finished_at_utc="$(date -u +"%Y-%m-%dT%H:%M:%SZ")"
@@ -310,67 +315,61 @@ local required=(
--run-dir "${RUN_DIR}" \
--started-at "${STARTED_AT_UTC}" \
--finished-at "${finished_at_utc}"
local summary_rc=$?
cp -f "${SUMMARY_JSON}" "${LATEST_SUMMARY_JSON}" 2>/dev/null || true
local total_count pass_count fail_count all_pass
total_count="$(python3 - <<'PY' "${SUMMARY_JSON}"
import json, sys
data = json.load(open(sys.argv[1], "r", encoding="utf-8"))
counts = data.get("counts", {})
print(counts.get("total", 0))
PY
)"
pass_count="$(python3 - <<'PY' "${SUMMARY_JSON}"
import json, sys
data = json.load(open(sys.argv[1], "r", encoding="utf-8"))
counts = data.get("counts", {})
print(counts.get("pass", 0))
PY
)"
fail_count="$(python3 - <<'PY' "${SUMMARY_JSON}"
import json, sys
data = json.load(open(sys.argv[1], "r", encoding="utf-8"))
counts = data.get("counts", {})
print(counts.get("fail", 0))
PY
)"
all_pass="$(python3 - <<'PY' "${SUMMARY_JSON}"
import json, sys
data = json.load(open(sys.argv[1], "r", encoding="utf-8"))
print("true" if data.get("all_pass", False) else "false")
PY
)"
{
echo "task=14"
echo "run_id=${RUN_ID}"
echo "run_dir=${RUN_DIR}"
echo "manifest=${MANIFEST_TSV}"
echo "summary_json=${SUMMARY_JSON}"
echo "latest_summary_json=${LATEST_SUMMARY_JSON}"
echo "started_at=${STARTED_AT_UTC}"
echo "finished_at=${finished_at_utc}"
echo "counts_total=${total_count}"
echo "counts_pass=${pass_count}"
echo "counts_fail=${fail_count}"
echo "all_pass=${all_pass}"
echo "matrix_rows=rtp_h264,rtp_h265,rtmp_h264_libavformat,rtmp_h265_libavformat,rtmp_h264_ffmpeg_process,rtmp_h265_ffmpeg_process"
} > "${EVIDENCE_TEXT}"
if (( summary_rc != 0 )); then
echo "summary helper failed with rc=${summary_rc}" >&2
return 1
fi
local pass_count fail_count skip_count total_count
pass_count="$(python3 - <<'PY' "${SUMMARY_JSON}"
import json
import sys
data = json.load(open(sys.argv[1], "r", encoding="utf-8"))
counts = data.get("counts", {})
print(counts.get("pass", 0))
PY
)"
fail_count="$(python3 - <<'PY' "${SUMMARY_JSON}"
import json
import sys
data = json.load(open(sys.argv[1], "r", encoding="utf-8"))
counts = data.get("counts", {})
print(counts.get("fail", 0))
PY
)"
skip_count="$(python3 - <<'PY' "${SUMMARY_JSON}"
import json
import sys
data = json.load(open(sys.argv[1], "r", encoding="utf-8"))
counts = data.get("counts", {})
print(counts.get("skip", 0))
PY
)"
total_count="$(python3 - <<'PY' "${SUMMARY_JSON}"
import json
import sys
data = json.load(open(sys.argv[1], "r", encoding="utf-8"))
counts = data.get("counts", {})
print(counts.get("total", 0))
PY
)"
echo "summary: total=${total_count} pass=${pass_count} fail=${fail_count} skip=${skip_count}"
echo "json: ${SUMMARY_JSON}"
if [[ "${total_count}" == "5" && "${pass_count}" == "5" && "${fail_count}" == "0" && "${skip_count}" == "0" ]]; then
if [[ "${all_pass}" == "true" ]]; then
echo "acceptance matrix PASS (${pass_count}/${total_count})"
echo "summary: ${SUMMARY_JSON}"
return 0
fi
echo "acceptance matrix FAIL (${pass_count}/${total_count})" >&2
echo "summary: ${SUMMARY_JSON}" >&2
return 1
}
+60 -126
View File
@@ -12,10 +12,7 @@ from pathlib import Path
from typing import cast
MetricValue = int | float | str | bool
KV_PATTERN = re.compile(r"([a-zA-Z_]+)=([^\s]+)")
KV_PATTERN = re.compile(r"([a-zA-Z0-9_]+)=([^\s]+)")
@dataclass(frozen=True)
@@ -32,31 +29,20 @@ def parse_args() -> CliArgs:
parser = argparse.ArgumentParser(
description="Build JSON summary for standalone acceptance matrix"
)
_ = parser.add_argument(
"--manifest", required=True, help="TSV manifest produced by acceptance runner"
)
_ = parser.add_argument("--output", required=True, help="Output JSON summary path")
_ = parser.add_argument("--manifest", required=True)
_ = parser.add_argument("--output", required=True)
_ = parser.add_argument("--run-id", required=True)
_ = parser.add_argument("--run-dir", required=True)
_ = parser.add_argument("--started-at", required=True)
_ = parser.add_argument("--finished-at", required=True)
parsed = parser.parse_args(sys.argv[1:])
manifest = cast(str, parsed.manifest)
output = cast(str, parsed.output)
run_id = cast(str, parsed.run_id)
run_dir = cast(str, parsed.run_dir)
started_at = cast(str, parsed.started_at)
finished_at = cast(str, parsed.finished_at)
return CliArgs(
manifest=manifest,
output=output,
run_id=run_id,
run_dir=run_dir,
started_at=started_at,
finished_at=finished_at,
manifest=cast(str, parsed.manifest),
output=cast(str, parsed.output),
run_id=cast(str, parsed.run_id),
run_dir=cast(str, parsed.run_dir),
started_at=cast(str, parsed.started_at),
finished_at=cast(str, parsed.finished_at),
)
@@ -70,7 +56,7 @@ def read_text(path: str) -> str:
return ""
def to_number(value: str) -> MetricValue:
def to_number(value: str) -> int | float | str:
if re.fullmatch(r"-?\d+", value):
try:
return int(value)
@@ -84,25 +70,20 @@ def to_number(value: str) -> MetricValue:
return value
def parse_key_value_metrics(line: str) -> dict[str, MetricValue]:
metrics: dict[str, MetricValue] = {}
for match in KV_PATTERN.finditer(line):
key = match.group(1)
raw = match.group(2)
metrics[key] = to_number(raw)
return metrics
def parse_key_values(line: str) -> dict[str, int | float | str]:
return {match.group(1): to_number(match.group(2)) for match in KV_PATTERN.finditer(line)}
def extract_last_matching_line(text: str, token: str) -> str:
match = ""
def last_line_with_token(text: str, token: str) -> str:
found = ""
for line in text.splitlines():
if token in line:
match = line
return match
found = line
return found
def parse_rtp_tester_metrics(text: str) -> dict[str, MetricValue]:
metrics: dict[str, MetricValue] = {}
def parse_rtp_receiver_metrics(text: str) -> dict[str, int]:
metrics: dict[str, int] = {}
patterns = {
"packets_received": r"Packets received:\s*(\d+)",
"sequence_gaps": r"Sequence gaps:\s*(\d+)",
@@ -110,15 +91,14 @@ def parse_rtp_tester_metrics(text: str) -> dict[str, MetricValue]:
"detected_payload_type": r"Detected payload type:\s*(\d+)",
}
for key, pattern in patterns.items():
m = re.search(pattern, text)
if m:
metrics[key] = int(m.group(1))
match = re.search(pattern, text)
if match:
metrics[key] = int(match.group(1))
return metrics
def parse_rtmp_tester_metrics(text: str) -> dict[str, MetricValue]:
metrics: dict[str, MetricValue] = {}
def parse_rtmp_stub_metrics(text: str) -> dict[str, int]:
metrics: dict[str, int] = {}
messages = re.search(
r"Messages:\s*total=(\d+),\s*audio=(\d+),\s*video=(\d+),\s*data=(\d+),\s*chunk-size-updates=(\d+)",
text,
@@ -135,7 +115,7 @@ def parse_rtmp_tester_metrics(text: str) -> dict[str, MetricValue]:
)
counts = re.search(
r"Video signaling counts:\s*h264=(\d+),\s*h265-enhanced=(\d+),\s*h265-domestic=(\d+),\s*unknown=(\d+)",
r"Video signaling counts:\s*h264=(\d+),\s*h265-enhanced=(\d+),\s*unknown=(\d+)",
text,
)
if counts:
@@ -143,8 +123,7 @@ def parse_rtmp_tester_metrics(text: str) -> dict[str, MetricValue]:
{
"h264_video_messages": int(counts.group(1)),
"h265_enhanced_video_messages": int(counts.group(2)),
"h265_domestic_video_messages": int(counts.group(3)),
"unknown_video_messages": int(counts.group(4)),
"unknown_video_messages": int(counts.group(3)),
}
)
@@ -158,43 +137,29 @@ def parse_rtmp_tester_metrics(text: str) -> dict[str, MetricValue]:
"matching_threshold": int(matching.group(2)),
}
)
return metrics
def parse_streamer_metrics(text: str) -> dict[str, dict[str, MetricValue]]:
result: dict[str, dict[str, MetricValue]] = {}
for token, key in (
("PIPELINE_METRICS", "pipeline"),
("RTP_METRICS", "rtp"),
("RTMP_METRICS", "rtmp"),
):
line = extract_last_matching_line(text, token)
if line:
result[key] = parse_key_value_metrics(line)
return result
def parse_sdp_metrics(path: str) -> dict[str, MetricValue]:
p = Path(path)
def parse_sdp_metrics(path: str) -> dict[str, object]:
if not path:
return {}
p = Path(path)
if not p.exists():
return {"exists": False}
text = read_text(path)
metrics: dict[str, MetricValue] = {
metrics: dict[str, object] = {
"exists": True,
"bytes": p.stat().st_size,
"has_h264": "H264/90000" in text,
"has_h265": ("H265/90000" in text) or ("HEVC/90000" in text),
}
m = re.search(r"m=video\s+\d+\s+RTP/AVP\s+(\d+)", text)
if m:
metrics["payload_type"] = int(m.group(1))
match = re.search(r"m=video\s+\d+\s+RTP/AVP\s+(\d+)", text)
if match:
metrics["payload_type"] = int(match.group(1))
return metrics
def parse_exit_code(value: str) -> int:
def parse_exit(value: str) -> int:
try:
return int(value)
except (TypeError, ValueError):
@@ -208,35 +173,12 @@ def parse_duration_ms(value: str) -> int:
return 0
MANIFEST_FIELDS = (
"order",
"row_id",
"name",
"protocol",
"codec",
"rtmp_mode",
"status",
"reason",
"duration_ms",
"sim_rc",
"streamer_rc",
"tester_rc",
"sim_log",
"streamer_log",
"tester_log",
"sdp_path",
)
def parse_manifest(path: str) -> list[dict[str, str]]:
rows: list[dict[str, str]] = []
with open(path, "r", encoding="utf-8", newline="") as handle:
reader = csv.DictReader(handle, delimiter="\t")
for raw_row in reader:
row: dict[str, str] = {}
for field in MANIFEST_FIELDS:
value = raw_row.get(field, "")
row[field] = "" if value is None else str(value)
for raw in reader:
row = {key: "" if value is None else str(value) for key, value in raw.items()}
rows.append(row)
return rows
@@ -246,26 +188,22 @@ def build_summary(args: CliArgs) -> dict[str, object]:
rows: list[dict[str, object]] = []
for row in sorted(manifest_rows, key=lambda item: int(item["order"])):
streamer_log = row["streamer_log"]
tester_log = row["tester_log"]
sim_log = row["sim_log"]
sdp_path = row.get("sdp_path", "")
emitter_text = read_text(row["emitter_log"])
receiver_text = read_text(row["receiver_log"])
emitter_metrics: dict[str, dict[str, int | float | str]] = {}
for token, key in (
("RTP_METRICS", "rtp"),
("RTMP_OUTPUT_METRICS", "rtmp"),
):
line = last_line_with_token(emitter_text, token)
if line:
emitter_metrics[key] = parse_key_values(line)
streamer_text = read_text(streamer_log)
tester_text = read_text(tester_log)
tester_metrics: dict[str, MetricValue]
receiver_metrics: dict[str, object]
if row["protocol"] == "rtp":
tester_metrics = parse_rtp_tester_metrics(tester_text)
receiver_metrics = parse_rtp_receiver_metrics(receiver_text)
else:
tester_metrics = parse_rtmp_tester_metrics(tester_text)
metrics: dict[str, object] = {
"tester": tester_metrics,
"streamer": parse_streamer_metrics(streamer_text),
}
if row["protocol"] == "rtp":
metrics["sdp"] = parse_sdp_metrics(sdp_path)
receiver_metrics = parse_rtmp_stub_metrics(receiver_text)
rows.append(
{
@@ -274,21 +212,23 @@ def build_summary(args: CliArgs) -> dict[str, object]:
"name": row["name"],
"protocol": row["protocol"],
"codec": row["codec"],
"rtmp_mode": row["rtmp_mode"] if row["rtmp_mode"] else None,
"transport": row["transport"],
"status": row["status"],
"reason": row["reason"],
"duration_ms": parse_duration_ms(row["duration_ms"]),
"exit_codes": {
"sim": parse_exit_code(row["sim_rc"]),
"streamer": parse_exit_code(row["streamer_rc"]),
"tester": parse_exit_code(row["tester_rc"]),
"emitter": parse_exit(row["emitter_rc"]),
"receiver": parse_exit(row["receiver_rc"]),
},
"metrics": {
"emitter": emitter_metrics,
"receiver": receiver_metrics,
"sdp": parse_sdp_metrics(row.get("sdp_path", "")),
},
"metrics": metrics,
"evidence": {
"sim_log": sim_log,
"streamer_log": streamer_log,
"tester_log": tester_log,
"sdp": sdp_path if sdp_path else None,
"emitter_log": row["emitter_log"],
"receiver_log": row["receiver_log"],
"sdp_path": row.get("sdp_path") or None,
},
}
)
@@ -296,10 +236,7 @@ def build_summary(args: CliArgs) -> dict[str, object]:
pass_count = sum(1 for row in rows if row["status"] == "PASS")
fail_count = sum(1 for row in rows if row["status"] == "FAIL")
skip_count = sum(1 for row in rows if row["status"] == "SKIP")
all_pass = (
len(rows) == 5 and pass_count == 5 and fail_count == 0 and skip_count == 0
)
all_pass = len(rows) == 6 and pass_count == 6 and fail_count == 0 and skip_count == 0
return {
"run_id": args.run_id,
@@ -322,11 +259,8 @@ def main() -> int:
args = parse_args()
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
summary = build_summary(args)
_ = output_path.write_text(
json.dumps(summary, indent=2, sort_keys=False) + "\n", encoding="utf-8"
)
output_path.write_text(json.dumps(summary, indent=2) + "\n", encoding="utf-8")
return 0
+107 -248
View File
@@ -1,11 +1,10 @@
#!/usr/bin/env bash
set -u -o pipefail
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
STREAMER_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)"
BUILD_DIR="${STREAMER_ROOT}/build"
EVIDENCE_ROOT="${STREAMER_ROOT}/.sisyphus/evidence"
TASK_EVIDENCE_DIR="${EVIDENCE_ROOT}/task-15-fault-suite"
SUMMARY_HELPER="${SCRIPT_DIR}/fault_summary_helper.py"
@@ -39,17 +38,10 @@ fi
RUN_ID=""
RUN_DIR=""
MANIFEST_TSV="${RUN_DIR}/rows.tsv"
SUMMARY_JSON="${RUN_DIR}/summary.json"
if [[ "${MODE}" == "baseline" ]]; then
LATEST_SUMMARY_JSON="${EVIDENCE_ROOT}/task-15-fault-suite-summary.json"
EVIDENCE_TEXT="${EVIDENCE_ROOT}/task-15-fault-suite.txt"
else
LATEST_SUMMARY_JSON="${EVIDENCE_ROOT}/task-15-fault-suite-error-summary.json"
EVIDENCE_TEXT="${EVIDENCE_ROOT}/task-15-fault-suite-error.txt"
fi
MANIFEST_TSV=""
SUMMARY_JSON=""
LATEST_SUMMARY_JSON="${EVIDENCE_ROOT}/task-15-fault-suite-summary.json"
EVIDENCE_TEXT="${EVIDENCE_ROOT}/task-15-fault-suite.txt"
STARTED_AT_UTC="$(date -u +"%Y-%m-%dT%H:%M:%SZ")"
mkdir -p "${TASK_EVIDENCE_DIR}"
@@ -76,50 +68,7 @@ allocate_run_dir() {
allocate_run_dir || exit 1
RUN_HASH="$(printf '%s' "${RUN_ID}" | cksum | awk '{print $1}')"
PORT_OFFSET="$((RUN_HASH % 1000))"
if [[ "${MODE}" == "baseline" ]]; then
SCENARIO_PORT_BASE="$((52040 + PORT_OFFSET))"
else
SCENARIO_PORT_BASE="$((52140 + PORT_OFFSET))"
fi
echo -e "order\tscenario_id\tname\tstatus\treason\tduration_ms\tsim_rc\tstreamer_rc\ttester_rc\tsim_log\tstreamer_log\ttester_log\tsdp_path" > "${MANIFEST_TSV}"
cleanup_pids=()
cleanup_all() {
for pid in "${cleanup_pids[@]:-}"; do
if [[ -n "${pid}" ]] && kill -0 "${pid}" 2>/dev/null; then
kill "${pid}" 2>/dev/null || true
fi
done
}
trap cleanup_all EXIT
binary_exists() {
local path="$1"
[[ -x "${path}" ]]
}
wait_pid() {
local pid="$1"
local timeout_s="$2"
local elapsed=0
while kill -0 "${pid}" 2>/dev/null; do
if (( elapsed >= timeout_s )); then
kill "${pid}" 2>/dev/null || true
wait "${pid}" 2>/dev/null || true
return 124
fi
sleep 1
elapsed=$((elapsed + 1))
done
wait "${pid}" 2>/dev/null
return $?
}
echo -e "order\tscenario_id\tname\tstatus\treason\tduration_ms\tcommand_rc\tlog_path" > "${MANIFEST_TSV}"
append_manifest_row() {
local order="$1"
@@ -128,145 +77,37 @@ append_manifest_row() {
local status="$4"
local reason="$5"
local duration_ms="$6"
local sim_rc="$7"
local streamer_rc="$8"
local tester_rc="$9"
local sim_log="${10}"
local streamer_log="${11}"
local tester_log="${12}"
local sdp_path="${13}"
echo -e "${order}\t${scenario_id}\t${name}\t${status}\t${reason}\t${duration_ms}\t${sim_rc}\t${streamer_rc}\t${tester_rc}\t${sim_log}\t${streamer_log}\t${tester_log}\t${sdp_path}" >> "${MANIFEST_TSV}"
local command_rc="$7"
local log_path="$8"
echo -e "${order}\t${scenario_id}\t${name}\t${status}\t${reason}\t${duration_ms}\t${command_rc}\t${log_path}" >> "${MANIFEST_TSV}"
}
scenario_port() {
local order="$1"
echo $((SCENARIO_PORT_BASE + (order - 1) * 2))
}
run_fault_scenario() {
run_expected_failure() {
local order="$1"
local scenario_id="$2"
local name="$3"
local expected_substring="$4"
shift 4
local row_dir="${RUN_DIR}/${order}-${scenario_id}"
mkdir -p "${row_dir}"
local log_path="${row_dir}/command.log"
local sim_log="${row_dir}/sim.log"
local streamer_log="${row_dir}/streamer.log"
local tester_log="${row_dir}/tester.log"
local sdp_path="${row_dir}/stream.sdp"
local shm_name="fault_${MODE}_${scenario_id}_${RUN_ID}"
local zmq_endpoint="ipc:///tmp/fault_${MODE}_${scenario_id}_${RUN_ID}.ipc"
local sim_label="f${order}_${MODE:0:3}_${scenario_id:0:3}"
local sim_frames=360
local sim_fps=200
local reset_every=""
local snapshot_delay_us=0
local emit_stall_ms=0
local ingest_max_frames=180
case "${scenario_id}" in
torn_read)
if [[ "${MODE}" == "baseline" ]]; then
snapshot_delay_us=2500
sim_fps=240
else
snapshot_delay_us=25000
sim_fps=320
fi
;;
sink_stall)
if [[ "${MODE}" == "baseline" ]]; then
emit_stall_ms=3
ingest_max_frames=140
else
emit_stall_ms=60
ingest_max_frames=160
fi
;;
reset_storm)
if [[ "${MODE}" == "baseline" ]]; then
reset_every=20
ingest_max_frames=120
else
reset_every=3
ingest_max_frames=180
fi
;;
*)
echo "unknown scenario_id=${scenario_id}" >&2
return 1
;;
esac
local rtp_port
rtp_port="$(scenario_port "${order}")"
local streamer_cmd=(
"${BUILD_DIR}/cvmmap_streamer"
--run-mode pipeline
--codec h264
--shm-name "${shm_name}"
--zmq-endpoint "${zmq_endpoint}"
--input-mode dummy
--dummy-label "${sim_label}"
--dummy-frames "${sim_frames}"
--dummy-fps "${sim_fps}"
--dummy-width 640
--dummy-height 360
--dummy-startup-delay-ms 0
--queue-size 1
--gop 30
--b-frames 0
--ingest-max-frames "${ingest_max_frames}"
--ingest-idle-timeout-ms 8000
--snapshot-copy-delay-us "${snapshot_delay_us}"
--emit-stall-ms "${emit_stall_ms}"
--rtp
--rtp-endpoint "127.0.0.1:${rtp_port}"
--rtp-payload-type 96
--rtp-sdp "${sdp_path}"
)
if [[ -n "${reset_every}" ]]; then
streamer_cmd+=(--dummy-reset-every "${reset_every}")
fi
local tester_cmd=(
"${BUILD_DIR}/rtp_receiver_tester"
--port "${rtp_port}"
--expect-pt 96
--packet-threshold 1
--timeout-ms 15000
)
local row_start_ms row_end_ms duration_ms
local row_start_ms
row_start_ms="$(date +%s%3N)"
"${tester_cmd[@]}" > "${tester_log}" 2>&1 &
local tester_pid=$!
cleanup_pids+=("${tester_pid}")
sleep 1
: > "${sim_log}"
"${streamer_cmd[@]}" > "${streamer_log}" 2>&1
local streamer_rc=$?
wait_pid "${tester_pid}" 25
local tester_rc=$?
local sim_rc=0
set +e
"$@" >"${log_path}" 2>&1
local command_rc=$?
set -e
local row_end_ms
row_end_ms="$(date +%s%3N)"
duration_ms=$((row_end_ms - row_start_ms))
local duration_ms=$((row_end_ms - row_start_ms))
local status="PASS"
local reason="all-processes-ok"
if (( sim_rc != 0 || streamer_rc != 0 || tester_rc != 0 )); then
status="FAIL"
reason="sim_rc=${sim_rc},streamer_rc=${streamer_rc},tester_rc=${tester_rc}"
local status="FAIL"
local reason="expected non-zero rc and log token '${expected_substring}'"
if (( command_rc != 0 )) && grep -Fq "${expected_substring}" "${log_path}"; then
status="PASS"
reason="command failed as expected"
fi
append_manifest_row \
@@ -276,26 +117,21 @@ run_fault_scenario() {
"${status}" \
"${reason}" \
"${duration_ms}" \
"${sim_rc}" \
"${streamer_rc}" \
"${tester_rc}" \
"${sim_log}" \
"${streamer_log}" \
"${tester_log}" \
"${sdp_path}"
"${command_rc}" \
"${log_path}"
printf "[%s] %s => %s (%s)\n" "${scenario_id}" "${name}" "${status}" "${reason}"
}
main() {
local required=(
"${BUILD_DIR}/cvmmap_streamer"
"${BUILD_DIR}/rtp_receiver_tester"
"${BUILD_DIR}/cvmmap_streamer"
"${BUILD_DIR}/rtmp_output_tester"
)
local missing=()
for bin in "${required[@]}"; do
if ! binary_exists "${bin}"; then
if [[ ! -x "${bin}" ]]; then
missing+=("${bin}")
fi
done
@@ -313,9 +149,68 @@ main() {
return 1
fi
run_fault_scenario 1 "torn_read" "fault:torn-read"
run_fault_scenario 2 "sink_stall" "fault:sink-stall"
run_fault_scenario 3 "reset_storm" "fault:reset-storm"
run_expected_failure 1 "removed_encoder_backend" "removed encoder backend rejected" \
"invalid encoder backend: 'gstreamer_legacy' was removed; use ffmpeg" \
"${BUILD_DIR}/cvmmap_streamer" \
--run-mode pipeline \
--input-uri cvmmap://default \
--encoder-backend gstreamer_legacy
run_expected_failure 2 "removed_rtmp_transport" "removed RTMP transport rejected" \
"invalid rtmp transport: 'legacy_custom' was removed; use libavformat or ffmpeg_process" \
"${BUILD_DIR}/cvmmap_streamer" \
--run-mode pipeline \
--input-uri cvmmap://default \
--rtmp \
--rtmp-url rtmp://127.0.0.1/live/test \
--rtmp-transport legacy_custom
run_expected_failure 3 "removed_rtmp_mode_cli" "removed RTMP mode flag rejected" \
"unknown argument: --rtmp-mode (removed; RTMP always uses enhanced mode)" \
"${BUILD_DIR}/cvmmap_streamer" \
--rtmp-mode enhanced
local mode_row_dir="${RUN_DIR}/4-removed_rtmp_mode_toml"
mkdir -p "${mode_row_dir}"
local mode_config="${mode_row_dir}/removed_rtmp_mode.toml"
cat >"${mode_config}" <<'EOF'
[outputs.rtmp]
enabled = true
urls = ["rtmp://127.0.0.1/live/test"]
mode = "enhanced"
EOF
run_expected_failure 4 "removed_rtmp_mode_toml" "removed RTMP mode TOML rejected" \
"invalid RTMP config: outputs.rtmp.mode was removed; RTMP always uses enhanced mode" \
"${BUILD_DIR}/cvmmap_streamer" \
--config "${mode_config}"
run_expected_failure 5 "missing_rtmp_url" "missing RTMP URL rejected" \
"invalid RTMP config: enabled RTMP output requires at least one URL" \
"${BUILD_DIR}/cvmmap_streamer" \
--run-mode pipeline \
--input-uri cvmmap://default \
--rtmp
run_expected_failure 6 "invalid_rtp_endpoint" "invalid RTP endpoint rejected" \
"invalid RTP config: endpoint must be in '<host>:<port>' format" \
"${BUILD_DIR}/cvmmap_streamer" \
--run-mode pipeline \
--input-uri cvmmap://default \
--rtp \
--rtp-endpoint invalid
run_expected_failure 7 "ffmpeg_process_bad_binary" "ffmpeg_process child failure surfaces" \
"child exited before publish completed" \
"${BUILD_DIR}/rtmp_output_tester" \
--rtmp-url rtmp://127.0.0.1/live/test \
--transport ffmpeg_process \
--ffmpeg-path /nonexistent/ffmpeg \
--codec h264 \
--frames 256 \
--width 640 \
--height 360 \
--frame-interval-ms 1 \
--linger-ms 0
local finished_at_utc
finished_at_utc="$(date -u +"%Y-%m-%dT%H:%M:%SZ")"
@@ -328,54 +223,36 @@ main() {
--started-at "${STARTED_AT_UTC}" \
--finished-at "${finished_at_utc}" \
--mode "${MODE}"
local summary_rc=$?
cp -f "${SUMMARY_JSON}" "${LATEST_SUMMARY_JSON}" 2>/dev/null || true
local total_count pass_count fail_count all_pass
total_count="$(python3 - <<'PY' "${SUMMARY_JSON}"
import json
import sys
import json, sys
data = json.load(open(sys.argv[1], "r", encoding="utf-8"))
counts = data.get("counts", {})
print(counts.get("total", 0))
PY
)"
pass_count="$(python3 - <<'PY' "${SUMMARY_JSON}"
import json
import sys
import json, sys
data = json.load(open(sys.argv[1], "r", encoding="utf-8"))
counts = data.get("counts", {})
print(counts.get("pass", 0))
PY
)"
fail_count="$(python3 - <<'PY' "${SUMMARY_JSON}"
import json
import sys
import json, sys
data = json.load(open(sys.argv[1], "r", encoding="utf-8"))
counts = data.get("counts", {})
print(counts.get("fail", 0))
PY
)"
all_pass="$(python3 - <<'PY' "${SUMMARY_JSON}"
import json
import sys
import json, sys
data = json.load(open(sys.argv[1], "r", encoding="utf-8"))
print("true" if data.get("all_pass", False) else "false")
PY
)"
local violation_lines
violation_lines="$(python3 - <<'PY' "${SUMMARY_JSON}"
import json
import sys
data = json.load(open(sys.argv[1], "r", encoding="utf-8"))
for scenario in data.get("scenarios", []):
sid = scenario.get("id", "unknown")
for violation in scenario.get("violations", []):
print(f"{sid}:{violation}")
PY
)"
{
@@ -385,42 +262,24 @@ PY
echo "run_dir=${RUN_DIR}"
echo "manifest=${MANIFEST_TSV}"
echo "summary_json=${SUMMARY_JSON}"
echo "latest_summary_json=${LATEST_SUMMARY_JSON}"
echo "started_at=${STARTED_AT_UTC}"
echo "finished_at=${finished_at_utc}"
echo "scenario_total=${total_count}"
echo "scenario_pass=${pass_count}"
echo "scenario_fail=${fail_count}"
echo "counts_total=${total_count}"
echo "counts_pass=${pass_count}"
echo "counts_fail=${fail_count}"
echo "all_pass=${all_pass}"
echo "summary_helper_rc=${summary_rc}"
echo "violated_thresholds_begin"
if [[ -n "${violation_lines}" ]]; then
echo "${violation_lines}"
fi
echo "violated_thresholds_end"
echo "scenarios=removed_encoder_backend,removed_rtmp_transport,removed_rtmp_mode_cli,removed_rtmp_mode_toml,missing_rtmp_url,invalid_rtp_endpoint,ffmpeg_process_bad_binary"
} > "${EVIDENCE_TEXT}"
if (( summary_rc != 0 )); then
echo "summary helper failed with rc=${summary_rc}" >&2
return 1
if [[ "${all_pass}" == "true" ]]; then
echo "fault suite PASS (${pass_count}/${total_count})"
echo "summary: ${SUMMARY_JSON}"
return 0
fi
echo "fault-suite mode=${MODE} total=${total_count} pass=${pass_count} fail=${fail_count}"
echo "summary: ${SUMMARY_JSON}"
if [[ "${MODE}" == "baseline" ]]; then
if [[ "${total_count}" == "3" && "${pass_count}" == "3" && "${fail_count}" == "0" ]]; then
return 0
fi
return 1
fi
if [[ "${fail_count}" != "0" ]]; then
return 1
fi
echo "degraded mode did not violate thresholds" >&2
return 2
echo "fault suite FAIL (${pass_count}/${total_count})" >&2
echo "summary: ${SUMMARY_JSON}" >&2
return 1
}
main "$@"
+34 -321
View File
@@ -5,16 +5,12 @@ from __future__ import annotations
import argparse
import csv
import json
import re
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import cast
KV_PATTERN = re.compile(r"([a-zA-Z0-9_]+)=([^\s]+)")
@dataclass(frozen=True)
class CliArgs:
manifest: str
@@ -27,9 +23,7 @@ class CliArgs:
def parse_args() -> CliArgs:
parser = argparse.ArgumentParser(
description="Build fault suite summary with threshold checks"
)
parser = argparse.ArgumentParser(description="Build fault suite summary")
_ = parser.add_argument("--manifest", required=True)
_ = parser.add_argument("--output", required=True)
_ = parser.add_argument("--run-id", required=True)
@@ -37,7 +31,6 @@ def parse_args() -> CliArgs:
_ = parser.add_argument("--started-at", required=True)
_ = parser.add_argument("--finished-at", required=True)
_ = parser.add_argument("--mode", required=True, choices=("baseline", "degraded"))
parsed = parser.parse_args(sys.argv[1:])
return CliArgs(
manifest=cast(str, parsed.manifest),
@@ -50,46 +43,17 @@ def parse_args() -> CliArgs:
)
def read_text(path: str) -> str:
p = Path(path)
if not p.exists():
return ""
try:
return p.read_text(encoding="utf-8", errors="replace")
except OSError:
return ""
def parse_manifest(path: str) -> list[dict[str, str]]:
rows: list[dict[str, str]] = []
with open(path, "r", encoding="utf-8", newline="") as handle:
reader = csv.DictReader(handle, delimiter="\t")
for raw in reader:
row = {key: "" if value is None else str(value) for key, value in raw.items()}
rows.append(row)
return rows
def to_number(value: str) -> int | float | str:
if re.fullmatch(r"-?\d+", value):
try:
return int(value)
except ValueError:
return value
if re.fullmatch(r"-?\d+\.\d+", value):
try:
return float(value)
except ValueError:
return value
return value
def parse_key_values(line: str) -> dict[str, int | float | str]:
out: dict[str, int | float | str] = {}
for match in KV_PATTERN.finditer(line):
out[match.group(1)] = to_number(match.group(2))
return out
def last_line_with_token(text: str, token: str) -> str:
found = ""
for line in text.splitlines():
if token in line:
found = line
return found
def parse_exit(value: str) -> int:
def parse_int(value: str) -> int:
try:
return int(value)
except (TypeError, ValueError):
@@ -103,302 +67,51 @@ def parse_duration_ms(value: str) -> int:
return 0
def parse_manifest(path: str) -> list[dict[str, str]]:
rows: list[dict[str, str]] = []
with open(path, "r", encoding="utf-8", newline="") as handle:
reader = csv.DictReader(handle, delimiter="\t")
for raw in reader:
row: dict[str, str] = {}
for key in (
"order",
"scenario_id",
"name",
"status",
"reason",
"duration_ms",
"sim_rc",
"streamer_rc",
"tester_rc",
"sim_log",
"streamer_log",
"tester_log",
"sdp_path",
):
value = raw.get(key, "")
row[key] = "" if value is None else str(value)
rows.append(row)
return rows
Check = dict[str, object]
def make_check_min(metric: str, actual: int, minimum: int) -> Check:
passed = actual >= minimum
return {
"metric": metric,
"type": "min",
"actual": actual,
"expected": minimum,
"passed": passed,
"violation": "" if passed else f"{metric}={actual} < {minimum}",
}
def make_check_max(metric: str, actual: int, maximum: int) -> Check:
passed = actual <= maximum
return {
"metric": metric,
"type": "max",
"actual": actual,
"expected": maximum,
"passed": passed,
"violation": "" if passed else f"{metric}={actual} > {maximum}",
}
def get_thresholds(mode: str) -> dict[str, dict[str, int]]:
if mode == "baseline":
return {
"torn_read": {
"torn_read_events_min": 1,
"p50_us_max": 200_000,
"p99_us_max": 400_000,
"drop_ratio_ppm_max": 980_000,
"samples_min": 10,
},
"sink_stall": {
"sink_stall_events_min": 1,
"p50_us_max": 350_000,
"p95_us_max": 600_000,
"drop_ratio_ppm_max": 1_000_000,
"samples_min": 1,
},
"reset_storm": {
"reset_events_min": 4,
"p50_us_max": 1_000_000,
"p99_us_max": 1_000_000,
"drop_ratio_ppm_max": 1_000_000,
"samples_min": 1,
},
}
return {
"torn_read": {
"torn_read_events_min": 200,
"p50_us_max": 1_000,
"p99_us_max": 2_000,
"drop_ratio_ppm_max": 20_000,
"samples_min": 100,
},
"sink_stall": {
"sink_stall_events_min": 200,
"p50_us_max": 1_000,
"p95_us_max": 2_000,
"drop_ratio_ppm_max": 20_000,
"samples_min": 100,
},
"reset_storm": {
"reset_events_min": 20,
"p50_us_max": 1_000,
"p99_us_max": 2_000,
"drop_ratio_ppm_max": 20_000,
"samples_min": 100,
},
}
def scenario_checks(
scenario_id: str,
fault: dict[str, int | float | str],
latency: dict[str, int | float | str],
thresholds: dict[str, dict[str, int]],
) -> list[Check]:
scenario_thresholds = thresholds.get(scenario_id, {})
torn = int(fault.get("torn_read_events", 0))
stall = int(fault.get("sink_stall_events", 0))
resets = int(fault.get("reset_events", 0))
p95 = int(latency.get("p95_us", 0))
p99 = int(latency.get("p99_us", 0))
p50 = int(latency.get("p50_us", 0))
samples = int(latency.get("ingest_to_emit_samples", 0))
drop_ratio_ppm = int(latency.get("drop_ratio_ppm", 0))
checks: list[Check] = []
checks.append(
make_check_min(
"ingest_to_emit_samples",
samples,
int(scenario_thresholds.get("samples_min", 1)),
)
)
checks.append(
make_check_max(
"p50_us",
p50,
int(scenario_thresholds.get("p50_us_max", 500_000)),
)
)
checks.append(
make_check_max(
"drop_ratio_ppm",
drop_ratio_ppm,
int(scenario_thresholds.get("drop_ratio_ppm_max", 1_000_000)),
)
)
if scenario_id == "torn_read":
checks.append(
make_check_min(
"torn_read_events",
torn,
int(scenario_thresholds.get("torn_read_events_min", 1)),
)
)
checks.append(
make_check_max(
"p99_us",
p99,
int(scenario_thresholds.get("p99_us_max", 500_000)),
)
)
elif scenario_id == "sink_stall":
checks.append(
make_check_min(
"sink_stall_events",
stall,
int(scenario_thresholds.get("sink_stall_events_min", 1)),
)
)
checks.append(
make_check_max(
"p95_us",
p95,
int(scenario_thresholds.get("p95_us_max", 500_000)),
)
)
elif scenario_id == "reset_storm":
checks.append(
make_check_min(
"reset_events",
resets,
int(scenario_thresholds.get("reset_events_min", 1)),
)
)
checks.append(
make_check_max(
"p99_us",
p99,
int(scenario_thresholds.get("p99_us_max", 500_000)),
)
)
return checks
def build_summary(args: CliArgs) -> dict[str, object]:
thresholds = get_thresholds(args.mode)
rows = parse_manifest(args.manifest)
manifest_rows = parse_manifest(args.manifest)
rows = [
{
"order": parse_int(row["order"]),
"id": row["scenario_id"],
"name": row["name"],
"status": row["status"],
"reason": row["reason"],
"duration_ms": parse_duration_ms(row["duration_ms"]),
"exit_codes": {"command": parse_int(row["command_rc"])},
"evidence": {"log_path": row["log_path"]},
}
for row in sorted(manifest_rows, key=lambda item: parse_int(item["order"]))
]
scenarios: list[dict[str, object]] = []
for row in sorted(rows, key=lambda item: int(item["order"])):
streamer_text = read_text(row["streamer_log"])
pipeline_line = last_line_with_token(streamer_text, "PIPELINE_METRICS")
latency_line = last_line_with_token(streamer_text, "LATENCY_METRICS")
fault_line = last_line_with_token(streamer_text, "FAULT_COUNTERS")
rtp_line = last_line_with_token(streamer_text, "RTP_METRICS")
pipeline = parse_key_values(pipeline_line) if pipeline_line else {}
latency = parse_key_values(latency_line) if latency_line else {}
fault = parse_key_values(fault_line) if fault_line else {}
rtp = parse_key_values(rtp_line) if rtp_line else {}
sim_rc = parse_exit(row["sim_rc"])
streamer_rc = parse_exit(row["streamer_rc"])
tester_rc = parse_exit(row["tester_rc"])
process_ok = sim_rc == 0 and streamer_rc == 0 and tester_rc == 0
checks = scenario_checks(row["scenario_id"], fault, latency, thresholds)
violated_checks = [
cast(str, check["violation"])
for check in checks
if not cast(bool, check["passed"])
]
scenario_pass = process_ok and len(violated_checks) == 0
scenario_status = "PASS" if scenario_pass else "FAIL"
reason = (
"all checks passed"
if scenario_pass
else (
f"process_rc(sim={sim_rc},streamer={streamer_rc},tester={tester_rc})"
if not process_ok
else "; ".join(violated_checks)
)
)
scenarios.append(
{
"order": int(row["order"]),
"id": row["scenario_id"],
"name": row["name"],
"status": scenario_status,
"reason": reason,
"duration_ms": parse_duration_ms(row["duration_ms"]),
"process_exit": {
"sim": sim_rc,
"streamer": streamer_rc,
"tester": tester_rc,
},
"metrics": {
"pipeline": pipeline,
"latency": latency,
"fault": fault,
"rtp": rtp,
},
"checks": checks,
"violations": violated_checks,
"evidence": {
"sim_log": row["sim_log"],
"streamer_log": row["streamer_log"],
"tester_log": row["tester_log"],
"sdp": row["sdp_path"],
},
}
)
pass_count = sum(1 for item in scenarios if item["status"] == "PASS")
fail_count = sum(1 for item in scenarios if item["status"] == "FAIL")
all_pass = len(scenarios) == 3 and pass_count == 3 and fail_count == 0
pass_count = sum(1 for row in rows if row["status"] == "PASS")
fail_count = sum(1 for row in rows if row["status"] == "FAIL")
skip_count = sum(1 for row in rows if row["status"] == "SKIP")
all_pass = len(rows) == 7 and pass_count == 7 and fail_count == 0 and skip_count == 0
return {
"task": 15,
"mode": args.mode,
"run_id": args.run_id,
"run_dir": args.run_dir,
"started_at": args.started_at,
"finished_at": args.finished_at,
"thresholds": thresholds,
"mode": args.mode,
"counts": {
"total": len(scenarios),
"total": len(rows),
"pass": pass_count,
"fail": fail_count,
"skip": skip_count,
},
"all_pass": all_pass,
"recommended_exit_code": 0 if all_pass else 1,
"scenarios": scenarios,
"rows": rows,
}
def main() -> int:
args = parse_args()
summary = build_summary(args)
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
_ = output_path.write_text(
json.dumps(summary, indent=2, sort_keys=False) + "\n", encoding="utf-8"
)
summary = build_summary(args)
output_path.write_text(json.dumps(summary, indent=2) + "\n", encoding="utf-8")
return 0
+2 -5
View File
@@ -17,7 +17,6 @@ CODEC="${CODEC:-h264}"
ENCODER_BACKEND="${ENCODER_BACKEND:-ffmpeg}"
ENCODER_DEVICE="${ENCODER_DEVICE:-nvidia}"
RTMP_TRANSPORT="${RTMP_TRANSPORT:-libavformat}"
RTMP_MODE="${RTMP_MODE:-enhanced}"
INGEST_MAX_FRAMES="${INGEST_MAX_FRAMES:-120}"
PROBE_TIMEOUT_S="${PROBE_TIMEOUT_S:-20}"
DECODE_FRAMES="${DECODE_FRAMES:-15}"
@@ -58,10 +57,9 @@ Environment overrides:
INPUT_URI cvmmap source URI, if positional argument is omitted
STREAM_NAME RTMP/HTTP-FLV stream name, default derived from INPUT_URI
CODEC h264|h265
ENCODER_BACKEND ffmpeg|gstreamer_legacy
ENCODER_BACKEND auto|ffmpeg
ENCODER_DEVICE auto|nvidia|software
RTMP_TRANSPORT libavformat|ffmpeg_process|legacy_custom
RTMP_MODE enhanced|domestic
RTMP_TRANSPORT libavformat|ffmpeg_process
INGEST_MAX_FRAMES bounded frame count for the smoke
DECODE_FRAMES frames to decode from HTTP-FLV after probe
SRS_ROOT local SRS checkout, default ~/Code/srs
@@ -198,7 +196,6 @@ fi
--rtmp \
--rtmp-url "$RTMP_URL" \
--rtmp-transport "$RTMP_TRANSPORT" \
--rtmp-mode "$RTMP_MODE" \
--ingest-max-frames "$INGEST_MAX_FRAMES" \
>"$STREAMER_LOG" 2>&1 &
STREAMER_PID=$!