diff options
author | Franoosh <uinarf@autistici.org> | 2025-07-25 17:13:38 +0200 |
---|---|---|
committer | Franoosh <uinarf@autistici.org> | 2025-07-25 17:13:38 +0200 |
commit | 5bf209098ff501a67e380a025dd44985453ad63c (patch) | |
tree | 933b276a72ef308b2868f9d8a1f36bdcdeeaefb4 /client.py | |
download | ZeroMQ_Video_Streaming-master.tar.gz ZeroMQ_Video_Streaming-master.tar.bz2 ZeroMQ_Video_Streaming-master.zip |
Initial commit. Proof of concept message passing between client <-> router <-> worker with rudimentary cachingHEADmaster
Diffstat (limited to 'client.py')
-rw-r--r-- | client.py | 176 |
1 files changed, 176 insertions, 0 deletions
diff --git a/client.py b/client.py new file mode 100644 index 0000000..e9dce4f --- /dev/null +++ b/client.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python + +""" +A module containing client for streaming video to a zmq router. +""" + +__author__ = "Franoosh Corporation" + +from os import path +from threading import Thread, Event +from queue import Queue +import time +import datetime +import signal +import logging +from configparser import ConfigParser +import traceback +import zmq + +from helpers import CustomLoggingFormatter + +ROUTER_ADDRESS = "tcp://localhost" +PORT = "5569" +ADDRESS = f"{ROUTER_ADDRESS}:{PORT}" + +CONFIG_FILE = "client.cfg" +LOGFILE = f"{path.splitext(path.basename(__file__))[0]}.log" +LOGLEVEL = logging.DEBUG +CAMERAS = {'front_camera': '/dev/video0', 'back_camera': '/dev/video1'} +SLEEP_TIME = 5 + + +stop_event = Event() + +log_formatter = CustomLoggingFormatter() +handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') +handler.setFormatter(log_formatter) +logging.root.addHandler(handler) +logging.root.setLevel(LOGLEVEL) +logger = logging.getLogger(__name__) +logging.basicConfig( + filename=LOGFILE, + datefmt='%Y-%m-%d %I:%M:%S', + level=LOGLEVEL, +) + +def read_config(conf_file): + """Read config file and return as dictionary.""" + cfg = ConfigParser() + cfg.read(conf_file) + + return {key: dict(cfg.items(key)) for key in cfg.sections()} + + +class ClientVideo(Thread): + """Class for sending video stream""" + def __init__(self, _id, device, queue): + super().__init__(daemon=True) + self.identity = _id + self.device = device + self.queue = queue + self.live = True + + def run(self): + """Replace with actual video streaming logic.""" + ping_no = 0 + while self.live and not stop_event.is_set(): + try: + # Four parts required to start/stop video stream, three parts for ping: + if ping_no == 0: # Start video stream + timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + metadata = f"start:{timestamp}" + logger.debug("ClientVideo '%s' sending metadata: '%s'", self.identity, metadata) + message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] + self.queue.put(message) + text = f"ping-{ping_no}" + message = [self.identity.encode('utf-8'), text.encode('utf-8')] + elif ping_no >= 5: # Stop video stream + logger.debug("ClientVideo '%s' sending stop signal", self.identity) + timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + metadata = f"stop:{timestamp}" + logger.debug("ClientVideo '%s' sending metadata: '%s' and ping: %r", self.identity, metadata, b'') + message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] + self.live = False + else: # Send ping + text = f"ping-{ping_no}" + message = [self.identity.encode('utf-8'), text.encode('utf-8')] + logger.debug("ClientVideo '%s' sending message: %r", self.identity, message) + self.queue.put(message) + ping_no += 1 + except Exception as exc: + logger.error("ClientVideo: socket error: %r", exc) + time.sleep(SLEEP_TIME) + + logger.info("ClientVideo '%s' closing socket ...", self.identity) + + def stop(self): + logger.info("Client video '%s' exiting ...", self.identity) + self.live = False + + +class ClientTask(Thread): + """Main Client Task with logic, + message handling and setup.""" + + def __init__(self, _id, context=None): + super().__init__(daemon=True) + self.identity = _id + self.context = context or zmq.Context.instance() + self.socket = self.context.socket(zmq.DEALER) + self.socket.identity = self.identity.encode("utf-8") + self.poll = zmq.Poller() + self.poll.register(self.socket, zmq.POLLIN) + self.video_threads = [] + self.running = True + self.connected = False + + def run(self): + self.socket.connect(ADDRESS) + logger.debug("Client '%s' connected to %s", self.identity, ADDRESS) + logger.debug("starting video threads ...") + q = Queue() + for _id, device in CAMERAS.items(): + vid_cl = ClientVideo(_id, device, q) + vid_cl.start() + self.video_threads.append(vid_cl) + while self.running: + try: + sockets = dict(self.poll.poll(1000)) + if self.socket in sockets: + try: + message = self.socket.recv_multipart() + logger.debug("Client '%s' received message: %r", self.identity, message) + except Exception: + logger.error("Failed to receive message: %r", traceback.format_exc()) + if q.qsize() > 0: + frame = q.get() + if frame is not None: + logger.debug("Processing frame: %r", frame) + self.socket.send_multipart(frame) + except zmq.ZMQError as exc: + logger.error("Client '%s': socket error: %r", self.identity, exc) + + logger.info("Closing socket ...") + self.socket.close() + logger.debug("Terminating context ...") + self.context.term() + + def stop(self): + """Stop task""" + logger.info("ClientTask cleaning up ...") + for thread in self.video_threads: + logger.info("Cleaning u video thread ...") + thread.stop() + thread.join() + self.video_threads = [] + logger.info("Client task exiting ...") + self.running = False + + +def signal_handler(sig, frame): + logger.info("Received signal handler '%r', stopping client ...", sig) + client.stop() + + +if __name__ == "__main__": + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + logger.info("Starting up ...") + client = ClientTask("client_task") + + client.start() + + client.join() |