aboutsummaryrefslogtreecommitdiff
path: root/worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'worker.py')
-rw-r--r--worker.py628
1 files changed, 628 insertions, 0 deletions
diff --git a/worker.py b/worker.py
new file mode 100644
index 0000000..26ea8aa
--- /dev/null
+++ b/worker.py
@@ -0,0 +1,628 @@
+#!/usr/bin/env python
+"""
+A module consisting of a zeromq worker receiving video stream
+via router from clients. Able to send control messages to clients.
+Needs libzmq and pyzmq with 'drafts' support.
+"""
+
+import os
+import sys
+import struct
+import datetime
+from threading import Thread, Event
+import time
+import signal
+import logging
+from queue import Queue
+from collections import defaultdict
+import tempfile
+import json
+import zmq
+import cv2
+import numpy
+
+from helpers import (
+ CustomLoggingFormatter,
+ bytes_to_str,
+ TIME_FORMAT_STRING,
+)
+
+__author__ = "Franoosh Corporation"
+
+# Various constants: # TODO: put them in a config
+
+HOST = "127.0.0.1"
+ZMQPORT = "9979"
+# WSPORT = "8000"
+WSPORT = "8008"
+ZMQ_BACKEND_ADDR = f"tcp://{HOST}:{ZMQPORT}"
+WEB_BACKEND_ADDR = f"tcp://{HOST}:{WSPORT}"
+# LOGLEVEL = logging.DEBUG
+LOGLEVEL = logging.INFO
+
+# File paths:
+
+LOGDIR = 'logs'
+if not os.path.exists(LOGDIR):
+ try:
+ os.makedirs(LOGDIR)
+ except Exception as exc:
+ print(f"Could not create log directory: {exc}")
+ sys.exit()
+BASENAME = f"{os.path.splitext(os.path.basename(__file__))[0]}"
+LOGFILE = os.path.join(LOGDIR, f"{BASENAME}.log")
+
+CWD = os.getcwd()
+TMP_DIR = os.path.join(CWD, "tmp")
+CLIENTS_JSON_FILE = os.path.join(CWD, 'clients.json')
+# This should come from the config file:
+VIDEO_DIR = os.path.join(CWD, "videos")
+
+# Other:
+
+CLIENTS_DICT = {}
+
+stop_event = Event()
+
+log_formatter = CustomLoggingFormatter()
+handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
+handler.setFormatter(log_formatter)
+logging.root.addHandler(handler)
+logging.root.setLevel(LOGLEVEL)
+logger = logging.getLogger(__name__)
+logging.basicConfig(
+ filename=LOGFILE,
+ datefmt='%Y-%m-%d %I:%M:%S',
+ level=LOGLEVEL,
+)
+
+
+if not os.path.exists(TMP_DIR):
+ try:
+ os.makedirs(TMP_DIR)
+ except Exception as exc:
+ logger.error("Could not create temporary directory: %r", exc)
+ sys.exit()
+
+class MonitorTask(Thread):
+ """
+ Monitor the connection status of the zmq socket.
+
+ Parameters
+ ----------
+ socket : zmq.Socket
+ The zmq socket to monitor.
+
+ Methods
+ -------
+ run()
+ Monitor connection on the initial socket.
+ stop()
+ Stop thread
+ """
+ def __init__(self, socket) -> None:
+ """
+ Docstring for __init__
+
+ Parameters
+ ----------
+ socket : zmq.Socket
+ The zmq socket to monitor.
+ """
+ super().__init__(daemon=True)
+ self.socket = socket
+ self.running = True
+
+ def run(self) -> None:
+ """
+ Monitor connection on the initial socket.
+ This is heartbeat monitoring
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ monitor = self.socket.get_monitor_socket()
+ monitor.setsockopt(zmq.RCVTIMEO, 5000)
+ logger.debug("Monitor socket started.")
+ while self.running:
+ try:
+ event, _ = monitor.recv_multipart()
+ # Resolve the received event type:
+ event_type = int.from_bytes(event[:2], "little")
+ if event_type in (zmq.EVENT_CLOSED, zmq.EVENT_DISCONNECTED):
+ logger.warning("Monitor socket: closed | disconnected")
+ stop_event.set()
+ elif event_type == zmq.EVENT_CONNECT_DELAYED:
+ logger.debug("Monitor socket: event connect delayed")
+ elif event_type == zmq.EVENT_CONNECT_RETRIED:
+ logger.debug("Monitor socket: event connect retried")
+ elif event_type in (zmq.EVENT_CONNECTED, zmq.EVENT_HANDSHAKE_SUCCEEDED):
+ logger.debug("Monitor socket: client connected to router, handshake OK.")
+ stop_event.clear()
+ else:
+ logger.warning("Monitor socket: other event: %r", event_type)
+ except zmq.Again:
+ logger.debug("Timeout on monitoring socket.")
+ except Exception as exc: # W: Catching too general exception Exception
+ logger.error("Other exception on monitoring socket: %r", exc)
+
+ # monitor.close()
+
+ def stop(self) -> None:
+ """
+ Stop thread
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ logger.info("Stopping monitor thread ...")
+ self.running = False
+
+
+class ServerWorker(Thread):
+ """
+ ServerWorker class for zmq video messaging.
+
+ Parameters
+ ----------
+ identity : bytes
+ Worker identity
+ context : zmq.Context, optional
+ ZMQ context (default is None, which creates a new context)
+
+ Methods
+ -------
+ start_client(client_id, camera_id, filename)
+ Start a video thread for new client_id and camera_id.
+ stop_client(client_id, camera_id)
+ Stop video thread for a client_id and camera_id.
+ run()
+ Main loop of the worker.
+ """
+ def __init__(self, identity, context=None) -> None:
+ """
+ Docstring for __init__
+
+ Parameters
+ ----------
+ identity : bytes
+ Worker identity
+ context : zmq.Context, optional
+ ZMQ context (default is None, which creates a new context)
+ """
+ super().__init__(daemon=True)
+ self.context = context or zmq.Context.instance()
+ self.socket = self.context.socket(zmq.DEALER)
+ self.id = self.socket.identity = identity
+ self.monitor = MonitorTask(self.socket)
+ self.video_threads = defaultdict(lambda: defaultdict(dict))
+ self.poller = zmq.Poller()
+ self.web_sock = self.context.socket(zmq.DEALER)
+ self.connected = False
+ self.running = True
+
+ def start_client(self, client_id, camera_id, filename) -> None:
+ """
+ Start a video thread for new client_id and camera_id.
+
+ Parameters
+ ----------
+ client_id : bytes
+ Client ID
+ camera_id : bytes
+ Camera ID
+ filename : str
+ Filename to write video to
+
+ Returns
+ -------
+ None
+ """
+ # New client or new camera for existing client:
+ if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id):
+ q = Queue()
+ logger.debug("Starting new video thread for client %r, camera %r", client_id, camera_id)
+ video_worker = VideoWorker(client_id, camera_id, filename, q)
+ video_worker.start()
+ self.video_threads[client_id][camera_id] = video_worker
+
+ def stop_client(self, client_id, camera_id) -> None:
+ """
+ Stop video thread for a client_id and camera_id.
+
+ Parameters
+ ----------
+ client_id : bytes
+ Client ID
+ camera_id : bytes
+ Camera ID
+
+ Returns
+ -------
+ None
+ """
+ if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
+ logger.debug("Stopping video thread for client %r, camera %r", client_id, camera_id)
+ self.video_threads[client_id][camera_id].stop() # Stop the thread
+ del self.video_threads[client_id][camera_id]
+ logger.info("Stopped video thread for client %r, camera %r", client_id, camera_id)
+
+ def handle_metadata(
+ self,
+ client_id: bytes,
+ camera_id: bytes,
+ directive: bytes,
+ timestamp: bytes
+ ) -> None:
+ """
+ Handle metadata messages.
+
+ Parameters
+ ----------
+ client_id : bytes
+ Client ID
+ camera_id : bytes
+ Camera ID
+ directive : bytes
+ Action to perform (e.g., 'start', 'stop')
+ timestamp : bytes
+ Timestamp of the action
+
+ Returns
+ -------
+ None
+ """
+ logger.info(
+ "Received metadata directive: %r for client: %r, camera: %r, timestamp: %r",
+ directive, client_id, camera_id, timestamp
+ )
+ update_clients(client_id, camera_id)
+ if directive in (b'start', b'stop'):
+ camera_id_string = camera_id.decode('utf-8')
+ timestamp_string = datetime.datetime.fromtimestamp(
+ struct.unpack('d', timestamp)[0]
+ ).strftime(TIME_FORMAT_STRING)
+ filename = f"{camera_id_string}-{timestamp_string}.mp4"
+ logger.debug("Directive: %r, Timestamp: %r, Filename: %r", directive, timestamp, filename)
+ if directive == b"start":
+ self.start_client(client_id, camera_id, filename)
+ elif directive == b"stop":
+ self.stop_client(client_id, camera_id)
+ else:
+ logger.error("Unknown directive: %r", directive)
+ # elif directive == b'rename':
+ # old_name = new_name = timestamp = None
+ # try:
+ # old_name, new_name, timestamp = data.split(b":")
+ # logger.info("Renamed video thread from %r to %r.", old_name, new_name)
+ # except ValueError:
+ # logger.error("Invalid rename data format: %r", data)
+ # if old_name and new_name and timestamp:
+ # # I think it's better to stop the old thread and start a new one,
+ # # rather than reuse the old one as it's less mucking about.
+ # self.stop_client(old_name, camera_id)
+ # new_name_string = new_name.decode('utf-8')
+ # camera_id_string = camera_id.decode('utf-8')
+ # timestamp_string = timestamp.decode('utf-8')
+ # self.start_client(
+ # new_name,
+ # camera_id,
+ # f"{new_name_string}_{camera_id_string}-{timestamp_string}.mp4"
+ # )
+ # to_remove= b':'.join([client_id, camera_id])
+ # try:
+ # self.web_sock.send_multipart([to_remove, b'', b'']) # Notify webserver of rename
+ # except Exception as exc:
+ # logger.error("Sending rename notification to websocket failed: %r", exc)
+
+ # elif directive in (b'new_camera', b'removed_camera'):
+ # logger.info("Received camera metadata update: %r for client %r, camera %r",
+ # directive,
+ # client_id,
+ # camera_id,
+ # )
+ # # Just forward the message to the webserver:
+ # try:
+ # self.web_sock.send_multipart([client_id, camera_id, directive, data], flags=zmq.NOBLOCK)
+ # except Exception as exc:
+ # logger.error("Sending camera metadata update to websocket failed: %r", exc)
+ else:
+ logger.error("Unknown directive: %r", directive)
+
+ return False
+
+ def handle_video_message(self, msg: list) -> None:
+ logger.debug("Received video message with data only.")
+ client_id, camera_id, *content = msg
+ if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
+ self.video_threads[client_id][camera_id].queue.put(content[0])
+ # Send only [client_id, camera_id, jpeg_bytes] to the webserver:
+ # zmq subsciber can subscribe to a topic defined by the first
+ # part of the multipart message, so in order to allow for a
+ # per camera subscription, we need to join client_id and camera_id
+ topic = b':'.join([client_id, camera_id])
+ try:
+ self.web_sock.send_multipart([topic, content[0]], flags=zmq.NOBLOCK)
+ except Exception as exc:
+ logger.error("Sending message to websocket failed: %r", exc)
+ else:
+ logger.error("No video thread found for client %r, camera %r", client_id, camera_id)
+ logger.debug("Available video threads keys: %r", self.video_threads.keys())
+ logger.debug("Available video threads values: %r", self.video_threads.values())
+
+ def run(self) -> None:
+ """
+ Main loop of the worker.
+ Full of wonders.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ logger.debug("ServerWorker %r starting ...", self.id)
+ self.socket.connect(ZMQ_BACKEND_ADDR)
+ try:
+ self.web_sock.bind(WEB_BACKEND_ADDR)
+ except Exception as exc:
+ logger.error("Connection to zmq websocket failed: %r", exc)
+ self.poller.register(self.socket, zmq.POLLIN)
+ self.poller.register(self.web_sock, zmq.POLLIN)
+ self.monitor.start()
+
+ while self.running:
+ logger.debug("ServerWorker %r waiting for a message ...", self.id)
+ if not self.connected or stop_event.is_set():
+ self.socket.send_multipart([b"READY"]) # Router needs worker identity, hence this
+ time.sleep(0.1) # Wait a bit before trying to connect again
+
+ sockets = dict(self.poller.poll(10))
+ if self.socket in sockets:
+ self.connected = True
+ msg = self.socket.recv_multipart()
+ logger.debug("ServerWorker %r received message of length: %d.", self.id, len(msg))
+ filename = None
+ # At the moment we don't expect any other message than a
+ # start/stop message (of length 4) and a video message:
+ if len(msg) == 4: # This is a message with start/stop/rename directive and no video data.
+ self.handle_metadata(*msg)
+ elif len(msg) == 3: # This is a video message with data
+ self.handle_video_message(msg)
+ else:
+ logger.warning("Received a message of unexpected length from client. Message length: %d", len(msg))
+ else:
+ logger.debug("No message received, polling again ...")
+ time.sleep(0.1)
+ if self.web_sock in sockets:
+ frontend_msg = self.web_sock.recv_multipart()
+ logger.info("Received message from frontend: %r", frontend_msg)
+ self.socket.send_multipart(frontend_msg)
+ logger.info("Forwarded message to backend: %r", frontend_msg)
+
+ self.monitor.stop()
+ self.monitor.join()
+
+ for camera_thread in self.video_threads.values():
+ for thread in camera_thread.values():
+ thread.queue.put(None) # Sentinel to unblock queue.get()
+ thread.stop()
+ thread.join()
+
+ def send_control_message(self, client_id, camera_id, message) -> None:
+ """
+ Send control message to a specific client and camera.
+
+ Parameters
+ ----------
+ client_id : bytes
+ Client ID
+ camera_id : bytes
+ Camera ID
+ message : bytes
+ Control message to send
+
+ Returns
+ -------
+ None
+ """
+ if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
+ # self.socket.send_multipart([client_id.encode("utf-8"), camera_id.encode("utf-8"), message])
+ self.socket.send_multipart([client_id, camera_id, message])
+ logger.info("Sent control message to client %r, camera %r: %r", client_id, camera_id, message)
+ else:
+ logger.error("No video thread found for client %r, camera %r", client_id, camera_id)
+
+ def stop(self) -> None:
+ """
+ Stop the worker thread.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ logger.info("ServerWorker %r exiting ...", self.id)
+ self.running = False
+
+
+class VideoWorker(Thread):
+ """
+ Class for video threads writing video stream to files.
+
+ Parameters
+ ----------
+ client_id : bytes
+ Client ID
+ camera_id : bytes
+ Camera ID
+ filename : str
+ Filename to write video to
+ queue : Queue
+ Queue to receive video frames
+
+ Methods
+ -------
+ run()
+ Main loop of the video worker thread.
+ stop()
+ Stop thread
+ """
+
+ def __init__(self, client_id, camera_id, filename, queue) -> None:
+ """
+ Docstring for __init__
+
+ Parameters
+ ----------
+ client_id : bytes
+ Client ID
+ camera_id : bytes
+ Camera ID
+ filename : str
+ Filename to write video to
+ queue : Queue
+ Queue to receive video frames
+ """
+ super().__init__(daemon=True)
+ self.context = zmq.Context()
+ self.context.setsockopt(zmq.LINGER, 0)
+ self.client_id = client_id
+ self.camera_id = camera_id
+ self.filename = filename
+ self.queue = queue
+ self.live = True
+
+ def stop(self) -> None:
+ """
+ Docstring for stop
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ logger.info("VideoWorker %r exiting ...", self.camera_id)
+ self.live = False
+
+ def run(self) -> None:
+ """
+ Main loop of the video worker thread.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ # Create a VIDEO_DIR/client_id/camera_id/ directory if it doesn't exist
+ dirname = os.path.join(
+ VIDEO_DIR,
+ self.client_id.decode('utf-8'),
+ self.camera_id.decode('utf-8'),
+ )
+ filename = os.path.join(
+ dirname,
+ self.filename
+ )
+ try:
+ os.makedirs(dirname, exist_ok=True)
+ except PermissionError as exc:
+ logger.error("Could not create directory '%s': %r", dirname, exc)
+ return
+ if os.path.exists(filename):
+ logger.warning("File '%s' already exists, overwriting ...", filename)
+
+ fourcc = cv2.VideoWriter_fourcc(*'avc1')
+ out = cv2.VideoWriter(filename, fourcc, 30.0, (640, 480)) # Assuming 640x480 resolution
+ logger.info("This is the first run, binding websocket ...")
+ while self.live:
+ logger.debug("VideoWorker writing to file: %s", filename)
+ frame_bytes = self.queue.get()
+ if frame_bytes is None:
+ logger.debug("Received None, stopping video worker for camera: '%r'", self.camera_id)
+ break
+ frame = cv2.imdecode(numpy.frombuffer(frame_bytes, dtype=numpy.uint8), cv2.IMREAD_COLOR)
+ logger.debug("Processing ('writing to a file') frame for camera: '%r'", self.camera_id)
+ # Write frame to file:
+ out.write(frame)
+
+ # Release video writer
+ out.release()
+ logger.info("VideoWorker finished writing to file: %s", self.filename)
+
+def signal_handler(sig, frame) -> None:
+ """
+ Docstring for signal_handler
+
+ sig : Signal
+ Signal number
+ frame : Frame
+ Current stack frame
+ """
+ worker.stop()
+
+def update_clients(client_id, camera_id) -> None:
+ """
+ Update client and camera dictionary and write to a shared file.
+
+ Parameters
+ ----------
+ client_id : bytes
+ Client ID
+ camera_id : bytes
+ Camera ID
+
+ Returns
+ -------
+ None
+ """
+ global CLIENTS_DICT
+ if client_id not in CLIENTS_DICT:
+ logger.debug("Client_id not in CLIENTS_DICT, adding an empty list for it.")
+ CLIENTS_DICT[client_id] = []
+ if camera_id not in CLIENTS_DICT[client_id]:
+ logger.debug("Camera_id not in CLIENTS_DICT[%s] list, adding", client_id)
+ CLIENTS_DICT[client_id].append(camera_id)
+ # Atomic write using tempfile. Works only when both files on the same filesystem
+ with tempfile.NamedTemporaryFile('w', dir=TMP_DIR, delete=False) as tf:
+ logger.debug("Dumping to file CLIENTS_DICT: %r", CLIENTS_DICT)
+ json.dump(bytes_to_str(CLIENTS_DICT), tf)
+ tempname = tf.name
+ os.replace(tempname, CLIENTS_JSON_FILE)
+
+
+if __name__ == "__main__":
+ logger.info("Starting up ...")
+
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+
+ worker = ServerWorker(b"worker-task")
+ worker.start()
+
+ worker.join()
+ try:
+ os.remove(CLIENTS_JSON_FILE)
+ except FileNotFoundError:
+ pass