#!/usr/bin/env bash set -u -o pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" STREAMER_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" BUILD_DIR="${STREAMER_ROOT}/build" EVIDENCE_ROOT="${STREAMER_ROOT}/.sisyphus/evidence" TASK_EVIDENCE_DIR="${EVIDENCE_ROOT}/task-15-fault-suite" SUMMARY_HELPER="${SCRIPT_DIR}/fault_summary_helper.py" MODE="baseline" if [[ $# -gt 0 ]]; then case "$1" in --mode) if [[ $# -lt 2 ]]; then echo "missing value for --mode" >&2 exit 2 fi MODE="$2" shift 2 ;; --degraded) MODE="degraded" shift ;; *) echo "unknown argument: $1" >&2 exit 2 ;; esac fi if [[ "${MODE}" != "baseline" && "${MODE}" != "degraded" ]]; then echo "invalid --mode '${MODE}' (expected: baseline|degraded)" >&2 exit 2 fi RUN_ID="" RUN_DIR="" MANIFEST_TSV="${RUN_DIR}/rows.tsv" SUMMARY_JSON="${RUN_DIR}/summary.json" if [[ "${MODE}" == "baseline" ]]; then LATEST_SUMMARY_JSON="${EVIDENCE_ROOT}/task-15-fault-suite-summary.json" EVIDENCE_TEXT="${EVIDENCE_ROOT}/task-15-fault-suite.txt" else LATEST_SUMMARY_JSON="${EVIDENCE_ROOT}/task-15-fault-suite-error-summary.json" EVIDENCE_TEXT="${EVIDENCE_ROOT}/task-15-fault-suite-error.txt" fi STARTED_AT_UTC="$(date -u +"%Y-%m-%dT%H:%M:%SZ")" mkdir -p "${TASK_EVIDENCE_DIR}" allocate_run_dir() { local attempts=0 while (( attempts < 50 )); do local candidate_id candidate_id="$(date +"%Y%m%dT%H%M%S")-$(date +"%N")-p$$-$RANDOM" local candidate_dir="${TASK_EVIDENCE_DIR}/${candidate_id}-${MODE}" if mkdir "${candidate_dir}" 2>/dev/null; then RUN_ID="${candidate_id}" RUN_DIR="${candidate_dir}" MANIFEST_TSV="${RUN_DIR}/rows.tsv" SUMMARY_JSON="${RUN_DIR}/summary.json" return 0 fi attempts=$((attempts + 1)) sleep 0.01 done echo "failed to allocate unique fault-suite run directory" >&2 return 1 } allocate_run_dir || exit 1 RUN_HASH="$(printf '%s' "${RUN_ID}" | cksum | awk '{print $1}')" PORT_OFFSET="$((RUN_HASH % 1000))" if [[ "${MODE}" == "baseline" ]]; then SCENARIO_PORT_BASE="$((52040 + PORT_OFFSET))" else SCENARIO_PORT_BASE="$((52140 + PORT_OFFSET))" fi echo -e "order\tscenario_id\tname\tstatus\treason\tduration_ms\tsim_rc\tstreamer_rc\ttester_rc\tsim_log\tstreamer_log\ttester_log\tsdp_path" > "${MANIFEST_TSV}" cleanup_pids=() cleanup_all() { for pid in "${cleanup_pids[@]:-}"; do if [[ -n "${pid}" ]] && kill -0 "${pid}" 2>/dev/null; then kill "${pid}" 2>/dev/null || true fi done } trap cleanup_all EXIT binary_exists() { local path="$1" [[ -x "${path}" ]] } wait_pid() { local pid="$1" local timeout_s="$2" local elapsed=0 while kill -0 "${pid}" 2>/dev/null; do if (( elapsed >= timeout_s )); then kill "${pid}" 2>/dev/null || true wait "${pid}" 2>/dev/null || true return 124 fi sleep 1 elapsed=$((elapsed + 1)) done wait "${pid}" 2>/dev/null return $? } append_manifest_row() { local order="$1" local scenario_id="$2" local name="$3" local status="$4" local reason="$5" local duration_ms="$6" local sim_rc="$7" local streamer_rc="$8" local tester_rc="$9" local sim_log="${10}" local streamer_log="${11}" local tester_log="${12}" local sdp_path="${13}" echo -e "${order}\t${scenario_id}\t${name}\t${status}\t${reason}\t${duration_ms}\t${sim_rc}\t${streamer_rc}\t${tester_rc}\t${sim_log}\t${streamer_log}\t${tester_log}\t${sdp_path}" >> "${MANIFEST_TSV}" } scenario_port() { local order="$1" echo $((SCENARIO_PORT_BASE + (order - 1) * 2)) } run_fault_scenario() { local order="$1" local scenario_id="$2" local name="$3" local row_dir="${RUN_DIR}/${order}-${scenario_id}" mkdir -p "${row_dir}" local sim_log="${row_dir}/sim.log" local streamer_log="${row_dir}/streamer.log" local tester_log="${row_dir}/tester.log" local sdp_path="${row_dir}/stream.sdp" local shm_name="fault_${MODE}_${scenario_id}_${RUN_ID}" local zmq_endpoint="ipc:///tmp/fault_${MODE}_${scenario_id}_${RUN_ID}.ipc" local sim_label="f${order}_${MODE:0:3}_${scenario_id:0:3}" local sim_frames=360 local sim_fps=200 local reset_every="" local snapshot_delay_us=0 local emit_stall_ms=0 local ingest_max_frames=180 case "${scenario_id}" in torn_read) if [[ "${MODE}" == "baseline" ]]; then snapshot_delay_us=2500 sim_fps=240 else snapshot_delay_us=25000 sim_fps=320 fi ;; sink_stall) if [[ "${MODE}" == "baseline" ]]; then emit_stall_ms=3 ingest_max_frames=140 else emit_stall_ms=60 ingest_max_frames=160 fi ;; reset_storm) if [[ "${MODE}" == "baseline" ]]; then reset_every=20 ingest_max_frames=120 else reset_every=3 ingest_max_frames=180 fi ;; *) echo "unknown scenario_id=${scenario_id}" >&2 return 1 ;; esac local rtp_port rtp_port="$(scenario_port "${order}")" local sim_cmd=( "${BUILD_DIR}/cvmmap_sim" --shm-name "${shm_name}" --zmq-endpoint "${zmq_endpoint}" --label "${sim_label}" --frames "${sim_frames}" --fps "${sim_fps}" --width 640 --height 360 ) if [[ -n "${reset_every}" ]]; then sim_cmd+=(--emit-reset-every "${reset_every}") fi local streamer_cmd=( "${BUILD_DIR}/cvmmap_streamer" --run-mode pipeline --codec h264 --shm-name "${shm_name}" --zmq-endpoint "${zmq_endpoint}" --queue-size 1 --gop 30 --b-frames 0 --ingest-max-frames "${ingest_max_frames}" --ingest-idle-timeout-ms 8000 --snapshot-copy-delay-us "${snapshot_delay_us}" --emit-stall-ms "${emit_stall_ms}" --rtp --rtp-endpoint "127.0.0.1:${rtp_port}" --rtp-payload-type 96 --rtp-sdp "${sdp_path}" ) local tester_cmd=( "${BUILD_DIR}/rtp_receiver_tester" --port "${rtp_port}" --expect-pt 96 --packet-threshold 1 --timeout-ms 15000 ) local row_start_ms row_end_ms duration_ms row_start_ms="$(date +%s%3N)" "${tester_cmd[@]}" > "${tester_log}" 2>&1 & local tester_pid=$! cleanup_pids+=("${tester_pid}") sleep 1 "${sim_cmd[@]}" > "${sim_log}" 2>&1 & local sim_pid=$! cleanup_pids+=("${sim_pid}") sleep 1 "${streamer_cmd[@]}" > "${streamer_log}" 2>&1 local streamer_rc=$? wait_pid "${tester_pid}" 25 local tester_rc=$? wait_pid "${sim_pid}" 25 local sim_rc=$? row_end_ms="$(date +%s%3N)" duration_ms=$((row_end_ms - row_start_ms)) local status="PASS" local reason="all-processes-ok" if (( sim_rc != 0 || streamer_rc != 0 || tester_rc != 0 )); then status="FAIL" reason="sim_rc=${sim_rc},streamer_rc=${streamer_rc},tester_rc=${tester_rc}" fi append_manifest_row \ "${order}" \ "${scenario_id}" \ "${name}" \ "${status}" \ "${reason}" \ "${duration_ms}" \ "${sim_rc}" \ "${streamer_rc}" \ "${tester_rc}" \ "${sim_log}" \ "${streamer_log}" \ "${tester_log}" \ "${sdp_path}" printf "[%s] %s => %s (%s)\n" "${scenario_id}" "${name}" "${status}" "${reason}" } main() { local required=( "${BUILD_DIR}/cvmmap_sim" "${BUILD_DIR}/cvmmap_streamer" "${BUILD_DIR}/rtp_receiver_tester" ) local missing=() for bin in "${required[@]}"; do if ! binary_exists "${bin}"; then missing+=("${bin}") fi done if (( ${#missing[@]} > 0 )); then { echo "task=15" echo "mode=${MODE}" echo "run_id=${RUN_ID}" echo "run_dir=${RUN_DIR}" echo "manifest=${MANIFEST_TSV}" echo "missing_binaries=${missing[*]}" } > "${EVIDENCE_TEXT}" echo "missing binaries: ${missing[*]}" >&2 return 1 fi run_fault_scenario 1 "torn_read" "fault:torn-read" run_fault_scenario 2 "sink_stall" "fault:sink-stall" run_fault_scenario 3 "reset_storm" "fault:reset-storm" local finished_at_utc finished_at_utc="$(date -u +"%Y-%m-%dT%H:%M:%SZ")" python3 "${SUMMARY_HELPER}" \ --manifest "${MANIFEST_TSV}" \ --output "${SUMMARY_JSON}" \ --run-id "${RUN_ID}" \ --run-dir "${RUN_DIR}" \ --started-at "${STARTED_AT_UTC}" \ --finished-at "${finished_at_utc}" \ --mode "${MODE}" local summary_rc=$? cp -f "${SUMMARY_JSON}" "${LATEST_SUMMARY_JSON}" 2>/dev/null || true local total_count pass_count fail_count all_pass total_count="$(python3 - <<'PY' "${SUMMARY_JSON}" import json import sys data = json.load(open(sys.argv[1], "r", encoding="utf-8")) counts = data.get("counts", {}) print(counts.get("total", 0)) PY )" pass_count="$(python3 - <<'PY' "${SUMMARY_JSON}" import json import sys data = json.load(open(sys.argv[1], "r", encoding="utf-8")) counts = data.get("counts", {}) print(counts.get("pass", 0)) PY )" fail_count="$(python3 - <<'PY' "${SUMMARY_JSON}" import json import sys data = json.load(open(sys.argv[1], "r", encoding="utf-8")) counts = data.get("counts", {}) print(counts.get("fail", 0)) PY )" all_pass="$(python3 - <<'PY' "${SUMMARY_JSON}" import json import sys data = json.load(open(sys.argv[1], "r", encoding="utf-8")) print("true" if data.get("all_pass", False) else "false") PY )" local violation_lines violation_lines="$(python3 - <<'PY' "${SUMMARY_JSON}" import json import sys data = json.load(open(sys.argv[1], "r", encoding="utf-8")) for scenario in data.get("scenarios", []): sid = scenario.get("id", "unknown") for violation in scenario.get("violations", []): print(f"{sid}:{violation}") PY )" { echo "task=15" echo "mode=${MODE}" echo "run_id=${RUN_ID}" echo "run_dir=${RUN_DIR}" echo "manifest=${MANIFEST_TSV}" echo "summary_json=${SUMMARY_JSON}" echo "latest_summary_json=${LATEST_SUMMARY_JSON}" echo "started_at=${STARTED_AT_UTC}" echo "finished_at=${finished_at_utc}" echo "scenario_total=${total_count}" echo "scenario_pass=${pass_count}" echo "scenario_fail=${fail_count}" echo "all_pass=${all_pass}" echo "summary_helper_rc=${summary_rc}" echo "violated_thresholds_begin" if [[ -n "${violation_lines}" ]]; then echo "${violation_lines}" fi echo "violated_thresholds_end" } > "${EVIDENCE_TEXT}" if (( summary_rc != 0 )); then echo "summary helper failed with rc=${summary_rc}" >&2 return 1 fi echo "fault-suite mode=${MODE} total=${total_count} pass=${pass_count} fail=${fail_count}" echo "summary: ${SUMMARY_JSON}" if [[ "${MODE}" == "baseline" ]]; then if [[ "${total_count}" == "3" && "${pass_count}" == "3" && "${fail_count}" == "0" ]]; then return 0 fi return 1 fi if [[ "${fail_count}" != "0" ]]; then return 1 fi echo "degraded mode did not violate thresholds" >&2 return 2 } main "$@"