diff options
Diffstat (limited to 'worker.py')
| -rw-r--r-- | worker.py | 628 |
1 files changed, 628 insertions, 0 deletions
diff --git a/worker.py b/worker.py new file mode 100644 index 0000000..26ea8aa --- /dev/null +++ b/worker.py @@ -0,0 +1,628 @@ +#!/usr/bin/env python +""" +A module consisting of a zeromq worker receiving video stream +via router from clients. Able to send control messages to clients. +Needs libzmq and pyzmq with 'drafts' support. +""" + +import os +import sys +import struct +import datetime +from threading import Thread, Event +import time +import signal +import logging +from queue import Queue +from collections import defaultdict +import tempfile +import json +import zmq +import cv2 +import numpy + +from helpers import ( + CustomLoggingFormatter, + bytes_to_str, + TIME_FORMAT_STRING, +) + +__author__ = "Franoosh Corporation" + +# Various constants: # TODO: put them in a config + +HOST = "127.0.0.1" +ZMQPORT = "9979" +# WSPORT = "8000" +WSPORT = "8008" +ZMQ_BACKEND_ADDR = f"tcp://{HOST}:{ZMQPORT}" +WEB_BACKEND_ADDR = f"tcp://{HOST}:{WSPORT}" +# LOGLEVEL = logging.DEBUG +LOGLEVEL = logging.INFO + +# File paths: + +LOGDIR = 'logs' +if not os.path.exists(LOGDIR): + try: + os.makedirs(LOGDIR) + except Exception as exc: + print(f"Could not create log directory: {exc}") + sys.exit() +BASENAME = f"{os.path.splitext(os.path.basename(__file__))[0]}" +LOGFILE = os.path.join(LOGDIR, f"{BASENAME}.log") + +CWD = os.getcwd() +TMP_DIR = os.path.join(CWD, "tmp") +CLIENTS_JSON_FILE = os.path.join(CWD, 'clients.json') +# This should come from the config file: +VIDEO_DIR = os.path.join(CWD, "videos") + +# Other: + +CLIENTS_DICT = {} + +stop_event = Event() + +log_formatter = CustomLoggingFormatter() +handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') +handler.setFormatter(log_formatter) +logging.root.addHandler(handler) +logging.root.setLevel(LOGLEVEL) +logger = logging.getLogger(__name__) +logging.basicConfig( + filename=LOGFILE, + datefmt='%Y-%m-%d %I:%M:%S', + level=LOGLEVEL, +) + + +if not os.path.exists(TMP_DIR): + try: + os.makedirs(TMP_DIR) + except Exception as exc: + logger.error("Could not create temporary directory: %r", exc) + sys.exit() + +class MonitorTask(Thread): + """ + Monitor the connection status of the zmq socket. + + Parameters + ---------- + socket : zmq.Socket + The zmq socket to monitor. + + Methods + ------- + run() + Monitor connection on the initial socket. + stop() + Stop thread + """ + def __init__(self, socket) -> None: + """ + Docstring for __init__ + + Parameters + ---------- + socket : zmq.Socket + The zmq socket to monitor. + """ + super().__init__(daemon=True) + self.socket = socket + self.running = True + + def run(self) -> None: + """ + Monitor connection on the initial socket. + This is heartbeat monitoring + + Parameters + ---------- + None + + Returns + ------- + None + """ + monitor = self.socket.get_monitor_socket() + monitor.setsockopt(zmq.RCVTIMEO, 5000) + logger.debug("Monitor socket started.") + while self.running: + try: + event, _ = monitor.recv_multipart() + # Resolve the received event type: + event_type = int.from_bytes(event[:2], "little") + if event_type in (zmq.EVENT_CLOSED, zmq.EVENT_DISCONNECTED): + logger.warning("Monitor socket: closed | disconnected") + stop_event.set() + elif event_type == zmq.EVENT_CONNECT_DELAYED: + logger.debug("Monitor socket: event connect delayed") + elif event_type == zmq.EVENT_CONNECT_RETRIED: + logger.debug("Monitor socket: event connect retried") + elif event_type in (zmq.EVENT_CONNECTED, zmq.EVENT_HANDSHAKE_SUCCEEDED): + logger.debug("Monitor socket: client connected to router, handshake OK.") + stop_event.clear() + else: + logger.warning("Monitor socket: other event: %r", event_type) + except zmq.Again: + logger.debug("Timeout on monitoring socket.") + except Exception as exc: # W: Catching too general exception Exception + logger.error("Other exception on monitoring socket: %r", exc) + + # monitor.close() + + def stop(self) -> None: + """ + Stop thread + + Parameters + ---------- + None + + Returns + ------- + None + """ + logger.info("Stopping monitor thread ...") + self.running = False + + +class ServerWorker(Thread): + """ + ServerWorker class for zmq video messaging. + + Parameters + ---------- + identity : bytes + Worker identity + context : zmq.Context, optional + ZMQ context (default is None, which creates a new context) + + Methods + ------- + start_client(client_id, camera_id, filename) + Start a video thread for new client_id and camera_id. + stop_client(client_id, camera_id) + Stop video thread for a client_id and camera_id. + run() + Main loop of the worker. + """ + def __init__(self, identity, context=None) -> None: + """ + Docstring for __init__ + + Parameters + ---------- + identity : bytes + Worker identity + context : zmq.Context, optional + ZMQ context (default is None, which creates a new context) + """ + super().__init__(daemon=True) + self.context = context or zmq.Context.instance() + self.socket = self.context.socket(zmq.DEALER) + self.id = self.socket.identity = identity + self.monitor = MonitorTask(self.socket) + self.video_threads = defaultdict(lambda: defaultdict(dict)) + self.poller = zmq.Poller() + self.web_sock = self.context.socket(zmq.DEALER) + self.connected = False + self.running = True + + def start_client(self, client_id, camera_id, filename) -> None: + """ + Start a video thread for new client_id and camera_id. + + Parameters + ---------- + client_id : bytes + Client ID + camera_id : bytes + Camera ID + filename : str + Filename to write video to + + Returns + ------- + None + """ + # New client or new camera for existing client: + if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id): + q = Queue() + logger.debug("Starting new video thread for client %r, camera %r", client_id, camera_id) + video_worker = VideoWorker(client_id, camera_id, filename, q) + video_worker.start() + self.video_threads[client_id][camera_id] = video_worker + + def stop_client(self, client_id, camera_id) -> None: + """ + Stop video thread for a client_id and camera_id. + + Parameters + ---------- + client_id : bytes + Client ID + camera_id : bytes + Camera ID + + Returns + ------- + None + """ + if client_id in self.video_threads and camera_id in self.video_threads[client_id]: + logger.debug("Stopping video thread for client %r, camera %r", client_id, camera_id) + self.video_threads[client_id][camera_id].stop() # Stop the thread + del self.video_threads[client_id][camera_id] + logger.info("Stopped video thread for client %r, camera %r", client_id, camera_id) + + def handle_metadata( + self, + client_id: bytes, + camera_id: bytes, + directive: bytes, + timestamp: bytes + ) -> None: + """ + Handle metadata messages. + + Parameters + ---------- + client_id : bytes + Client ID + camera_id : bytes + Camera ID + directive : bytes + Action to perform (e.g., 'start', 'stop') + timestamp : bytes + Timestamp of the action + + Returns + ------- + None + """ + logger.info( + "Received metadata directive: %r for client: %r, camera: %r, timestamp: %r", + directive, client_id, camera_id, timestamp + ) + update_clients(client_id, camera_id) + if directive in (b'start', b'stop'): + camera_id_string = camera_id.decode('utf-8') + timestamp_string = datetime.datetime.fromtimestamp( + struct.unpack('d', timestamp)[0] + ).strftime(TIME_FORMAT_STRING) + filename = f"{camera_id_string}-{timestamp_string}.mp4" + logger.debug("Directive: %r, Timestamp: %r, Filename: %r", directive, timestamp, filename) + if directive == b"start": + self.start_client(client_id, camera_id, filename) + elif directive == b"stop": + self.stop_client(client_id, camera_id) + else: + logger.error("Unknown directive: %r", directive) + # elif directive == b'rename': + # old_name = new_name = timestamp = None + # try: + # old_name, new_name, timestamp = data.split(b":") + # logger.info("Renamed video thread from %r to %r.", old_name, new_name) + # except ValueError: + # logger.error("Invalid rename data format: %r", data) + # if old_name and new_name and timestamp: + # # I think it's better to stop the old thread and start a new one, + # # rather than reuse the old one as it's less mucking about. + # self.stop_client(old_name, camera_id) + # new_name_string = new_name.decode('utf-8') + # camera_id_string = camera_id.decode('utf-8') + # timestamp_string = timestamp.decode('utf-8') + # self.start_client( + # new_name, + # camera_id, + # f"{new_name_string}_{camera_id_string}-{timestamp_string}.mp4" + # ) + # to_remove= b':'.join([client_id, camera_id]) + # try: + # self.web_sock.send_multipart([to_remove, b'', b'']) # Notify webserver of rename + # except Exception as exc: + # logger.error("Sending rename notification to websocket failed: %r", exc) + + # elif directive in (b'new_camera', b'removed_camera'): + # logger.info("Received camera metadata update: %r for client %r, camera %r", + # directive, + # client_id, + # camera_id, + # ) + # # Just forward the message to the webserver: + # try: + # self.web_sock.send_multipart([client_id, camera_id, directive, data], flags=zmq.NOBLOCK) + # except Exception as exc: + # logger.error("Sending camera metadata update to websocket failed: %r", exc) + else: + logger.error("Unknown directive: %r", directive) + + return False + + def handle_video_message(self, msg: list) -> None: + logger.debug("Received video message with data only.") + client_id, camera_id, *content = msg + if client_id in self.video_threads and camera_id in self.video_threads[client_id]: + self.video_threads[client_id][camera_id].queue.put(content[0]) + # Send only [client_id, camera_id, jpeg_bytes] to the webserver: + # zmq subsciber can subscribe to a topic defined by the first + # part of the multipart message, so in order to allow for a + # per camera subscription, we need to join client_id and camera_id + topic = b':'.join([client_id, camera_id]) + try: + self.web_sock.send_multipart([topic, content[0]], flags=zmq.NOBLOCK) + except Exception as exc: + logger.error("Sending message to websocket failed: %r", exc) + else: + logger.error("No video thread found for client %r, camera %r", client_id, camera_id) + logger.debug("Available video threads keys: %r", self.video_threads.keys()) + logger.debug("Available video threads values: %r", self.video_threads.values()) + + def run(self) -> None: + """ + Main loop of the worker. + Full of wonders. + + Parameters + ---------- + None + + Returns + ------- + None + """ + logger.debug("ServerWorker %r starting ...", self.id) + self.socket.connect(ZMQ_BACKEND_ADDR) + try: + self.web_sock.bind(WEB_BACKEND_ADDR) + except Exception as exc: + logger.error("Connection to zmq websocket failed: %r", exc) + self.poller.register(self.socket, zmq.POLLIN) + self.poller.register(self.web_sock, zmq.POLLIN) + self.monitor.start() + + while self.running: + logger.debug("ServerWorker %r waiting for a message ...", self.id) + if not self.connected or stop_event.is_set(): + self.socket.send_multipart([b"READY"]) # Router needs worker identity, hence this + time.sleep(0.1) # Wait a bit before trying to connect again + + sockets = dict(self.poller.poll(10)) + if self.socket in sockets: + self.connected = True + msg = self.socket.recv_multipart() + logger.debug("ServerWorker %r received message of length: %d.", self.id, len(msg)) + filename = None + # At the moment we don't expect any other message than a + # start/stop message (of length 4) and a video message: + if len(msg) == 4: # This is a message with start/stop/rename directive and no video data. + self.handle_metadata(*msg) + elif len(msg) == 3: # This is a video message with data + self.handle_video_message(msg) + else: + logger.warning("Received a message of unexpected length from client. Message length: %d", len(msg)) + else: + logger.debug("No message received, polling again ...") + time.sleep(0.1) + if self.web_sock in sockets: + frontend_msg = self.web_sock.recv_multipart() + logger.info("Received message from frontend: %r", frontend_msg) + self.socket.send_multipart(frontend_msg) + logger.info("Forwarded message to backend: %r", frontend_msg) + + self.monitor.stop() + self.monitor.join() + + for camera_thread in self.video_threads.values(): + for thread in camera_thread.values(): + thread.queue.put(None) # Sentinel to unblock queue.get() + thread.stop() + thread.join() + + def send_control_message(self, client_id, camera_id, message) -> None: + """ + Send control message to a specific client and camera. + + Parameters + ---------- + client_id : bytes + Client ID + camera_id : bytes + Camera ID + message : bytes + Control message to send + + Returns + ------- + None + """ + if client_id in self.video_threads and camera_id in self.video_threads[client_id]: + # self.socket.send_multipart([client_id.encode("utf-8"), camera_id.encode("utf-8"), message]) + self.socket.send_multipart([client_id, camera_id, message]) + logger.info("Sent control message to client %r, camera %r: %r", client_id, camera_id, message) + else: + logger.error("No video thread found for client %r, camera %r", client_id, camera_id) + + def stop(self) -> None: + """ + Stop the worker thread. + + Parameters + ---------- + None + + Returns + ------- + None + """ + logger.info("ServerWorker %r exiting ...", self.id) + self.running = False + + +class VideoWorker(Thread): + """ + Class for video threads writing video stream to files. + + Parameters + ---------- + client_id : bytes + Client ID + camera_id : bytes + Camera ID + filename : str + Filename to write video to + queue : Queue + Queue to receive video frames + + Methods + ------- + run() + Main loop of the video worker thread. + stop() + Stop thread + """ + + def __init__(self, client_id, camera_id, filename, queue) -> None: + """ + Docstring for __init__ + + Parameters + ---------- + client_id : bytes + Client ID + camera_id : bytes + Camera ID + filename : str + Filename to write video to + queue : Queue + Queue to receive video frames + """ + super().__init__(daemon=True) + self.context = zmq.Context() + self.context.setsockopt(zmq.LINGER, 0) + self.client_id = client_id + self.camera_id = camera_id + self.filename = filename + self.queue = queue + self.live = True + + def stop(self) -> None: + """ + Docstring for stop + + Parameters + ---------- + None + + Returns + ------- + None + """ + logger.info("VideoWorker %r exiting ...", self.camera_id) + self.live = False + + def run(self) -> None: + """ + Main loop of the video worker thread. + + Parameters + ---------- + None + + Returns + ------- + None + """ + # Create a VIDEO_DIR/client_id/camera_id/ directory if it doesn't exist + dirname = os.path.join( + VIDEO_DIR, + self.client_id.decode('utf-8'), + self.camera_id.decode('utf-8'), + ) + filename = os.path.join( + dirname, + self.filename + ) + try: + os.makedirs(dirname, exist_ok=True) + except PermissionError as exc: + logger.error("Could not create directory '%s': %r", dirname, exc) + return + if os.path.exists(filename): + logger.warning("File '%s' already exists, overwriting ...", filename) + + fourcc = cv2.VideoWriter_fourcc(*'avc1') + out = cv2.VideoWriter(filename, fourcc, 30.0, (640, 480)) # Assuming 640x480 resolution + logger.info("This is the first run, binding websocket ...") + while self.live: + logger.debug("VideoWorker writing to file: %s", filename) + frame_bytes = self.queue.get() + if frame_bytes is None: + logger.debug("Received None, stopping video worker for camera: '%r'", self.camera_id) + break + frame = cv2.imdecode(numpy.frombuffer(frame_bytes, dtype=numpy.uint8), cv2.IMREAD_COLOR) + logger.debug("Processing ('writing to a file') frame for camera: '%r'", self.camera_id) + # Write frame to file: + out.write(frame) + + # Release video writer + out.release() + logger.info("VideoWorker finished writing to file: %s", self.filename) + +def signal_handler(sig, frame) -> None: + """ + Docstring for signal_handler + + sig : Signal + Signal number + frame : Frame + Current stack frame + """ + worker.stop() + +def update_clients(client_id, camera_id) -> None: + """ + Update client and camera dictionary and write to a shared file. + + Parameters + ---------- + client_id : bytes + Client ID + camera_id : bytes + Camera ID + + Returns + ------- + None + """ + global CLIENTS_DICT + if client_id not in CLIENTS_DICT: + logger.debug("Client_id not in CLIENTS_DICT, adding an empty list for it.") + CLIENTS_DICT[client_id] = [] + if camera_id not in CLIENTS_DICT[client_id]: + logger.debug("Camera_id not in CLIENTS_DICT[%s] list, adding", client_id) + CLIENTS_DICT[client_id].append(camera_id) + # Atomic write using tempfile. Works only when both files on the same filesystem + with tempfile.NamedTemporaryFile('w', dir=TMP_DIR, delete=False) as tf: + logger.debug("Dumping to file CLIENTS_DICT: %r", CLIENTS_DICT) + json.dump(bytes_to_str(CLIENTS_DICT), tf) + tempname = tf.name + os.replace(tempname, CLIENTS_JSON_FILE) + + +if __name__ == "__main__": + logger.info("Starting up ...") + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + worker = ServerWorker(b"worker-task") + worker.start() + + worker.join() + try: + os.remove(CLIENTS_JSON_FILE) + except FileNotFoundError: + pass |
