diff options
| author | Franoosh <uinarf@autistici.org> | 2025-10-15 14:42:29 +0200 | 
|---|---|---|
| committer | Franoosh <uinarf@autistici.org> | 2025-10-15 14:42:29 +0200 | 
| commit | 70beed73465ab27449a59c62043d94c16efe00c5 (patch) | |
| tree | 9f4b5d6a72711af4641b01169da5d20a2228ef64 /worker.py | |
| parent | 68bd1bd052a7cd6438b92cb1059ef5e58b8d022c (diff) | |
| download | ZeroMQ_Video_Streaming-70beed73465ab27449a59c62043d94c16efe00c5.tar.gz ZeroMQ_Video_Streaming-70beed73465ab27449a59c62043d94c16efe00c5.tar.bz2 ZeroMQ_Video_Streaming-70beed73465ab27449a59c62043d94c16efe00c5.zip | |
Added camera support. Movement recognition and video streaming. Web server and frontend. Work in progress. To be fixed: frontend reloading information about client and a page after major changes like camera name or address change, camera removal. Add certificates before testing on actual distributed hardware. Add user login logic.added_video
Diffstat (limited to 'worker.py')
| -rw-r--r-- | worker.py | 247 | 
1 files changed, 183 insertions, 64 deletions
| @@ -1,29 +1,51 @@  #!/usr/bin/env python -__author__ = "Franoosh Corporation"  """  A module consisting of a zeromq worker receiving video stream  via router from clients. Able to send control messages to clients. +Needs libzmq and pyzmq with 'drafts' support.  """  import os +import sys  from threading import Thread, Event  import time  import signal  import logging  from queue import Queue  from collections import defaultdict +import tempfile +import json  import zmq +import cv2 +import numpy  from helpers import CustomLoggingFormatter +__author__ = "Franoosh Corporation" + +# Various constants:  # TODO: put them in a config + +HOST = "127.0.0.1" +ZMQPORT = "9979" +# WSPORT = "8000" +WSPORT = "8008" +ZMQ_BACKEND_ADDR = f"tcp://{HOST}:{ZMQPORT}" +WEB_BACKEND_ADDR = f"tcp://{HOST}:{WSPORT}" +# LOGLEVEL = logging.DEBUG +LOGLEVEL = logging.INFO + +#   File paths: -ADDR = "tcp://localhost" -PORT = "9979" -INITIAL_BACKEND_ADDR = f"{ADDR}:{PORT}"  LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log" -LOGLEVEL = logging.DEBUG +CWD = os.getcwd() +TMP_DIR = os.path.join(CWD, "tmp") +CLIENTS_JSON_FILE = os.path.join(CWD, 'clients.json') + +# Other: + +CLIENTS_DICT = {}  stop_event = Event() @@ -39,6 +61,14 @@ logging.basicConfig(      level=LOGLEVEL,  ) + +if not os.path.exists(TMP_DIR): +    try: +        os.makedirs(TMP_DIR) +    except Exception as exc: +        logger.error("Could not create temporary directory: %r", exc) +        sys.exit() +  class MonitorTask(Thread):      """Monitor task"""      def __init__(self, socket): @@ -92,14 +122,45 @@ class ServerWorker(Thread):          self.socket.identity = self.id.encode("utf-8")          self.monitor = MonitorTask(self.socket)          self.video_threads = defaultdict(lambda: defaultdict(dict)) +        self.poller = zmq.Poller() +        self.web_sock = self.context.socket(zmq.DEALER)          self.connected = False          self.running = True +    def start_client(self, client_id, camera_id, filename): +        """ +        Start a video thread for new client_id and camera_id. +        """ +        if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id):  # New client or new camera +            q = Queue() +            logger.debug("Starting new video thread for client '%s', camera '%s'", client_id, camera_id) +            video_worker = VideoWorker(client_id, camera_id, filename, q) +            video_worker.start() +            self.video_threads[client_id][camera_id] = video_worker + +    def stop_client(self, client_id, camera_id): +        """ +        Stop video thread for a client_id and camera_id. +        """ +        if client_id in self.video_threads and camera_id in self.video_threads[client_id]: +            logger.debug("Stopping video thread for client '%s', camera '%s'", client_id, camera_id) +            self.video_threads[client_id][camera_id].stop() # Stop the thread +            del self.video_threads[client_id][camera_id] +            logger.info("Stopped video thread for client '%s', camera '%s'", client_id, camera_id) +      def run(self): +        """ +        Main loop of the worker. +        Full of wonders. +        """          logger.debug("ServerWorker '%s' starting ...", self.id) -        self.socket.connect(INITIAL_BACKEND_ADDR) -        self.poller = zmq.Poller() +        self.socket.connect(ZMQ_BACKEND_ADDR) +        try: +           self.web_sock.bind(WEB_BACKEND_ADDR) +        except Exception as exc: +            logger.error("Connection to zmq websocket failed: %r", exc)          self.poller.register(self.socket, zmq.POLLIN) +        self.poller.register(self.web_sock, zmq.POLLIN)          self.monitor.start()          while self.running: @@ -112,67 +173,93 @@ class ServerWorker(Thread):              if self.socket in sockets:                  self.connected = True                  msg = self.socket.recv_multipart() -                logger.debug("ServerWorker '%s' received message: %r", self.id, msg) +                logger.debug("ServerWorker '%s' received message of length: %d.", self.id, len(msg))                  filename = None -                # At the moment we don't expect any other message than a video message: +                # At the moment we don't expect any other message than a start/stop message (of length 4) and a video message:                  if len(msg) == 4:  # This is a message with start/stop directive and no video data. -                    logger.debug("Received start/stop directive: %r", msg) +                    logger.debug("Received start/stop directive: (?)")                      client_id, camera_id, metadata = msg[0], msg[1], msg[2] +                    # Convert bytes to str once and for all: +                    client_id = client_id.decode('utf-8') +                    camera_id = camera_id.decode('utf-8') +                    update_clients(client_id, camera_id)                      try: -                        directive, timestamp = metadata.decode("utf-8").split(":") -                        filename = f"{client_id.decode('utf-8')}_{camera_id.decode('utf-8')}-{timestamp}.mp4" -                        logger.debug("Directive: '%s', Timestamp: '%s', Filename: '%s'", directive, timestamp, filename) +                        # Directive and data are fron now on converted to strings: +                        directive, data = metadata.decode("utf-8").split(":") +                        logger.info( +                            "Received directive '%s' with data: %r for client '%s', camera '%s'", +                            directive, +                            data, +                            client_id, +                            camera_id, +                        )                      except ValueError:                          logger.error("Invalid metadata format: %r", metadata) +                        directive = None                          continue -                    if directive == "start": -                        if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id):  # New client or new camera -                            q = Queue() -                            logger.debug("Starting new video thread for client '%s', camera '%s'", client_id, camera_id) -                            video_worker = VideoWorker(filename, q) -                            video_worker.start() -                            self.video_threads[client_id][camera_id] = video_worker -                    elif directive == "stop": -                        if client_id in self.video_threads and camera_id in self.video_threads[client_id]: -                            logger.debug("Stopping video thread for client '%s', camera '%s'", client_id, camera_id) -                            self.video_threads[client_id][camera_id].queue.put(None)  # Sentinel to stop the thread -                            del self.video_threads[client_id][camera_id] -                            logger.info("Stopped video thread for client '%s', camera '%s'", client_id, camera_id) +                    if directive == 'rename': +                        old_name, new_name, timestamp = None, None, None +                        try: +                            old_name, new_name, timestamp = data.split(":") +                            logger.info("Renamed video thread from '%s' to '%s'.", old_name, new_name) +                        except ValueError: +                            logger.error("Invalid rename data format: %r", data) +                            continue +                        if old_name and new_name and timestamp: +                            # I think it's better to stop the old thread and start a new one, +                            # rather than reuse the old one as it's less mucking about. +                            self.stop_client(old_name, camera_id) +                            self.start_client(new_name, camera_id, f"{new_name}_{camera_id}-{timestamp}.mkv") +                            to_remove= b':'.join([client_id, camera_id]) +                            try: +                                self.web_sock.send_multipart([to_remove, b'', b''])  # Notify webserver of rename +                            except Exception as exc: +                                logger.error("Sending rename notification to websocket failed: %r", exc)                      else: -                        logger.error("Unknown directive: '%s'", directive) -                elif len(msg) == 3:  # This is a video message with data only -                    logger.debug("Received video message with data only: %r", msg) -                    client_id, camera_id, msg = msg[0], msg[1], msg[2] +                        timestamp = data +                        filename = f"{client_id}_{camera_id}-{timestamp}.mkv" +                        logger.debug("Directive: '%s', Timestamp: '%s', Filename: '%s'", directive, timestamp, filename) +                        if directive == "start": +                            self.start_client(client_id, camera_id, filename) +                        elif directive == "stop": +                            self.stop_client(client_id, camera_id) +                        else: +                            logger.error("Unknown directive: %r", directive) + +                elif len(msg) == 3:  # This is a video message with data +                    logger.debug("Received video message with data only.") +                    client_id, camera_id, content = msg[0], msg[1], msg[2] +                    client_id = client_id.decode('utf-8') +                    camera_id = camera_id.decode('utf-8')                      if client_id in self.video_threads and camera_id in self.video_threads[client_id]: -                        self.video_threads[client_id][camera_id].queue.put(msg) +                        self.video_threads[client_id][camera_id].queue.put(content) +                        # Send only [client_id, camera_id, jpeg_bytes] to the webserver: +                        # zmq subsciber can subscribe to a topic defined by the first +                        # part of the multipart message, so in order to allow for a  +                        # per camera subscription, we need to join client_id and camera_id +                        topic = ':'.join([client_id, camera_id]).encode('utf-8') +                        try: +                            self.web_sock.send_multipart([topic, content], flags=zmq.NOBLOCK) +                        except Exception as exc: +                            logger.error("Sending message to websocket failed: %r", exc)                      else:                          logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id)                          logger.error("Available video threads keys: %r", self.video_threads.keys())                          logger.error("Available video threads values: %r", self.video_threads.values()) -                elif len(msg) == 2:  # This is a ping message from client -                    logger.debug("Received ping message: %r", msg) -                    client_id, camera_id = msg[0], msg[1] -                    if client_id in self.video_threads and camera_id in self.video_threads[client_id]: -                        # Respond with a pong message -                        try: -                            self.socket.send_multipart([client_id, camera_id, b'pong']) -                            logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id) -                        except zmq.ZMQError as exc: -                            logger.error("Failed to send pong: %r", exc) -                  else: -                    logger.warning("Received a message of not expected length from client: %r message: %r", client_id, msg) -                try: -                    self.socket.send_multipart([client_id, camera_id, b'pong']) -                    logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id) -                except zmq.ZMQError as exc: -                    logger.error("Failed to send pong: %r", exc) +                    logger.warning("Received a message of unexpected length from client. Message length: %d", len(msg))              else:                  logger.debug("No message received, polling again ...")                  time.sleep(5) +            if self.web_sock in sockets: +                frontend_msg = self.web_sock.recv_multipart() +                logger.info("Received message from frontend: %r", frontend_msg) +                self.socket.send_multipart(frontend_msg) +                logger.info("Forwarded message to backend: %r", frontend_msg)          self.monitor.stop()          self.monitor.join() +          for camera_thread in self.video_threads.values():              for thread in camera_thread.values():                  thread.queue.put(None)  # Sentinel to unblock queue.get() @@ -183,7 +270,7 @@ class ServerWorker(Thread):          """Send control message to a specific client and camera."""          if client_id in self.video_threads and camera_id in self.video_threads[client_id]:              self.socket.send_multipart([client_id.encode("utf-8"), camera_id.encode("utf-8"), message]) -            logger.debug("Sent control message to client '%s', camera '%s': %r", client_id, camera_id, message) +            logger.info("Sent control message to client '%s', camera '%s': %r", client_id, camera_id, message)          else:              logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id) @@ -195,34 +282,62 @@ class ServerWorker(Thread):  class VideoWorker(Thread):      """Class for video threads.""" -    def __init__(self, filename, queue): +    def __init__(self, client_id, camera_id, filename, queue):          super().__init__(daemon=True) +        self.context = zmq.Context() +        self.context.setsockopt(zmq.LINGER, 0) +        self.client_id = client_id +        self.camera_id = camera_id          self.filename = filename          self.queue = queue          self.live = True +    def stop(self): +        logger.info("VideoWorker %r exiting ...", self.camera_id) +        self.live = False +      def run(self):          if os.path.exists(self.filename):              logger.warning("File '%s' already exists, overwriting ...", self.filename) -        with open(self.filename, 'wb') as f: -            logger.info("VideoWorker started writing to file: %s", self.filename) -            while self.live: -                # Simulate video processing -                frame = self.queue.get()  # Get frame from queue -                logger.debug("Processing ('writing to a file') frame for camera: %r", frame) -                if frame is None: -                    logger.info("VideoWorker received stop signal, exiting ...") -                    break -                f.write(frame + b'\n')  # Write frame to file - -    def stop(self): -        logger.info("VideoWorker '' exiting ...") -        self.live = False +        fourcc = cv2.VideoWriter_fourcc(*'VP80') +        out = cv2.VideoWriter(self.filename, fourcc, 30.0, (640, 480))  # Assuming 640x480 resolution +        logger.info("This is the first run, binding websocket ...") +        while self.live: +            logger.debug("VideoWorker writing to file: %s", self.filename) +            frame_bytes = self.queue.get() +            if frame_bytes is None: +                logger.debug("Received None, stopping video worker for camera: '%s'", self.camera_id) +                break +            frame = cv2.imdecode(numpy.frombuffer(frame_bytes, dtype=numpy.uint8), cv2.IMREAD_COLOR) +            logger.debug("Processing ('writing to a file') frame for camera: '%s'", self.camera_id) +            # Write frame to file: +            out.write(frame) + +        # Release video writer +        out.release() +        logger.info("VideoWorker finished writing to file: %s", self.filename)  def signal_handler(sig, frame):      worker.stop() +def update_clients(client_id, camera_id): +    """Update client and camera dictionary and write to a shared file.""" +    global CLIENTS_DICT +    if client_id not in CLIENTS_DICT: +        logger.debug("Client_id not in CLIENTS_DICT, adding an empty list for it.") +        CLIENTS_DICT[client_id] = [] +    if camera_id not in CLIENTS_DICT[client_id]: +        logger.debug("Camera_id not in CLIENTS_DICT[%s] list, adding", client_id) +        CLIENTS_DICT[client_id].append(camera_id) +    # Atomic write using tempfile. Works only when both files on the same filesystem +    with tempfile.NamedTemporaryFile('w', dir=TMP_DIR, delete=False) as tf: +        logger.debug("Dumping to file CLIENTS_DICT: %r", CLIENTS_DICT) +        json.dump(CLIENTS_DICT, tf) +        tempname = tf.name +    os.replace(tempname, CLIENTS_JSON_FILE) + +  if __name__ == "__main__":      logger.info("Starting up ...") @@ -233,3 +348,7 @@ if __name__ == "__main__":      worker.start()      worker.join() +    try: +        os.remove(CLIENTS_JSON_FILE) +    except FileNotFoundError: +        pass
\ No newline at end of file | 
