diff options
Diffstat (limited to 'worker.py')
-rw-r--r-- | worker.py | 235 |
1 files changed, 235 insertions, 0 deletions
diff --git a/worker.py b/worker.py new file mode 100644 index 0000000..ad5fe1e --- /dev/null +++ b/worker.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python + +__author__ = "Franoosh Corporation" + +""" +A module consisting of a zeromq worker receiving video stream +via router from clients. Able to send control messages to clients. +""" + +import os +from threading import Thread, Event +import time +import signal +import logging +from queue import Queue +from collections import defaultdict +import zmq + +from helpers import CustomLoggingFormatter + + +ADDR = "tcp://localhost" +PORT = "9979" +INITIAL_BACKEND_ADDR = f"{ADDR}:{PORT}" +LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log" +LOGLEVEL = logging.DEBUG + +stop_event = Event() + +log_formatter = CustomLoggingFormatter() +handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') +handler.setFormatter(log_formatter) +logging.root.addHandler(handler) +logging.root.setLevel(LOGLEVEL) +logger = logging.getLogger(__name__) +logging.basicConfig( + filename=LOGFILE, + datefmt='%Y-%m-%d %I:%M:%S', + level=LOGLEVEL, +) + +class MonitorTask(Thread): + """Monitor task""" + def __init__(self, socket): + super().__init__(daemon=True) + self.socket = socket + self.running = True + + def run(self): + """Monitor connection on the initial socket. + This is heartbeat monitoring""" + monitor = self.socket.get_monitor_socket() + monitor.setsockopt(zmq.RCVTIMEO, 5000) + logger.debug("Monitor socket started.") + while self.running: + try: + event, _ = monitor.recv_multipart() + # Resolve the received event type: + event_type = int.from_bytes(event[:2], "little") + if event_type in (zmq.EVENT_CLOSED, zmq.EVENT_DISCONNECTED): + logger.warning("Monitor socket: closed | disconnected") + stop_event.set() + elif event_type == zmq.EVENT_CONNECT_DELAYED: + logger.debug("Monitor socket: event connect delayed") + elif event_type == zmq.EVENT_CONNECT_RETRIED: + logger.debug("Monitor socket: event connect retried") + elif event_type in (zmq.EVENT_CONNECTED, zmq.EVENT_HANDSHAKE_SUCCEEDED): + logger.debug("Monitor socket: client connected to router, handshake OK.") + stop_event.clear() + else: + logger.warning("Monitor socket: other event: '%r'", event_type) + except zmq.Again: + logger.debug("Timeout on monitoring socket.") + except Exception as exc: # W: Catching too general exception Exception + logger.error("Other exception on monitoring socket: %r", exc) + + # monitor.close() + + def stop(self): + """Stop thread""" + logger.info("Stopping monitor thread ...") + self.running = False + + +class ServerWorker(Thread): + """ServerWorker""" + def __init__(self, identity, context=None): + super().__init__(daemon=True) + self.id = identity + self.context = context or zmq.Context.instance() + self.socket = self.context.socket(zmq.DEALER) + self.socket.identity = self.id.encode("utf-8") + self.monitor = MonitorTask(self.socket) + self.video_threads = defaultdict(lambda: defaultdict(dict)) + self.connected = False + self.running = True + + def run(self): + logger.debug("ServerWorker '%s' starting ...", self.id) + self.socket.connect(INITIAL_BACKEND_ADDR) + self.poller = zmq.Poller() + self.poller.register(self.socket, zmq.POLLIN) + self.monitor.start() + + while self.running: + logger.debug("ServerWorker '%s' waiting for a message ...", self.id) + if not self.connected or stop_event.is_set(): + self.socket.send_multipart([b"READY"]) # Router needs worker identity, hence this + time.sleep(1) # Wait a bit before trying to connect again + + sockets = dict(self.poller.poll(1000)) + if self.socket in sockets: + self.connected = True + msg = self.socket.recv_multipart() + logger.debug("ServerWorker '%s' received message: %r", self.id, msg) + filename = None + # At the moment we don't expect any other message than a video message: + if len(msg) == 4: # This is a message with start/stop directive and no video data. + logger.debug("Received start/stop directive: %r", msg) + client_id, camera_id, metadata = msg[0], msg[1], msg[2] + try: + directive, timestamp = metadata.decode("utf-8").split(":") + filename = f"{client_id.decode('utf-8')}_{camera_id.decode('utf-8')}-{timestamp}.mp4" + logger.debug("Directive: '%s', Timestamp: '%s', Filename: '%s'", directive, timestamp, filename) + except ValueError: + logger.error("Invalid metadata format: %r", metadata) + continue + if directive == "start": + if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id): # New client or new camera + q = Queue() + logger.debug("Starting new video thread for client '%s', camera '%s'", client_id, camera_id) + video_worker = VideoWorker(filename, q) + video_worker.start() + self.video_threads[client_id][camera_id] = video_worker + elif directive == "stop": + if client_id in self.video_threads and camera_id in self.video_threads[client_id]: + logger.debug("Stopping video thread for client '%s', camera '%s'", client_id, camera_id) + self.video_threads[client_id][camera_id].queue.put(None) # Sentinel to stop the thread + del self.video_threads[client_id][camera_id] + logger.info("Stopped video thread for client '%s', camera '%s'", client_id, camera_id) + else: + logger.error("Unknown directive: '%s'", directive) + elif len(msg) == 3: # This is a video message with data only + logger.debug("Received video message with data only: %r", msg) + client_id, camera_id, msg = msg[0], msg[1], msg[2] + if client_id in self.video_threads and camera_id in self.video_threads[client_id]: + self.video_threads[client_id][camera_id].queue.put(msg) + else: + logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id) + logger.error("Available video threads keys: %r", self.video_threads.keys()) + logger.error("Available video threads values: %r", self.video_threads.values()) + elif len(msg) == 2: # This is a ping message from client + logger.debug("Received ping message: %r", msg) + client_id, camera_id = msg[0], msg[1] + if client_id in self.video_threads and camera_id in self.video_threads[client_id]: + # Respond with a pong message + try: + self.socket.send_multipart([client_id, camera_id, b'pong']) + logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id) + except zmq.ZMQError as exc: + logger.error("Failed to send pong: %r", exc) + + else: + logger.warning("Received a message of not expected length from client: %r message: %r", client_id, msg) + try: + self.socket.send_multipart([client_id, camera_id, b'pong']) + logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id) + except zmq.ZMQError as exc: + logger.error("Failed to send pong: %r", exc) + else: + logger.debug("No message received, polling again ...") + time.sleep(5) + + self.monitor.stop() + self.monitor.join() + for camera_thread in self.video_threads.values(): + for thread in camera_thread.values(): + thread.queue.put(None) # Sentinel to unblock queue.get() + thread.stop() + thread.join() + + def send_control_message(self, client_id, camera_id, message): + """Send control message to a specific client and camera.""" + if client_id in self.video_threads and camera_id in self.video_threads[client_id]: + self.socket.send_multipart([client_id.encode("utf-8"), camera_id.encode("utf-8"), message]) + logger.debug("Sent control message to client '%s', camera '%s': %r", client_id, camera_id, message) + else: + logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id) + + def stop(self): + logger.info("ServerWorker '%s' exiting ...", self.id) + self.running = False + + +class VideoWorker(Thread): + """Class for video threads.""" + + def __init__(self, filename, queue): + super().__init__(daemon=True) + self.filename = filename + self.queue = queue + self.live = True + + def run(self): + if os.path.exists(self.filename): + logger.warning("File '%s' already exists, overwriting ...", self.filename) + with open(self.filename, 'wb') as f: + logger.info("VideoWorker started writing to file: %s", self.filename) + while self.live: + # Simulate video processing + frame = self.queue.get() # Get frame from queue + logger.debug("Processing ('writing to a file') frame for camera: %r", frame) + if frame is None: + logger.info("VideoWorker received stop signal, exiting ...") + break + f.write(frame + b'\n') # Write frame to file + + + def stop(self): + logger.info("VideoWorker '' exiting ...") + self.live = False + +def signal_handler(sig, frame): + worker.stop() + +if __name__ == "__main__": + logger.info("Starting up ...") + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + worker = ServerWorker("worker-task") + worker.start() + + worker.join() |