Files
cvmmap-streamer/scripts/test_rtp_sender.py
T
crosstyan 991f7ded34 feat(test): add downstream acceptance and fault harness artifacts
This commit packages the standalone task-14 acceptance and task-15 fault-suite execution toolchain for downstream validation.

It includes all runnable harness scripts, helper utilities, and generated evidence captures so downstream behavior can be reproduced and reviewed independently from docs and core implementation.

Bundling these assets separately allows QA/automation workflows to validate runtime changes without dragging operational notes or release-gate documentation into the same review unit.

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
2026-03-05 20:32:12 +08:00

83 lines
2.4 KiB
Python
Executable File

#!/usr/bin/env python3
"""Simple RTP test sender for validating rtp_receiver_tester."""
import socket
import struct
import sys
import time
import argparse
def create_rtp_packet(sequence: int, timestamp: int, payload_type: int, payload: bytes) -> bytes:
"""Create an RTP packet (RFC3550)."""
# RTP header format:
# Byte 0: V(2) P(1) X(1) CC(4)
# Byte 1: M(1) PT(7)
# Bytes 2-3: Sequence
# Bytes 4-7: Timestamp
# Bytes 8-11: SSRC
version = 2
padding = 0
extension = 0
csrc_count = 0
marker = 0
byte0 = (version << 6) | (padding << 5) | (extension << 4) | csrc_count
byte1 = (marker << 7) | payload_type
ssrc = 0x12345678
header = struct.pack('!BBHII', byte0, byte1, sequence, timestamp, ssrc)
return header + payload
def send_rtp_packets(target_host: str, target_port: int, payload_type: int,
packet_count: int, interval_ms: float):
"""Send RTP packets to target."""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
for i in range(packet_count):
# Create dummy payload (could be NAL unit for video)
payload = bytes([0x00, 0x00, 0x00, 0x01, 0x09, 0x10]) # Simple dummy data
packet = create_rtp_packet(
sequence=i % 65536,
timestamp=i * 3000, # 3000 ticks per packet (90kHz / 30fps)
payload_type=payload_type,
payload=payload
)
sock.sendto(packet, (target_host, target_port))
if interval_ms > 0:
time.sleep(interval_ms / 1000.0)
print(f"Sent {packet_count} RTP packets with PT={payload_type}")
finally:
sock.close()
def main():
parser = argparse.ArgumentParser(description='Send test RTP packets')
parser.add_argument('--host', default='127.0.0.1', help='Target host')
parser.add_argument('--port', type=int, default=5004, help='Target port')
parser.add_argument('--pt', type=int, default=96, help='Payload type')
parser.add_argument('--count', type=int, default=20, help='Number of packets')
parser.add_argument('--interval-ms', type=float, default=50, help='Interval between packets')
args = parser.parse_args()
send_rtp_packets(
args.host,
args.port,
args.pt,
args.count,
args.interval_ms
)
if __name__ == '__main__':
main()