aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore3
-rw-r--r--README.md25
-rw-r--r--client.cfg17
-rw-r--r--client.py1006
-rw-r--r--helpers.py256
-rw-r--r--router.py266
-rw-r--r--static/css/main.css36
-rw-r--r--templates/client.html173
-rw-r--r--templates/main.html16
-rw-r--r--webserver.py524
-rw-r--r--worker.py628
11 files changed, 2950 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..c70b96d
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+__pycache__/
+*/logs
+*/videos
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..f0ba6a6
--- /dev/null
+++ b/README.md
@@ -0,0 +1,25 @@
+Movement detection application.
+Once movement is detected, application starts streaming via router to one of available servers.
+Server stores video stream and makes them avaiable via a web gui, also live.
+
+This repo includes only the application layer.
+Although this is written for Gentoo Linux, it should be trivial to run on any other nix.
+
+There shall be another git repo for the OS bits necessary for deployment.
+
+Technologies used:
+
+- Gentoo Linux;
+- V4l for handling of camera stream;
+- OpenCV (Python bindings) for movement detection;
+- ZeroMQ (Python bindings) for messaging;
+- FastAPI for web part.
+
+
+Necessary Gentoo packages (may not be complete):
+
+media-libs/opencv[python,v4l] - USE list may be incomplete
+net-libs/zeromq[drafts, sodium]
+
+Necessary python modules (may not be complete):
+dev-python/inotify
diff --git a/client.cfg b/client.cfg
new file mode 100644
index 0000000..c407c4e
--- /dev/null
+++ b/client.cfg
@@ -0,0 +1,17 @@
+[main]
+testing = True
+
+[network]
+router_address = 127.0.0.1
+router_port = 5569
+
+[logging]
+logdir = logs
+logfile = client.log
+loglevel = INFO
+
+[cameras]
+front.address = /dev/video0
+front.contour_size_threshold = 4000
+front.movement_grace_period = 10
+
diff --git a/client.py b/client.py
new file mode 100644
index 0000000..fc0d3f7
--- /dev/null
+++ b/client.py
@@ -0,0 +1,1006 @@
+#!/usr/bin/env python
+
+"""
+A module containing client for streaming video to a zmq router.
+"""
+
+__author__ = "Franoosh Corporation"
+
+import sys
+import os
+from threading import Thread, Event
+from queue import Queue, Empty
+from collections import deque, defaultdict
+import time
+import datetime
+import signal
+import logging
+from configparser import ConfigParser
+import zmq
+import cv2
+import inotify.adapters
+import yappi
+from concurrent.futures import ThreadPoolExecutor
+
+from helpers import (
+ CustomLoggingFormatter,
+ process_frame,
+ compute_contours,
+ detect_movement,
+ scale_contours,
+ draw_contours,
+ str_to_bytes,
+ timestamp_to_bytes,
+ bytes_to_timestamp,
+ write_yappi_stats,
+)
+
+###################################################
+# Defaults and constants #
+###################################################
+
+TESTING = False # For now this turns on yappi profiling
+CONFIG_FILE = "client.cfg"
+# Those will be updated from config file if present:
+LOGDIR = 'logs'
+if not os.path.exists(LOGDIR):
+ try:
+ os.makedirs(LOGDIR)
+ except Exception as exc:
+ print(f"Could not create log directory: {exc}")
+ sys.exit()
+BASENAME = f"{os.path.splitext(os.path.basename(__file__))[0]}"
+LOGFILE = os.path.join(LOGDIR, f"{BASENAME}.log")
+YAAPI_LOG_FILE_BASE = "client_yaapi"
+# LOGLEVEL = logging.INFO
+LOGLEVEL = logging.DEBUG
+
+ROUTER_ADDRESS = "localhost"
+PORT = "5569"
+ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}"
+# This is populated at startup with all cameras present in /dev
+# (like /dev/video0, /dev/video1, etc.).
+# This will include dummy devices!
+# If cameras are already defined in config file, those will override present devicesl:
+KNOWN_CAMERA_PARAMETERS = ('address', 'contour_size_threshold', 'movement_grace_period')
+CONTOUR_SIZE_THRESHOLD = 4000 # Minimum contour area to consider
+MOVEMENT_GRACE_PERIOD = 10 # Seconds to wait after movement stops
+TIME_FORMAT = '%Y_%m_%d-%H_%M_%S'
+START_STOP_MESSAGE_FMT = "{action}:{data}"
+MAX_CAMERA_NAME_LENGTH = 256
+FPS = 30
+# This should be unique, each box different.
+# If client ID is wrong, should server reject connection?
+# TODO: Devise and implement schema for client ID assignment.
+# Maybe serial number from /proc/cpuinfo.
+# In bytes.
+CLIENT_ID = b"client_001"
+
+#####################################################
+# End defaults and constants #
+#####################################################
+
+log_formatter = CustomLoggingFormatter()
+# Attempt to create logging directory if it does not exist:
+if not os.path.exists(LOGDIR):
+ try:
+ os.makedirs(LOGDIR)
+ except Exception as exc:
+ print("Could not create log directory: %r", exc)
+try:
+ handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
+except Exception as exc:
+ handler = logging.StreamHandler()
+# Prevent log pollution from inotify when LOG_LEVEL set to debug:
+logging.getLogger("inotify").setLevel(logging.WARNING)
+handler.setFormatter(log_formatter)
+logging.root.addHandler(handler)
+logging.root.setLevel(LOGLEVEL)
+logger = logging.getLogger(__name__)
+logging.basicConfig(
+ filename=LOGFILE,
+ datefmt=TIME_FORMAT,
+ level=LOGLEVEL,
+)
+
+CAMERAS = {}
+stop_event = Event()
+
+
+def _encoder_worker(
+ encode_queue: Queue,
+ output_queue: Queue,
+ jpeg_quality=60,
+ ) -> None:
+ while True:
+ try:
+ identity, frame = encode_queue.get(timeout=1)
+ except Empty:
+ logger.debug("Encoder worker: encode queue empty, continuing ...")
+ continue
+ # Encode frame as JPEG with lower quality:
+ encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_quality]
+ try:
+ _, buffer = cv2.imencode('.jpg', frame, encode_param)
+ except Exception as exc:
+ logger.error("Encoder worker: Error encoding frame: %r", exc)
+ try:
+ message = [identity, buffer.tobytes()]
+ output_queue.put_nowait(message)
+ except Exception as exc:
+ logger.debug("Send queue full, dropping encoded frame: %r", exc)
+ time.sleep(0.1)
+
+class ClientVideo(Thread):
+ """
+ Class for sending video stream
+
+ Attributes:
+ ----------
+ _id: (bytes)
+ Camera ID
+ device_dict: dict
+ Dictionary with device parameters (bytes for strings, integers for ints)
+ queue: Queue
+ Queue for sending messages to main client task
+
+ Methods:
+ --------
+ run
+ Main video streaming logic
+ stop
+ Stop video streaming
+ """
+
+ def __init__(self, _id, device_dict, queue) -> None:
+ """
+ Parameters
+ ----------
+ _id : bytes
+ Camera ID
+ device_dict : dict
+ Dictionary with device parameters (bytes for strings, integers for ints)
+ queue : Queue
+ Queue for sending messages to main client task
+ """
+ super().__init__(daemon=True)
+ self.identity = _id
+ self.device_dict = device_dict # <-- A dict with bytes for strings and integers for ints.
+ self.device_addr = self.device_dict.get(b'address') # No default.
+ self.device_threshold = device_dict[b'contour_size_threshold']
+ self.device_grace_period = device_dict[b'movement_grace_period']
+ self.queue = queue
+ # Store only FPS number of frames, compare first and last:
+ self.frame_deque = deque(maxlen=FPS)
+ self.encode_queue = Queue(maxsize=3)
+ self._enc_executor = ThreadPoolExecutor(max_workers=1)
+ self.live = True
+
+ def send_frame(self, contours: list) -> None:
+ """
+ Prepare and send frame with drawn contours.
+
+ Parameters
+ ----------
+ contours: list
+ list of contours to draw on a frame
+ """
+
+ raw_frame = self.frame_deque[-1][2]
+ scaling_factor = self.frame_deque[-1][1]
+ scaled_contours = scale_contours(contours, scaling_factor)
+ try:
+ frame_out = draw_contours(
+ raw_frame,
+ scaled_contours,
+ min_contour_area=self.device_threshold,
+ )
+ except Exception as exc:
+ logger.error("ClientVideo %r: Error drawing contours: %r", self.identity, exc)
+ frame_out = raw_frame
+
+ # Send video message with contour and frame:
+ try:
+ logger.debug("ClientVideo %r: Queueing frame for encoding ...", self.identity)
+ self.encode_queue.put_nowait((self.identity, frame_out))
+ except Exception as exc:
+ try:
+ logger.debug("ClientVideo %r: Encode queue full, dropping oldest frame: %r", self.identity, exc)
+ _ = self.encode_queue.get_nowait() # Remove oldest frame
+ self.encode_queue.put_nowait((self.identity, frame_out))
+ except Exception:
+ pass
+
+ def run(self) -> None:
+ """
+ Video streaming logic.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ logger.debug("ClientVideo '%s' starting ...", self.identity)
+ # The cv2.CAP_V4l2 below forces reading using v4l backend rather than gstreamer.
+ # If opencv was compiled with gstreamer support, it will default to gstreamer.
+ # Then we need to make sure that appropriate gstreamer plugins are installed.
+ # Although I haven't profiled it, it seems that using v4l uses less juice.
+ # Some devices can be opened with address (/dev/video0) and some only with integer index (0).
+ cap = cv2.VideoCapture(
+ self.device_addr.decode('utf-8'),
+ cv2.CAP_V4L2
+ )
+ if not cap.isOpened():
+ logger.error("ClientVideo %r: Unable to open video device %r", self.identity, self.device_addr)
+ cap.release()
+ return
+
+ # Flag to track movement state. Expires after grace period:
+ movement_detected = False
+ movement_detected_start = movement_now = None
+ while self.live and not stop_event.is_set():
+ ret, frame = cap.read()
+ if not ret:
+ logger.error("ClientVideo %r: Failed to read frame from device %r", self.identity, self.device_addr)
+ break
+ try:
+ # Produce grayscale blurred frame for contour detection:
+ processed_frame_tuple = process_frame(frame)
+ except RuntimeError as exc:
+ logger.error("ClientVideo %r: Error processing frame: %r", self.identity, exc)
+ continue
+ # Append both original and processed frame to deque:
+ # processed_frame_tuple = (processed_frame, scaling_factor, raw_frame)
+ self.frame_deque.append(processed_frame_tuple + (frame,))
+ # Skip on the first frame:
+ # To detect movement, compare only first and last frame from the queue of length FPS
+ # in order to reduce load:
+ if len(self.frame_deque) >= 2:
+ sample_frames = self.frame_deque[0][0], self.frame_deque[-1][0]
+ try:
+ contours = compute_contours(sample_frames)
+ logger.debug("ClientVideo %r: Found %d contours.", self.identity, len(contours))
+ logger.debug("ClientVideo %r: Contours: %r", self.identity, contours)
+ except Exception as exc:
+ logger.error("ClientVideo %r: Error computing contours: %r", self.identity, exc)
+ continue
+ try:
+ movement_now = detect_movement(contours, min_area=self.device_threshold)
+ logger.debug("ClientVideo %r: Movement detected in frame.", self.identity)
+ except Exception as exc:
+ logger.error("ClientVideo %r: Error detecting movement: %r", self.identity, exc)
+ continue
+
+ # Only update movement start time and send start message
+ # if movement is detected and was not detected before:
+ if movement_now:
+ if not movement_detected:
+ # Update movement detected start time:
+ movement_detected_start = time.time()
+ queue_metadata_message(
+ identity=self.identity,
+ queue=self.queue,
+ action=b'start',
+ timestamp=timestamp_to_bytes(movement_detected_start),
+ )
+ movement_detected = True
+
+ # Prepare and send frame with drawn contours:
+ self.send_frame(contours)
+
+ else:
+ if movement_detected:
+ # Movement was detected before, but not anymore.
+ # We wait for grace period before sending stop message.
+ now = time.time()
+ delta_seconds = now - movement_detected_start
+ logger.debug(
+ "ClientVideo %r: delta seconds since movement detected: %d, grace remaining: %d.",
+ self.identity,
+ delta_seconds,
+ self.device_grace_period - delta_seconds
+ )
+ if delta_seconds >= self.device_grace_period:
+ queue_metadata_message(
+ identity=self.identity,
+ queue=self.queue,
+ action=b'stop',
+ timestamp=timestamp_to_bytes(now),
+ )
+ movement_detected = False
+ movement_detected_start = None
+ else:
+ # Still in grace period, send frame with contours:
+ self.send_frame(contours)
+ else:
+ pass # No movement detected currently or before, do nothing.
+ logger.debug("Calling cap.release() ...")
+ cap.release()
+ logger.info("ClientVideo %r exiting ...", self.identity)
+
+ def stop(self) -> None:
+ """
+ Stop video streaming thread.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ logger.info("Client video %r sending stop message ...", self.identity)
+ self.live = False
+ global stop_event
+ stop_event.set()
+ # self.queue.put([]) # Put a dummy message to unblock queue if needed
+
+
+class ClientTask(Thread):
+ """
+ Main Client Task with logic, message handling and setup.
+
+ Attributes:
+ ----------
+ context: zmq.Context
+ ZMQ context
+ socket: zmq.Socket
+ ZMQ DEALER socket
+ identity: bytes
+ Client identity
+ poll: zmq.Poller
+ ZMQ poller for socket
+ video_threads: dict
+ Dictionary of ClientVideo threads
+ running: bool
+ Flag indicating if client is running
+ queue: Queue
+ Queue for sending messages to server
+
+ Methods:
+ --------
+ run
+ Main client logic
+ stop
+ Stop client task
+ receive_messages
+ Receive messages from the server
+ send_messages
+ Send messages to the server
+ update_config(message)
+ Update configuration based on message from server
+ watch_for_new_cameras
+ Watch for new camera devices using inotify
+ """
+
+ def __init__(self, _id, context: zmq.Context=None) -> None:
+ """
+ Parameters
+ ----------
+ _id : bytes
+ Client ID
+ context : zmq.Context, optional
+ ZMQ context, by default None
+ """
+ super().__init__(daemon=True)
+ self.context = context or zmq.Context.instance()
+ self.socket = self.context.socket(zmq.DEALER)
+ self.identity = self.socket.identity = _id # <-- bytes
+ self.poll = zmq.Poller()
+ self.poll.register(self.socket, zmq.POLLIN)
+ self.video_threads = {}
+ self.running = True
+ self.connected = False
+ self.queue = Queue()
+
+ def receive_messages(self) -> None:
+ """
+ Receive messages from the server.
+ Messages are expected to be multipart with:
+ [camera_id (bytes), directive (bytes), value (bytes)]
+ Update configuration based on received messages running update_config(message).
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ while self.running:
+ try:
+ sockets = dict(self.poll.poll(1000))
+ if self.socket in sockets:
+ logger.debug("Self socket in sockets, receiving message ...")
+ try:
+ message = self.socket.recv_multipart()
+ logger.info("Client '%r' received message: %r.", self.identity, message)
+ self.update_config(message)
+ except Exception as exc:
+ logger.error("Failed to receive message: %r", exc)
+ except zmq.ZMQError as exc:
+ logger.error("Client '%r': socket error: %r", self.identity, exc)
+ time.sleep(0.01) # Sleep to avoid busy waiting
+
+ def send_messages(self) -> None:
+ """
+ Send messages from queue to the server.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ while self.running:
+ try:
+ logger.debug("Client '%r': Waiting for message to send ...", self.identity)
+ frame = self.queue.get(timeout=0.1)
+ logger.debug("Sending message of length %r ...", len(frame))
+ try:
+ if len(frame) > 1:
+ # TODO: This blocks?
+ self.socket.send_multipart(frame)
+ else:
+ logger.debug("Client '%r': Dummy message received, not sending.", self.identity)
+ except zmq.ZMQError as exc:
+ logger.error("Client '%r': socket error: %r", self.identity, exc)
+ except Empty:
+ continue # No message to send, continue waiting
+
+ def update_config(self, message: list) -> None:
+ """
+ Update configuration based on message from server.
+ Only 'cameras' section of the config file can be updated.
+ Message format:
+ [camera_id (bytes), directive (bytes), value (bytes)]
+ Defined directives are:
+ modify_camera_name,
+ modify_camera_threshold,
+ modify_camera_grace_pd,
+ add_camera,
+ remove_camera,
+
+ Parameters
+ ----------
+ message : list
+ Message received from server
+
+ Returns
+ -------
+ None
+ """
+ try:
+ logger.debug("Client received config message: %r", message)
+ try:
+ camera_id, directive, value = message # <-- bytes
+ except ValueError as exc:
+ logger.error("Invalid config message format: %r", exc)
+ return
+ # Parameter validation happens on the server side before sending the message
+ if directive in (
+ b'modify_camera_name',
+ b'modify_camera_threshold',
+ b'modify_camera_grace_pd',
+ ):
+ # Ok, just validate that camera exists:
+ if camera_id not in CAMERAS: # <-- bytes
+ logger.error("Cannot update unknown camera: %r", camera_id)
+ logger.debug("Known cameras: %r", list(CAMERAS.keys()))
+ else:
+ if directive == b'modify_camera_name':
+ old_name, new_name = camera_id, value # <-- bytes
+ # Update global CAMERAS dict:
+ CAMERAS[new_name] = CAMERAS.pop(old_name)
+ # Update config file accordingly:
+ for key, value in cfg.items('cameras'): # <-- strings
+ if key.startswith(old_name.encode('utf-8') + '.'):
+ # Take the second part of a key like 'camera1.address' -> 'address':
+ param = key.split('.', 1)[1]
+ cfg.set('cameras', f"{new_name.encode('utf-8')}.{param}", value.encode('utf-8'))
+ cfg.remove_option('cameras', key)
+ # Update video threads dict:
+ self.video_threads[new_name] = self.video_threads.pop(old_name)
+ # Send message to server to update routing:
+ # FIXME: This is a mess. Settle on a format for data field.
+ queue_metadata_message(
+ identity=self.identity,
+ queue=self.queue,
+ action=b'rename',
+ # data=':'.join([old_name.decode('utf-8'), new_name.decode('utf-8'), timestamp]) # FIXME: this should be datetime object
+ data=timestamp_to_bytes(time.time()),
+ )
+ self.send_messages()
+ logger.info("Renamed and restarted thread for camera %r to %r.", old_name, new_name)
+ elif directive == b'modify_camera_threshold':
+ try:
+ _value = int(value)
+ except ValueError as exc:
+ logger.error("Invalid contour size threshold value: %r, %r", value, exc)
+ return
+ CAMERAS[camera_id][b'contour_size_threshold'] = _value # <-- integer!
+ self.video_threads[camera_id].device_threshold = _value # <-- integer!
+ # configparser requires strings:
+ cfg.set('cameras', f"{camera_id.decode('utf-8')}.contour_size_threshold", str(value))
+ logger.info("Modified contour size threshold of camera %r to %d.", camera_id, value)
+ elif directive == b'modify_camera_grace_pd':
+ try:
+ _value = int(value)
+ except ValueError as exc:
+ logger.error("Invalid movement grace period value: %r, %r", value, exc)
+ return
+ CAMERAS[camera_id][b'movement_grace_period'] = _value # <-- integer!
+ self.video_threads[camera_id].device_grace_period = _value # <-- integer!
+ cfg.set('cameras', f"{camera_id.decode('utf-8')}.movement_grace_period", str(value))
+ logger.info("Modified movement grace period of camera %r to %d.", camera_id, value)
+ elif directive == b'add_camera':
+ CAMERAS[camera_id] = {b'address': value}
+ # vid_cl = ClientVideo(camera_id, CAMERAS[camera_id], self.queue)
+ # vid_cl.start()
+ # self.video_threads[camera_id] = vid_cl
+ cfg.set('cameras', f"{camera_id.decode('utf-8')}.address", value.decode('utf-8'))
+ logger.info("Added and started a thread for new camera %r with address %r.", camera_id, value)
+ elif directive == b'remove_camera':
+ if camera_id in self.video_threads:
+ self.video_threads[camera_id].stop()
+ # self.video_threads[camera_id].join() # TODO: Check this is needed and wanted!
+ # del self.video_threads[camera_id]
+ if camera_id in CAMERAS:
+ del CAMERAS[camera_id]
+ # Remove from config file:
+ for key in list(cfg.options('cameras')): # <-- strings
+ if key.startswith(camera_id.decode('utf-8') + '.'):
+ cfg.remove_option('cameras', key)
+ logger.info("Removed and stopped thread for camera %r.", camera_id)
+ else:
+ logger.warning("Unknown config directive: %r", directive)
+
+ except (ValueError, IndexError) as exc:
+ logger.error("Failed to handle config message: %r", exc)
+ with open(CONFIG_FILE, 'w') as f:
+ cfg.write(f)
+
+ def watch_for_new_cameras(self) -> None:
+ """
+ Watch for new camera devices using inotify.
+ When a new camera device is detected in /dev (e.g., /dev/video2),
+ send a message to the server to start receiving video from it.
+ When a camera device is removed, send a message to stop receiving video from it.
+ Update config file accordingly.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ i = inotify.adapters.Inotify()
+ i.add_watch('/dev')
+
+ while self.running:
+ for event in i.event_gen(timeout_s=1, yield_nones=False):
+ (_, type_names, path, filename) = event
+ if filename.startswith('video'):
+ camera_pathname = os.path.join(path, filename)
+ if 'IN_CREATE' in type_names:
+ logger.info("Detected new camera device: %s", camera_pathname)
+ self.update_config([filename.encode('utf-8'), b'add_camera', camera_pathname.encode('utf-8')])
+ video_dict = {b'address': camera_pathname.encode('utf-8')}
+ self.start_video_thread(filename.encode('utf-8'), video_dict)
+ if 'IN_DELETE' in type_names:
+ logger.info("Detected removed camera device: %s", camera_pathname)
+ self.update_config([filename.encode('utf-8'), b'remove_camera', b''])
+ self.video_threads[filename.encode('utf-8')].stop()
+
+ def start_video_thread(self, camera_id: bytes, device_dict: dict) -> None:
+ """
+ Start a video thread for a given camera.
+
+ Parameters
+ ----------
+ camera_id : bytes
+ Camera ID
+ device_dict : dict
+ Dictionary with device parameters
+
+ Returns
+ -------
+ None
+ """
+ vid_cl = ClientVideo(camera_id, device_dict, self.queue)
+ vid_cl.start()
+ self.video_threads[camera_id] = vid_cl
+ logger.info("Started video thread for camera %r.", camera_id)
+ Thread(
+ target=_encoder_worker,
+ args=(vid_cl.encode_queue, self.queue,),
+ daemon=True,
+ ).start()
+
+ def run(self) -> None:
+ """
+ Run main client logic thread.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ self.socket.connect(ADDRESS)
+ logger.debug("Client '%r' connected to %r", self.identity, ADDRESS)
+ logger.debug("Starting video threads ...")
+ for _id, device_dict in CAMERAS.items():
+ self.start_video_thread(_id, device_dict)
+
+ recv_thread = Thread(target=self.receive_messages) # Not a daemon
+ send_thread = Thread(target=self.send_messages) # Not a daemon
+ watchdog_thread = Thread(target=self.watch_for_new_cameras, daemon=True)
+
+ recv_thread.start()
+ logger.debug("Client '%s' started receiving thread.", self.identity)
+ send_thread.start()
+ logger.debug("Client '%s' started sending thread.", self.identity)
+ watchdog_thread.start()
+ logger.debug("Client '%s' started watchdog thread.", self.identity)
+
+ logger.debug("Client '%s' waiting for threads to finish ...", self.identity)
+ recv_thread.join()
+ send_thread.join()
+ watchdog_thread.join(timeout=5)
+
+ logger.info("Closing socket ...")
+ try:
+ self.socket.close()
+ except Exception as exc:
+ logger.warning("Failed to close socket: %r", exc)
+ logger.debug("Terminating context ...")
+ try:
+ self.context.term()
+ except Exception as exc:
+ logger.warning("Failed to terminate context: %r", exc)
+
+ def stop(self) -> None:
+ """
+ Stop task
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ logger.info("ClientTask cleaning up ...")
+ for _id in list(self.video_threads.keys()):
+ logger.info("Cleaning video thread %s ...", _id)
+ _thread = self.video_threads[_id]
+ _thread.stop()
+ _thread.join(timeout=3)
+ if _thread.is_alive():
+ logger.warning("Video thread %r did not stop in time.", _id)
+ self.video_threads.pop(_id, None)
+
+ self.running = False
+ self.queue.put([]) # Put a dummy message to unblock send_messages if waiting
+ try:
+ self.socket.close(linger=0)
+ except Exception as exc:
+ logger.warning("Error closing socket: %r", exc)
+ logger.info("Client task exiting ...")
+
+
+def queue_video_message(identity: bytes, queue: Queue, frame) -> None:
+ """
+ Docstring for queue_video_message
+
+ Parameters
+ ----------
+ identity : bytes
+ Camera identity
+ queue : Queue
+ Queue to put message to
+ frame : ndarray
+ Video frame to send
+
+ Returns
+ -------
+ None
+ """
+ # Encode frame as JPEG:
+ _, buffer = cv2.imencode('.jpg', frame)
+ # Add identity to message:
+ message = (identity, buffer.tobytes())
+ # Put message to queue:
+ queue.put(message)
+
+def queue_metadata_message(
+ identity: bytes,
+ queue: Queue,
+ action: bytes,
+ timestamp: float,
+ ) -> None:
+ """
+ Queue metadata message to be sent to server.
+
+ Parameters
+ ----------
+ identity : bytes
+ Camera ID
+ queue : Queue
+ Queue to put message to
+ action : bytes
+ Action as bytes ('start' or 'stop')
+ data : float
+ Data string: this depends on action, for start/stop it's a timestamp, for rename it's old_name:new_name:timestamp
+
+ Returns
+ -------
+ None
+ """
+ logger.debug("ClientVideo: %r at: %r", action, datetime.datetime.fromtimestamp(
+ bytes_to_timestamp(timestamp)).strftime(TIME_FORMAT))
+ message = (identity, action, timestamp)
+ queue.put(message)
+
+def find_present_cameras() -> dict:
+ """
+ Find currently present camera devices in /dev.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ dict
+ Dictionary of present cameras with device names as keys and paths as values
+ """
+ present_cameras = {}
+ for entry in os.listdir('/dev'):
+ if entry.startswith('video'):
+ present_cameras[entry] = os.path.join('/dev', entry)
+ return present_cameras
+
+def read_config(conf_file) -> ConfigParser:
+ """
+ Read config file and return as dictionary.
+ The only required section is 'cameras' with at least one camera.
+ Do basic sanity checks and update global variables.
+ Log warnings if config file or sections/options are missing.
+
+ Parameters
+ ----------
+ conf_file : str
+ Path to config file
+
+ Returns
+ -------
+ ConfigParser
+ ConfigParser object with configuration
+ """
+ cfg = ConfigParser()
+ cfg.read(conf_file)
+ cameras_dict = None
+ # Need to update logging info:
+ if 'main' in cfg:
+ global TESTING
+ try:
+ TESTING = cfg.getboolean('main', 'testing')
+ logger.info("Testing mode is set to: %r.", TESTING)
+ except ValueError as exc:
+ logger.error("Invalid value for 'testing' option in 'main' section: %r, using default: %r.", exc, TESTING)
+
+ if 'logging' in cfg:
+ global LOGDIR, LOGFILE, LOGLEVEL
+
+ LOGDIR = cfg.get('logging', 'logdir', fallback='.')
+ LOGFILE = cfg.get('logging', 'logfile', fallback=LOGFILE)
+ LOGLEVEL = cfg.get('logging', 'loglevel', fallback=LOGLEVEL)
+ logger.setLevel(LOGLEVEL)
+ for _handler in logger.handlers:
+ _handler.setLevel(LOGLEVEL)
+ if 'network' in cfg:
+ global ROUTER_ADDRESS, PORT, ADDRESS
+ if cfg.has_option('network', 'router_address'):
+ ROUTER_ADDRESS = cfg.get('network', 'router_address')
+ else:
+ logger.warning("No 'router_address' option in 'network' section, using default: %r.", ROUTER_ADDRESS)
+ if cfg.has_option('network', 'router_port'):
+ PORT = cfg.get('network', 'router_port')
+ else:
+ logger.warning("No 'router_port' option in 'network' section, using default: %r.", PORT)
+ ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}"
+ else:
+ logger.warning("No 'network' section in config file, using defaults: %r.", ADDRESS)
+
+ if 'cameras' in cfg:
+ # Need to take care: camera names are stored as strings but comunicated as bytes:
+ cameras_dict = set_up_cameras(cfg.items('cameras')) # <-- dict with strings
+ if cameras_dict:
+ # Only update global CAMERAS if we have at least one valid camera with an address:
+ CAMERAS = str_to_bytes(cameras_dict) # <-- now bytes
+ logger.info("Using camera configuration updated from config file: %r", CAMERAS)
+
+ if 'cameras' not in cfg or not cameras_dict:
+ # No cameras section or no valid cameras found:
+ logger.warning("No 'cameras' section in config file, using defaults: %r.", CAMERAS)
+ found_cameras = find_present_cameras()
+ for cam_name, cam_addr in found_cameras.items():
+ CAMERAS[cam_name.encode('utf-8')] = {'address': cam_addr.encode('utf-8')} # <-- bytes
+ # Use default parameters:
+ cfg.set('cameras', f"{cam_name}.address", cam_addr)
+ cfg.set('cameras', f"{cam_name}.contour_size_threshold", str(CONTOUR_SIZE_THRESHOLD))
+ cfg.set('cameras', f"{cam_name}.movement_grace_period", str(MOVEMENT_GRACE_PERIOD))
+ logger.info("CAMERAS configuration: %r", CAMERAS)
+ with open(conf_file, 'w') as configfile:
+ cfg.write(configfile)
+
+ return cfg
+
+def validate_camera_address(address: str) -> bool:
+ """
+ Validate camera address.
+
+ Parameters
+ ----------
+ address : str
+ Camera address
+
+ Returns
+ -------
+ bool
+ True if address is valid, False otherwise
+ """
+ # Check if address is an integer (device index):
+ if address.startswith('/dev/video'):
+ return address
+ raise ValueError("Invalid camera address: %r" % address)
+
+def validate_camera_threshold(threshold: str) -> bool:
+ """
+ Validate camera contour size threshold.
+
+ Parameters
+ ----------
+ threshold : str
+ Contour size threshold
+
+ Returns
+ -------
+ bool
+ True if threshold is valid, False otherwise
+ """
+ try:
+ if int(threshold) > 0:
+ return threshold
+ except ValueError:
+ raise ValueError("Invalid contour size threshold: %r" % threshold)
+ raise ValueError("Invalid contour size threshold: %r" % threshold)
+
+def validate_camera_grace_pd(grace_pd: str) -> bool:
+ """
+ Validate camera movement grace period.
+
+ Parameters
+ ----------
+ grace_pd : str
+ Movement grace period
+
+ Returns
+ -------
+ bool
+ True if grace period is valid, False otherwise
+ """
+ try:
+ if int(grace_pd) >= 0:
+ return grace_pd
+ except ValueError:
+ raise ValueError("Invalid movement grace period: %r" % grace_pd)
+ raise ValueError("Invalid movement grace period: %r" % grace_pd)
+
+def set_up_cameras(cameras_cfg_section: dict) -> None:
+ """
+ Set up camera configuration from config file section.
+ Validate camera parameters and apply defaults if necessary.
+ Parameters
+ ----------
+ cameras_section : dict
+ Dictionary of camera configuration from config file
+ Returns
+ -------
+ cameras_dict : dict
+ Dictionary to store validated camera configuration
+ """
+ # ConfigParser does not support nested sections.
+ # These shenanigans below allow for a nested config simulation with configparser
+ # and so per camera settings:
+ cameras_dict = defaultdict(dict)
+ for key, val in cameras_cfg_section: # <-- strings
+ try:
+ cam, param = key.split('.', 1)
+ except ValueError as exc:
+ logger.error(
+ "Invalid camera configuration entry: %r, error: %r, using defaults.", key, exc)
+ match param:
+ case 'address':
+ try:
+ cameras_dict[cam][param] = validate_camera_address(val) # <-- still strings
+ except ValueError as exc:
+ logger.error("Invalid camera address for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('address', ''))
+ continue
+ case 'contour_size_threshold':
+ try:
+ cameras_dict[cam][param] = validate_camera_threshold(val) # <-- still strings
+ except ValueError as exc:
+ logger.error("Invalid contour size threshold for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('contour_size_threshold', ''))
+ cameras_dict[cam][param] = str(CONTOUR_SIZE_THRESHOLD)
+ continue
+ case 'movement_grace_period':
+ try:
+ cameras_dict[cam][param] = validate_camera_grace_pd(val) # <-- still strings
+ except ValueError as exc:
+ logger.error("Invalid movement grace period for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('movement_grace_period', ''))
+ cameras_dict[cam][param] = str(MOVEMENT_GRACE_PERIOD)
+ continue
+ case _:
+ logger.warning("Unknown camera %r parameter: %r", cam, param)
+
+ # Camera section needs to contain at least an address:
+ for key in list(cameras_dict.keys()):
+ if 'address' not in cameras_dict[key].keys():
+ logger.error("Camera %r has no address configured, removing from configuration.", key)
+ del cameras_dict[key]
+
+ return cameras_dict
+
+def signal_handler(sig, frame) -> None:
+ """
+ Signal handler for graceful shutdown.
+ """
+ logger.info("Received signal handler '%r', stopping client ...", sig)
+ # Set flag to stop VideoClient thread(s):
+ stop_event.set()
+ if client.is_alive():
+ client.stop()
+
+
+if __name__ == "__main__":
+ logger.info("Client starting ...")
+
+ # Read configuration file before starting logging as it may contain logging settings:
+ cfg = read_config(CONFIG_FILE)
+
+ if TESTING:
+ logger.info("Starting in testing mode with yappi profiling ...")
+ yappi.set_clock_type("cpu")
+ yappi.start(builtins=True, profile_threads=True)
+
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+
+ logger.info("Starting up ...")
+
+ client = ClientTask(CLIENT_ID)
+
+ client.start()
+ client.join()
+
+ if TESTING:
+ yappi.stop()
+ if write_yappi_stats(yappi):
+ logger.error("Error writing yappi stats.")
+
+ logger.info("Terminating ...")
diff --git a/helpers.py b/helpers.py
new file mode 100644
index 0000000..a40b060
--- /dev/null
+++ b/helpers.py
@@ -0,0 +1,256 @@
+#!/usr/bin/env python
+
+"""
+Helper functions for zmq video messaging.
+"""
+
+__author__ = "Franoosh Corporation"
+
+
+import os
+import io
+import pstats
+import logging
+import subprocess
+import cv2
+import struct
+import datetime
+
+# Known directives for camera configuration:
+DIRECTIVES = (
+ 'modify_camera_name',
+ 'modify_camera_threshold',
+ 'modify_camera_grace_pd',
+ 'add_camera',
+ 'remove_camera',
+)
+MAX_CAMERA_NAME_LENGTH = 256
+TIME_FORMAT_STRING = '%Y-%m-%d %H:%M:%S.%f'
+LOGDIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs')
+
+class CustomLoggingFormatter(logging.Formatter):
+ """Custom logging formatter"""
+ debug_fmt = 'DEBUG: %(filename)s:%(lineno)d %(asctime)s %(message)s'
+ info_fmt = 'INFO: %(asctime)s %(message)s'
+ warning_fmt = 'WARNING: %(asctime)s %(message)s'
+ error_fmt = 'ERROR: line: %(lineno)d, %(asctime)s %(message)s'
+ critical_fmt = 'CRITICAL: line: %(lineno)d, %(asctime)s %(message)s'
+
+ def __init__(self):
+ super().__init__(
+ fmt="%(levelno)d: %s(asctime)s %(message)s",
+ datefmt=None,
+ )
+
+ def format(self, record):
+ orig_fmt = self._style._fmt
+ if record.levelno == logging.DEBUG:
+ self._style._fmt = CustomLoggingFormatter.debug_fmt
+ elif record.levelno == logging.INFO:
+ self._style._fmt = CustomLoggingFormatter.info_fmt
+ elif record.levelno == logging.WARNING:
+ self._style._fmt = CustomLoggingFormatter.warning_fmt
+ elif record.levelno == logging.ERROR:
+ self._style._fmt = CustomLoggingFormatter.error_fmt
+ elif record.levelno == logging.CRITICAL:
+ self._style._fmt = CustomLoggingFormatter.critical_fmt
+
+ result = logging.Formatter.format(self, record)
+ self._style._fmt = orig_fmt
+
+ return result
+
+def process_frame(frame, detect_width=320):
+ """Process frame for contour detection."""
+ try:
+ height, width = frame.shape[:2]
+ if width > detect_width:
+ scaling_factor = detect_width / float(width)
+ small_frame = cv2.resize(frame, (detect_width, int(height * scaling_factor)))
+ else:
+ scaling_factor = 1.0
+ small_frame = frame
+ # Convert to grayscale:
+ gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY)
+ # Apply Gaussian blur:
+ blurred = cv2.GaussianBlur(gray, (21, 21), 0)
+ except Exception as exc:
+ raise RuntimeError(f"Error processing frame: {exc}")
+
+ return blurred, scaling_factor
+
+
+def timestamp_to_bytes(timestamp):
+ """Convert timestamp to bytes."""
+ return struct.pack('d', timestamp)
+
+def bytes_to_timestamp(byte_data):
+ """Convert bytes to timestamp."""
+ return struct.unpack('d', byte_data)[0]
+
+def compute_contours(sample_frames):
+ """Compute contours between two frames"""
+ all_contours = []
+ frame_0, frame_1 = sample_frames
+ frame_delta = cv2.absdiff(frame_0, frame_1)
+ threshold = cv2.threshold(frame_delta, 25, 255, cv2.THRESH_BINARY)[1]
+ threshold = cv2.dilate(threshold, None, iterations=2)
+ # contours, _ = cv2.findContours(threshold.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
+ contours, _ = cv2.findContours(threshold, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
+ all_contours.extend(contours)
+
+ return all_contours
+
+def scale_contours(contours, scaling_factor):
+ """Scale contours by the given scaling factor."""
+ if scaling_factor == 1.0:
+ return contours
+
+ scaled_contours = []
+ for contour in contours:
+ scaled_contour = (contour * (1.0 / scaling_factor)).astype(int)
+ scaled_contours.append(scaled_contour)
+
+ return scaled_contours
+
+def draw_contours(frame, contours, min_contour_area=500):
+ """Draw contours on the frame."""
+ for contour in contours:
+ if cv2.contourArea(contour) > min_contour_area:
+ (x, y, w, h) = cv2.boundingRect(contour)
+ cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
+
+ return frame
+
+def detect_movement(contours, min_area=500):
+ """Detect movement based on contours found from frame diff."""
+ for contour in contours:
+ if cv2.contourArea(contour) >= min_area:
+ return True
+ return False
+
+def get_available_cameras():
+ """
+ Get list of available camera devices.
+ At the moment it does not work. At all. It is useless.
+ """
+ proc = subprocess.Popen(['v4l2-ctl', '--list-devices'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ stdout, stderr = proc.communicate()
+ candidate_devices = [i.strip() for i in stdout.decode('utf-8').strip().splitlines()[1:]]
+ verified_devices = []
+ for device in candidate_devices:
+ cap = cv2.VideoCapture(device)
+ if cap.isOpened():
+ verified_devices.append(device)
+ cap.release()
+ return verified_devices
+
+def bytes_to_str(obj):
+ """Recursively convert bytes to strings in dicts and lists."""
+ if isinstance(obj, bytes):
+ return obj.decode('utf-8')
+ elif isinstance(obj, dict):
+ return {bytes_to_str(k): bytes_to_str(v) for k, v in obj.items()}
+ elif isinstance(obj, list):
+ return [bytes_to_str(item) for item in obj]
+ else:
+ return obj
+
+def str_to_bytes(obj):
+ """Recursively convert strings to bytes in dicts and lists."""
+ if isinstance(obj, str):
+ return obj.encode('utf-8')
+ elif isinstance(obj, dict):
+ return {str_to_bytes(k): str_to_bytes(v) for k, v in obj.items()}
+ elif isinstance(obj, list):
+ return [str_to_bytes(item) for item in obj]
+ else:
+ return obj
+
+def write_yappi_stats(yappi_instance, logdir=LOGDIR) -> bool:
+ """
+ Function writing yaapi stats to .pstat files and
+ converting stats to readable .txt files
+
+ Parameters
+ ----------
+ yappi_instance : yappi
+ Yappi instance
+
+ Returns
+ -------
+ bool
+ True on error, False on success
+ """
+ logger = logging.getLogger(__name__)
+ threads = yappi_instance.get_thread_stats()
+ # combined text output:
+ combined_out = []
+
+ time_now = datetime.datetime.strftime(datetime.datetime.now(), TIME_FORMAT_STRING)
+ subdir = f"yaapi_{time_now}"
+ yaapi_dir = os.path.join(logdir, 'yaapi', subdir)
+ try:
+ os.makedirs(yaapi_dir)
+ except Exception as exc:
+ print("Couldn't create directory for yaapi stats: %r", exc)
+ return True
+
+ logfile = os.path.join(yaapi_dir, "yaapi.log")
+ main_yaapi_png = os.path.join(yaapi_dir, "yaapi.png")
+ gprof2dot_cmd = ["gprof2dot", "-f", "pstats"]
+ dot_cmd = ["dot", "-Tpng"]
+ try:
+ for thread in threads:
+ func_stats = yappi_instance.get_func_stats(ctx_id=thread.id)
+ yappi_instance.get_func_stats().save('profile.callgrind', type="callgrind") # For possible future use
+ thread_filename_base = f"{logfile}.thread{thread.id}"
+ pstat_filename = f"{thread_filename_base}.pstat"
+ txt_filename = f"{thread_filename_base}.txt"
+ dot_filename = f"{thread_filename_base}.dot"
+ png_filename = f"{thread_filename_base}.png"
+
+ # save pstat (can be opened with pstats or profiling tools)
+ func_stats.save(pstat_filename, type="pstat")
+
+ # convert pstat to readable text using pstats
+ sio = io.StringIO()
+ ps = pstats.Stats(pstat_filename, stream=sio)
+ ps.sort_stats("tottime")
+ ps.print_stats()
+
+ text = sio.getvalue()
+ combined_out.append(f"--- Thread {thread.id} ({thread.name}) ---\n{text}")
+
+ # also write per-thread text file
+ with open(txt_filename, "w", encoding='utf-8') as f:
+ f.write(text)
+
+ # generate call graph png using gprof2dot and dot
+ _gprof2dot_cmd = gprof2dot_cmd + [pstat_filename, "-o", dot_filename]
+ _dot_cmd = dot_cmd + [dot_filename, "-o", png_filename]
+ try:
+ gprof2dot_proc = subprocess.Popen(_gprof2dot_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ gprof2dot_stdout, gprof2dot_stderr = gprof2dot_proc.communicate()
+ if gprof2dot_proc.returncode != 0:
+ logger.error("Error generating gprof2dot for thread %r: %r", thread.id, gprof2dot_stderr.decode('utf-8'))
+ continue
+ except Exception as exc:
+ logger.error("Exception generating gprof2dot for thread %r: %r", thread.id, exc)
+ continue
+ try:
+ dot_proc = subprocess.Popen(_dot_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ gprof2dot_proc.stdout.close() # Allow gprof2dot_proc to receive a SIGPIPE if dot_proc exits.
+ out, err = dot_proc.communicate()
+ if dot_proc.returncode != 0:
+ logger.error("Error generating call graph PNG for thread %r: %r", thread.id, err.decode('utf-8'))
+ except Exception as exc:
+ logger.error("Exception generating call graph PNG for thread %r: %r", thread.id, exc)
+
+ # write combined text file
+ with open('.'.join((logfile, "txt")), "w", encoding='utf-8') as f:
+ f.write("\n\n".join(combined_out))
+ except Exception as exc:
+ return True
+
+ return False \ No newline at end of file
diff --git a/router.py b/router.py
new file mode 100644
index 0000000..ce733b9
--- /dev/null
+++ b/router.py
@@ -0,0 +1,266 @@
+#!/usr/bin/env python
+
+"""
+A module containing zeromq router for video streaming
+from a client and sending control messages from server.
+"""
+
+__author__ = "Franoosh Corporation"
+
+import sys
+import os
+import signal
+import logging
+import random
+from collections import defaultdict, deque
+import zmq
+
+from helpers import CustomLoggingFormatter
+
+# TODO: add configparser
+
+IP_FRONTEND = '127.0.0.1'
+IP_BACKEND = '127.0.0.1'
+
+PORT_FRONTEND = "5569" # This is a client (producer)
+PORT_BACKEND = "9979"
+
+FRONTEND_ADDR = f"tcp://{IP_FRONTEND}:{PORT_FRONTEND}" # This is a client (producer)
+BACKEND_ADDR = f"tcp://{IP_BACKEND}:{PORT_BACKEND}"
+
+LOGDID = 'logs'
+if not os.path.exists(LOGDID):
+ try:
+ os.makedirs(LOGDID)
+ except Exception as exc:
+ print(f"Could not create log directory: {exc}")
+ sys.exit()
+BASENAME = f"{os.path.splitext(os.path.basename(__file__))[0]}"
+LOGFILE = os.path.join(LOGDID, f"{BASENAME}.log")
+LOGLEVEL = logging.INFO
+
+log_formatter = CustomLoggingFormatter()
+handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
+handler.setFormatter(log_formatter)
+logging.root.addHandler(handler)
+logging.root.setLevel(LOGLEVEL)
+logger = logging.getLogger(__name__)
+logging.basicConfig(
+ filename=LOGFILE,
+ datefmt='%Y-%m-%d %I:%M:%S',
+ level=LOGLEVEL,
+)
+
+
+RUNNING = True
+
+def custom_proxy() -> None:
+ """
+ A custom zmq proxy to route messages between clients and workers.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ context = zmq.Context.instance()
+ poller = zmq.Poller()
+
+ # Set up frontend:
+ frontend_socket = context.socket(zmq.ROUTER)
+ frontend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1)
+ frontend_socket.bind(FRONTEND_ADDR)
+ poller.register(frontend_socket, zmq.POLLIN)
+
+ # Set up backend:
+ backend_socket = context.socket(zmq.ROUTER)
+ backend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1)
+ backend_socket.bind(BACKEND_ADDR)
+ poller.register(backend_socket, zmq.POLLIN)
+
+ awailable_workers = []
+ client_worker_dict = {}
+ # There are three pending messages dictionaries:
+ # 1. pending_messages_no_worker[client_id] - messages that are pending for a client
+ # 2. pending_messages_client_worker[client_id][worker_id] - messages that are pending for a client with a specific worker
+ # 3. pending_messages_worker_client[worker_id][client_id] - messages that are pending for a worker
+ # To assure delivey of the entire video we need to store messages from start to end until they are delivered
+ # to the client.
+ # If a worker is not available for certain amount of time, we will reassign it to another worker.
+ # For now, there is only one worker. Remember to implement this later.
+ # Complex case: half delivered video to the worker, worker becomes unavailable and we need to reassign it to another worker.
+
+ pending_messages_no_worker = defaultdict(deque) # Messages that are pending for a client with no worker assigned
+ pending_messages_client_worker = defaultdict(lambda: defaultdict(deque))
+ pending_messages_worker_client = defaultdict(lambda: defaultdict(deque))
+
+ while RUNNING:
+ # Poll both frontend and backend sockets:
+ events = dict(poller.poll(10))
+ # If message from client:
+ if frontend_socket in events:
+ # Receive message from client:
+ msg = frontend_socket.recv_multipart()
+ logger.debug("Received message.")
+ client_id, content = msg[0], msg[1:]
+ # Check if client is already connected to worker:
+ try:
+ # Get worker ID for this client from the client-worker dictionary:
+ worker_id = client_worker_dict[client_id]
+ # Check if there are any pending messages for this worker and send them first:
+ while pending_messages_client_worker[client_id][worker_id]:
+ # In order to take a peek at the size of all messages, perhaps check out:
+ # https://web.archive.org/web/20240804164028/https://code.activestate.com/recipes/546530/
+ logger.debug(
+ "There are '%d' pending messages from client %r for worker %r",
+ len(pending_messages_client_worker[client_id][worker_id]),
+ client_id,
+ worker_id,
+ )
+ pending_msg = pending_messages_client_worker[client_id][worker_id].pop()
+ try:
+ logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id)
+ backend_socket.send_multipart([worker_id, client_id, *pending_msg])
+ except Exception as e:
+ logger.error("Failed to send pending message to worker: %r, error: %r", worker_id, e)
+ # Re-queue the message:
+ pending_messages_client_worker[client_id][worker_id].append(pending_msg)
+ # Break to avoid infinite loop:
+ break
+ # If there are still pending messages for this worker:
+ if pending_messages_client_worker[client_id][worker_id]:
+ # Store message for later delivery:
+ pending_messages_client_worker[client_id][worker_id].appendleft(content)
+ # At last send new message to worker:
+ else:
+ try:
+ logger.debug("No more pending messages for client %r, sending new message to worker %r.", client_id, worker_id)
+ backend_socket.send_multipart([worker_id, client_id, *content])
+ except Exception as e:
+ logger.error("Failed to send message to backend, error: %s", e)
+ # Store message for later delivery:
+ # pending_messages_client_worker[client_id][worker_id].appendleft(msg) <- FIXME: this is the bug, no?
+ pending_messages_client_worker[client_id][worker_id].appendleft(content)
+ # If client is not connected to any worker:
+ except KeyError:
+ logger.info("Received ID from client: %r.", client_id)
+ # Check if there are available workers.
+ if awailable_workers:
+ # Assign random worker to client:
+ worker_id = random.choice(awailable_workers)
+ # Update client-worker dictionary:
+ client_worker_dict[client_id] = worker_id
+ # Check if there are any pending messages for this client:
+ if pending_messages_no_worker[client_id]:
+ # Move messages to pending messages for this worker:
+ pending_messages_client_worker[client_id][worker_id].extendleft(msg)
+ logger.debug("Moved pending messages for client '%r' to worker '%r'", client_id, worker_id)
+ # Check if there are any pending messages for this worker:
+ while pending_messages_client_worker[client_id][worker_id]:
+ pending_msg = pending_messages_client_worker[client_id][worker_id].pop()
+ try:
+ logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id)
+ backend_socket.send_multipart([worker_id, client_id, *pending_msg])
+ except Exception as e:
+ logger.error("Failed to send pending message to worker: %r. Error: %r", worker_id, e)
+ # Re-queue the message:
+ pending_messages_client_worker[client_id][worker_id].append(pending_msg)
+ break
+ # If there are still pending messages for this worker:
+ if pending_messages_client_worker[client_id][worker_id]:
+ logger.debug("There are still pending messages for client %r and worker %r, storing message for later delivery.", client_id, worker_id)
+ # Store message for later delivery:
+ pending_messages_client_worker[client_id][worker_id].appendleft(content)
+ else:
+ logger.debug("No more pending messages for client %r, sending new message to worker %r.", client_id, worker_id)
+ try:
+ logger.debug("No more pending messages for client %r, sending new message to worker %r.", client_id, worker_id)
+ # At last, send the message to worker:
+ backend_socket.send_multipart([worker_id, client_id, *content])
+ except Exception as e:
+ logger.error("Failed to send message for client %r to backend, requeing. Error: %r", client_id, e)
+ # Store message for later delivery:
+ pending_messages_client_worker[client_id][worker_id].appendleft(msg)
+
+ else:
+ # Store message for later delivery:
+ pending_messages_no_worker[client_id].append(content)
+ logger.debug("No available workers, storing client %r with no worker assigned", client_id)
+
+ if backend_socket in events:
+ _msg = backend_socket.recv_multipart()
+ # These messages should be safe to log as they are short strings or bytes.
+ logger.debug("Received from worker: %r", _msg)
+ if len(_msg) == 2: # Worker is sending its identity:
+ worker_id, msg = _msg[0], _msg[1:]
+ logger.debug("Received ID from worker: %r, message: %r", worker_id, msg)
+ logger.info("Received ID from worker: %r.", worker_id)
+ # Add worker to available workers list:
+ awailable_workers.append(worker_id)
+ for client, worker in client_worker_dict.items():
+ # If client has no worker assigned, assign it a new worker:
+ if worker is None:
+ client_worker_dict[client] = worker
+ logger.debug("Assigning worker %r to client %r", worker, client)
+ if pending_messages_no_worker:
+ # If there are pending messages for clients with no worker assigned:
+ for client_id, messages in pending_messages_no_worker.items():
+ if messages:
+ # Move messages to pending messages for this worker:
+ pending_messages_client_worker[client_id][worker_id].extendleft(messages)
+ # Clear pending messages for this client:
+ pending_messages_no_worker[client_id].clear()
+ logger.debug("Moved pending messages for client %r to worker %r", client_id, worker_id)
+ try:
+ # Get the first message for this client and worker:
+ pending_msg = pending_messages_client_worker[client_id][worker_id].pop()
+ logger.debug("Sending pending message to worker %r for client '%r'", worker_id, client_id)
+ # Send the message to worker:
+ backend_socket.send_multipart([worker_id, client_id, *pending_msg])
+ except Exception as e:
+ pending_messages_client_worker[client_id][worker_id].append(pending_msg)
+ logger.error("Failed to send pending message to worker: %r. Error: %r", worker_id, e)
+
+ else: # Worker is sending a message to client:
+ # worker_id, client_id, msg = _msg[0], _msg[1], _msg[2:]
+ worker_id, client_id, *msg = _msg
+ logger.debug("Received message from worker %r for client %r: %r", worker_id, client_id, msg)
+ # First check if there are any pending messages for this worker:
+ while pending_messages_worker_client[worker_id][client_id]:
+ pending_msg = pending_messages_worker_client[worker_id][client_id].pop()
+ try:
+ logger.debug("Sending pending message %r from %r to client %r", pending_msg, client_id, worker_id)
+ backend_socket.send_multipart([client_id, *pending_msg])
+ except Exception as e:
+ # It is safe to log the message content as it is a short string or bytes.
+ logger.error("Could not deliver pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e)
+ # Re-queue the message:
+ pending_messages_worker_client[worker_id][client_id].append(pending_msg)
+ # Break to avoid infinite loop:
+ break
+ # If no pending messages for this client, send new message to the client :
+ if not pending_messages_worker_client[worker_id][client_id]:
+ logger.debug("Sending message %r to client %r", msg, client_id)
+ try:
+ frontend_socket.send_multipart([client_id, *msg])
+ logger.debug("Sent message %r to client %r", msg, client_id)
+ except Exception as e:
+ pending_messages_worker_client[worker_id][client_id].appendleft(msg)
+ logger.error("Could not deliver message: %r from worker: %r to client: %r", msg, worker_id, client_id)
+
+def signal_handler(sig, frame) -> None:
+ global RUNNING
+ logger.info("Received signal handler %r, stopping ...", signal.Signals(sig).name)
+ RUNNING = False
+
+
+if __name__ == "__main__":
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+
+ logger.info("Starting up ...")
+
+ custom_proxy()
diff --git a/static/css/main.css b/static/css/main.css
new file mode 100644
index 0000000..af1f40f
--- /dev/null
+++ b/static/css/main.css
@@ -0,0 +1,36 @@
+.streams-container {
+ display: flex;
+ flex-wrap: wrap;
+ gap: 16px;
+ /* justify-content: center; */
+}
+.camera-stream {
+ flex: 0 1 320px;
+ margin: 10px;
+ /* text-align: center; */
+}
+.scroll-box {
+ flex: 0 1 300px;
+ max-height: 200px;
+ overflow-y: auto;
+ border: 1px solid #ccc;
+ padding: 5px;
+}
+
+@media (max-width: 600px) {
+.streams-container {
+ flex-direction: column;
+ align-items: center;
+}
+.camera-stream {
+ width: 100%;
+ max-width: 100vw;
+}
+.mob-scroll-box {
+ width: 100px;
+ height: 150px;
+ overflow: scroll;
+ border: 1px solid #ccc;
+ padding: 10px;
+}
+} \ No newline at end of file
diff --git a/templates/client.html b/templates/client.html
new file mode 100644
index 0000000..785fbcf
--- /dev/null
+++ b/templates/client.html
@@ -0,0 +1,173 @@
+<!DOCTYPE html>
+<html>
+<head>
+ <title>Client {{ client_id }} - Camera Streams</title>
+ <link rel="stylesheet" href="{{ url_for('static', path='css/main.css') }}">
+ <style>
+ .camera-stream { display: inline-block; margin: 10px; }
+ video { border: 1px solid #333; }
+ </style>
+</head>
+<body>
+ <h1>Camera Streams for client: {{ client_id }}</h1>
+ <div class="streams-container">
+ {% for camera_id in camera_ids %}
+ <div class="camera-stream">
+ <h2>Camera: {{ camera_id }}</h2>
+ <img
+ id="video-{{ camera_id }}"
+ width="640"
+ height="480" />
+ <div>
+ <h3>Modify Camera Name</h3>
+ <input
+ type="text"
+ id="new-name-{{ camera_id }}"
+ placeholder="New Name"
+ required pattern= "^\S+$"
+ title="No whitespace allowed.">
+ <button onclick="sendConfig('{{ camera_id }}', 'modify_camera_name')">Send</button>
+ </div>
+ <div>
+ <h3>Modify Camera Threshold</h3>
+ <input
+ type="number"
+ id="threshold-value-{{ camera_id }}"
+ placeholder="Threshold"
+ required step="1"
+ min="0"
+ inputmode="numeric"
+ title="Enter a non-negative integer.">
+ <button onclick="sendConfig('{{ camera_id }}', 'modify_camera_threshold')">Send</button>
+ </div>
+ <div>
+ <h3>Modify Camera Grace Period</h3>
+ <input
+ type="number"
+ id="grace-value-{{ camera_id }}"
+ placeholder="Grace Period"
+ required step="1"
+ min="0"
+ inputmode="numeric"
+ title="Enter a non-negative integer.">
+ <button onclick="sendConfig('{{ camera_id }}', 'modify_camera_grace_pd')">Send</button>
+ </div>
+ <div>
+ <h3>Recorded Videos for {{ camera_id }}</h3>
+ <ul id="video-list-{{ camera_id }}" class="scroll-box">
+ {% for filename, video_url, timestamp in client_videos[camera_id] %}
+ <li><a href="{{ video_url }}">{{ filename }}</a> ({{ timestamp }})</li>
+ <!-- <h3>video file: {{ filename }}</h3>
+ <video src="{{ video_url }}" type="video/ogg" width="320" height="240" controls></video>
+ <hr4>creation: {{ timestamp }}</h3>
+ -->
+ {% endfor %}
+ </ul>
+ </div>
+ </div>
+ {% endfor %}
+<!--
+ <div>
+ <h3>Add Camera</h3>
+ <input type="text" id="add-name" placeholder="Camera Name">
+ <input type="text" id="add-address" placeholder="Address">
+ <button onclick="sendConfig('add_camera')">Send</button>
+ </div>
+-->
+ <div>
+ <h3>Remove Camera</h3>
+ <input
+ type="text"
+ id="remove-name"
+ placeholder="Camera Name"
+ required pattern="^\S+$"
+ title="No whitespace allowed.">
+ <button onclick="sendConfig('remove_camera')">Send</button>
+ </div>
+ </div>
+ <script>
+ const wsMap = {};
+ {% for camera_id in camera_ids %}
+ // For each camera, open a WebSocket and update the corresponding <img>
+ (function() {
+ const cameraId = '{{ camera_id }}';
+ const ws = new WebSocket('ws://' + window.location.host + '/ws/{{ client_id }}/' + cameraId);
+ let currentUrl = null;
+ ws.onmessage = function(event) {
+ let image = document.getElementById('video-' + cameraId);
+ if (currentUrl) {
+ URL.revokeObjectURL(currentUrl);
+ }
+ currentUrl = URL.createObjectURL(event.data);
+ image.src = currentUrl;
+ };
+ ws.onclose = function(event) {
+ console.log('WebSocket closed for camera ' + cameraId + ':', event);
+ };
+ ws.onerror = function(event) {
+ console.log('WebSocket error for camera ' + cameraId + ':', event);
+ };
+ window.addEventListener('beforeunload', function() {
+ ws.close();
+ });
+ wsMap[cameraId] = ws;
+ })();
+ // FIXME: Move to a separate function
+ // For each camera, open a WebSocket to receive notificationf about new videos
+ // (function() {
+ // const cameraId = '{{ camera_id }}';
+ // const wsVideos = new WebSocket('ws://' + window.location.host + '/ws/{{ client_id }}/' + cameraId + '/videos');
+ // wsVideos.onmessage = function(event) {
+ // let videoFilename = event.data;
+ // console.log('New video available for camera ' + cameraId + ': ' + videoFilename);
+ // // On new video notification,
+ // };
+ // })();
+ {% endfor %}
+
+ function sendConfig(cameraId, type) {
+ let msg = {};
+ switch(type) {
+ case 'modify_camera_name':
+ msg[type] = [
+ document.getElementById('new-name-' + cameraId).value
+ ];
+ break;
+ case 'modify_camera_threshold':
+ msg[type] = [
+ document.getElementById('threshold-value-' + cameraId).value
+ ];
+ break;
+ case 'modify_camera_grace_pd':
+ msg[type] = [
+ parseInt(document.getElementById('grace-value-' + cameraId).value)
+ ];
+ break;
+ case 'modify_camera_address':
+ msg[type] = [
+ document.getElementById('address-value-' + cameraId).value
+ ];
+ break;
+ // case 'add_camera':
+ // msg[type] = [
+ // document.getElementById('add-name').value,
+ // document.getElementById('add-address').value
+ // ];
+ // break;
+ case 'remove_camera':
+ msg[type] = [
+ document.getElementById('remove-name').value
+ ];
+ break;
+ }
+ const ws = wsMap[cameraId] && wsMap[cameraId] ? wsMap[cameraId] : Object.values(wsMap)[0];
+ if (ws && ws.readyState === WebSocket.OPEN) {
+ console.log("Sending message:", msg, "on ws:", ws);
+ ws.send(JSON.stringify(msg));
+ } else {
+ alert('WebSocket is not open for camera ' + cameraId);
+ }
+ }
+ </script>
+</body>
+</html>
diff --git a/templates/main.html b/templates/main.html
new file mode 100644
index 0000000..e0eaa52
--- /dev/null
+++ b/templates/main.html
@@ -0,0 +1,16 @@
+<!DOCTYPE html>
+<html>
+<head>
+ <title>Live Streaming</title>
+</head>
+<body>
+ <h1>Streaming live.</h1>
+ {% for client_id in clients.keys() %}
+ <h2>Client {{ client_id }}</h2>
+ <li>
+ <a href="/clients/{{ client_id }}"> Client {{ client_id }} streams
+ </a>
+ <li>
+ {% endfor %}
+</body>
+</html>
diff --git a/webserver.py b/webserver.py
new file mode 100644
index 0000000..41d1d30
--- /dev/null
+++ b/webserver.py
@@ -0,0 +1,524 @@
+#!/usr/bin/env python
+"""
+Module serving video from zmq to a webserver.
+"""
+__author__ = "Franoosh Corporation"
+
+import os
+from collections import defaultdict
+import json
+import logging
+import asyncio
+import datetime
+from contextlib import asynccontextmanager
+import zmq
+import zmq.asyncio
+import uvicorn
+from fastapi import (
+ FastAPI,
+ Request,
+ HTTPException,
+ WebSocket,
+ WebSocketDisconnect,
+ templating,
+)
+from fastapi.responses import HTMLResponse, FileResponse
+from fastapi.staticfiles import StaticFiles
+from typing import AsyncGenerator
+
+from helpers import (
+ CustomLoggingFormatter,
+ DIRECTIVES,
+ MAX_CAMERA_NAME_LENGTH,
+)
+
+
+CWD = os.getcwd()
+CLIENTS_JSON_FILE = os.path.join(CWD, 'clients.json')
+CLIENTS_DICT = defaultdict(list) # CLIENTS_DICT[client_id] = [camera_id1, camera_id2, ...]
+BASENAME = f"{os.path.splitext(os.path.basename(__file__))[0]}"
+LOGDIR = 'logs'
+LOGFILE = os.path.join(LOGDIR, f"{BASENAME}.log")
+LOGLEVEL = logging.INFO
+# LOGLEVEL = logging.DEBUG
+# This should come from the config file:
+VIDEO_DIR = os.path.join(CWD, "videos")
+
+HOST = "127.0.0.1"
+ZMQ_PORT = "9979"
+WEB_PORT = "8008"
+CTRL_BACKEND_ADDR = f"tcp://{HOST}:{ZMQ_PORT}"
+WEB_BACKEND_ADDR = f"tcp://{HOST}:{WEB_PORT}"
+TIME_FORMAT = '%Y_%m_%d-%H_%M_%S'
+
+log_formatter = CustomLoggingFormatter()
+handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
+handler.setFormatter(log_formatter)
+logging.root.addHandler(handler)
+logging.root.setLevel(LOGLEVEL)
+logger = logging.getLogger(__name__)
+logging.basicConfig(
+ filename=LOGFILE,
+ datefmt='%Y-%m-%d %I:%M:%S',
+ level=LOGLEVEL,
+)
+
+# Track websocket connections by (client_id, camera_id):
+ws_connections = defaultdict(dict) # ws_connections[client_id][camera_id] = websocket
+ws_queues = defaultdict(dict)
+ctrl_msg_que = asyncio.Queue()
+# Create ZMQ context and socket:
+zmq_context = zmq.asyncio.Context()
+zmq_socket = zmq_context.socket(zmq.DEALER)
+# Connect to ZMQ backend:
+zmq_socket.connect(WEB_BACKEND_ADDR)
+
+
+async def zmq_bridge():
+ """
+ Bridge between ZMQ backend and websocket clients.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ poll = zmq.asyncio.Poller()
+ poll.register(zmq_socket, zmq.POLLIN)
+ while True:
+ try:
+ sockets = dict(await poll.poll(100))
+ if zmq_socket in sockets:
+ logger.debug("Receiving message from ZMQ backend ...")
+ topic, frame_data = await zmq_socket.recv_multipart()
+ # messages can also now contain directives, like 'new_camera' and 'removed_camera' with empty frame_data
+ # Send those to a control queue to be handled appropriately
+ client_id, camera_id = topic.decode('utf-8').split(':', 1) # <-- strings from now on
+ # client_id, camera_id = topic.split(b':', 1)
+ if not camera_id in CLIENTS_DICT[client_id]:
+ CLIENTS_DICT[client_id].append(camera_id)
+ queue = ws_queues.get(client_id, {}).get(camera_id)
+ if queue:
+ if queue.full():
+ logger.debug(
+ "WebsSocket queue full for '/ws/%r/%r', discarding oldest frame.",
+ client_id,
+ camera_id
+ )
+ _ = queue.get_nowait() # Discard oldest frame
+ await queue.put(frame_data)
+ else:
+ logger.debug("Creating new websocket queue for '/ws/%r/%r'.", client_id, camera_id)
+ ws_queues[client_id][camera_id] = asyncio.Queue(maxsize=10)
+ if not ctrl_msg_que.empty():
+ client_id, camera_id, command, args_list = None, None, None, None
+ try:
+ client_id, camera_id, command, args_list = await ctrl_msg_que.get()
+ except ValueError as ve:
+ logger.error("Invalid control message on queue: %r", ve)
+ if client_id and camera_id and command and args_list is not None:
+ try:
+ zmq_socket.send_multipart([
+ client_id,
+ camera_id,
+ command,
+ ] + args_list, flags=zmq.NOBLOCK)
+ logger.info("Sent control command.")
+ except zmq.Again:
+ logger.error(
+ "ZMQ socket busy, could not send control command '%r' with args: %r for '/ws/%r/%r' to backend.",
+ command,
+ args_list,
+ client_id,
+ camera_id
+ )
+ except Exception as e:
+ logger.error("Error in ZMQ bridge: %r", e)
+
+@asynccontextmanager
+async def lifespan(app: FastAPI) -> AsyncGenerator:
+ """
+ Create lifespan context for FastAPI app.
+
+ Parameters
+ ----------
+ app : FastAPI
+ The FastAPI application instance.
+
+ Returns
+ -------
+ AsyncGenerator
+ Yields control to the application context.
+ """
+ asyncio.create_task(zmq_bridge())
+ yield
+
+_app = FastAPI(lifespan=lifespan)
+_app.mount("/static", StaticFiles(directory="static"), name="static")
+templates = templating.Jinja2Templates(directory='templates')
+
+@_app.get("/")
+async def main_route(request: Request) -> HTMLResponse:
+ """
+ Main route serving the index page.
+
+ Parameters
+ ----------
+ request : Request
+ The incoming HTTP request.
+ """
+ logger.debug("Main route visited")
+ return templates.TemplateResponse(
+ "main.html",
+ {
+ "request": request,
+ "clients": CLIENTS_DICT,
+ }
+ )
+
+@_app.get("/clients/{client_id}", response_class=HTMLResponse)
+async def client_route(request: Request, client_id: str) -> HTMLResponse:
+ """
+ Serve a particular client page.
+
+ Parameters
+ ----------
+ request : Request
+ The incoming HTTP request.
+ client_id : str
+ The client ID to serve.
+ """
+ logger.debug("Checking client_id: %s in clients_dict: %r.", client_id, CLIENTS_DICT)
+ if not client_id in CLIENTS_DICT:
+ raise HTTPException(status_code=404, detail="No such client ID.")
+ return templates.TemplateResponse(
+ "client.html",
+ {
+ "request": request,
+ "client_id": client_id,
+ "camera_ids": CLIENTS_DICT[client_id],
+ "client_videos": list_video_files(client_id),
+ },
+ )
+
+@_app.websocket("/ws/{client_id}/{camera_id}")
+async def camera_route(websocket: WebSocket, client_id: str, camera_id: str) -> None:
+ """
+ Serve a particular camera page.
+
+ Parameters
+ ----------
+ websocket : WebSocket
+ The incoming WebSocket connection.
+ client_id : str
+ The client ID.
+ camera_id : str
+ The camera ID.
+ """
+ logger.info("Accepting websocket connection for '/ws/%s/%s'.", client_id, camera_id)
+ await websocket.accept()
+ ws_connections[client_id][camera_id] = {'ws': websocket}
+ queue = ws_queues[client_id][camera_id]
+
+ async def send_frames():
+ while True:
+ logger.debug("Getting from queue frame for '/ws/%s/%s'.", client_id, camera_id)
+ try:
+ # Don't wait indefinitely to allow checking for client disconnection:
+ frame_data = queue.get_nowait()
+ except asyncio.QueueEmpty:
+ await asyncio.sleep(0.1)
+ continue
+ try:
+ logger.debug("Sending frame to '/ws/%s/%s'.", client_id, camera_id)
+ await websocket.send_bytes(frame_data)
+ logger.debug("Sent frame to '/ws/%s/%s'.", client_id, camera_id)
+ # This exception is raised when client disconnects:
+ except Exception as exc:
+ logger.warning("Error sending frame to '/ws/%s/%s': %r", client_id, camera_id, exc)
+ ws_connections[client_id].pop(camera_id, None)
+ break
+
+ async def receive_control():
+ while True:
+ try:
+ data = await websocket.receive_text()
+ logger.info("Received control message from '/ws/%s/%s': %s", client_id, camera_id, data)
+ # Handle control messages from the client:
+ frontend_message = json.loads(data)
+ for command, args in frontend_message.items():
+ if validate_directive(command, args, client_id, camera_id):
+ args_list = [str(arg).encode('utf-8') for arg in args]
+ ctrl_msg_que.put_nowait((client_id, camera_id, command, args_list))
+ logger.info(
+ "Put control command '%s' with args: %r for '/ws/%s/%s' on queue to backend.",
+ command,
+ args_list,
+ client_id,
+ camera_id
+ )
+ except json.JSONDecodeError:
+ logger.warning("Received invalid JSON from '/ws/%s/%s': %s", client_id, camera_id, data)
+ except WebSocketDisconnect:
+ logger.info("WebSocket disconnected for '/ws/%s/%s'.", client_id, camera_id)
+ ws_connections[client_id].pop(camera_id, None)
+ break
+ except Exception as exc:
+ logger.warning("Error receiving control message: %r", exc)
+
+ send_task = asyncio.create_task(send_frames())
+ receive_task = asyncio.create_task(receive_control())
+ try:
+ await asyncio.gather(send_task, receive_task)
+ finally:
+ send_task.cancel()
+ receive_task.cancel()
+ ws_connections[client_id].pop(camera_id, None)
+ ws_queues[client_id].pop(camera_id, None)
+ logger.info("Cleaned up websocket connection for '/ws/%s/%s'.", client_id, camera_id)
+
+@_app.get("/videos/{client_id}/{camera_id}/{filename}")
+async def serve_video(client_id: str, camera_id: str, filename: str) -> FileResponse:
+ """
+ Route serving video files.
+
+ Parameters
+ ----------
+ client_id : str
+ The client ID.
+ camera_id : str
+ The camera ID.
+ filename : str
+ The video filename.
+ """
+ video_path = os.path.join(VIDEO_DIR, client_id, camera_id, filename)
+
+ if not os.path.exists(video_path):
+ raise HTTPException(status_code=404, detail="Video not found")
+
+ logger.debug("Serving video file: '%s', video_path")
+
+ return FileResponse(video_path, media_type="video/ogg")
+
+def list_video_files(client_id: str) -> dict:
+ """
+ Return a dictionary of recorded video files for a given client.
+ {<camera_id>: [(<filename>, <file url>, <date>)]}
+
+ Parameters
+ ----------
+ client_id : str
+ The client ID.
+
+ Returns
+ -------
+ dict
+ Dictionary of recorded video files for the client.
+ """
+ client_videos = {}
+ client_dir = os.path.join(VIDEO_DIR, client_id)
+ if os.path.exists(client_dir):
+ for (_, dirnames, _) in os.walk(client_dir):
+ for camera_dir in dirnames:
+ client_videos[camera_dir] = []
+ camera_path = os.path.join(client_dir, camera_dir)
+ for (_, _, filenames) in os.walk(camera_path):
+ for filename in filenames:
+ try:
+ filepath = os.path.join(VIDEO_DIR, camera_path, filename)
+ timestamp = datetime.datetime.strftime(
+ datetime.datetime.fromtimestamp(
+ os.path.getmtime(filepath)
+ ),
+ TIME_FORMAT)
+ except Exception as exc:
+ logger.warning("Could not determine creation time for file %r: %r", filename, exc)
+ timestamp = 'unknown'
+ video_url = f"/videos/{client_id}/{camera_dir}/{filename}"
+ client_videos[camera_dir].append((filename, video_url, timestamp))
+ return client_videos
+
+def validate_directive(command: str, args: list, client_id: str, camera_id: str) -> bool:
+ """
+ Validate if the directive is known.
+
+ Parameters
+ ----------
+ command : str
+ The directive command.
+ args : list
+ The arguments for the directive.
+ client_id : str
+ The client ID.
+ camera_id : str
+ The camera ID.
+
+ Returns
+ -------
+ bool
+ True if the directive is valid, False otherwise.
+ """
+ retval = False
+ # First, check if command is known:
+ if command not in DIRECTIVES:
+ logger.warning("Unknown directive received: %s", command)
+ return retval
+ if len(args) != 1:
+ logger.error("Directive '%s' requires exactly one argument.", command)
+ return retval
+ # Validate arguments passed from the frontend:
+ match command:
+ case 'modify_camera_name':
+ retval = validate_camera_name(args, client_id, camera_id)
+ case 'modify_camera_threshold':
+ retval = validate_camera_threshold(args)
+ case 'modify_camera_grace_pd':
+ retval = validate_camera_grace_period(args)
+ case 'modify_camera_address':
+ retval = validate_camera_address(args)
+ # case 'add_camera':
+ # retval = validate_add_camera(args, client_id)
+ case 'remove_camera':
+ retval = True # No specific validation needed
+ return retval
+
+def validate_camera_name(name, client_id, camera_id) -> bool:
+ """
+ Validate camera name to avoid problematic characters and whitespace.
+
+ Parameters
+ ----------
+ name : str
+ The new camera name.
+ client_id : str
+ The client ID.
+ camera_id : str
+ The old camera ID.
+
+ Returns
+ -------
+ bool
+ True if the camera name is valid, False otherwise.
+ """
+ retval = True
+ new_name = name # TODO: check if sending old name is needed if we have camera_id
+ old_name = camera_id
+ if old_name not in CLIENTS_DICT[client_id]:
+ logger.error("Old camera name does not exist.")
+ retval = False
+ if new_name in CLIENTS_DICT[client_id]:
+ logger.error("New camera name already exists.")
+ retval = False
+ if not new_name or any(char in new_name for char in ':') or any(char.isspace() for char in new_name):
+ logger.error("Camera name cannot be empty or contain spaces or colons.")
+ retval = False
+ if len(new_name) > MAX_CAMERA_NAME_LENGTH:
+ logger.error("Camera name is too long (max %s characters).", MAX_CAMERA_NAME_LENGTH)
+ retval = False
+ return retval
+
+def validate_camera_threshold(threshold) -> bool:
+ """
+ Validate contour size threshold.
+
+ Parameters
+ ----------
+ threshold : str
+ The contour size threshold.
+
+ Returns
+ -------
+ bool
+ True if the contour size threshold is valid, False otherwise.
+ """
+ retval = True
+ try:
+ value = int(threshold)
+ if value <= 0:
+ logger.error("Contour size threshold must be positive: %r.", threshold)
+ retval = False
+ except ValueError as exc:
+ logger.error("Invalid contour size threshold: %r, exception: %r", threshold, exc)
+ retval = False
+ return retval
+
+def validate_camera_grace_period(grace_period) -> bool:
+ """
+ Validate movement grace period.
+
+ Parameters
+ ----------
+ grace_period : str
+ The movement grace period.
+
+ Returns
+ -------
+ bool
+ True if the movement grace period is valid, False otherwise.
+ """
+ retval = True
+ try:
+ value = int(grace_period)
+ if value < 0:
+ logger.error("Movement grace period cannot be negative: %r.", grace_period)
+ retval = False
+ except ValueError as exc:
+ logger.error("Invalid movement grace period: %r, exception: ", grace_period, exc)
+ retval = False
+ return retval
+
+def validate_camera_address(address) -> bool:
+ """
+ Validate camera address.
+
+ Parameters
+ ----------
+ address : str
+ The camera address.
+
+ Returns
+ -------
+ bool
+ True if the camera address is valid, False otherwise.
+ """
+ retval = True
+ if not address or any(char.isspace() for char in address):
+ logger.error("Camera address %r cannot be empty.", address)
+ retval = False
+ if not address.startswith('/dev/video'):
+ logger.error("Camera address must start with '/dev/video': %r.", address)
+ retval = False
+ return retval
+
+def validate_remove_camera(name, client_id) -> bool:
+ """
+ Validate remove camera directive.
+
+ Parameters
+ ----------
+ name : str
+ The camera name to remove.
+ client_id : str
+ The client ID.
+
+ Returns
+ -------
+ bool
+ True if the camera name is valid, False otherwise.
+ """
+ retval = True
+ if name not in CLIENTS_DICT[client_id]:
+ logger.error("Camera name %r does not exist.", client_id)
+ retval = False
+ return retval
+
+if __name__ == "__main__":
+ uvicorn.run(
+ _app,
+ port=8007,
+ host='127.0.0.1',
+ log_level='info',
+ )
diff --git a/worker.py b/worker.py
new file mode 100644
index 0000000..26ea8aa
--- /dev/null
+++ b/worker.py
@@ -0,0 +1,628 @@
+#!/usr/bin/env python
+"""
+A module consisting of a zeromq worker receiving video stream
+via router from clients. Able to send control messages to clients.
+Needs libzmq and pyzmq with 'drafts' support.
+"""
+
+import os
+import sys
+import struct
+import datetime
+from threading import Thread, Event
+import time
+import signal
+import logging
+from queue import Queue
+from collections import defaultdict
+import tempfile
+import json
+import zmq
+import cv2
+import numpy
+
+from helpers import (
+ CustomLoggingFormatter,
+ bytes_to_str,
+ TIME_FORMAT_STRING,
+)
+
+__author__ = "Franoosh Corporation"
+
+# Various constants: # TODO: put them in a config
+
+HOST = "127.0.0.1"
+ZMQPORT = "9979"
+# WSPORT = "8000"
+WSPORT = "8008"
+ZMQ_BACKEND_ADDR = f"tcp://{HOST}:{ZMQPORT}"
+WEB_BACKEND_ADDR = f"tcp://{HOST}:{WSPORT}"
+# LOGLEVEL = logging.DEBUG
+LOGLEVEL = logging.INFO
+
+# File paths:
+
+LOGDIR = 'logs'
+if not os.path.exists(LOGDIR):
+ try:
+ os.makedirs(LOGDIR)
+ except Exception as exc:
+ print(f"Could not create log directory: {exc}")
+ sys.exit()
+BASENAME = f"{os.path.splitext(os.path.basename(__file__))[0]}"
+LOGFILE = os.path.join(LOGDIR, f"{BASENAME}.log")
+
+CWD = os.getcwd()
+TMP_DIR = os.path.join(CWD, "tmp")
+CLIENTS_JSON_FILE = os.path.join(CWD, 'clients.json')
+# This should come from the config file:
+VIDEO_DIR = os.path.join(CWD, "videos")
+
+# Other:
+
+CLIENTS_DICT = {}
+
+stop_event = Event()
+
+log_formatter = CustomLoggingFormatter()
+handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
+handler.setFormatter(log_formatter)
+logging.root.addHandler(handler)
+logging.root.setLevel(LOGLEVEL)
+logger = logging.getLogger(__name__)
+logging.basicConfig(
+ filename=LOGFILE,
+ datefmt='%Y-%m-%d %I:%M:%S',
+ level=LOGLEVEL,
+)
+
+
+if not os.path.exists(TMP_DIR):
+ try:
+ os.makedirs(TMP_DIR)
+ except Exception as exc:
+ logger.error("Could not create temporary directory: %r", exc)
+ sys.exit()
+
+class MonitorTask(Thread):
+ """
+ Monitor the connection status of the zmq socket.
+
+ Parameters
+ ----------
+ socket : zmq.Socket
+ The zmq socket to monitor.
+
+ Methods
+ -------
+ run()
+ Monitor connection on the initial socket.
+ stop()
+ Stop thread
+ """
+ def __init__(self, socket) -> None:
+ """
+ Docstring for __init__
+
+ Parameters
+ ----------
+ socket : zmq.Socket
+ The zmq socket to monitor.
+ """
+ super().__init__(daemon=True)
+ self.socket = socket
+ self.running = True
+
+ def run(self) -> None:
+ """
+ Monitor connection on the initial socket.
+ This is heartbeat monitoring
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ monitor = self.socket.get_monitor_socket()
+ monitor.setsockopt(zmq.RCVTIMEO, 5000)
+ logger.debug("Monitor socket started.")
+ while self.running:
+ try:
+ event, _ = monitor.recv_multipart()
+ # Resolve the received event type:
+ event_type = int.from_bytes(event[:2], "little")
+ if event_type in (zmq.EVENT_CLOSED, zmq.EVENT_DISCONNECTED):
+ logger.warning("Monitor socket: closed | disconnected")
+ stop_event.set()
+ elif event_type == zmq.EVENT_CONNECT_DELAYED:
+ logger.debug("Monitor socket: event connect delayed")
+ elif event_type == zmq.EVENT_CONNECT_RETRIED:
+ logger.debug("Monitor socket: event connect retried")
+ elif event_type in (zmq.EVENT_CONNECTED, zmq.EVENT_HANDSHAKE_SUCCEEDED):
+ logger.debug("Monitor socket: client connected to router, handshake OK.")
+ stop_event.clear()
+ else:
+ logger.warning("Monitor socket: other event: %r", event_type)
+ except zmq.Again:
+ logger.debug("Timeout on monitoring socket.")
+ except Exception as exc: # W: Catching too general exception Exception
+ logger.error("Other exception on monitoring socket: %r", exc)
+
+ # monitor.close()
+
+ def stop(self) -> None:
+ """
+ Stop thread
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ logger.info("Stopping monitor thread ...")
+ self.running = False
+
+
+class ServerWorker(Thread):
+ """
+ ServerWorker class for zmq video messaging.
+
+ Parameters
+ ----------
+ identity : bytes
+ Worker identity
+ context : zmq.Context, optional
+ ZMQ context (default is None, which creates a new context)
+
+ Methods
+ -------
+ start_client(client_id, camera_id, filename)
+ Start a video thread for new client_id and camera_id.
+ stop_client(client_id, camera_id)
+ Stop video thread for a client_id and camera_id.
+ run()
+ Main loop of the worker.
+ """
+ def __init__(self, identity, context=None) -> None:
+ """
+ Docstring for __init__
+
+ Parameters
+ ----------
+ identity : bytes
+ Worker identity
+ context : zmq.Context, optional
+ ZMQ context (default is None, which creates a new context)
+ """
+ super().__init__(daemon=True)
+ self.context = context or zmq.Context.instance()
+ self.socket = self.context.socket(zmq.DEALER)
+ self.id = self.socket.identity = identity
+ self.monitor = MonitorTask(self.socket)
+ self.video_threads = defaultdict(lambda: defaultdict(dict))
+ self.poller = zmq.Poller()
+ self.web_sock = self.context.socket(zmq.DEALER)
+ self.connected = False
+ self.running = True
+
+ def start_client(self, client_id, camera_id, filename) -> None:
+ """
+ Start a video thread for new client_id and camera_id.
+
+ Parameters
+ ----------
+ client_id : bytes
+ Client ID
+ camera_id : bytes
+ Camera ID
+ filename : str
+ Filename to write video to
+
+ Returns
+ -------
+ None
+ """
+ # New client or new camera for existing client:
+ if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id):
+ q = Queue()
+ logger.debug("Starting new video thread for client %r, camera %r", client_id, camera_id)
+ video_worker = VideoWorker(client_id, camera_id, filename, q)
+ video_worker.start()
+ self.video_threads[client_id][camera_id] = video_worker
+
+ def stop_client(self, client_id, camera_id) -> None:
+ """
+ Stop video thread for a client_id and camera_id.
+
+ Parameters
+ ----------
+ client_id : bytes
+ Client ID
+ camera_id : bytes
+ Camera ID
+
+ Returns
+ -------
+ None
+ """
+ if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
+ logger.debug("Stopping video thread for client %r, camera %r", client_id, camera_id)
+ self.video_threads[client_id][camera_id].stop() # Stop the thread
+ del self.video_threads[client_id][camera_id]
+ logger.info("Stopped video thread for client %r, camera %r", client_id, camera_id)
+
+ def handle_metadata(
+ self,
+ client_id: bytes,
+ camera_id: bytes,
+ directive: bytes,
+ timestamp: bytes
+ ) -> None:
+ """
+ Handle metadata messages.
+
+ Parameters
+ ----------
+ client_id : bytes
+ Client ID
+ camera_id : bytes
+ Camera ID
+ directive : bytes
+ Action to perform (e.g., 'start', 'stop')
+ timestamp : bytes
+ Timestamp of the action
+
+ Returns
+ -------
+ None
+ """
+ logger.info(
+ "Received metadata directive: %r for client: %r, camera: %r, timestamp: %r",
+ directive, client_id, camera_id, timestamp
+ )
+ update_clients(client_id, camera_id)
+ if directive in (b'start', b'stop'):
+ camera_id_string = camera_id.decode('utf-8')
+ timestamp_string = datetime.datetime.fromtimestamp(
+ struct.unpack('d', timestamp)[0]
+ ).strftime(TIME_FORMAT_STRING)
+ filename = f"{camera_id_string}-{timestamp_string}.mp4"
+ logger.debug("Directive: %r, Timestamp: %r, Filename: %r", directive, timestamp, filename)
+ if directive == b"start":
+ self.start_client(client_id, camera_id, filename)
+ elif directive == b"stop":
+ self.stop_client(client_id, camera_id)
+ else:
+ logger.error("Unknown directive: %r", directive)
+ # elif directive == b'rename':
+ # old_name = new_name = timestamp = None
+ # try:
+ # old_name, new_name, timestamp = data.split(b":")
+ # logger.info("Renamed video thread from %r to %r.", old_name, new_name)
+ # except ValueError:
+ # logger.error("Invalid rename data format: %r", data)
+ # if old_name and new_name and timestamp:
+ # # I think it's better to stop the old thread and start a new one,
+ # # rather than reuse the old one as it's less mucking about.
+ # self.stop_client(old_name, camera_id)
+ # new_name_string = new_name.decode('utf-8')
+ # camera_id_string = camera_id.decode('utf-8')
+ # timestamp_string = timestamp.decode('utf-8')
+ # self.start_client(
+ # new_name,
+ # camera_id,
+ # f"{new_name_string}_{camera_id_string}-{timestamp_string}.mp4"
+ # )
+ # to_remove= b':'.join([client_id, camera_id])
+ # try:
+ # self.web_sock.send_multipart([to_remove, b'', b'']) # Notify webserver of rename
+ # except Exception as exc:
+ # logger.error("Sending rename notification to websocket failed: %r", exc)
+
+ # elif directive in (b'new_camera', b'removed_camera'):
+ # logger.info("Received camera metadata update: %r for client %r, camera %r",
+ # directive,
+ # client_id,
+ # camera_id,
+ # )
+ # # Just forward the message to the webserver:
+ # try:
+ # self.web_sock.send_multipart([client_id, camera_id, directive, data], flags=zmq.NOBLOCK)
+ # except Exception as exc:
+ # logger.error("Sending camera metadata update to websocket failed: %r", exc)
+ else:
+ logger.error("Unknown directive: %r", directive)
+
+ return False
+
+ def handle_video_message(self, msg: list) -> None:
+ logger.debug("Received video message with data only.")
+ client_id, camera_id, *content = msg
+ if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
+ self.video_threads[client_id][camera_id].queue.put(content[0])
+ # Send only [client_id, camera_id, jpeg_bytes] to the webserver:
+ # zmq subsciber can subscribe to a topic defined by the first
+ # part of the multipart message, so in order to allow for a
+ # per camera subscription, we need to join client_id and camera_id
+ topic = b':'.join([client_id, camera_id])
+ try:
+ self.web_sock.send_multipart([topic, content[0]], flags=zmq.NOBLOCK)
+ except Exception as exc:
+ logger.error("Sending message to websocket failed: %r", exc)
+ else:
+ logger.error("No video thread found for client %r, camera %r", client_id, camera_id)
+ logger.debug("Available video threads keys: %r", self.video_threads.keys())
+ logger.debug("Available video threads values: %r", self.video_threads.values())
+
+ def run(self) -> None:
+ """
+ Main loop of the worker.
+ Full of wonders.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ logger.debug("ServerWorker %r starting ...", self.id)
+ self.socket.connect(ZMQ_BACKEND_ADDR)
+ try:
+ self.web_sock.bind(WEB_BACKEND_ADDR)
+ except Exception as exc:
+ logger.error("Connection to zmq websocket failed: %r", exc)
+ self.poller.register(self.socket, zmq.POLLIN)
+ self.poller.register(self.web_sock, zmq.POLLIN)
+ self.monitor.start()
+
+ while self.running:
+ logger.debug("ServerWorker %r waiting for a message ...", self.id)
+ if not self.connected or stop_event.is_set():
+ self.socket.send_multipart([b"READY"]) # Router needs worker identity, hence this
+ time.sleep(0.1) # Wait a bit before trying to connect again
+
+ sockets = dict(self.poller.poll(10))
+ if self.socket in sockets:
+ self.connected = True
+ msg = self.socket.recv_multipart()
+ logger.debug("ServerWorker %r received message of length: %d.", self.id, len(msg))
+ filename = None
+ # At the moment we don't expect any other message than a
+ # start/stop message (of length 4) and a video message:
+ if len(msg) == 4: # This is a message with start/stop/rename directive and no video data.
+ self.handle_metadata(*msg)
+ elif len(msg) == 3: # This is a video message with data
+ self.handle_video_message(msg)
+ else:
+ logger.warning("Received a message of unexpected length from client. Message length: %d", len(msg))
+ else:
+ logger.debug("No message received, polling again ...")
+ time.sleep(0.1)
+ if self.web_sock in sockets:
+ frontend_msg = self.web_sock.recv_multipart()
+ logger.info("Received message from frontend: %r", frontend_msg)
+ self.socket.send_multipart(frontend_msg)
+ logger.info("Forwarded message to backend: %r", frontend_msg)
+
+ self.monitor.stop()
+ self.monitor.join()
+
+ for camera_thread in self.video_threads.values():
+ for thread in camera_thread.values():
+ thread.queue.put(None) # Sentinel to unblock queue.get()
+ thread.stop()
+ thread.join()
+
+ def send_control_message(self, client_id, camera_id, message) -> None:
+ """
+ Send control message to a specific client and camera.
+
+ Parameters
+ ----------
+ client_id : bytes
+ Client ID
+ camera_id : bytes
+ Camera ID
+ message : bytes
+ Control message to send
+
+ Returns
+ -------
+ None
+ """
+ if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
+ # self.socket.send_multipart([client_id.encode("utf-8"), camera_id.encode("utf-8"), message])
+ self.socket.send_multipart([client_id, camera_id, message])
+ logger.info("Sent control message to client %r, camera %r: %r", client_id, camera_id, message)
+ else:
+ logger.error("No video thread found for client %r, camera %r", client_id, camera_id)
+
+ def stop(self) -> None:
+ """
+ Stop the worker thread.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ logger.info("ServerWorker %r exiting ...", self.id)
+ self.running = False
+
+
+class VideoWorker(Thread):
+ """
+ Class for video threads writing video stream to files.
+
+ Parameters
+ ----------
+ client_id : bytes
+ Client ID
+ camera_id : bytes
+ Camera ID
+ filename : str
+ Filename to write video to
+ queue : Queue
+ Queue to receive video frames
+
+ Methods
+ -------
+ run()
+ Main loop of the video worker thread.
+ stop()
+ Stop thread
+ """
+
+ def __init__(self, client_id, camera_id, filename, queue) -> None:
+ """
+ Docstring for __init__
+
+ Parameters
+ ----------
+ client_id : bytes
+ Client ID
+ camera_id : bytes
+ Camera ID
+ filename : str
+ Filename to write video to
+ queue : Queue
+ Queue to receive video frames
+ """
+ super().__init__(daemon=True)
+ self.context = zmq.Context()
+ self.context.setsockopt(zmq.LINGER, 0)
+ self.client_id = client_id
+ self.camera_id = camera_id
+ self.filename = filename
+ self.queue = queue
+ self.live = True
+
+ def stop(self) -> None:
+ """
+ Docstring for stop
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ logger.info("VideoWorker %r exiting ...", self.camera_id)
+ self.live = False
+
+ def run(self) -> None:
+ """
+ Main loop of the video worker thread.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ # Create a VIDEO_DIR/client_id/camera_id/ directory if it doesn't exist
+ dirname = os.path.join(
+ VIDEO_DIR,
+ self.client_id.decode('utf-8'),
+ self.camera_id.decode('utf-8'),
+ )
+ filename = os.path.join(
+ dirname,
+ self.filename
+ )
+ try:
+ os.makedirs(dirname, exist_ok=True)
+ except PermissionError as exc:
+ logger.error("Could not create directory '%s': %r", dirname, exc)
+ return
+ if os.path.exists(filename):
+ logger.warning("File '%s' already exists, overwriting ...", filename)
+
+ fourcc = cv2.VideoWriter_fourcc(*'avc1')
+ out = cv2.VideoWriter(filename, fourcc, 30.0, (640, 480)) # Assuming 640x480 resolution
+ logger.info("This is the first run, binding websocket ...")
+ while self.live:
+ logger.debug("VideoWorker writing to file: %s", filename)
+ frame_bytes = self.queue.get()
+ if frame_bytes is None:
+ logger.debug("Received None, stopping video worker for camera: '%r'", self.camera_id)
+ break
+ frame = cv2.imdecode(numpy.frombuffer(frame_bytes, dtype=numpy.uint8), cv2.IMREAD_COLOR)
+ logger.debug("Processing ('writing to a file') frame for camera: '%r'", self.camera_id)
+ # Write frame to file:
+ out.write(frame)
+
+ # Release video writer
+ out.release()
+ logger.info("VideoWorker finished writing to file: %s", self.filename)
+
+def signal_handler(sig, frame) -> None:
+ """
+ Docstring for signal_handler
+
+ sig : Signal
+ Signal number
+ frame : Frame
+ Current stack frame
+ """
+ worker.stop()
+
+def update_clients(client_id, camera_id) -> None:
+ """
+ Update client and camera dictionary and write to a shared file.
+
+ Parameters
+ ----------
+ client_id : bytes
+ Client ID
+ camera_id : bytes
+ Camera ID
+
+ Returns
+ -------
+ None
+ """
+ global CLIENTS_DICT
+ if client_id not in CLIENTS_DICT:
+ logger.debug("Client_id not in CLIENTS_DICT, adding an empty list for it.")
+ CLIENTS_DICT[client_id] = []
+ if camera_id not in CLIENTS_DICT[client_id]:
+ logger.debug("Camera_id not in CLIENTS_DICT[%s] list, adding", client_id)
+ CLIENTS_DICT[client_id].append(camera_id)
+ # Atomic write using tempfile. Works only when both files on the same filesystem
+ with tempfile.NamedTemporaryFile('w', dir=TMP_DIR, delete=False) as tf:
+ logger.debug("Dumping to file CLIENTS_DICT: %r", CLIENTS_DICT)
+ json.dump(bytes_to_str(CLIENTS_DICT), tf)
+ tempname = tf.name
+ os.replace(tempname, CLIENTS_JSON_FILE)
+
+
+if __name__ == "__main__":
+ logger.info("Starting up ...")
+
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+
+ worker = ServerWorker(b"worker-task")
+ worker.start()
+
+ worker.join()
+ try:
+ os.remove(CLIENTS_JSON_FILE)
+ except FileNotFoundError:
+ pass