From 5bf209098ff501a67e380a025dd44985453ad63c Mon Sep 17 00:00:00 2001 From: Franoosh Date: Fri, 25 Jul 2025 17:13:38 +0200 Subject: Initial commit. Proof of concept message passing between client <-> router <-> worker with rudimentary caching --- .gitignore | 2 + client.py | 176 +++++++++++++++++++++++++++++++++++++++++++++ helpers.py | 44 ++++++++++++ router.py | 207 +++++++++++++++++++++++++++++++++++++++++++++++++++++ worker.py | 235 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 664 insertions(+) create mode 100644 .gitignore create mode 100644 client.py create mode 100644 helpers.py create mode 100644 router.py create mode 100644 worker.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5b478e6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__/ +*.log diff --git a/client.py b/client.py new file mode 100644 index 0000000..e9dce4f --- /dev/null +++ b/client.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python + +""" +A module containing client for streaming video to a zmq router. +""" + +__author__ = "Franoosh Corporation" + +from os import path +from threading import Thread, Event +from queue import Queue +import time +import datetime +import signal +import logging +from configparser import ConfigParser +import traceback +import zmq + +from helpers import CustomLoggingFormatter + +ROUTER_ADDRESS = "tcp://localhost" +PORT = "5569" +ADDRESS = f"{ROUTER_ADDRESS}:{PORT}" + +CONFIG_FILE = "client.cfg" +LOGFILE = f"{path.splitext(path.basename(__file__))[0]}.log" +LOGLEVEL = logging.DEBUG +CAMERAS = {'front_camera': '/dev/video0', 'back_camera': '/dev/video1'} +SLEEP_TIME = 5 + + +stop_event = Event() + +log_formatter = CustomLoggingFormatter() +handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') +handler.setFormatter(log_formatter) +logging.root.addHandler(handler) +logging.root.setLevel(LOGLEVEL) +logger = logging.getLogger(__name__) +logging.basicConfig( + filename=LOGFILE, + datefmt='%Y-%m-%d %I:%M:%S', + level=LOGLEVEL, +) + +def read_config(conf_file): + """Read config file and return as dictionary.""" + cfg = ConfigParser() + cfg.read(conf_file) + + return {key: dict(cfg.items(key)) for key in cfg.sections()} + + +class ClientVideo(Thread): + """Class for sending video stream""" + def __init__(self, _id, device, queue): + super().__init__(daemon=True) + self.identity = _id + self.device = device + self.queue = queue + self.live = True + + def run(self): + """Replace with actual video streaming logic.""" + ping_no = 0 + while self.live and not stop_event.is_set(): + try: + # Four parts required to start/stop video stream, three parts for ping: + if ping_no == 0: # Start video stream + timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + metadata = f"start:{timestamp}" + logger.debug("ClientVideo '%s' sending metadata: '%s'", self.identity, metadata) + message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] + self.queue.put(message) + text = f"ping-{ping_no}" + message = [self.identity.encode('utf-8'), text.encode('utf-8')] + elif ping_no >= 5: # Stop video stream + logger.debug("ClientVideo '%s' sending stop signal", self.identity) + timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + metadata = f"stop:{timestamp}" + logger.debug("ClientVideo '%s' sending metadata: '%s' and ping: %r", self.identity, metadata, b'') + message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] + self.live = False + else: # Send ping + text = f"ping-{ping_no}" + message = [self.identity.encode('utf-8'), text.encode('utf-8')] + logger.debug("ClientVideo '%s' sending message: %r", self.identity, message) + self.queue.put(message) + ping_no += 1 + except Exception as exc: + logger.error("ClientVideo: socket error: %r", exc) + time.sleep(SLEEP_TIME) + + logger.info("ClientVideo '%s' closing socket ...", self.identity) + + def stop(self): + logger.info("Client video '%s' exiting ...", self.identity) + self.live = False + + +class ClientTask(Thread): + """Main Client Task with logic, + message handling and setup.""" + + def __init__(self, _id, context=None): + super().__init__(daemon=True) + self.identity = _id + self.context = context or zmq.Context.instance() + self.socket = self.context.socket(zmq.DEALER) + self.socket.identity = self.identity.encode("utf-8") + self.poll = zmq.Poller() + self.poll.register(self.socket, zmq.POLLIN) + self.video_threads = [] + self.running = True + self.connected = False + + def run(self): + self.socket.connect(ADDRESS) + logger.debug("Client '%s' connected to %s", self.identity, ADDRESS) + logger.debug("starting video threads ...") + q = Queue() + for _id, device in CAMERAS.items(): + vid_cl = ClientVideo(_id, device, q) + vid_cl.start() + self.video_threads.append(vid_cl) + while self.running: + try: + sockets = dict(self.poll.poll(1000)) + if self.socket in sockets: + try: + message = self.socket.recv_multipart() + logger.debug("Client '%s' received message: %r", self.identity, message) + except Exception: + logger.error("Failed to receive message: %r", traceback.format_exc()) + if q.qsize() > 0: + frame = q.get() + if frame is not None: + logger.debug("Processing frame: %r", frame) + self.socket.send_multipart(frame) + except zmq.ZMQError as exc: + logger.error("Client '%s': socket error: %r", self.identity, exc) + + logger.info("Closing socket ...") + self.socket.close() + logger.debug("Terminating context ...") + self.context.term() + + def stop(self): + """Stop task""" + logger.info("ClientTask cleaning up ...") + for thread in self.video_threads: + logger.info("Cleaning u video thread ...") + thread.stop() + thread.join() + self.video_threads = [] + logger.info("Client task exiting ...") + self.running = False + + +def signal_handler(sig, frame): + logger.info("Received signal handler '%r', stopping client ...", sig) + client.stop() + + +if __name__ == "__main__": + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + logger.info("Starting up ...") + client = ClientTask("client_task") + + client.start() + + client.join() diff --git a/helpers.py b/helpers.py new file mode 100644 index 0000000..409a253 --- /dev/null +++ b/helpers.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python + +""" +Helper functions for zmq video messaging. +""" + +__author__ = "Franoosh Corporation" + + +import logging + + +class CustomLoggingFormatter(logging.Formatter): + """Custom logging formatter""" + debug_fmt = 'DEBUG: %(filename)s:%(lineno)d %(asctime)s %(message)s' + info_fmt = 'INFO: %(asctime)s %(message)s' + warning_fmt = 'WARNING: %(asctime)s %(message)s' + error_fmt = 'ERROR: %(asctime)s %(message)s' + critical_fmt = 'CRITICAL: %(asctime)s %(message)s' + + def __init__(self): + super().__init__( + fmt="%(levelno)d: %s(asctime)s %(message)s", + datefmt=None, + ) + + def format(self, record): + orig_fmt = self._style._fmt + if record.levelno == logging.DEBUG: + self._style._fmt = CustomLoggingFormatter.debug_fmt + elif record.levelno == logging.INFO: + self._style._fmt = CustomLoggingFormatter.info_fmt + elif record.levelno == logging.WARNING: + self._style._fmt = CustomLoggingFormatter.warning_fmt + elif record.levelno == logging.ERROR: + self._style._fmt = CustomLoggingFormatter.error_fmt + elif record.levelno == logging.CRITICAL: + self._style._fmt = CustomLoggingFormatter.critical_fmt + + result = logging.Formatter.format(self, record) + self._style._fmt = orig_fmt + + return result + diff --git a/router.py b/router.py new file mode 100644 index 0000000..61b2f72 --- /dev/null +++ b/router.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python + +""" +A module containing zeromq router for video streaming +from a client and sending control messages from server. +""" + +__author__ = "Franoosh Corporation" + +import os +import signal +import logging +import random +from collections import defaultdict, deque +import zmq + +from helpers import CustomLoggingFormatter + +# TODO: add configparser + +ADDR_FRONTEND = 'tcp://localhost' +ADDR_BACKEND = 'tcp://localhost' + +PORT_FRONTEND = "5569" +PORT_BACKEND = "9979" + +INITIAL_FRONTEND_ADDR = f"{ADDR_FRONTEND}:{PORT_FRONTEND}" +INITIAL_BACKEND_ADDR = f"{ADDR_BACKEND}:{PORT_BACKEND}" + +LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log" +LOGLEVEL = logging.DEBUG + +log_formatter = CustomLoggingFormatter() +handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') +handler.setFormatter(log_formatter) +logging.root.addHandler(handler) +logging.root.setLevel(LOGLEVEL) +logger = logging.getLogger(__name__) +logging.basicConfig( + filename=LOGFILE, + datefmt='%Y-%m-%d %I:%M:%S', + level=LOGLEVEL, +) + + +RUNNING = True + +def custom_proxy(): + context = zmq.Context.instance() + poller = zmq.Poller() + + # Set up frontend: + frontend_socket = context.socket(zmq.ROUTER) + frontend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) + frontend_socket.bind(INITIAL_FRONTEND_ADDR) + poller.register(frontend_socket, zmq.POLLIN) + + # Set up backend: + backend_socket = context.socket(zmq.ROUTER) + backend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) + backend_socket.bind(INITIAL_BACKEND_ADDR) + poller.register(backend_socket, zmq.POLLIN) + + awailable_workers = [] + client_worker_dict = {} + # There are three pending messages dictionaries: + # 1. pending_messages_no_worker[client_id] - messages that are pending for a client + # 2. pending_messages_client_worker[client_id][worker_id] - messages that are pending for a client with a specific worker + # 3. pending_messages_worker_client[worker_id][client_id] - messages that are pending for a worker + # To assure delivey of the entire video we need to store messages from start to end until they are delivered + # to the client. + # If a worker is not available for certain amount of time, we will reassign it to another worker. + # For now, there is only one worker. Remember to implement this later. + # Complex case: half delivered video to the worker, worker becomes unavailable and we need to reassign it to another worker. + + pending_messages_no_worker = defaultdict(deque) # Messages that are pending for a client with no worker assigned + pending_messages_client_worker = defaultdict(lambda: defaultdict(deque)) + pending_messages_worker_client = defaultdict(lambda: defaultdict(deque)) + + while RUNNING: + events = dict(poller.poll(1000)) # Poll both frontend and backend sockets: + if frontend_socket in events: # If message from client: + msg = frontend_socket.recv_multipart() # Receive message from client: + logger.debug("Received message: %r", msg) + client_id, content = msg[0], msg[1:] + if not client_id in client_worker_dict: # If client is not in client-worker dictionary: + logger.info("Received ID from client: %r.", client_id) + # Check if client is already connected to worker: + try: + worker_id = client_worker_dict[client_id] # Get worker ID for this client from the client-worker dictionary + while pending_messages_client_worker[client_id][worker_id]: # Check if there are any pending messages for this worker and send them first: + logger.debug("There are pending messages from client %r for worker %r", client_id, worker_id) + pending_msg = pending_messages_client_worker[client_id][worker_id].pop() + try: + logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id) + backend_socket.send_multipart([worker_id, client_id, *pending_msg]) + except Exception as e: + logger.error("Failed to send pending message: %r to worker: %r, error: %r", pending_msg, worker_id, e) + pending_messages_client_worker[client_id][worker_id].append(pending_msg) # Re-queue the message + break # Break to avoid infinite loop + if pending_messages_client_worker[client_id][worker_id]: # If there are still pending messages for this worker: + pending_messages_client_worker[client_id][worker_id].appendleft(content) # Store message for later delivery + else: + try: + logger.debug("No more pending messages for client %r, sending new message to worker %r: %r", client_id, worker_id, msg) + backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker: + except Exception as e: + logger.error("Failed to send message to backend: %r, error: %s", msg, e) + pending_messages_client_worker[client_id][worker_id].appendleft(msg) # Store message for later delivery + except KeyError: # If client is not connected to any worker: + logger.debug("Client '%s' is not connected to any worker, checking available workers", client_id) + if awailable_workers: # If there are available workers: + worker_id = random.choice(awailable_workers) # Assign random worker to client + client_worker_dict[client_id] = worker_id # Update client-worker dictionary + if pending_messages_no_worker[client_id]: # Check if there are any pending messages for this client: + pending_messages_client_worker[client_id][worker_id].extendleft(msg) # Move messages to pending messages for this worker + logger.debug("Moved pending messages for client '%s' to worker '%s'", client_id, worker_id) + while pending_messages_client_worker[client_id][worker_id]: # Check if there are any pending messages for this worker: + pending_msg = pending_messages_client_worker[client_id][worker_id].pop() + try: + logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id) + backend_socket.send_multipart([worker_id, client_id, *pending_msg]) + except Exception as e: + logger.error("Failed to send pending message: %r to worker: %r. Error: %s", pending_msg, worker_id, e) + pending_messages_client_worker[client_id][worker_id].append(pending_msg) # Re-queue the message + break + if pending_messages_client_worker[client_id][worker_id]: # If there are still pending messages for this worker: + logger.debug("There are still pending messages for client '%s' and worker '%s', storing message for later delivery: %r", client_id, worker_id, content) + pending_messages_client_worker[client_id][worker_id].appendleft(content) # Store message for later delivery + else: + logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg) + try: + logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg) + backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker: + except Exception as e: + logger.error("Failed to send message %r for client %r to backend, requeing. Error: %s", msg, client_id, e) + pending_messages_client_worker[client_id][worker_id].appendleft(msg) # Store message for later delivery + + else: # If no workers are available, assign a new worker: + pending_messages_no_worker[client_id].append(content) # Store message for later delivery + logger.debug("No available workers, storing client '%s' with no worker assigned", client_id) + + if backend_socket in events: + _msg = backend_socket.recv_multipart() + logger.debug("Received from worker: %r", _msg) + if len(_msg) == 2: # Worker is sending its identity: + worker_id, msg = _msg[0], _msg[1:] + logger.debug("Received ID from worker: %r, message: %r", worker_id, msg) + logger.info("Received ID from worker: %r.", worker_id) + awailable_workers.append(worker_id) # Add worker to available workers list + for client, worker in client_worker_dict.items(): + if worker is None: # If client has no worker assigned: + client_worker_dict[client] = worker + logger.debug("Assigning worker %r to client %r", worker, client) + if pending_messages_no_worker: + # If there are pending messages for clients with no worker assigned: + for client_id, messages in pending_messages_no_worker.items(): + if messages: + pending_messages_client_worker[client_id][worker_id].extendleft(messages) # Move messages to pending messages for this worker + pending_messages_no_worker[client_id].clear() # Clear pending messages for this client + logger.debug("Moved pending messages for client %r to worker %r", client_id, worker_id) + try: + pending_msg = pending_messages_client_worker[client_id][worker_id].pop() # Get the first message for this client and worker + logger.debug( + "Sending pending message %r to worker %r for client '%r'", pending_msg, worker_id, client_id) + backend_socket.send_multipart([worker_id, client_id, *pending_msg]) # Send the message to worker + except Exception as e: + pending_messages_client_worker[client_id][worker_id].append(pending_msg) + logger.error("Failed to send pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e) + + else: # Worker is sending a message to client: + worker_id, client_id, msg = _msg[0], _msg[1], _msg[2:] + logger.debug("Received message from worker '%s' for client '%s': %r", worker_id, client_id, msg) + # First check if there are any pending messages for this worker: + while pending_messages_worker_client[worker_id][client_id]: # Check if there are any pending messages for this worker: + pending_msg = pending_messages_worker_client[worker_id][client_id].pop() + try: + logger.debug("Sending pending message %r from %r to client '%r'", pending_msg, client_id, worker_id) + backend_socket.send_multipart([client_id, *pending_msg]) + except Exception as e: + # It is safe to log the message content as it is a short string or bytes. + logger.error("Could not deliver pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e) + pending_messages_worker_client[worker_id][client_id].append(pending_msg) # Re-queue the message + break # Break to avoid infinite loop + # Now send the message to the client if no pending messages for this client: + if not pending_messages_worker_client[worker_id][client_id]: # If there are no pending messages for this worker: + logger.debug("Sending message %r to client '%s'", msg, client_id) + try: + frontend_socket.send_multipart([client_id, *msg]) + logger.debug("Sent message %r to client '%r'", msg, client_id) + except Exception as e: + pending_messages_worker_client[worker_id][client_id].appendleft(msg) + logger.error("Could not deliver message: %r from worker: %r to client: '%r'", msg, worker_id, client_id) + +def signal_handler(sig, frame): + global RUNNING + logger.info("Received signal handler %r, stopping ...", signal.Signals(sig).name) + RUNNING = False + + +if __name__ == "__main__": + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + logger.info("Starting up ...") + + custom_proxy() diff --git a/worker.py b/worker.py new file mode 100644 index 0000000..ad5fe1e --- /dev/null +++ b/worker.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python + +__author__ = "Franoosh Corporation" + +""" +A module consisting of a zeromq worker receiving video stream +via router from clients. Able to send control messages to clients. +""" + +import os +from threading import Thread, Event +import time +import signal +import logging +from queue import Queue +from collections import defaultdict +import zmq + +from helpers import CustomLoggingFormatter + + +ADDR = "tcp://localhost" +PORT = "9979" +INITIAL_BACKEND_ADDR = f"{ADDR}:{PORT}" +LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log" +LOGLEVEL = logging.DEBUG + +stop_event = Event() + +log_formatter = CustomLoggingFormatter() +handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') +handler.setFormatter(log_formatter) +logging.root.addHandler(handler) +logging.root.setLevel(LOGLEVEL) +logger = logging.getLogger(__name__) +logging.basicConfig( + filename=LOGFILE, + datefmt='%Y-%m-%d %I:%M:%S', + level=LOGLEVEL, +) + +class MonitorTask(Thread): + """Monitor task""" + def __init__(self, socket): + super().__init__(daemon=True) + self.socket = socket + self.running = True + + def run(self): + """Monitor connection on the initial socket. + This is heartbeat monitoring""" + monitor = self.socket.get_monitor_socket() + monitor.setsockopt(zmq.RCVTIMEO, 5000) + logger.debug("Monitor socket started.") + while self.running: + try: + event, _ = monitor.recv_multipart() + # Resolve the received event type: + event_type = int.from_bytes(event[:2], "little") + if event_type in (zmq.EVENT_CLOSED, zmq.EVENT_DISCONNECTED): + logger.warning("Monitor socket: closed | disconnected") + stop_event.set() + elif event_type == zmq.EVENT_CONNECT_DELAYED: + logger.debug("Monitor socket: event connect delayed") + elif event_type == zmq.EVENT_CONNECT_RETRIED: + logger.debug("Monitor socket: event connect retried") + elif event_type in (zmq.EVENT_CONNECTED, zmq.EVENT_HANDSHAKE_SUCCEEDED): + logger.debug("Monitor socket: client connected to router, handshake OK.") + stop_event.clear() + else: + logger.warning("Monitor socket: other event: '%r'", event_type) + except zmq.Again: + logger.debug("Timeout on monitoring socket.") + except Exception as exc: # W: Catching too general exception Exception + logger.error("Other exception on monitoring socket: %r", exc) + + # monitor.close() + + def stop(self): + """Stop thread""" + logger.info("Stopping monitor thread ...") + self.running = False + + +class ServerWorker(Thread): + """ServerWorker""" + def __init__(self, identity, context=None): + super().__init__(daemon=True) + self.id = identity + self.context = context or zmq.Context.instance() + self.socket = self.context.socket(zmq.DEALER) + self.socket.identity = self.id.encode("utf-8") + self.monitor = MonitorTask(self.socket) + self.video_threads = defaultdict(lambda: defaultdict(dict)) + self.connected = False + self.running = True + + def run(self): + logger.debug("ServerWorker '%s' starting ...", self.id) + self.socket.connect(INITIAL_BACKEND_ADDR) + self.poller = zmq.Poller() + self.poller.register(self.socket, zmq.POLLIN) + self.monitor.start() + + while self.running: + logger.debug("ServerWorker '%s' waiting for a message ...", self.id) + if not self.connected or stop_event.is_set(): + self.socket.send_multipart([b"READY"]) # Router needs worker identity, hence this + time.sleep(1) # Wait a bit before trying to connect again + + sockets = dict(self.poller.poll(1000)) + if self.socket in sockets: + self.connected = True + msg = self.socket.recv_multipart() + logger.debug("ServerWorker '%s' received message: %r", self.id, msg) + filename = None + # At the moment we don't expect any other message than a video message: + if len(msg) == 4: # This is a message with start/stop directive and no video data. + logger.debug("Received start/stop directive: %r", msg) + client_id, camera_id, metadata = msg[0], msg[1], msg[2] + try: + directive, timestamp = metadata.decode("utf-8").split(":") + filename = f"{client_id.decode('utf-8')}_{camera_id.decode('utf-8')}-{timestamp}.mp4" + logger.debug("Directive: '%s', Timestamp: '%s', Filename: '%s'", directive, timestamp, filename) + except ValueError: + logger.error("Invalid metadata format: %r", metadata) + continue + if directive == "start": + if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id): # New client or new camera + q = Queue() + logger.debug("Starting new video thread for client '%s', camera '%s'", client_id, camera_id) + video_worker = VideoWorker(filename, q) + video_worker.start() + self.video_threads[client_id][camera_id] = video_worker + elif directive == "stop": + if client_id in self.video_threads and camera_id in self.video_threads[client_id]: + logger.debug("Stopping video thread for client '%s', camera '%s'", client_id, camera_id) + self.video_threads[client_id][camera_id].queue.put(None) # Sentinel to stop the thread + del self.video_threads[client_id][camera_id] + logger.info("Stopped video thread for client '%s', camera '%s'", client_id, camera_id) + else: + logger.error("Unknown directive: '%s'", directive) + elif len(msg) == 3: # This is a video message with data only + logger.debug("Received video message with data only: %r", msg) + client_id, camera_id, msg = msg[0], msg[1], msg[2] + if client_id in self.video_threads and camera_id in self.video_threads[client_id]: + self.video_threads[client_id][camera_id].queue.put(msg) + else: + logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id) + logger.error("Available video threads keys: %r", self.video_threads.keys()) + logger.error("Available video threads values: %r", self.video_threads.values()) + elif len(msg) == 2: # This is a ping message from client + logger.debug("Received ping message: %r", msg) + client_id, camera_id = msg[0], msg[1] + if client_id in self.video_threads and camera_id in self.video_threads[client_id]: + # Respond with a pong message + try: + self.socket.send_multipart([client_id, camera_id, b'pong']) + logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id) + except zmq.ZMQError as exc: + logger.error("Failed to send pong: %r", exc) + + else: + logger.warning("Received a message of not expected length from client: %r message: %r", client_id, msg) + try: + self.socket.send_multipart([client_id, camera_id, b'pong']) + logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id) + except zmq.ZMQError as exc: + logger.error("Failed to send pong: %r", exc) + else: + logger.debug("No message received, polling again ...") + time.sleep(5) + + self.monitor.stop() + self.monitor.join() + for camera_thread in self.video_threads.values(): + for thread in camera_thread.values(): + thread.queue.put(None) # Sentinel to unblock queue.get() + thread.stop() + thread.join() + + def send_control_message(self, client_id, camera_id, message): + """Send control message to a specific client and camera.""" + if client_id in self.video_threads and camera_id in self.video_threads[client_id]: + self.socket.send_multipart([client_id.encode("utf-8"), camera_id.encode("utf-8"), message]) + logger.debug("Sent control message to client '%s', camera '%s': %r", client_id, camera_id, message) + else: + logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id) + + def stop(self): + logger.info("ServerWorker '%s' exiting ...", self.id) + self.running = False + + +class VideoWorker(Thread): + """Class for video threads.""" + + def __init__(self, filename, queue): + super().__init__(daemon=True) + self.filename = filename + self.queue = queue + self.live = True + + def run(self): + if os.path.exists(self.filename): + logger.warning("File '%s' already exists, overwriting ...", self.filename) + with open(self.filename, 'wb') as f: + logger.info("VideoWorker started writing to file: %s", self.filename) + while self.live: + # Simulate video processing + frame = self.queue.get() # Get frame from queue + logger.debug("Processing ('writing to a file') frame for camera: %r", frame) + if frame is None: + logger.info("VideoWorker received stop signal, exiting ...") + break + f.write(frame + b'\n') # Write frame to file + + + def stop(self): + logger.info("VideoWorker '' exiting ...") + self.live = False + +def signal_handler(sig, frame): + worker.stop() + +if __name__ == "__main__": + logger.info("Starting up ...") + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + worker = ServerWorker("worker-task") + worker.start() + + worker.join() -- cgit v1.2.3-65-gdbad