133 lines
4.0 KiB
Python
133 lines
4.0 KiB
Python
from datetime import datetime
|
|
from os import PathLike
|
|
from pathlib import Path
|
|
import signal
|
|
from subprocess import Popen, TimeoutExpired
|
|
from typing import Any, Literal
|
|
from loguru import logger
|
|
import click
|
|
import loguru
|
|
|
|
# pacman -S python-loguru
|
|
# pacman -S python-click
|
|
|
|
Mode = Literal["preview", "save", "save_preview"]
|
|
MODE_LIST: list[Mode] = ["preview", "save", "save_preview"]
|
|
MULTICAST_ADDR = "224.0.0.123"
|
|
|
|
|
|
class DumpCommand:
|
|
port: int
|
|
output_path: str
|
|
|
|
def __init__(self, port: int, output_path: PathLike | str):
|
|
self.port = port
|
|
self.output_path = str(output_path)
|
|
|
|
def save_and_decode_nv_pipeline(self):
|
|
# note that capabilties SHOULD NOT have spaces in between
|
|
# `gst-launch-1.0` could tolerate that, but not the API itself
|
|
return f"""gst-launch-1.0 -e udpsrc port={self.port} \
|
|
! 'application/x-rtp,encoding-name=H265,payload=96' \
|
|
! rtph265depay \
|
|
! h265parse \
|
|
! tee name=t \
|
|
t. ! queue ! nvh265dec ! videoconvert ! autovideosink \
|
|
t. ! queue ! mp4mux ! filesink location={self.output_path}
|
|
"""
|
|
|
|
def save_and_decode_nv_pipeline_multicast(self):
|
|
return f"""gst-launch-1.0 -e udpsrc port={self.port} \
|
|
auto-multicast=true \
|
|
multicast-group={MULTICAST_ADDR} \
|
|
! 'application/x-rtp,encoding-name=H265,payload=96' \
|
|
! rtph265depay \
|
|
! h265parse \
|
|
! tee name=t \
|
|
t. ! queue ! vtdec_hw ! videoconvert ! autovideosink \
|
|
t. ! queue ! mp4mux ! filesink location={self.output_path}
|
|
"""
|
|
# `vtdec_hw` for macos
|
|
# `nvh265dec` for nv
|
|
|
|
def save_pipeline(self):
|
|
return f"""gst-launch-1.0 -e udpsrc port={self.port} \
|
|
! 'application/x-rtp, encoding-name=H265, payload=96' \
|
|
! rtph265depay \
|
|
! queue ! h265parse ! mp4mux ! filesink location={self.output_path}
|
|
"""
|
|
|
|
def decode_cv_only(self):
|
|
return f"""gst-launch-1.0 -e udpsrc port={self.port} \
|
|
! 'application/x-rtp,encoding-name=H265,payload=96' \
|
|
! rtph265depay \
|
|
! h265parse \
|
|
! nvh265dec \
|
|
! videoconvert \
|
|
! autovideosink
|
|
"""
|
|
|
|
def get_pipeline_from_mode(self, mode: Mode):
|
|
if mode == "save":
|
|
return self.save_pipeline()
|
|
elif mode == "save_preview":
|
|
return self.save_and_decode_nv_pipeline_multicast()
|
|
elif mode == "preview":
|
|
return self.decode_cv_only()
|
|
raise ValueError(f"Unknown mode: {mode}")
|
|
|
|
|
|
def test_filename(
|
|
port: int,
|
|
output_dir: PathLike | str,
|
|
date: datetime,
|
|
prefix="video_",
|
|
suffix=".mp4",
|
|
):
|
|
date_str = date.strftime("%Y-%m-%d_%H-%M-%S")
|
|
assert suffix.startswith("."), "suffix should start with a dot"
|
|
file_name = f"{prefix}{date_str}_{port}{suffix}"
|
|
return Path(output_dir) / file_name
|
|
|
|
|
|
# nmap -sS --open -p 22 192.168.2.0/24
|
|
|
|
|
|
@click.command()
|
|
@click.option("-o", "--output", type=click.Path(exists=True), default="output")
|
|
@click.option("-m", "--mode", type=click.Choice(MODE_LIST), default="save_preview")
|
|
def main(output: str, mode: Mode):
|
|
ports = [5601, 5602, 5603, 5604, 5605, 5606]
|
|
output_dir = Path(output)
|
|
now = datetime.now()
|
|
commands = [
|
|
DumpCommand(port, test_filename(port, output_dir, now)) for port in ports
|
|
]
|
|
ps: list[Popen] = []
|
|
run_flag: bool = True
|
|
|
|
def handle_sigint(signum: int, frame: Any):
|
|
nonlocal run_flag
|
|
run_flag = False
|
|
logger.info("Received SIGINT, stopping all processes")
|
|
|
|
for command in commands:
|
|
p = Popen(command.get_pipeline_from_mode(mode), shell=True)
|
|
ps.append(p)
|
|
|
|
signal.signal(signal.SIGINT, handle_sigint)
|
|
while run_flag:
|
|
pass
|
|
|
|
for p in ps:
|
|
p.send_signal(signal.SIGINT)
|
|
for p in ps:
|
|
try:
|
|
p.wait(3)
|
|
except TimeoutExpired:
|
|
logger.warning("Command `{}` timeout", p.args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() # pylint: disable=no-value-for-parameter
|