diff options
| author | Franoosh <uinarf@autistici.org> | 2026-01-09 15:14:04 +0100 |
|---|---|---|
| committer | Franoosh <uinarf@autistici.org> | 2026-01-09 15:14:04 +0100 |
| commit | 9b506a7094c6e09097e4c2d16f12c81706cb0644 (patch) | |
| tree | 7e0215acfaff5192a3c7449a1f0ea44dc444bb56 | |
| parent | ef10ae72ec6b4f9fedace24edf0f931f7077e628 (diff) | |
| download | ZeroMQ_Video_Streaming-9b506a7094c6e09097e4c2d16f12c81706cb0644.tar.gz ZeroMQ_Video_Streaming-9b506a7094c6e09097e4c2d16f12c81706cb0644.tar.bz2 ZeroMQ_Video_Streaming-9b506a7094c6e09097e4c2d16f12c81706cb0644.zip | |
Minor fixes and refactoring in worker.py
| -rw-r--r-- | worker.py | 95 |
1 files changed, 51 insertions, 44 deletions
@@ -5,6 +5,8 @@ via router from clients. Able to send control messages to clients. Needs libzmq and pyzmq with 'drafts' support. """ +__author__ = "Franoosh Corporation" + import os import sys import struct @@ -27,13 +29,11 @@ from helpers import ( TIME_FORMAT_STRING, ) -__author__ = "Franoosh Corporation" # Various constants: # TODO: put them in a config HOST = "127.0.0.1" ZMQPORT = "9979" -# WSPORT = "8000" WSPORT = "8008" ZMQ_BACKEND_ADDR = f"tcp://{HOST}:{ZMQPORT}" WEB_BACKEND_ADDR = f"tcp://{HOST}:{WSPORT}" @@ -92,7 +92,7 @@ class MonitorTask(Thread): ---------- socket : zmq.Socket The zmq socket to monitor. - + Methods ------- run() @@ -192,7 +192,7 @@ class ServerWorker(Thread): def __init__(self, identity, context=None) -> None: """ Docstring for __init__ - + Parameters ---------- identity : bytes @@ -211,7 +211,7 @@ class ServerWorker(Thread): self.connected = False self.running = True - def start_client(self, client_id, camera_id, filename) -> None: + def start_client(self, client_id: bytes, camera_id: bytes, timestamp: bytes) -> None: """ Start a video thread for new client_id and camera_id. @@ -221,14 +221,23 @@ class ServerWorker(Thread): Client ID camera_id : bytes Camera ID - filename : str - Filename to write video to + timestamp : bytes + Timestamp of the action Returns ------- None """ - # New client or new camera for existing client: + update_clients(client_id, camera_id) + + timestamp_string = datetime.datetime.fromtimestamp( + struct.unpack('d', timestamp)[0] + ).strftime(TIME_FORMAT_STRING) + logger.debug("Timestamp string: %r", timestamp_string) + + camera_id_string = camera_id.decode('utf-8') + filename = f"{camera_id_string}-{timestamp_string}.mp4" + if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id): q = Queue() logger.debug("Starting new video thread for client %r, camera %r", client_id, camera_id) @@ -286,20 +295,12 @@ class ServerWorker(Thread): "Received metadata directive: %r for client: %r, camera: %r, timestamp: %r", directive, client_id, camera_id, timestamp ) - update_clients(client_id, camera_id) - if directive in (b'start', b'stop'): - camera_id_string = camera_id.decode('utf-8') - timestamp_string = datetime.datetime.fromtimestamp( - struct.unpack('d', timestamp)[0] - ).strftime(TIME_FORMAT_STRING) - filename = f"{camera_id_string}-{timestamp_string}.mp4" - logger.debug("Directive: %r, Timestamp: %r, Filename: %r", directive, timestamp, filename) - if directive == b"start": - self.start_client(client_id, camera_id, filename) - elif directive == b"stop": - self.stop_client(client_id, camera_id) - else: - logger.error("Unknown directive: %r", directive) + if directive == b"start": + self.start_client(client_id, camera_id, timestamp) + elif directive == b"stop": + self.stop_client(client_id, camera_id) + else: + logger.error("Unknown directive: %r", directive) # elif directive == b'rename': # old_name = new_name = timestamp = None # try: @@ -336,29 +337,27 @@ class ServerWorker(Thread): # self.web_sock.send_multipart([client_id, camera_id, directive, data], flags=zmq.NOBLOCK) # except Exception as exc: # logger.error("Sending camera metadata update to websocket failed: %r", exc) - else: - logger.error("Unknown directive: %r", directive) return False def handle_video_message(self, msg: list) -> None: logger.debug("Received video message with data only.") client_id, camera_id, *content = msg - if client_id in self.video_threads and camera_id in self.video_threads[client_id]: - self.video_threads[client_id][camera_id].queue.put(content[0]) - # Send only [client_id, camera_id, jpeg_bytes] to the webserver: - # zmq subsciber can subscribe to a topic defined by the first - # part of the multipart message, so in order to allow for a - # per camera subscription, we need to join client_id and camera_id - topic = b':'.join([client_id, camera_id]) - try: - self.web_sock.send_multipart([topic, content[0]], flags=zmq.NOBLOCK) - except Exception as exc: - logger.error("Sending message to websocket failed: %r", exc) - else: - logger.error("No video thread found for client %r, camera %r", client_id, camera_id) - logger.debug("Available video threads keys: %r", self.video_threads.keys()) - logger.debug("Available video threads values: %r", self.video_threads.values()) + if client_id not in self.video_threads or camera_id not in self.video_threads[client_id]: + # After crashing worker will not receive start message again. + logger.error("No video thread found for client %r, camera %r, creating", client_id, camera_id) + self.start_client(client_id, camera_id, struct.pack('d', time.time())) + + self.video_threads[client_id][camera_id].queue.put(content[0]) + # Send only [client_id, camera_id, jpeg_bytes] to the webserver: + # zmq subsciber can subscribe to a topic defined by the first + # part of the multipart message, so in order to allow for a + # per camera subscription, we need to join client_id and camera_id + topic = b':'.join([client_id, camera_id]) + try: + self.web_sock.send_multipart([topic, content[0]], flags=zmq.NOBLOCK) + except Exception as exc: + logger.error("Sending message to websocket failed: %r", exc) def run(self) -> None: """ @@ -394,7 +393,6 @@ class ServerWorker(Thread): self.connected = True msg = self.socket.recv_multipart() logger.debug("ServerWorker %r received message of length: %d.", self.id, len(msg)) - filename = None # At the moment we don't expect any other message than a # start/stop message (of length 4) and a video message: if len(msg) == 4: # This is a message with start/stop/rename directive and no video data. @@ -407,10 +405,18 @@ class ServerWorker(Thread): logger.debug("No message received, polling again ...") time.sleep(0.1) if self.web_sock in sockets: - frontend_msg = self.web_sock.recv_multipart() - logger.info("Received message from frontend: %r", frontend_msg) - self.socket.send_multipart(frontend_msg) - logger.info("Forwarded message to backend: %r", frontend_msg) + try: + frontend_msg = self.web_sock.recv_multipart() + logger.info("Received message from frontend: %r", frontend_msg) + except Exception as exc: + logger.error("Receiving message from frontend failed: %r", exc) + continue + try: + self.socket.send_multipart(frontend_msg) + except Exception as exc: + logger.error("Forwarding message to client failed: %r", exc) + continue + logger.info("Forwarded message to client: %r", frontend_msg) self.monitor.stop() self.monitor.join() @@ -571,6 +577,7 @@ class VideoWorker(Thread): out.release() logger.info("VideoWorker finished writing to file: %s", self.filename) + def signal_handler(sig, frame) -> None: """ Docstring for signal_handler |
