#!/usr/bin/env python3 """Simple RTP test sender for validating rtp_receiver_tester.""" import socket import struct import sys import time import argparse def create_rtp_packet(sequence: int, timestamp: int, payload_type: int, payload: bytes) -> bytes: """Create an RTP packet (RFC3550).""" # RTP header format: # Byte 0: V(2) P(1) X(1) CC(4) # Byte 1: M(1) PT(7) # Bytes 2-3: Sequence # Bytes 4-7: Timestamp # Bytes 8-11: SSRC version = 2 padding = 0 extension = 0 csrc_count = 0 marker = 0 byte0 = (version << 6) | (padding << 5) | (extension << 4) | csrc_count byte1 = (marker << 7) | payload_type ssrc = 0x12345678 header = struct.pack('!BBHII', byte0, byte1, sequence, timestamp, ssrc) return header + payload def send_rtp_packets(target_host: str, target_port: int, payload_type: int, packet_count: int, interval_ms: float): """Send RTP packets to target.""" sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: for i in range(packet_count): # Create dummy payload (could be NAL unit for video) payload = bytes([0x00, 0x00, 0x00, 0x01, 0x09, 0x10]) # Simple dummy data packet = create_rtp_packet( sequence=i % 65536, timestamp=i * 3000, # 3000 ticks per packet (90kHz / 30fps) payload_type=payload_type, payload=payload ) sock.sendto(packet, (target_host, target_port)) if interval_ms > 0: time.sleep(interval_ms / 1000.0) print(f"Sent {packet_count} RTP packets with PT={payload_type}") finally: sock.close() def main(): parser = argparse.ArgumentParser(description='Send test RTP packets') parser.add_argument('--host', default='127.0.0.1', help='Target host') parser.add_argument('--port', type=int, default=5004, help='Target port') parser.add_argument('--pt', type=int, default=96, help='Payload type') parser.add_argument('--count', type=int, default=20, help='Number of packets') parser.add_argument('--interval-ms', type=float, default=50, help='Interval between packets') args = parser.parse_args() send_rtp_packets( args.host, args.port, args.pt, args.count, args.interval_ms ) if __name__ == '__main__': main()