aboutsummaryrefslogtreecommitdiff
path: root/client.py
diff options
context:
space:
mode:
authorFranoosh <uinarf@autistici.org>2025-07-25 17:13:38 +0200
committerFranoosh <uinarf@autistici.org>2025-07-25 17:13:38 +0200
commit5bf209098ff501a67e380a025dd44985453ad63c (patch)
tree933b276a72ef308b2868f9d8a1f36bdcdeeaefb4 /client.py
downloadZeroMQ_Video_Streaming-5bf209098ff501a67e380a025dd44985453ad63c.tar.gz
ZeroMQ_Video_Streaming-5bf209098ff501a67e380a025dd44985453ad63c.tar.bz2
ZeroMQ_Video_Streaming-5bf209098ff501a67e380a025dd44985453ad63c.zip
Initial commit. Proof of concept message passing between client <-> router <-> worker with rudimentary cachingHEADmaster
Diffstat (limited to 'client.py')
-rw-r--r--client.py176
1 files changed, 176 insertions, 0 deletions
diff --git a/client.py b/client.py
new file mode 100644
index 0000000..e9dce4f
--- /dev/null
+++ b/client.py
@@ -0,0 +1,176 @@
+#!/usr/bin/env python
+
+"""
+A module containing client for streaming video to a zmq router.
+"""
+
+__author__ = "Franoosh Corporation"
+
+from os import path
+from threading import Thread, Event
+from queue import Queue
+import time
+import datetime
+import signal
+import logging
+from configparser import ConfigParser
+import traceback
+import zmq
+
+from helpers import CustomLoggingFormatter
+
+ROUTER_ADDRESS = "tcp://localhost"
+PORT = "5569"
+ADDRESS = f"{ROUTER_ADDRESS}:{PORT}"
+
+CONFIG_FILE = "client.cfg"
+LOGFILE = f"{path.splitext(path.basename(__file__))[0]}.log"
+LOGLEVEL = logging.DEBUG
+CAMERAS = {'front_camera': '/dev/video0', 'back_camera': '/dev/video1'}
+SLEEP_TIME = 5
+
+
+stop_event = Event()
+
+log_formatter = CustomLoggingFormatter()
+handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
+handler.setFormatter(log_formatter)
+logging.root.addHandler(handler)
+logging.root.setLevel(LOGLEVEL)
+logger = logging.getLogger(__name__)
+logging.basicConfig(
+ filename=LOGFILE,
+ datefmt='%Y-%m-%d %I:%M:%S',
+ level=LOGLEVEL,
+)
+
+def read_config(conf_file):
+ """Read config file and return as dictionary."""
+ cfg = ConfigParser()
+ cfg.read(conf_file)
+
+ return {key: dict(cfg.items(key)) for key in cfg.sections()}
+
+
+class ClientVideo(Thread):
+ """Class for sending video stream"""
+ def __init__(self, _id, device, queue):
+ super().__init__(daemon=True)
+ self.identity = _id
+ self.device = device
+ self.queue = queue
+ self.live = True
+
+ def run(self):
+ """Replace with actual video streaming logic."""
+ ping_no = 0
+ while self.live and not stop_event.is_set():
+ try:
+ # Four parts required to start/stop video stream, three parts for ping:
+ if ping_no == 0: # Start video stream
+ timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
+ metadata = f"start:{timestamp}"
+ logger.debug("ClientVideo '%s' sending metadata: '%s'", self.identity, metadata)
+ message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b'']
+ self.queue.put(message)
+ text = f"ping-{ping_no}"
+ message = [self.identity.encode('utf-8'), text.encode('utf-8')]
+ elif ping_no >= 5: # Stop video stream
+ logger.debug("ClientVideo '%s' sending stop signal", self.identity)
+ timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
+ metadata = f"stop:{timestamp}"
+ logger.debug("ClientVideo '%s' sending metadata: '%s' and ping: %r", self.identity, metadata, b'')
+ message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b'']
+ self.live = False
+ else: # Send ping
+ text = f"ping-{ping_no}"
+ message = [self.identity.encode('utf-8'), text.encode('utf-8')]
+ logger.debug("ClientVideo '%s' sending message: %r", self.identity, message)
+ self.queue.put(message)
+ ping_no += 1
+ except Exception as exc:
+ logger.error("ClientVideo: socket error: %r", exc)
+ time.sleep(SLEEP_TIME)
+
+ logger.info("ClientVideo '%s' closing socket ...", self.identity)
+
+ def stop(self):
+ logger.info("Client video '%s' exiting ...", self.identity)
+ self.live = False
+
+
+class ClientTask(Thread):
+ """Main Client Task with logic,
+ message handling and setup."""
+
+ def __init__(self, _id, context=None):
+ super().__init__(daemon=True)
+ self.identity = _id
+ self.context = context or zmq.Context.instance()
+ self.socket = self.context.socket(zmq.DEALER)
+ self.socket.identity = self.identity.encode("utf-8")
+ self.poll = zmq.Poller()
+ self.poll.register(self.socket, zmq.POLLIN)
+ self.video_threads = []
+ self.running = True
+ self.connected = False
+
+ def run(self):
+ self.socket.connect(ADDRESS)
+ logger.debug("Client '%s' connected to %s", self.identity, ADDRESS)
+ logger.debug("starting video threads ...")
+ q = Queue()
+ for _id, device in CAMERAS.items():
+ vid_cl = ClientVideo(_id, device, q)
+ vid_cl.start()
+ self.video_threads.append(vid_cl)
+ while self.running:
+ try:
+ sockets = dict(self.poll.poll(1000))
+ if self.socket in sockets:
+ try:
+ message = self.socket.recv_multipart()
+ logger.debug("Client '%s' received message: %r", self.identity, message)
+ except Exception:
+ logger.error("Failed to receive message: %r", traceback.format_exc())
+ if q.qsize() > 0:
+ frame = q.get()
+ if frame is not None:
+ logger.debug("Processing frame: %r", frame)
+ self.socket.send_multipart(frame)
+ except zmq.ZMQError as exc:
+ logger.error("Client '%s': socket error: %r", self.identity, exc)
+
+ logger.info("Closing socket ...")
+ self.socket.close()
+ logger.debug("Terminating context ...")
+ self.context.term()
+
+ def stop(self):
+ """Stop task"""
+ logger.info("ClientTask cleaning up ...")
+ for thread in self.video_threads:
+ logger.info("Cleaning u video thread ...")
+ thread.stop()
+ thread.join()
+ self.video_threads = []
+ logger.info("Client task exiting ...")
+ self.running = False
+
+
+def signal_handler(sig, frame):
+ logger.info("Received signal handler '%r', stopping client ...", sig)
+ client.stop()
+
+
+if __name__ == "__main__":
+
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+
+ logger.info("Starting up ...")
+ client = ClientTask("client_task")
+
+ client.start()
+
+ client.join()