diff options
Diffstat (limited to 'worker.py')
| -rw-r--r-- | worker.py | 235 | 
1 files changed, 235 insertions, 0 deletions
| diff --git a/worker.py b/worker.py new file mode 100644 index 0000000..ad5fe1e --- /dev/null +++ b/worker.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python + +__author__ = "Franoosh Corporation" + +""" +A module consisting of a zeromq worker receiving video stream +via router from clients. Able to send control messages to clients. +""" + +import os +from threading import Thread, Event +import time +import signal +import logging +from queue import Queue +from collections import defaultdict +import zmq + +from helpers import CustomLoggingFormatter + + +ADDR = "tcp://localhost" +PORT = "9979" +INITIAL_BACKEND_ADDR = f"{ADDR}:{PORT}" +LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log" +LOGLEVEL = logging.DEBUG + +stop_event = Event() + +log_formatter = CustomLoggingFormatter() +handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') +handler.setFormatter(log_formatter) +logging.root.addHandler(handler) +logging.root.setLevel(LOGLEVEL) +logger = logging.getLogger(__name__) +logging.basicConfig( +    filename=LOGFILE, +    datefmt='%Y-%m-%d %I:%M:%S', +    level=LOGLEVEL, +) + +class MonitorTask(Thread): +    """Monitor task""" +    def __init__(self, socket): +        super().__init__(daemon=True) +        self.socket = socket +        self.running = True + +    def run(self): +        """Monitor connection on the initial socket. +        This is heartbeat monitoring""" +        monitor = self.socket.get_monitor_socket() +        monitor.setsockopt(zmq.RCVTIMEO, 5000) +        logger.debug("Monitor socket started.") +        while self.running: +            try: +                event, _ = monitor.recv_multipart() +                # Resolve the received event type: +                event_type = int.from_bytes(event[:2], "little") +                if event_type in (zmq.EVENT_CLOSED, zmq.EVENT_DISCONNECTED): +                    logger.warning("Monitor socket: closed | disconnected") +                    stop_event.set() +                elif event_type == zmq.EVENT_CONNECT_DELAYED: +                    logger.debug("Monitor socket: event connect delayed") +                elif event_type == zmq.EVENT_CONNECT_RETRIED: +                    logger.debug("Monitor socket: event connect retried") +                elif event_type in (zmq.EVENT_CONNECTED, zmq.EVENT_HANDSHAKE_SUCCEEDED): +                    logger.debug("Monitor socket: client connected to router, handshake OK.") +                    stop_event.clear() +                else: +                    logger.warning("Monitor socket: other event: '%r'", event_type) +            except zmq.Again: +                logger.debug("Timeout on monitoring socket.") +            except Exception as exc: # W: Catching too general exception Exception +                logger.error("Other exception on monitoring socket: %r", exc) + +        # monitor.close() + +    def stop(self): +        """Stop thread""" +        logger.info("Stopping monitor thread ...") +        self.running = False + + +class ServerWorker(Thread): +    """ServerWorker""" +    def __init__(self, identity, context=None): +        super().__init__(daemon=True) +        self.id = identity +        self.context = context or zmq.Context.instance() +        self.socket = self.context.socket(zmq.DEALER) +        self.socket.identity = self.id.encode("utf-8") +        self.monitor = MonitorTask(self.socket) +        self.video_threads = defaultdict(lambda: defaultdict(dict)) +        self.connected = False +        self.running = True + +    def run(self): +        logger.debug("ServerWorker '%s' starting ...", self.id) +        self.socket.connect(INITIAL_BACKEND_ADDR) +        self.poller = zmq.Poller() +        self.poller.register(self.socket, zmq.POLLIN) +        self.monitor.start() + +        while self.running: +            logger.debug("ServerWorker '%s' waiting for a message ...", self.id) +            if not self.connected or stop_event.is_set(): +                self.socket.send_multipart([b"READY"])  # Router needs worker identity, hence this +                time.sleep(1)  # Wait a bit before trying to connect again + +            sockets = dict(self.poller.poll(1000)) +            if self.socket in sockets: +                self.connected = True +                msg = self.socket.recv_multipart() +                logger.debug("ServerWorker '%s' received message: %r", self.id, msg) +                filename = None +                # At the moment we don't expect any other message than a video message: +                if len(msg) == 4:  # This is a message with start/stop directive and no video data. +                    logger.debug("Received start/stop directive: %r", msg) +                    client_id, camera_id, metadata = msg[0], msg[1], msg[2] +                    try: +                        directive, timestamp = metadata.decode("utf-8").split(":") +                        filename = f"{client_id.decode('utf-8')}_{camera_id.decode('utf-8')}-{timestamp}.mp4" +                        logger.debug("Directive: '%s', Timestamp: '%s', Filename: '%s'", directive, timestamp, filename) +                    except ValueError: +                        logger.error("Invalid metadata format: %r", metadata) +                        continue +                    if directive == "start": +                        if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id):  # New client or new camera +                            q = Queue() +                            logger.debug("Starting new video thread for client '%s', camera '%s'", client_id, camera_id) +                            video_worker = VideoWorker(filename, q) +                            video_worker.start() +                            self.video_threads[client_id][camera_id] = video_worker +                    elif directive == "stop": +                        if client_id in self.video_threads and camera_id in self.video_threads[client_id]: +                            logger.debug("Stopping video thread for client '%s', camera '%s'", client_id, camera_id) +                            self.video_threads[client_id][camera_id].queue.put(None)  # Sentinel to stop the thread +                            del self.video_threads[client_id][camera_id] +                            logger.info("Stopped video thread for client '%s', camera '%s'", client_id, camera_id) +                    else: +                        logger.error("Unknown directive: '%s'", directive) +                elif len(msg) == 3:  # This is a video message with data only +                    logger.debug("Received video message with data only: %r", msg) +                    client_id, camera_id, msg = msg[0], msg[1], msg[2] +                    if client_id in self.video_threads and camera_id in self.video_threads[client_id]: +                        self.video_threads[client_id][camera_id].queue.put(msg) +                    else: +                        logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id) +                        logger.error("Available video threads keys: %r", self.video_threads.keys()) +                        logger.error("Available video threads values: %r", self.video_threads.values()) +                elif len(msg) == 2:  # This is a ping message from client +                    logger.debug("Received ping message: %r", msg) +                    client_id, camera_id = msg[0], msg[1] +                    if client_id in self.video_threads and camera_id in self.video_threads[client_id]: +                        # Respond with a pong message +                        try: +                            self.socket.send_multipart([client_id, camera_id, b'pong']) +                            logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id) +                        except zmq.ZMQError as exc: +                            logger.error("Failed to send pong: %r", exc) + +                else: +                    logger.warning("Received a message of not expected length from client: %r message: %r", client_id, msg) +                try: +                    self.socket.send_multipart([client_id, camera_id, b'pong']) +                    logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id) +                except zmq.ZMQError as exc: +                    logger.error("Failed to send pong: %r", exc) +            else: +                logger.debug("No message received, polling again ...") +                time.sleep(5) + +        self.monitor.stop() +        self.monitor.join() +        for camera_thread in self.video_threads.values(): +            for thread in camera_thread.values(): +                thread.queue.put(None)  # Sentinel to unblock queue.get() +                thread.stop() +                thread.join() + +    def send_control_message(self, client_id, camera_id, message): +        """Send control message to a specific client and camera.""" +        if client_id in self.video_threads and camera_id in self.video_threads[client_id]: +            self.socket.send_multipart([client_id.encode("utf-8"), camera_id.encode("utf-8"), message]) +            logger.debug("Sent control message to client '%s', camera '%s': %r", client_id, camera_id, message) +        else: +            logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id) + +    def stop(self): +        logger.info("ServerWorker '%s' exiting ...", self.id) +        self.running = False + + +class VideoWorker(Thread): +    """Class for video threads.""" + +    def __init__(self, filename, queue): +        super().__init__(daemon=True) +        self.filename = filename +        self.queue = queue +        self.live = True + +    def run(self): +        if os.path.exists(self.filename): +            logger.warning("File '%s' already exists, overwriting ...", self.filename) +        with open(self.filename, 'wb') as f: +            logger.info("VideoWorker started writing to file: %s", self.filename) +            while self.live: +                # Simulate video processing +                frame = self.queue.get()  # Get frame from queue +                logger.debug("Processing ('writing to a file') frame for camera: %r", frame) +                if frame is None: +                    logger.info("VideoWorker received stop signal, exiting ...") +                    break +                f.write(frame + b'\n')  # Write frame to file + + +    def stop(self): +        logger.info("VideoWorker '' exiting ...") +        self.live = False + +def signal_handler(sig, frame): +    worker.stop() + +if __name__ == "__main__": +    logger.info("Starting up ...") + +    signal.signal(signal.SIGINT, signal_handler) +    signal.signal(signal.SIGTERM, signal_handler) + +    worker = ServerWorker("worker-task") +    worker.start() + +    worker.join() | 
