diff options
| -rw-r--r-- | .gitignore | 3 | ||||
| -rw-r--r-- | README.md | 25 | ||||
| -rw-r--r-- | client.cfg | 17 | ||||
| -rw-r--r-- | client.py | 1006 | ||||
| -rw-r--r-- | helpers.py | 256 | ||||
| -rw-r--r-- | router.py | 266 | ||||
| -rw-r--r-- | static/css/main.css | 36 | ||||
| -rw-r--r-- | templates/client.html | 173 | ||||
| -rw-r--r-- | templates/main.html | 16 | ||||
| -rw-r--r-- | webserver.py | 524 | ||||
| -rw-r--r-- | worker.py | 628 |
11 files changed, 2950 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c70b96d --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +__pycache__/ +*/logs +*/videos diff --git a/README.md b/README.md new file mode 100644 index 0000000..f0ba6a6 --- /dev/null +++ b/README.md @@ -0,0 +1,25 @@ +Movement detection application. +Once movement is detected, application starts streaming via router to one of available servers. +Server stores video stream and makes them avaiable via a web gui, also live. + +This repo includes only the application layer. +Although this is written for Gentoo Linux, it should be trivial to run on any other nix. + +There shall be another git repo for the OS bits necessary for deployment. + +Technologies used: + +- Gentoo Linux; +- V4l for handling of camera stream; +- OpenCV (Python bindings) for movement detection; +- ZeroMQ (Python bindings) for messaging; +- FastAPI for web part. + + +Necessary Gentoo packages (may not be complete): + +media-libs/opencv[python,v4l] - USE list may be incomplete +net-libs/zeromq[drafts, sodium] + +Necessary python modules (may not be complete): +dev-python/inotify diff --git a/client.cfg b/client.cfg new file mode 100644 index 0000000..c407c4e --- /dev/null +++ b/client.cfg @@ -0,0 +1,17 @@ +[main] +testing = True + +[network] +router_address = 127.0.0.1 +router_port = 5569 + +[logging] +logdir = logs +logfile = client.log +loglevel = INFO + +[cameras] +front.address = /dev/video0 +front.contour_size_threshold = 4000 +front.movement_grace_period = 10 + diff --git a/client.py b/client.py new file mode 100644 index 0000000..fc0d3f7 --- /dev/null +++ b/client.py @@ -0,0 +1,1006 @@ +#!/usr/bin/env python + +""" +A module containing client for streaming video to a zmq router. +""" + +__author__ = "Franoosh Corporation" + +import sys +import os +from threading import Thread, Event +from queue import Queue, Empty +from collections import deque, defaultdict +import time +import datetime +import signal +import logging +from configparser import ConfigParser +import zmq +import cv2 +import inotify.adapters +import yappi +from concurrent.futures import ThreadPoolExecutor + +from helpers import ( + CustomLoggingFormatter, + process_frame, + compute_contours, + detect_movement, + scale_contours, + draw_contours, + str_to_bytes, + timestamp_to_bytes, + bytes_to_timestamp, + write_yappi_stats, +) + +################################################### +# Defaults and constants # +################################################### + +TESTING = False # For now this turns on yappi profiling +CONFIG_FILE = "client.cfg" +# Those will be updated from config file if present: +LOGDIR = 'logs' +if not os.path.exists(LOGDIR): + try: + os.makedirs(LOGDIR) + except Exception as exc: + print(f"Could not create log directory: {exc}") + sys.exit() +BASENAME = f"{os.path.splitext(os.path.basename(__file__))[0]}" +LOGFILE = os.path.join(LOGDIR, f"{BASENAME}.log") +YAAPI_LOG_FILE_BASE = "client_yaapi" +# LOGLEVEL = logging.INFO +LOGLEVEL = logging.DEBUG + +ROUTER_ADDRESS = "localhost" +PORT = "5569" +ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}" +# This is populated at startup with all cameras present in /dev +# (like /dev/video0, /dev/video1, etc.). +# This will include dummy devices! +# If cameras are already defined in config file, those will override present devicesl: +KNOWN_CAMERA_PARAMETERS = ('address', 'contour_size_threshold', 'movement_grace_period') +CONTOUR_SIZE_THRESHOLD = 4000 # Minimum contour area to consider +MOVEMENT_GRACE_PERIOD = 10 # Seconds to wait after movement stops +TIME_FORMAT = '%Y_%m_%d-%H_%M_%S' +START_STOP_MESSAGE_FMT = "{action}:{data}" +MAX_CAMERA_NAME_LENGTH = 256 +FPS = 30 +# This should be unique, each box different. +# If client ID is wrong, should server reject connection? +# TODO: Devise and implement schema for client ID assignment. +# Maybe serial number from /proc/cpuinfo. +# In bytes. +CLIENT_ID = b"client_001" + +##################################################### +# End defaults and constants # +##################################################### + +log_formatter = CustomLoggingFormatter() +# Attempt to create logging directory if it does not exist: +if not os.path.exists(LOGDIR): + try: + os.makedirs(LOGDIR) + except Exception as exc: + print("Could not create log directory: %r", exc) +try: + handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') +except Exception as exc: + handler = logging.StreamHandler() +# Prevent log pollution from inotify when LOG_LEVEL set to debug: +logging.getLogger("inotify").setLevel(logging.WARNING) +handler.setFormatter(log_formatter) +logging.root.addHandler(handler) +logging.root.setLevel(LOGLEVEL) +logger = logging.getLogger(__name__) +logging.basicConfig( + filename=LOGFILE, + datefmt=TIME_FORMAT, + level=LOGLEVEL, +) + +CAMERAS = {} +stop_event = Event() + + +def _encoder_worker( + encode_queue: Queue, + output_queue: Queue, + jpeg_quality=60, + ) -> None: + while True: + try: + identity, frame = encode_queue.get(timeout=1) + except Empty: + logger.debug("Encoder worker: encode queue empty, continuing ...") + continue + # Encode frame as JPEG with lower quality: + encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_quality] + try: + _, buffer = cv2.imencode('.jpg', frame, encode_param) + except Exception as exc: + logger.error("Encoder worker: Error encoding frame: %r", exc) + try: + message = [identity, buffer.tobytes()] + output_queue.put_nowait(message) + except Exception as exc: + logger.debug("Send queue full, dropping encoded frame: %r", exc) + time.sleep(0.1) + +class ClientVideo(Thread): + """ + Class for sending video stream + + Attributes: + ---------- + _id: (bytes) + Camera ID + device_dict: dict + Dictionary with device parameters (bytes for strings, integers for ints) + queue: Queue + Queue for sending messages to main client task + + Methods: + -------- + run + Main video streaming logic + stop + Stop video streaming + """ + + def __init__(self, _id, device_dict, queue) -> None: + """ + Parameters + ---------- + _id : bytes + Camera ID + device_dict : dict + Dictionary with device parameters (bytes for strings, integers for ints) + queue : Queue + Queue for sending messages to main client task + """ + super().__init__(daemon=True) + self.identity = _id + self.device_dict = device_dict # <-- A dict with bytes for strings and integers for ints. + self.device_addr = self.device_dict.get(b'address') # No default. + self.device_threshold = device_dict[b'contour_size_threshold'] + self.device_grace_period = device_dict[b'movement_grace_period'] + self.queue = queue + # Store only FPS number of frames, compare first and last: + self.frame_deque = deque(maxlen=FPS) + self.encode_queue = Queue(maxsize=3) + self._enc_executor = ThreadPoolExecutor(max_workers=1) + self.live = True + + def send_frame(self, contours: list) -> None: + """ + Prepare and send frame with drawn contours. + + Parameters + ---------- + contours: list + list of contours to draw on a frame + """ + + raw_frame = self.frame_deque[-1][2] + scaling_factor = self.frame_deque[-1][1] + scaled_contours = scale_contours(contours, scaling_factor) + try: + frame_out = draw_contours( + raw_frame, + scaled_contours, + min_contour_area=self.device_threshold, + ) + except Exception as exc: + logger.error("ClientVideo %r: Error drawing contours: %r", self.identity, exc) + frame_out = raw_frame + + # Send video message with contour and frame: + try: + logger.debug("ClientVideo %r: Queueing frame for encoding ...", self.identity) + self.encode_queue.put_nowait((self.identity, frame_out)) + except Exception as exc: + try: + logger.debug("ClientVideo %r: Encode queue full, dropping oldest frame: %r", self.identity, exc) + _ = self.encode_queue.get_nowait() # Remove oldest frame + self.encode_queue.put_nowait((self.identity, frame_out)) + except Exception: + pass + + def run(self) -> None: + """ + Video streaming logic. + + Parameters + ---------- + None + + Returns + ------- + None + """ + logger.debug("ClientVideo '%s' starting ...", self.identity) + # The cv2.CAP_V4l2 below forces reading using v4l backend rather than gstreamer. + # If opencv was compiled with gstreamer support, it will default to gstreamer. + # Then we need to make sure that appropriate gstreamer plugins are installed. + # Although I haven't profiled it, it seems that using v4l uses less juice. + # Some devices can be opened with address (/dev/video0) and some only with integer index (0). + cap = cv2.VideoCapture( + self.device_addr.decode('utf-8'), + cv2.CAP_V4L2 + ) + if not cap.isOpened(): + logger.error("ClientVideo %r: Unable to open video device %r", self.identity, self.device_addr) + cap.release() + return + + # Flag to track movement state. Expires after grace period: + movement_detected = False + movement_detected_start = movement_now = None + while self.live and not stop_event.is_set(): + ret, frame = cap.read() + if not ret: + logger.error("ClientVideo %r: Failed to read frame from device %r", self.identity, self.device_addr) + break + try: + # Produce grayscale blurred frame for contour detection: + processed_frame_tuple = process_frame(frame) + except RuntimeError as exc: + logger.error("ClientVideo %r: Error processing frame: %r", self.identity, exc) + continue + # Append both original and processed frame to deque: + # processed_frame_tuple = (processed_frame, scaling_factor, raw_frame) + self.frame_deque.append(processed_frame_tuple + (frame,)) + # Skip on the first frame: + # To detect movement, compare only first and last frame from the queue of length FPS + # in order to reduce load: + if len(self.frame_deque) >= 2: + sample_frames = self.frame_deque[0][0], self.frame_deque[-1][0] + try: + contours = compute_contours(sample_frames) + logger.debug("ClientVideo %r: Found %d contours.", self.identity, len(contours)) + logger.debug("ClientVideo %r: Contours: %r", self.identity, contours) + except Exception as exc: + logger.error("ClientVideo %r: Error computing contours: %r", self.identity, exc) + continue + try: + movement_now = detect_movement(contours, min_area=self.device_threshold) + logger.debug("ClientVideo %r: Movement detected in frame.", self.identity) + except Exception as exc: + logger.error("ClientVideo %r: Error detecting movement: %r", self.identity, exc) + continue + + # Only update movement start time and send start message + # if movement is detected and was not detected before: + if movement_now: + if not movement_detected: + # Update movement detected start time: + movement_detected_start = time.time() + queue_metadata_message( + identity=self.identity, + queue=self.queue, + action=b'start', + timestamp=timestamp_to_bytes(movement_detected_start), + ) + movement_detected = True + + # Prepare and send frame with drawn contours: + self.send_frame(contours) + + else: + if movement_detected: + # Movement was detected before, but not anymore. + # We wait for grace period before sending stop message. + now = time.time() + delta_seconds = now - movement_detected_start + logger.debug( + "ClientVideo %r: delta seconds since movement detected: %d, grace remaining: %d.", + self.identity, + delta_seconds, + self.device_grace_period - delta_seconds + ) + if delta_seconds >= self.device_grace_period: + queue_metadata_message( + identity=self.identity, + queue=self.queue, + action=b'stop', + timestamp=timestamp_to_bytes(now), + ) + movement_detected = False + movement_detected_start = None + else: + # Still in grace period, send frame with contours: + self.send_frame(contours) + else: + pass # No movement detected currently or before, do nothing. + logger.debug("Calling cap.release() ...") + cap.release() + logger.info("ClientVideo %r exiting ...", self.identity) + + def stop(self) -> None: + """ + Stop video streaming thread. + + Parameters + ---------- + None + + Returns + ------- + None + """ + logger.info("Client video %r sending stop message ...", self.identity) + self.live = False + global stop_event + stop_event.set() + # self.queue.put([]) # Put a dummy message to unblock queue if needed + + +class ClientTask(Thread): + """ + Main Client Task with logic, message handling and setup. + + Attributes: + ---------- + context: zmq.Context + ZMQ context + socket: zmq.Socket + ZMQ DEALER socket + identity: bytes + Client identity + poll: zmq.Poller + ZMQ poller for socket + video_threads: dict + Dictionary of ClientVideo threads + running: bool + Flag indicating if client is running + queue: Queue + Queue for sending messages to server + + Methods: + -------- + run + Main client logic + stop + Stop client task + receive_messages + Receive messages from the server + send_messages + Send messages to the server + update_config(message) + Update configuration based on message from server + watch_for_new_cameras + Watch for new camera devices using inotify + """ + + def __init__(self, _id, context: zmq.Context=None) -> None: + """ + Parameters + ---------- + _id : bytes + Client ID + context : zmq.Context, optional + ZMQ context, by default None + """ + super().__init__(daemon=True) + self.context = context or zmq.Context.instance() + self.socket = self.context.socket(zmq.DEALER) + self.identity = self.socket.identity = _id # <-- bytes + self.poll = zmq.Poller() + self.poll.register(self.socket, zmq.POLLIN) + self.video_threads = {} + self.running = True + self.connected = False + self.queue = Queue() + + def receive_messages(self) -> None: + """ + Receive messages from the server. + Messages are expected to be multipart with: + [camera_id (bytes), directive (bytes), value (bytes)] + Update configuration based on received messages running update_config(message). + + Parameters + ---------- + None + + Returns + ------- + None + """ + while self.running: + try: + sockets = dict(self.poll.poll(1000)) + if self.socket in sockets: + logger.debug("Self socket in sockets, receiving message ...") + try: + message = self.socket.recv_multipart() + logger.info("Client '%r' received message: %r.", self.identity, message) + self.update_config(message) + except Exception as exc: + logger.error("Failed to receive message: %r", exc) + except zmq.ZMQError as exc: + logger.error("Client '%r': socket error: %r", self.identity, exc) + time.sleep(0.01) # Sleep to avoid busy waiting + + def send_messages(self) -> None: + """ + Send messages from queue to the server. + + Parameters + ---------- + None + + Returns + ------- + None + """ + while self.running: + try: + logger.debug("Client '%r': Waiting for message to send ...", self.identity) + frame = self.queue.get(timeout=0.1) + logger.debug("Sending message of length %r ...", len(frame)) + try: + if len(frame) > 1: + # TODO: This blocks? + self.socket.send_multipart(frame) + else: + logger.debug("Client '%r': Dummy message received, not sending.", self.identity) + except zmq.ZMQError as exc: + logger.error("Client '%r': socket error: %r", self.identity, exc) + except Empty: + continue # No message to send, continue waiting + + def update_config(self, message: list) -> None: + """ + Update configuration based on message from server. + Only 'cameras' section of the config file can be updated. + Message format: + [camera_id (bytes), directive (bytes), value (bytes)] + Defined directives are: + modify_camera_name, + modify_camera_threshold, + modify_camera_grace_pd, + add_camera, + remove_camera, + + Parameters + ---------- + message : list + Message received from server + + Returns + ------- + None + """ + try: + logger.debug("Client received config message: %r", message) + try: + camera_id, directive, value = message # <-- bytes + except ValueError as exc: + logger.error("Invalid config message format: %r", exc) + return + # Parameter validation happens on the server side before sending the message + if directive in ( + b'modify_camera_name', + b'modify_camera_threshold', + b'modify_camera_grace_pd', + ): + # Ok, just validate that camera exists: + if camera_id not in CAMERAS: # <-- bytes + logger.error("Cannot update unknown camera: %r", camera_id) + logger.debug("Known cameras: %r", list(CAMERAS.keys())) + else: + if directive == b'modify_camera_name': + old_name, new_name = camera_id, value # <-- bytes + # Update global CAMERAS dict: + CAMERAS[new_name] = CAMERAS.pop(old_name) + # Update config file accordingly: + for key, value in cfg.items('cameras'): # <-- strings + if key.startswith(old_name.encode('utf-8') + '.'): + # Take the second part of a key like 'camera1.address' -> 'address': + param = key.split('.', 1)[1] + cfg.set('cameras', f"{new_name.encode('utf-8')}.{param}", value.encode('utf-8')) + cfg.remove_option('cameras', key) + # Update video threads dict: + self.video_threads[new_name] = self.video_threads.pop(old_name) + # Send message to server to update routing: + # FIXME: This is a mess. Settle on a format for data field. + queue_metadata_message( + identity=self.identity, + queue=self.queue, + action=b'rename', + # data=':'.join([old_name.decode('utf-8'), new_name.decode('utf-8'), timestamp]) # FIXME: this should be datetime object + data=timestamp_to_bytes(time.time()), + ) + self.send_messages() + logger.info("Renamed and restarted thread for camera %r to %r.", old_name, new_name) + elif directive == b'modify_camera_threshold': + try: + _value = int(value) + except ValueError as exc: + logger.error("Invalid contour size threshold value: %r, %r", value, exc) + return + CAMERAS[camera_id][b'contour_size_threshold'] = _value # <-- integer! + self.video_threads[camera_id].device_threshold = _value # <-- integer! + # configparser requires strings: + cfg.set('cameras', f"{camera_id.decode('utf-8')}.contour_size_threshold", str(value)) + logger.info("Modified contour size threshold of camera %r to %d.", camera_id, value) + elif directive == b'modify_camera_grace_pd': + try: + _value = int(value) + except ValueError as exc: + logger.error("Invalid movement grace period value: %r, %r", value, exc) + return + CAMERAS[camera_id][b'movement_grace_period'] = _value # <-- integer! + self.video_threads[camera_id].device_grace_period = _value # <-- integer! + cfg.set('cameras', f"{camera_id.decode('utf-8')}.movement_grace_period", str(value)) + logger.info("Modified movement grace period of camera %r to %d.", camera_id, value) + elif directive == b'add_camera': + CAMERAS[camera_id] = {b'address': value} + # vid_cl = ClientVideo(camera_id, CAMERAS[camera_id], self.queue) + # vid_cl.start() + # self.video_threads[camera_id] = vid_cl + cfg.set('cameras', f"{camera_id.decode('utf-8')}.address", value.decode('utf-8')) + logger.info("Added and started a thread for new camera %r with address %r.", camera_id, value) + elif directive == b'remove_camera': + if camera_id in self.video_threads: + self.video_threads[camera_id].stop() + # self.video_threads[camera_id].join() # TODO: Check this is needed and wanted! + # del self.video_threads[camera_id] + if camera_id in CAMERAS: + del CAMERAS[camera_id] + # Remove from config file: + for key in list(cfg.options('cameras')): # <-- strings + if key.startswith(camera_id.decode('utf-8') + '.'): + cfg.remove_option('cameras', key) + logger.info("Removed and stopped thread for camera %r.", camera_id) + else: + logger.warning("Unknown config directive: %r", directive) + + except (ValueError, IndexError) as exc: + logger.error("Failed to handle config message: %r", exc) + with open(CONFIG_FILE, 'w') as f: + cfg.write(f) + + def watch_for_new_cameras(self) -> None: + """ + Watch for new camera devices using inotify. + When a new camera device is detected in /dev (e.g., /dev/video2), + send a message to the server to start receiving video from it. + When a camera device is removed, send a message to stop receiving video from it. + Update config file accordingly. + + Parameters + ---------- + None + + Returns + ------- + None + """ + i = inotify.adapters.Inotify() + i.add_watch('/dev') + + while self.running: + for event in i.event_gen(timeout_s=1, yield_nones=False): + (_, type_names, path, filename) = event + if filename.startswith('video'): + camera_pathname = os.path.join(path, filename) + if 'IN_CREATE' in type_names: + logger.info("Detected new camera device: %s", camera_pathname) + self.update_config([filename.encode('utf-8'), b'add_camera', camera_pathname.encode('utf-8')]) + video_dict = {b'address': camera_pathname.encode('utf-8')} + self.start_video_thread(filename.encode('utf-8'), video_dict) + if 'IN_DELETE' in type_names: + logger.info("Detected removed camera device: %s", camera_pathname) + self.update_config([filename.encode('utf-8'), b'remove_camera', b'']) + self.video_threads[filename.encode('utf-8')].stop() + + def start_video_thread(self, camera_id: bytes, device_dict: dict) -> None: + """ + Start a video thread for a given camera. + + Parameters + ---------- + camera_id : bytes + Camera ID + device_dict : dict + Dictionary with device parameters + + Returns + ------- + None + """ + vid_cl = ClientVideo(camera_id, device_dict, self.queue) + vid_cl.start() + self.video_threads[camera_id] = vid_cl + logger.info("Started video thread for camera %r.", camera_id) + Thread( + target=_encoder_worker, + args=(vid_cl.encode_queue, self.queue,), + daemon=True, + ).start() + + def run(self) -> None: + """ + Run main client logic thread. + + Parameters + ---------- + None + + Returns + ------- + None + """ + self.socket.connect(ADDRESS) + logger.debug("Client '%r' connected to %r", self.identity, ADDRESS) + logger.debug("Starting video threads ...") + for _id, device_dict in CAMERAS.items(): + self.start_video_thread(_id, device_dict) + + recv_thread = Thread(target=self.receive_messages) # Not a daemon + send_thread = Thread(target=self.send_messages) # Not a daemon + watchdog_thread = Thread(target=self.watch_for_new_cameras, daemon=True) + + recv_thread.start() + logger.debug("Client '%s' started receiving thread.", self.identity) + send_thread.start() + logger.debug("Client '%s' started sending thread.", self.identity) + watchdog_thread.start() + logger.debug("Client '%s' started watchdog thread.", self.identity) + + logger.debug("Client '%s' waiting for threads to finish ...", self.identity) + recv_thread.join() + send_thread.join() + watchdog_thread.join(timeout=5) + + logger.info("Closing socket ...") + try: + self.socket.close() + except Exception as exc: + logger.warning("Failed to close socket: %r", exc) + logger.debug("Terminating context ...") + try: + self.context.term() + except Exception as exc: + logger.warning("Failed to terminate context: %r", exc) + + def stop(self) -> None: + """ + Stop task + + Parameters + ---------- + None + + Returns + ------- + None + """ + logger.info("ClientTask cleaning up ...") + for _id in list(self.video_threads.keys()): + logger.info("Cleaning video thread %s ...", _id) + _thread = self.video_threads[_id] + _thread.stop() + _thread.join(timeout=3) + if _thread.is_alive(): + logger.warning("Video thread %r did not stop in time.", _id) + self.video_threads.pop(_id, None) + + self.running = False + self.queue.put([]) # Put a dummy message to unblock send_messages if waiting + try: + self.socket.close(linger=0) + except Exception as exc: + logger.warning("Error closing socket: %r", exc) + logger.info("Client task exiting ...") + + +def queue_video_message(identity: bytes, queue: Queue, frame) -> None: + """ + Docstring for queue_video_message + + Parameters + ---------- + identity : bytes + Camera identity + queue : Queue + Queue to put message to + frame : ndarray + Video frame to send + + Returns + ------- + None + """ + # Encode frame as JPEG: + _, buffer = cv2.imencode('.jpg', frame) + # Add identity to message: + message = (identity, buffer.tobytes()) + # Put message to queue: + queue.put(message) + +def queue_metadata_message( + identity: bytes, + queue: Queue, + action: bytes, + timestamp: float, + ) -> None: + """ + Queue metadata message to be sent to server. + + Parameters + ---------- + identity : bytes + Camera ID + queue : Queue + Queue to put message to + action : bytes + Action as bytes ('start' or 'stop') + data : float + Data string: this depends on action, for start/stop it's a timestamp, for rename it's old_name:new_name:timestamp + + Returns + ------- + None + """ + logger.debug("ClientVideo: %r at: %r", action, datetime.datetime.fromtimestamp( + bytes_to_timestamp(timestamp)).strftime(TIME_FORMAT)) + message = (identity, action, timestamp) + queue.put(message) + +def find_present_cameras() -> dict: + """ + Find currently present camera devices in /dev. + + Parameters + ---------- + None + + Returns + ------- + dict + Dictionary of present cameras with device names as keys and paths as values + """ + present_cameras = {} + for entry in os.listdir('/dev'): + if entry.startswith('video'): + present_cameras[entry] = os.path.join('/dev', entry) + return present_cameras + +def read_config(conf_file) -> ConfigParser: + """ + Read config file and return as dictionary. + The only required section is 'cameras' with at least one camera. + Do basic sanity checks and update global variables. + Log warnings if config file or sections/options are missing. + + Parameters + ---------- + conf_file : str + Path to config file + + Returns + ------- + ConfigParser + ConfigParser object with configuration + """ + cfg = ConfigParser() + cfg.read(conf_file) + cameras_dict = None + # Need to update logging info: + if 'main' in cfg: + global TESTING + try: + TESTING = cfg.getboolean('main', 'testing') + logger.info("Testing mode is set to: %r.", TESTING) + except ValueError as exc: + logger.error("Invalid value for 'testing' option in 'main' section: %r, using default: %r.", exc, TESTING) + + if 'logging' in cfg: + global LOGDIR, LOGFILE, LOGLEVEL + + LOGDIR = cfg.get('logging', 'logdir', fallback='.') + LOGFILE = cfg.get('logging', 'logfile', fallback=LOGFILE) + LOGLEVEL = cfg.get('logging', 'loglevel', fallback=LOGLEVEL) + logger.setLevel(LOGLEVEL) + for _handler in logger.handlers: + _handler.setLevel(LOGLEVEL) + if 'network' in cfg: + global ROUTER_ADDRESS, PORT, ADDRESS + if cfg.has_option('network', 'router_address'): + ROUTER_ADDRESS = cfg.get('network', 'router_address') + else: + logger.warning("No 'router_address' option in 'network' section, using default: %r.", ROUTER_ADDRESS) + if cfg.has_option('network', 'router_port'): + PORT = cfg.get('network', 'router_port') + else: + logger.warning("No 'router_port' option in 'network' section, using default: %r.", PORT) + ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}" + else: + logger.warning("No 'network' section in config file, using defaults: %r.", ADDRESS) + + if 'cameras' in cfg: + # Need to take care: camera names are stored as strings but comunicated as bytes: + cameras_dict = set_up_cameras(cfg.items('cameras')) # <-- dict with strings + if cameras_dict: + # Only update global CAMERAS if we have at least one valid camera with an address: + CAMERAS = str_to_bytes(cameras_dict) # <-- now bytes + logger.info("Using camera configuration updated from config file: %r", CAMERAS) + + if 'cameras' not in cfg or not cameras_dict: + # No cameras section or no valid cameras found: + logger.warning("No 'cameras' section in config file, using defaults: %r.", CAMERAS) + found_cameras = find_present_cameras() + for cam_name, cam_addr in found_cameras.items(): + CAMERAS[cam_name.encode('utf-8')] = {'address': cam_addr.encode('utf-8')} # <-- bytes + # Use default parameters: + cfg.set('cameras', f"{cam_name}.address", cam_addr) + cfg.set('cameras', f"{cam_name}.contour_size_threshold", str(CONTOUR_SIZE_THRESHOLD)) + cfg.set('cameras', f"{cam_name}.movement_grace_period", str(MOVEMENT_GRACE_PERIOD)) + logger.info("CAMERAS configuration: %r", CAMERAS) + with open(conf_file, 'w') as configfile: + cfg.write(configfile) + + return cfg + +def validate_camera_address(address: str) -> bool: + """ + Validate camera address. + + Parameters + ---------- + address : str + Camera address + + Returns + ------- + bool + True if address is valid, False otherwise + """ + # Check if address is an integer (device index): + if address.startswith('/dev/video'): + return address + raise ValueError("Invalid camera address: %r" % address) + +def validate_camera_threshold(threshold: str) -> bool: + """ + Validate camera contour size threshold. + + Parameters + ---------- + threshold : str + Contour size threshold + + Returns + ------- + bool + True if threshold is valid, False otherwise + """ + try: + if int(threshold) > 0: + return threshold + except ValueError: + raise ValueError("Invalid contour size threshold: %r" % threshold) + raise ValueError("Invalid contour size threshold: %r" % threshold) + +def validate_camera_grace_pd(grace_pd: str) -> bool: + """ + Validate camera movement grace period. + + Parameters + ---------- + grace_pd : str + Movement grace period + + Returns + ------- + bool + True if grace period is valid, False otherwise + """ + try: + if int(grace_pd) >= 0: + return grace_pd + except ValueError: + raise ValueError("Invalid movement grace period: %r" % grace_pd) + raise ValueError("Invalid movement grace period: %r" % grace_pd) + +def set_up_cameras(cameras_cfg_section: dict) -> None: + """ + Set up camera configuration from config file section. + Validate camera parameters and apply defaults if necessary. + Parameters + ---------- + cameras_section : dict + Dictionary of camera configuration from config file + Returns + ------- + cameras_dict : dict + Dictionary to store validated camera configuration + """ + # ConfigParser does not support nested sections. + # These shenanigans below allow for a nested config simulation with configparser + # and so per camera settings: + cameras_dict = defaultdict(dict) + for key, val in cameras_cfg_section: # <-- strings + try: + cam, param = key.split('.', 1) + except ValueError as exc: + logger.error( + "Invalid camera configuration entry: %r, error: %r, using defaults.", key, exc) + match param: + case 'address': + try: + cameras_dict[cam][param] = validate_camera_address(val) # <-- still strings + except ValueError as exc: + logger.error("Invalid camera address for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('address', '')) + continue + case 'contour_size_threshold': + try: + cameras_dict[cam][param] = validate_camera_threshold(val) # <-- still strings + except ValueError as exc: + logger.error("Invalid contour size threshold for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('contour_size_threshold', '')) + cameras_dict[cam][param] = str(CONTOUR_SIZE_THRESHOLD) + continue + case 'movement_grace_period': + try: + cameras_dict[cam][param] = validate_camera_grace_pd(val) # <-- still strings + except ValueError as exc: + logger.error("Invalid movement grace period for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('movement_grace_period', '')) + cameras_dict[cam][param] = str(MOVEMENT_GRACE_PERIOD) + continue + case _: + logger.warning("Unknown camera %r parameter: %r", cam, param) + + # Camera section needs to contain at least an address: + for key in list(cameras_dict.keys()): + if 'address' not in cameras_dict[key].keys(): + logger.error("Camera %r has no address configured, removing from configuration.", key) + del cameras_dict[key] + + return cameras_dict + +def signal_handler(sig, frame) -> None: + """ + Signal handler for graceful shutdown. + """ + logger.info("Received signal handler '%r', stopping client ...", sig) + # Set flag to stop VideoClient thread(s): + stop_event.set() + if client.is_alive(): + client.stop() + + +if __name__ == "__main__": + logger.info("Client starting ...") + + # Read configuration file before starting logging as it may contain logging settings: + cfg = read_config(CONFIG_FILE) + + if TESTING: + logger.info("Starting in testing mode with yappi profiling ...") + yappi.set_clock_type("cpu") + yappi.start(builtins=True, profile_threads=True) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + logger.info("Starting up ...") + + client = ClientTask(CLIENT_ID) + + client.start() + client.join() + + if TESTING: + yappi.stop() + if write_yappi_stats(yappi): + logger.error("Error writing yappi stats.") + + logger.info("Terminating ...") diff --git a/helpers.py b/helpers.py new file mode 100644 index 0000000..a40b060 --- /dev/null +++ b/helpers.py @@ -0,0 +1,256 @@ +#!/usr/bin/env python + +""" +Helper functions for zmq video messaging. +""" + +__author__ = "Franoosh Corporation" + + +import os +import io +import pstats +import logging +import subprocess +import cv2 +import struct +import datetime + +# Known directives for camera configuration: +DIRECTIVES = ( + 'modify_camera_name', + 'modify_camera_threshold', + 'modify_camera_grace_pd', + 'add_camera', + 'remove_camera', +) +MAX_CAMERA_NAME_LENGTH = 256 +TIME_FORMAT_STRING = '%Y-%m-%d %H:%M:%S.%f' +LOGDIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs') + +class CustomLoggingFormatter(logging.Formatter): + """Custom logging formatter""" + debug_fmt = 'DEBUG: %(filename)s:%(lineno)d %(asctime)s %(message)s' + info_fmt = 'INFO: %(asctime)s %(message)s' + warning_fmt = 'WARNING: %(asctime)s %(message)s' + error_fmt = 'ERROR: line: %(lineno)d, %(asctime)s %(message)s' + critical_fmt = 'CRITICAL: line: %(lineno)d, %(asctime)s %(message)s' + + def __init__(self): + super().__init__( + fmt="%(levelno)d: %s(asctime)s %(message)s", + datefmt=None, + ) + + def format(self, record): + orig_fmt = self._style._fmt + if record.levelno == logging.DEBUG: + self._style._fmt = CustomLoggingFormatter.debug_fmt + elif record.levelno == logging.INFO: + self._style._fmt = CustomLoggingFormatter.info_fmt + elif record.levelno == logging.WARNING: + self._style._fmt = CustomLoggingFormatter.warning_fmt + elif record.levelno == logging.ERROR: + self._style._fmt = CustomLoggingFormatter.error_fmt + elif record.levelno == logging.CRITICAL: + self._style._fmt = CustomLoggingFormatter.critical_fmt + + result = logging.Formatter.format(self, record) + self._style._fmt = orig_fmt + + return result + +def process_frame(frame, detect_width=320): + """Process frame for contour detection.""" + try: + height, width = frame.shape[:2] + if width > detect_width: + scaling_factor = detect_width / float(width) + small_frame = cv2.resize(frame, (detect_width, int(height * scaling_factor))) + else: + scaling_factor = 1.0 + small_frame = frame + # Convert to grayscale: + gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY) + # Apply Gaussian blur: + blurred = cv2.GaussianBlur(gray, (21, 21), 0) + except Exception as exc: + raise RuntimeError(f"Error processing frame: {exc}") + + return blurred, scaling_factor + + +def timestamp_to_bytes(timestamp): + """Convert timestamp to bytes.""" + return struct.pack('d', timestamp) + +def bytes_to_timestamp(byte_data): + """Convert bytes to timestamp.""" + return struct.unpack('d', byte_data)[0] + +def compute_contours(sample_frames): + """Compute contours between two frames""" + all_contours = [] + frame_0, frame_1 = sample_frames + frame_delta = cv2.absdiff(frame_0, frame_1) + threshold = cv2.threshold(frame_delta, 25, 255, cv2.THRESH_BINARY)[1] + threshold = cv2.dilate(threshold, None, iterations=2) + # contours, _ = cv2.findContours(threshold.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + contours, _ = cv2.findContours(threshold, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + all_contours.extend(contours) + + return all_contours + +def scale_contours(contours, scaling_factor): + """Scale contours by the given scaling factor.""" + if scaling_factor == 1.0: + return contours + + scaled_contours = [] + for contour in contours: + scaled_contour = (contour * (1.0 / scaling_factor)).astype(int) + scaled_contours.append(scaled_contour) + + return scaled_contours + +def draw_contours(frame, contours, min_contour_area=500): + """Draw contours on the frame.""" + for contour in contours: + if cv2.contourArea(contour) > min_contour_area: + (x, y, w, h) = cv2.boundingRect(contour) + cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2) + + return frame + +def detect_movement(contours, min_area=500): + """Detect movement based on contours found from frame diff.""" + for contour in contours: + if cv2.contourArea(contour) >= min_area: + return True + return False + +def get_available_cameras(): + """ + Get list of available camera devices. + At the moment it does not work. At all. It is useless. + """ + proc = subprocess.Popen(['v4l2-ctl', '--list-devices'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = proc.communicate() + candidate_devices = [i.strip() for i in stdout.decode('utf-8').strip().splitlines()[1:]] + verified_devices = [] + for device in candidate_devices: + cap = cv2.VideoCapture(device) + if cap.isOpened(): + verified_devices.append(device) + cap.release() + return verified_devices + +def bytes_to_str(obj): + """Recursively convert bytes to strings in dicts and lists.""" + if isinstance(obj, bytes): + return obj.decode('utf-8') + elif isinstance(obj, dict): + return {bytes_to_str(k): bytes_to_str(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [bytes_to_str(item) for item in obj] + else: + return obj + +def str_to_bytes(obj): + """Recursively convert strings to bytes in dicts and lists.""" + if isinstance(obj, str): + return obj.encode('utf-8') + elif isinstance(obj, dict): + return {str_to_bytes(k): str_to_bytes(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [str_to_bytes(item) for item in obj] + else: + return obj + +def write_yappi_stats(yappi_instance, logdir=LOGDIR) -> bool: + """ + Function writing yaapi stats to .pstat files and + converting stats to readable .txt files + + Parameters + ---------- + yappi_instance : yappi + Yappi instance + + Returns + ------- + bool + True on error, False on success + """ + logger = logging.getLogger(__name__) + threads = yappi_instance.get_thread_stats() + # combined text output: + combined_out = [] + + time_now = datetime.datetime.strftime(datetime.datetime.now(), TIME_FORMAT_STRING) + subdir = f"yaapi_{time_now}" + yaapi_dir = os.path.join(logdir, 'yaapi', subdir) + try: + os.makedirs(yaapi_dir) + except Exception as exc: + print("Couldn't create directory for yaapi stats: %r", exc) + return True + + logfile = os.path.join(yaapi_dir, "yaapi.log") + main_yaapi_png = os.path.join(yaapi_dir, "yaapi.png") + gprof2dot_cmd = ["gprof2dot", "-f", "pstats"] + dot_cmd = ["dot", "-Tpng"] + try: + for thread in threads: + func_stats = yappi_instance.get_func_stats(ctx_id=thread.id) + yappi_instance.get_func_stats().save('profile.callgrind', type="callgrind") # For possible future use + thread_filename_base = f"{logfile}.thread{thread.id}" + pstat_filename = f"{thread_filename_base}.pstat" + txt_filename = f"{thread_filename_base}.txt" + dot_filename = f"{thread_filename_base}.dot" + png_filename = f"{thread_filename_base}.png" + + # save pstat (can be opened with pstats or profiling tools) + func_stats.save(pstat_filename, type="pstat") + + # convert pstat to readable text using pstats + sio = io.StringIO() + ps = pstats.Stats(pstat_filename, stream=sio) + ps.sort_stats("tottime") + ps.print_stats() + + text = sio.getvalue() + combined_out.append(f"--- Thread {thread.id} ({thread.name}) ---\n{text}") + + # also write per-thread text file + with open(txt_filename, "w", encoding='utf-8') as f: + f.write(text) + + # generate call graph png using gprof2dot and dot + _gprof2dot_cmd = gprof2dot_cmd + [pstat_filename, "-o", dot_filename] + _dot_cmd = dot_cmd + [dot_filename, "-o", png_filename] + try: + gprof2dot_proc = subprocess.Popen(_gprof2dot_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + gprof2dot_stdout, gprof2dot_stderr = gprof2dot_proc.communicate() + if gprof2dot_proc.returncode != 0: + logger.error("Error generating gprof2dot for thread %r: %r", thread.id, gprof2dot_stderr.decode('utf-8')) + continue + except Exception as exc: + logger.error("Exception generating gprof2dot for thread %r: %r", thread.id, exc) + continue + try: + dot_proc = subprocess.Popen(_dot_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + gprof2dot_proc.stdout.close() # Allow gprof2dot_proc to receive a SIGPIPE if dot_proc exits. + out, err = dot_proc.communicate() + if dot_proc.returncode != 0: + logger.error("Error generating call graph PNG for thread %r: %r", thread.id, err.decode('utf-8')) + except Exception as exc: + logger.error("Exception generating call graph PNG for thread %r: %r", thread.id, exc) + + # write combined text file + with open('.'.join((logfile, "txt")), "w", encoding='utf-8') as f: + f.write("\n\n".join(combined_out)) + except Exception as exc: + return True + + return False
\ No newline at end of file diff --git a/router.py b/router.py new file mode 100644 index 0000000..ce733b9 --- /dev/null +++ b/router.py @@ -0,0 +1,266 @@ +#!/usr/bin/env python + +""" +A module containing zeromq router for video streaming +from a client and sending control messages from server. +""" + +__author__ = "Franoosh Corporation" + +import sys +import os +import signal +import logging +import random +from collections import defaultdict, deque +import zmq + +from helpers import CustomLoggingFormatter + +# TODO: add configparser + +IP_FRONTEND = '127.0.0.1' +IP_BACKEND = '127.0.0.1' + +PORT_FRONTEND = "5569" # This is a client (producer) +PORT_BACKEND = "9979" + +FRONTEND_ADDR = f"tcp://{IP_FRONTEND}:{PORT_FRONTEND}" # This is a client (producer) +BACKEND_ADDR = f"tcp://{IP_BACKEND}:{PORT_BACKEND}" + +LOGDID = 'logs' +if not os.path.exists(LOGDID): + try: + os.makedirs(LOGDID) + except Exception as exc: + print(f"Could not create log directory: {exc}") + sys.exit() +BASENAME = f"{os.path.splitext(os.path.basename(__file__))[0]}" +LOGFILE = os.path.join(LOGDID, f"{BASENAME}.log") +LOGLEVEL = logging.INFO + +log_formatter = CustomLoggingFormatter() +handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') +handler.setFormatter(log_formatter) +logging.root.addHandler(handler) +logging.root.setLevel(LOGLEVEL) +logger = logging.getLogger(__name__) +logging.basicConfig( + filename=LOGFILE, + datefmt='%Y-%m-%d %I:%M:%S', + level=LOGLEVEL, +) + + +RUNNING = True + +def custom_proxy() -> None: + """ + A custom zmq proxy to route messages between clients and workers. + + Parameters + ---------- + None + + Returns + ------- + None + """ + context = zmq.Context.instance() + poller = zmq.Poller() + + # Set up frontend: + frontend_socket = context.socket(zmq.ROUTER) + frontend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) + frontend_socket.bind(FRONTEND_ADDR) + poller.register(frontend_socket, zmq.POLLIN) + + # Set up backend: + backend_socket = context.socket(zmq.ROUTER) + backend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) + backend_socket.bind(BACKEND_ADDR) + poller.register(backend_socket, zmq.POLLIN) + + awailable_workers = [] + client_worker_dict = {} + # There are three pending messages dictionaries: + # 1. pending_messages_no_worker[client_id] - messages that are pending for a client + # 2. pending_messages_client_worker[client_id][worker_id] - messages that are pending for a client with a specific worker + # 3. pending_messages_worker_client[worker_id][client_id] - messages that are pending for a worker + # To assure delivey of the entire video we need to store messages from start to end until they are delivered + # to the client. + # If a worker is not available for certain amount of time, we will reassign it to another worker. + # For now, there is only one worker. Remember to implement this later. + # Complex case: half delivered video to the worker, worker becomes unavailable and we need to reassign it to another worker. + + pending_messages_no_worker = defaultdict(deque) # Messages that are pending for a client with no worker assigned + pending_messages_client_worker = defaultdict(lambda: defaultdict(deque)) + pending_messages_worker_client = defaultdict(lambda: defaultdict(deque)) + + while RUNNING: + # Poll both frontend and backend sockets: + events = dict(poller.poll(10)) + # If message from client: + if frontend_socket in events: + # Receive message from client: + msg = frontend_socket.recv_multipart() + logger.debug("Received message.") + client_id, content = msg[0], msg[1:] + # Check if client is already connected to worker: + try: + # Get worker ID for this client from the client-worker dictionary: + worker_id = client_worker_dict[client_id] + # Check if there are any pending messages for this worker and send them first: + while pending_messages_client_worker[client_id][worker_id]: + # In order to take a peek at the size of all messages, perhaps check out: + # https://web.archive.org/web/20240804164028/https://code.activestate.com/recipes/546530/ + logger.debug( + "There are '%d' pending messages from client %r for worker %r", + len(pending_messages_client_worker[client_id][worker_id]), + client_id, + worker_id, + ) + pending_msg = pending_messages_client_worker[client_id][worker_id].pop() + try: + logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id) + backend_socket.send_multipart([worker_id, client_id, *pending_msg]) + except Exception as e: + logger.error("Failed to send pending message to worker: %r, error: %r", worker_id, e) + # Re-queue the message: + pending_messages_client_worker[client_id][worker_id].append(pending_msg) + # Break to avoid infinite loop: + break + # If there are still pending messages for this worker: + if pending_messages_client_worker[client_id][worker_id]: + # Store message for later delivery: + pending_messages_client_worker[client_id][worker_id].appendleft(content) + # At last send new message to worker: + else: + try: + logger.debug("No more pending messages for client %r, sending new message to worker %r.", client_id, worker_id) + backend_socket.send_multipart([worker_id, client_id, *content]) + except Exception as e: + logger.error("Failed to send message to backend, error: %s", e) + # Store message for later delivery: + # pending_messages_client_worker[client_id][worker_id].appendleft(msg) <- FIXME: this is the bug, no? + pending_messages_client_worker[client_id][worker_id].appendleft(content) + # If client is not connected to any worker: + except KeyError: + logger.info("Received ID from client: %r.", client_id) + # Check if there are available workers. + if awailable_workers: + # Assign random worker to client: + worker_id = random.choice(awailable_workers) + # Update client-worker dictionary: + client_worker_dict[client_id] = worker_id + # Check if there are any pending messages for this client: + if pending_messages_no_worker[client_id]: + # Move messages to pending messages for this worker: + pending_messages_client_worker[client_id][worker_id].extendleft(msg) + logger.debug("Moved pending messages for client '%r' to worker '%r'", client_id, worker_id) + # Check if there are any pending messages for this worker: + while pending_messages_client_worker[client_id][worker_id]: + pending_msg = pending_messages_client_worker[client_id][worker_id].pop() + try: + logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id) + backend_socket.send_multipart([worker_id, client_id, *pending_msg]) + except Exception as e: + logger.error("Failed to send pending message to worker: %r. Error: %r", worker_id, e) + # Re-queue the message: + pending_messages_client_worker[client_id][worker_id].append(pending_msg) + break + # If there are still pending messages for this worker: + if pending_messages_client_worker[client_id][worker_id]: + logger.debug("There are still pending messages for client %r and worker %r, storing message for later delivery.", client_id, worker_id) + # Store message for later delivery: + pending_messages_client_worker[client_id][worker_id].appendleft(content) + else: + logger.debug("No more pending messages for client %r, sending new message to worker %r.", client_id, worker_id) + try: + logger.debug("No more pending messages for client %r, sending new message to worker %r.", client_id, worker_id) + # At last, send the message to worker: + backend_socket.send_multipart([worker_id, client_id, *content]) + except Exception as e: + logger.error("Failed to send message for client %r to backend, requeing. Error: %r", client_id, e) + # Store message for later delivery: + pending_messages_client_worker[client_id][worker_id].appendleft(msg) + + else: + # Store message for later delivery: + pending_messages_no_worker[client_id].append(content) + logger.debug("No available workers, storing client %r with no worker assigned", client_id) + + if backend_socket in events: + _msg = backend_socket.recv_multipart() + # These messages should be safe to log as they are short strings or bytes. + logger.debug("Received from worker: %r", _msg) + if len(_msg) == 2: # Worker is sending its identity: + worker_id, msg = _msg[0], _msg[1:] + logger.debug("Received ID from worker: %r, message: %r", worker_id, msg) + logger.info("Received ID from worker: %r.", worker_id) + # Add worker to available workers list: + awailable_workers.append(worker_id) + for client, worker in client_worker_dict.items(): + # If client has no worker assigned, assign it a new worker: + if worker is None: + client_worker_dict[client] = worker + logger.debug("Assigning worker %r to client %r", worker, client) + if pending_messages_no_worker: + # If there are pending messages for clients with no worker assigned: + for client_id, messages in pending_messages_no_worker.items(): + if messages: + # Move messages to pending messages for this worker: + pending_messages_client_worker[client_id][worker_id].extendleft(messages) + # Clear pending messages for this client: + pending_messages_no_worker[client_id].clear() + logger.debug("Moved pending messages for client %r to worker %r", client_id, worker_id) + try: + # Get the first message for this client and worker: + pending_msg = pending_messages_client_worker[client_id][worker_id].pop() + logger.debug("Sending pending message to worker %r for client '%r'", worker_id, client_id) + # Send the message to worker: + backend_socket.send_multipart([worker_id, client_id, *pending_msg]) + except Exception as e: + pending_messages_client_worker[client_id][worker_id].append(pending_msg) + logger.error("Failed to send pending message to worker: %r. Error: %r", worker_id, e) + + else: # Worker is sending a message to client: + # worker_id, client_id, msg = _msg[0], _msg[1], _msg[2:] + worker_id, client_id, *msg = _msg + logger.debug("Received message from worker %r for client %r: %r", worker_id, client_id, msg) + # First check if there are any pending messages for this worker: + while pending_messages_worker_client[worker_id][client_id]: + pending_msg = pending_messages_worker_client[worker_id][client_id].pop() + try: + logger.debug("Sending pending message %r from %r to client %r", pending_msg, client_id, worker_id) + backend_socket.send_multipart([client_id, *pending_msg]) + except Exception as e: + # It is safe to log the message content as it is a short string or bytes. + logger.error("Could not deliver pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e) + # Re-queue the message: + pending_messages_worker_client[worker_id][client_id].append(pending_msg) + # Break to avoid infinite loop: + break + # If no pending messages for this client, send new message to the client : + if not pending_messages_worker_client[worker_id][client_id]: + logger.debug("Sending message %r to client %r", msg, client_id) + try: + frontend_socket.send_multipart([client_id, *msg]) + logger.debug("Sent message %r to client %r", msg, client_id) + except Exception as e: + pending_messages_worker_client[worker_id][client_id].appendleft(msg) + logger.error("Could not deliver message: %r from worker: %r to client: %r", msg, worker_id, client_id) + +def signal_handler(sig, frame) -> None: + global RUNNING + logger.info("Received signal handler %r, stopping ...", signal.Signals(sig).name) + RUNNING = False + + +if __name__ == "__main__": + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + logger.info("Starting up ...") + + custom_proxy() diff --git a/static/css/main.css b/static/css/main.css new file mode 100644 index 0000000..af1f40f --- /dev/null +++ b/static/css/main.css @@ -0,0 +1,36 @@ +.streams-container { + display: flex; + flex-wrap: wrap; + gap: 16px; + /* justify-content: center; */ +} +.camera-stream { + flex: 0 1 320px; + margin: 10px; + /* text-align: center; */ +} +.scroll-box { + flex: 0 1 300px; + max-height: 200px; + overflow-y: auto; + border: 1px solid #ccc; + padding: 5px; +} + +@media (max-width: 600px) { +.streams-container { + flex-direction: column; + align-items: center; +} +.camera-stream { + width: 100%; + max-width: 100vw; +} +.mob-scroll-box { + width: 100px; + height: 150px; + overflow: scroll; + border: 1px solid #ccc; + padding: 10px; +} +}
\ No newline at end of file diff --git a/templates/client.html b/templates/client.html new file mode 100644 index 0000000..785fbcf --- /dev/null +++ b/templates/client.html @@ -0,0 +1,173 @@ +<!DOCTYPE html> +<html> +<head> + <title>Client {{ client_id }} - Camera Streams</title> + <link rel="stylesheet" href="{{ url_for('static', path='css/main.css') }}"> + <style> + .camera-stream { display: inline-block; margin: 10px; } + video { border: 1px solid #333; } + </style> +</head> +<body> + <h1>Camera Streams for client: {{ client_id }}</h1> + <div class="streams-container"> + {% for camera_id in camera_ids %} + <div class="camera-stream"> + <h2>Camera: {{ camera_id }}</h2> + <img + id="video-{{ camera_id }}" + width="640" + height="480" /> + <div> + <h3>Modify Camera Name</h3> + <input + type="text" + id="new-name-{{ camera_id }}" + placeholder="New Name" + required pattern= "^\S+$" + title="No whitespace allowed."> + <button onclick="sendConfig('{{ camera_id }}', 'modify_camera_name')">Send</button> + </div> + <div> + <h3>Modify Camera Threshold</h3> + <input + type="number" + id="threshold-value-{{ camera_id }}" + placeholder="Threshold" + required step="1" + min="0" + inputmode="numeric" + title="Enter a non-negative integer."> + <button onclick="sendConfig('{{ camera_id }}', 'modify_camera_threshold')">Send</button> + </div> + <div> + <h3>Modify Camera Grace Period</h3> + <input + type="number" + id="grace-value-{{ camera_id }}" + placeholder="Grace Period" + required step="1" + min="0" + inputmode="numeric" + title="Enter a non-negative integer."> + <button onclick="sendConfig('{{ camera_id }}', 'modify_camera_grace_pd')">Send</button> + </div> + <div> + <h3>Recorded Videos for {{ camera_id }}</h3> + <ul id="video-list-{{ camera_id }}" class="scroll-box"> + {% for filename, video_url, timestamp in client_videos[camera_id] %} + <li><a href="{{ video_url }}">{{ filename }}</a> ({{ timestamp }})</li> + <!-- <h3>video file: {{ filename }}</h3> + <video src="{{ video_url }}" type="video/ogg" width="320" height="240" controls></video> + <hr4>creation: {{ timestamp }}</h3> + --> + {% endfor %} + </ul> + </div> + </div> + {% endfor %} +<!-- + <div> + <h3>Add Camera</h3> + <input type="text" id="add-name" placeholder="Camera Name"> + <input type="text" id="add-address" placeholder="Address"> + <button onclick="sendConfig('add_camera')">Send</button> + </div> +--> + <div> + <h3>Remove Camera</h3> + <input + type="text" + id="remove-name" + placeholder="Camera Name" + required pattern="^\S+$" + title="No whitespace allowed."> + <button onclick="sendConfig('remove_camera')">Send</button> + </div> + </div> + <script> + const wsMap = {}; + {% for camera_id in camera_ids %} + // For each camera, open a WebSocket and update the corresponding <img> + (function() { + const cameraId = '{{ camera_id }}'; + const ws = new WebSocket('ws://' + window.location.host + '/ws/{{ client_id }}/' + cameraId); + let currentUrl = null; + ws.onmessage = function(event) { + let image = document.getElementById('video-' + cameraId); + if (currentUrl) { + URL.revokeObjectURL(currentUrl); + } + currentUrl = URL.createObjectURL(event.data); + image.src = currentUrl; + }; + ws.onclose = function(event) { + console.log('WebSocket closed for camera ' + cameraId + ':', event); + }; + ws.onerror = function(event) { + console.log('WebSocket error for camera ' + cameraId + ':', event); + }; + window.addEventListener('beforeunload', function() { + ws.close(); + }); + wsMap[cameraId] = ws; + })(); + // FIXME: Move to a separate function + // For each camera, open a WebSocket to receive notificationf about new videos + // (function() { + // const cameraId = '{{ camera_id }}'; + // const wsVideos = new WebSocket('ws://' + window.location.host + '/ws/{{ client_id }}/' + cameraId + '/videos'); + // wsVideos.onmessage = function(event) { + // let videoFilename = event.data; + // console.log('New video available for camera ' + cameraId + ': ' + videoFilename); + // // On new video notification, + // }; + // })(); + {% endfor %} + + function sendConfig(cameraId, type) { + let msg = {}; + switch(type) { + case 'modify_camera_name': + msg[type] = [ + document.getElementById('new-name-' + cameraId).value + ]; + break; + case 'modify_camera_threshold': + msg[type] = [ + document.getElementById('threshold-value-' + cameraId).value + ]; + break; + case 'modify_camera_grace_pd': + msg[type] = [ + parseInt(document.getElementById('grace-value-' + cameraId).value) + ]; + break; + case 'modify_camera_address': + msg[type] = [ + document.getElementById('address-value-' + cameraId).value + ]; + break; + // case 'add_camera': + // msg[type] = [ + // document.getElementById('add-name').value, + // document.getElementById('add-address').value + // ]; + // break; + case 'remove_camera': + msg[type] = [ + document.getElementById('remove-name').value + ]; + break; + } + const ws = wsMap[cameraId] && wsMap[cameraId] ? wsMap[cameraId] : Object.values(wsMap)[0]; + if (ws && ws.readyState === WebSocket.OPEN) { + console.log("Sending message:", msg, "on ws:", ws); + ws.send(JSON.stringify(msg)); + } else { + alert('WebSocket is not open for camera ' + cameraId); + } + } + </script> +</body> +</html> diff --git a/templates/main.html b/templates/main.html new file mode 100644 index 0000000..e0eaa52 --- /dev/null +++ b/templates/main.html @@ -0,0 +1,16 @@ +<!DOCTYPE html> +<html> +<head> + <title>Live Streaming</title> +</head> +<body> + <h1>Streaming live.</h1> + {% for client_id in clients.keys() %} + <h2>Client {{ client_id }}</h2> + <li> + <a href="/clients/{{ client_id }}"> Client {{ client_id }} streams + </a> + <li> + {% endfor %} +</body> +</html> diff --git a/webserver.py b/webserver.py new file mode 100644 index 0000000..41d1d30 --- /dev/null +++ b/webserver.py @@ -0,0 +1,524 @@ +#!/usr/bin/env python +""" +Module serving video from zmq to a webserver. +""" +__author__ = "Franoosh Corporation" + +import os +from collections import defaultdict +import json +import logging +import asyncio +import datetime +from contextlib import asynccontextmanager +import zmq +import zmq.asyncio +import uvicorn +from fastapi import ( + FastAPI, + Request, + HTTPException, + WebSocket, + WebSocketDisconnect, + templating, +) +from fastapi.responses import HTMLResponse, FileResponse +from fastapi.staticfiles import StaticFiles +from typing import AsyncGenerator + +from helpers import ( + CustomLoggingFormatter, + DIRECTIVES, + MAX_CAMERA_NAME_LENGTH, +) + + +CWD = os.getcwd() +CLIENTS_JSON_FILE = os.path.join(CWD, 'clients.json') +CLIENTS_DICT = defaultdict(list) # CLIENTS_DICT[client_id] = [camera_id1, camera_id2, ...] +BASENAME = f"{os.path.splitext(os.path.basename(__file__))[0]}" +LOGDIR = 'logs' +LOGFILE = os.path.join(LOGDIR, f"{BASENAME}.log") +LOGLEVEL = logging.INFO +# LOGLEVEL = logging.DEBUG +# This should come from the config file: +VIDEO_DIR = os.path.join(CWD, "videos") + +HOST = "127.0.0.1" +ZMQ_PORT = "9979" +WEB_PORT = "8008" +CTRL_BACKEND_ADDR = f"tcp://{HOST}:{ZMQ_PORT}" +WEB_BACKEND_ADDR = f"tcp://{HOST}:{WEB_PORT}" +TIME_FORMAT = '%Y_%m_%d-%H_%M_%S' + +log_formatter = CustomLoggingFormatter() +handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') +handler.setFormatter(log_formatter) +logging.root.addHandler(handler) +logging.root.setLevel(LOGLEVEL) +logger = logging.getLogger(__name__) +logging.basicConfig( + filename=LOGFILE, + datefmt='%Y-%m-%d %I:%M:%S', + level=LOGLEVEL, +) + +# Track websocket connections by (client_id, camera_id): +ws_connections = defaultdict(dict) # ws_connections[client_id][camera_id] = websocket +ws_queues = defaultdict(dict) +ctrl_msg_que = asyncio.Queue() +# Create ZMQ context and socket: +zmq_context = zmq.asyncio.Context() +zmq_socket = zmq_context.socket(zmq.DEALER) +# Connect to ZMQ backend: +zmq_socket.connect(WEB_BACKEND_ADDR) + + +async def zmq_bridge(): + """ + Bridge between ZMQ backend and websocket clients. + + Parameters + ---------- + None + + Returns + ------- + None + """ + poll = zmq.asyncio.Poller() + poll.register(zmq_socket, zmq.POLLIN) + while True: + try: + sockets = dict(await poll.poll(100)) + if zmq_socket in sockets: + logger.debug("Receiving message from ZMQ backend ...") + topic, frame_data = await zmq_socket.recv_multipart() + # messages can also now contain directives, like 'new_camera' and 'removed_camera' with empty frame_data + # Send those to a control queue to be handled appropriately + client_id, camera_id = topic.decode('utf-8').split(':', 1) # <-- strings from now on + # client_id, camera_id = topic.split(b':', 1) + if not camera_id in CLIENTS_DICT[client_id]: + CLIENTS_DICT[client_id].append(camera_id) + queue = ws_queues.get(client_id, {}).get(camera_id) + if queue: + if queue.full(): + logger.debug( + "WebsSocket queue full for '/ws/%r/%r', discarding oldest frame.", + client_id, + camera_id + ) + _ = queue.get_nowait() # Discard oldest frame + await queue.put(frame_data) + else: + logger.debug("Creating new websocket queue for '/ws/%r/%r'.", client_id, camera_id) + ws_queues[client_id][camera_id] = asyncio.Queue(maxsize=10) + if not ctrl_msg_que.empty(): + client_id, camera_id, command, args_list = None, None, None, None + try: + client_id, camera_id, command, args_list = await ctrl_msg_que.get() + except ValueError as ve: + logger.error("Invalid control message on queue: %r", ve) + if client_id and camera_id and command and args_list is not None: + try: + zmq_socket.send_multipart([ + client_id, + camera_id, + command, + ] + args_list, flags=zmq.NOBLOCK) + logger.info("Sent control command.") + except zmq.Again: + logger.error( + "ZMQ socket busy, could not send control command '%r' with args: %r for '/ws/%r/%r' to backend.", + command, + args_list, + client_id, + camera_id + ) + except Exception as e: + logger.error("Error in ZMQ bridge: %r", e) + +@asynccontextmanager +async def lifespan(app: FastAPI) -> AsyncGenerator: + """ + Create lifespan context for FastAPI app. + + Parameters + ---------- + app : FastAPI + The FastAPI application instance. + + Returns + ------- + AsyncGenerator + Yields control to the application context. + """ + asyncio.create_task(zmq_bridge()) + yield + +_app = FastAPI(lifespan=lifespan) +_app.mount("/static", StaticFiles(directory="static"), name="static") +templates = templating.Jinja2Templates(directory='templates') + +@_app.get("/") +async def main_route(request: Request) -> HTMLResponse: + """ + Main route serving the index page. + + Parameters + ---------- + request : Request + The incoming HTTP request. + """ + logger.debug("Main route visited") + return templates.TemplateResponse( + "main.html", + { + "request": request, + "clients": CLIENTS_DICT, + } + ) + +@_app.get("/clients/{client_id}", response_class=HTMLResponse) +async def client_route(request: Request, client_id: str) -> HTMLResponse: + """ + Serve a particular client page. + + Parameters + ---------- + request : Request + The incoming HTTP request. + client_id : str + The client ID to serve. + """ + logger.debug("Checking client_id: %s in clients_dict: %r.", client_id, CLIENTS_DICT) + if not client_id in CLIENTS_DICT: + raise HTTPException(status_code=404, detail="No such client ID.") + return templates.TemplateResponse( + "client.html", + { + "request": request, + "client_id": client_id, + "camera_ids": CLIENTS_DICT[client_id], + "client_videos": list_video_files(client_id), + }, + ) + +@_app.websocket("/ws/{client_id}/{camera_id}") +async def camera_route(websocket: WebSocket, client_id: str, camera_id: str) -> None: + """ + Serve a particular camera page. + + Parameters + ---------- + websocket : WebSocket + The incoming WebSocket connection. + client_id : str + The client ID. + camera_id : str + The camera ID. + """ + logger.info("Accepting websocket connection for '/ws/%s/%s'.", client_id, camera_id) + await websocket.accept() + ws_connections[client_id][camera_id] = {'ws': websocket} + queue = ws_queues[client_id][camera_id] + + async def send_frames(): + while True: + logger.debug("Getting from queue frame for '/ws/%s/%s'.", client_id, camera_id) + try: + # Don't wait indefinitely to allow checking for client disconnection: + frame_data = queue.get_nowait() + except asyncio.QueueEmpty: + await asyncio.sleep(0.1) + continue + try: + logger.debug("Sending frame to '/ws/%s/%s'.", client_id, camera_id) + await websocket.send_bytes(frame_data) + logger.debug("Sent frame to '/ws/%s/%s'.", client_id, camera_id) + # This exception is raised when client disconnects: + except Exception as exc: + logger.warning("Error sending frame to '/ws/%s/%s': %r", client_id, camera_id, exc) + ws_connections[client_id].pop(camera_id, None) + break + + async def receive_control(): + while True: + try: + data = await websocket.receive_text() + logger.info("Received control message from '/ws/%s/%s': %s", client_id, camera_id, data) + # Handle control messages from the client: + frontend_message = json.loads(data) + for command, args in frontend_message.items(): + if validate_directive(command, args, client_id, camera_id): + args_list = [str(arg).encode('utf-8') for arg in args] + ctrl_msg_que.put_nowait((client_id, camera_id, command, args_list)) + logger.info( + "Put control command '%s' with args: %r for '/ws/%s/%s' on queue to backend.", + command, + args_list, + client_id, + camera_id + ) + except json.JSONDecodeError: + logger.warning("Received invalid JSON from '/ws/%s/%s': %s", client_id, camera_id, data) + except WebSocketDisconnect: + logger.info("WebSocket disconnected for '/ws/%s/%s'.", client_id, camera_id) + ws_connections[client_id].pop(camera_id, None) + break + except Exception as exc: + logger.warning("Error receiving control message: %r", exc) + + send_task = asyncio.create_task(send_frames()) + receive_task = asyncio.create_task(receive_control()) + try: + await asyncio.gather(send_task, receive_task) + finally: + send_task.cancel() + receive_task.cancel() + ws_connections[client_id].pop(camera_id, None) + ws_queues[client_id].pop(camera_id, None) + logger.info("Cleaned up websocket connection for '/ws/%s/%s'.", client_id, camera_id) + +@_app.get("/videos/{client_id}/{camera_id}/{filename}") +async def serve_video(client_id: str, camera_id: str, filename: str) -> FileResponse: + """ + Route serving video files. + + Parameters + ---------- + client_id : str + The client ID. + camera_id : str + The camera ID. + filename : str + The video filename. + """ + video_path = os.path.join(VIDEO_DIR, client_id, camera_id, filename) + + if not os.path.exists(video_path): + raise HTTPException(status_code=404, detail="Video not found") + + logger.debug("Serving video file: '%s', video_path") + + return FileResponse(video_path, media_type="video/ogg") + +def list_video_files(client_id: str) -> dict: + """ + Return a dictionary of recorded video files for a given client. + {<camera_id>: [(<filename>, <file url>, <date>)]} + + Parameters + ---------- + client_id : str + The client ID. + + Returns + ------- + dict + Dictionary of recorded video files for the client. + """ + client_videos = {} + client_dir = os.path.join(VIDEO_DIR, client_id) + if os.path.exists(client_dir): + for (_, dirnames, _) in os.walk(client_dir): + for camera_dir in dirnames: + client_videos[camera_dir] = [] + camera_path = os.path.join(client_dir, camera_dir) + for (_, _, filenames) in os.walk(camera_path): + for filename in filenames: + try: + filepath = os.path.join(VIDEO_DIR, camera_path, filename) + timestamp = datetime.datetime.strftime( + datetime.datetime.fromtimestamp( + os.path.getmtime(filepath) + ), + TIME_FORMAT) + except Exception as exc: + logger.warning("Could not determine creation time for file %r: %r", filename, exc) + timestamp = 'unknown' + video_url = f"/videos/{client_id}/{camera_dir}/{filename}" + client_videos[camera_dir].append((filename, video_url, timestamp)) + return client_videos + +def validate_directive(command: str, args: list, client_id: str, camera_id: str) -> bool: + """ + Validate if the directive is known. + + Parameters + ---------- + command : str + The directive command. + args : list + The arguments for the directive. + client_id : str + The client ID. + camera_id : str + The camera ID. + + Returns + ------- + bool + True if the directive is valid, False otherwise. + """ + retval = False + # First, check if command is known: + if command not in DIRECTIVES: + logger.warning("Unknown directive received: %s", command) + return retval + if len(args) != 1: + logger.error("Directive '%s' requires exactly one argument.", command) + return retval + # Validate arguments passed from the frontend: + match command: + case 'modify_camera_name': + retval = validate_camera_name(args, client_id, camera_id) + case 'modify_camera_threshold': + retval = validate_camera_threshold(args) + case 'modify_camera_grace_pd': + retval = validate_camera_grace_period(args) + case 'modify_camera_address': + retval = validate_camera_address(args) + # case 'add_camera': + # retval = validate_add_camera(args, client_id) + case 'remove_camera': + retval = True # No specific validation needed + return retval + +def validate_camera_name(name, client_id, camera_id) -> bool: + """ + Validate camera name to avoid problematic characters and whitespace. + + Parameters + ---------- + name : str + The new camera name. + client_id : str + The client ID. + camera_id : str + The old camera ID. + + Returns + ------- + bool + True if the camera name is valid, False otherwise. + """ + retval = True + new_name = name # TODO: check if sending old name is needed if we have camera_id + old_name = camera_id + if old_name not in CLIENTS_DICT[client_id]: + logger.error("Old camera name does not exist.") + retval = False + if new_name in CLIENTS_DICT[client_id]: + logger.error("New camera name already exists.") + retval = False + if not new_name or any(char in new_name for char in ':') or any(char.isspace() for char in new_name): + logger.error("Camera name cannot be empty or contain spaces or colons.") + retval = False + if len(new_name) > MAX_CAMERA_NAME_LENGTH: + logger.error("Camera name is too long (max %s characters).", MAX_CAMERA_NAME_LENGTH) + retval = False + return retval + +def validate_camera_threshold(threshold) -> bool: + """ + Validate contour size threshold. + + Parameters + ---------- + threshold : str + The contour size threshold. + + Returns + ------- + bool + True if the contour size threshold is valid, False otherwise. + """ + retval = True + try: + value = int(threshold) + if value <= 0: + logger.error("Contour size threshold must be positive: %r.", threshold) + retval = False + except ValueError as exc: + logger.error("Invalid contour size threshold: %r, exception: %r", threshold, exc) + retval = False + return retval + +def validate_camera_grace_period(grace_period) -> bool: + """ + Validate movement grace period. + + Parameters + ---------- + grace_period : str + The movement grace period. + + Returns + ------- + bool + True if the movement grace period is valid, False otherwise. + """ + retval = True + try: + value = int(grace_period) + if value < 0: + logger.error("Movement grace period cannot be negative: %r.", grace_period) + retval = False + except ValueError as exc: + logger.error("Invalid movement grace period: %r, exception: ", grace_period, exc) + retval = False + return retval + +def validate_camera_address(address) -> bool: + """ + Validate camera address. + + Parameters + ---------- + address : str + The camera address. + + Returns + ------- + bool + True if the camera address is valid, False otherwise. + """ + retval = True + if not address or any(char.isspace() for char in address): + logger.error("Camera address %r cannot be empty.", address) + retval = False + if not address.startswith('/dev/video'): + logger.error("Camera address must start with '/dev/video': %r.", address) + retval = False + return retval + +def validate_remove_camera(name, client_id) -> bool: + """ + Validate remove camera directive. + + Parameters + ---------- + name : str + The camera name to remove. + client_id : str + The client ID. + + Returns + ------- + bool + True if the camera name is valid, False otherwise. + """ + retval = True + if name not in CLIENTS_DICT[client_id]: + logger.error("Camera name %r does not exist.", client_id) + retval = False + return retval + +if __name__ == "__main__": + uvicorn.run( + _app, + port=8007, + host='127.0.0.1', + log_level='info', + ) diff --git a/worker.py b/worker.py new file mode 100644 index 0000000..26ea8aa --- /dev/null +++ b/worker.py @@ -0,0 +1,628 @@ +#!/usr/bin/env python +""" +A module consisting of a zeromq worker receiving video stream +via router from clients. Able to send control messages to clients. +Needs libzmq and pyzmq with 'drafts' support. +""" + +import os +import sys +import struct +import datetime +from threading import Thread, Event +import time +import signal +import logging +from queue import Queue +from collections import defaultdict +import tempfile +import json +import zmq +import cv2 +import numpy + +from helpers import ( + CustomLoggingFormatter, + bytes_to_str, + TIME_FORMAT_STRING, +) + +__author__ = "Franoosh Corporation" + +# Various constants: # TODO: put them in a config + +HOST = "127.0.0.1" +ZMQPORT = "9979" +# WSPORT = "8000" +WSPORT = "8008" +ZMQ_BACKEND_ADDR = f"tcp://{HOST}:{ZMQPORT}" +WEB_BACKEND_ADDR = f"tcp://{HOST}:{WSPORT}" +# LOGLEVEL = logging.DEBUG +LOGLEVEL = logging.INFO + +# File paths: + +LOGDIR = 'logs' +if not os.path.exists(LOGDIR): + try: + os.makedirs(LOGDIR) + except Exception as exc: + print(f"Could not create log directory: {exc}") + sys.exit() +BASENAME = f"{os.path.splitext(os.path.basename(__file__))[0]}" +LOGFILE = os.path.join(LOGDIR, f"{BASENAME}.log") + +CWD = os.getcwd() +TMP_DIR = os.path.join(CWD, "tmp") +CLIENTS_JSON_FILE = os.path.join(CWD, 'clients.json') +# This should come from the config file: +VIDEO_DIR = os.path.join(CWD, "videos") + +# Other: + +CLIENTS_DICT = {} + +stop_event = Event() + +log_formatter = CustomLoggingFormatter() +handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') +handler.setFormatter(log_formatter) +logging.root.addHandler(handler) +logging.root.setLevel(LOGLEVEL) +logger = logging.getLogger(__name__) +logging.basicConfig( + filename=LOGFILE, + datefmt='%Y-%m-%d %I:%M:%S', + level=LOGLEVEL, +) + + +if not os.path.exists(TMP_DIR): + try: + os.makedirs(TMP_DIR) + except Exception as exc: + logger.error("Could not create temporary directory: %r", exc) + sys.exit() + +class MonitorTask(Thread): + """ + Monitor the connection status of the zmq socket. + + Parameters + ---------- + socket : zmq.Socket + The zmq socket to monitor. + + Methods + ------- + run() + Monitor connection on the initial socket. + stop() + Stop thread + """ + def __init__(self, socket) -> None: + """ + Docstring for __init__ + + Parameters + ---------- + socket : zmq.Socket + The zmq socket to monitor. + """ + super().__init__(daemon=True) + self.socket = socket + self.running = True + + def run(self) -> None: + """ + Monitor connection on the initial socket. + This is heartbeat monitoring + + Parameters + ---------- + None + + Returns + ------- + None + """ + monitor = self.socket.get_monitor_socket() + monitor.setsockopt(zmq.RCVTIMEO, 5000) + logger.debug("Monitor socket started.") + while self.running: + try: + event, _ = monitor.recv_multipart() + # Resolve the received event type: + event_type = int.from_bytes(event[:2], "little") + if event_type in (zmq.EVENT_CLOSED, zmq.EVENT_DISCONNECTED): + logger.warning("Monitor socket: closed | disconnected") + stop_event.set() + elif event_type == zmq.EVENT_CONNECT_DELAYED: + logger.debug("Monitor socket: event connect delayed") + elif event_type == zmq.EVENT_CONNECT_RETRIED: + logger.debug("Monitor socket: event connect retried") + elif event_type in (zmq.EVENT_CONNECTED, zmq.EVENT_HANDSHAKE_SUCCEEDED): + logger.debug("Monitor socket: client connected to router, handshake OK.") + stop_event.clear() + else: + logger.warning("Monitor socket: other event: %r", event_type) + except zmq.Again: + logger.debug("Timeout on monitoring socket.") + except Exception as exc: # W: Catching too general exception Exception + logger.error("Other exception on monitoring socket: %r", exc) + + # monitor.close() + + def stop(self) -> None: + """ + Stop thread + + Parameters + ---------- + None + + Returns + ------- + None + """ + logger.info("Stopping monitor thread ...") + self.running = False + + +class ServerWorker(Thread): + """ + ServerWorker class for zmq video messaging. + + Parameters + ---------- + identity : bytes + Worker identity + context : zmq.Context, optional + ZMQ context (default is None, which creates a new context) + + Methods + ------- + start_client(client_id, camera_id, filename) + Start a video thread for new client_id and camera_id. + stop_client(client_id, camera_id) + Stop video thread for a client_id and camera_id. + run() + Main loop of the worker. + """ + def __init__(self, identity, context=None) -> None: + """ + Docstring for __init__ + + Parameters + ---------- + identity : bytes + Worker identity + context : zmq.Context, optional + ZMQ context (default is None, which creates a new context) + """ + super().__init__(daemon=True) + self.context = context or zmq.Context.instance() + self.socket = self.context.socket(zmq.DEALER) + self.id = self.socket.identity = identity + self.monitor = MonitorTask(self.socket) + self.video_threads = defaultdict(lambda: defaultdict(dict)) + self.poller = zmq.Poller() + self.web_sock = self.context.socket(zmq.DEALER) + self.connected = False + self.running = True + + def start_client(self, client_id, camera_id, filename) -> None: + """ + Start a video thread for new client_id and camera_id. + + Parameters + ---------- + client_id : bytes + Client ID + camera_id : bytes + Camera ID + filename : str + Filename to write video to + + Returns + ------- + None + """ + # New client or new camera for existing client: + if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id): + q = Queue() + logger.debug("Starting new video thread for client %r, camera %r", client_id, camera_id) + video_worker = VideoWorker(client_id, camera_id, filename, q) + video_worker.start() + self.video_threads[client_id][camera_id] = video_worker + + def stop_client(self, client_id, camera_id) -> None: + """ + Stop video thread for a client_id and camera_id. + + Parameters + ---------- + client_id : bytes + Client ID + camera_id : bytes + Camera ID + + Returns + ------- + None + """ + if client_id in self.video_threads and camera_id in self.video_threads[client_id]: + logger.debug("Stopping video thread for client %r, camera %r", client_id, camera_id) + self.video_threads[client_id][camera_id].stop() # Stop the thread + del self.video_threads[client_id][camera_id] + logger.info("Stopped video thread for client %r, camera %r", client_id, camera_id) + + def handle_metadata( + self, + client_id: bytes, + camera_id: bytes, + directive: bytes, + timestamp: bytes + ) -> None: + """ + Handle metadata messages. + + Parameters + ---------- + client_id : bytes + Client ID + camera_id : bytes + Camera ID + directive : bytes + Action to perform (e.g., 'start', 'stop') + timestamp : bytes + Timestamp of the action + + Returns + ------- + None + """ + logger.info( + "Received metadata directive: %r for client: %r, camera: %r, timestamp: %r", + directive, client_id, camera_id, timestamp + ) + update_clients(client_id, camera_id) + if directive in (b'start', b'stop'): + camera_id_string = camera_id.decode('utf-8') + timestamp_string = datetime.datetime.fromtimestamp( + struct.unpack('d', timestamp)[0] + ).strftime(TIME_FORMAT_STRING) + filename = f"{camera_id_string}-{timestamp_string}.mp4" + logger.debug("Directive: %r, Timestamp: %r, Filename: %r", directive, timestamp, filename) + if directive == b"start": + self.start_client(client_id, camera_id, filename) + elif directive == b"stop": + self.stop_client(client_id, camera_id) + else: + logger.error("Unknown directive: %r", directive) + # elif directive == b'rename': + # old_name = new_name = timestamp = None + # try: + # old_name, new_name, timestamp = data.split(b":") + # logger.info("Renamed video thread from %r to %r.", old_name, new_name) + # except ValueError: + # logger.error("Invalid rename data format: %r", data) + # if old_name and new_name and timestamp: + # # I think it's better to stop the old thread and start a new one, + # # rather than reuse the old one as it's less mucking about. + # self.stop_client(old_name, camera_id) + # new_name_string = new_name.decode('utf-8') + # camera_id_string = camera_id.decode('utf-8') + # timestamp_string = timestamp.decode('utf-8') + # self.start_client( + # new_name, + # camera_id, + # f"{new_name_string}_{camera_id_string}-{timestamp_string}.mp4" + # ) + # to_remove= b':'.join([client_id, camera_id]) + # try: + # self.web_sock.send_multipart([to_remove, b'', b'']) # Notify webserver of rename + # except Exception as exc: + # logger.error("Sending rename notification to websocket failed: %r", exc) + + # elif directive in (b'new_camera', b'removed_camera'): + # logger.info("Received camera metadata update: %r for client %r, camera %r", + # directive, + # client_id, + # camera_id, + # ) + # # Just forward the message to the webserver: + # try: + # self.web_sock.send_multipart([client_id, camera_id, directive, data], flags=zmq.NOBLOCK) + # except Exception as exc: + # logger.error("Sending camera metadata update to websocket failed: %r", exc) + else: + logger.error("Unknown directive: %r", directive) + + return False + + def handle_video_message(self, msg: list) -> None: + logger.debug("Received video message with data only.") + client_id, camera_id, *content = msg + if client_id in self.video_threads and camera_id in self.video_threads[client_id]: + self.video_threads[client_id][camera_id].queue.put(content[0]) + # Send only [client_id, camera_id, jpeg_bytes] to the webserver: + # zmq subsciber can subscribe to a topic defined by the first + # part of the multipart message, so in order to allow for a + # per camera subscription, we need to join client_id and camera_id + topic = b':'.join([client_id, camera_id]) + try: + self.web_sock.send_multipart([topic, content[0]], flags=zmq.NOBLOCK) + except Exception as exc: + logger.error("Sending message to websocket failed: %r", exc) + else: + logger.error("No video thread found for client %r, camera %r", client_id, camera_id) + logger.debug("Available video threads keys: %r", self.video_threads.keys()) + logger.debug("Available video threads values: %r", self.video_threads.values()) + + def run(self) -> None: + """ + Main loop of the worker. + Full of wonders. + + Parameters + ---------- + None + + Returns + ------- + None + """ + logger.debug("ServerWorker %r starting ...", self.id) + self.socket.connect(ZMQ_BACKEND_ADDR) + try: + self.web_sock.bind(WEB_BACKEND_ADDR) + except Exception as exc: + logger.error("Connection to zmq websocket failed: %r", exc) + self.poller.register(self.socket, zmq.POLLIN) + self.poller.register(self.web_sock, zmq.POLLIN) + self.monitor.start() + + while self.running: + logger.debug("ServerWorker %r waiting for a message ...", self.id) + if not self.connected or stop_event.is_set(): + self.socket.send_multipart([b"READY"]) # Router needs worker identity, hence this + time.sleep(0.1) # Wait a bit before trying to connect again + + sockets = dict(self.poller.poll(10)) + if self.socket in sockets: + self.connected = True + msg = self.socket.recv_multipart() + logger.debug("ServerWorker %r received message of length: %d.", self.id, len(msg)) + filename = None + # At the moment we don't expect any other message than a + # start/stop message (of length 4) and a video message: + if len(msg) == 4: # This is a message with start/stop/rename directive and no video data. + self.handle_metadata(*msg) + elif len(msg) == 3: # This is a video message with data + self.handle_video_message(msg) + else: + logger.warning("Received a message of unexpected length from client. Message length: %d", len(msg)) + else: + logger.debug("No message received, polling again ...") + time.sleep(0.1) + if self.web_sock in sockets: + frontend_msg = self.web_sock.recv_multipart() + logger.info("Received message from frontend: %r", frontend_msg) + self.socket.send_multipart(frontend_msg) + logger.info("Forwarded message to backend: %r", frontend_msg) + + self.monitor.stop() + self.monitor.join() + + for camera_thread in self.video_threads.values(): + for thread in camera_thread.values(): + thread.queue.put(None) # Sentinel to unblock queue.get() + thread.stop() + thread.join() + + def send_control_message(self, client_id, camera_id, message) -> None: + """ + Send control message to a specific client and camera. + + Parameters + ---------- + client_id : bytes + Client ID + camera_id : bytes + Camera ID + message : bytes + Control message to send + + Returns + ------- + None + """ + if client_id in self.video_threads and camera_id in self.video_threads[client_id]: + # self.socket.send_multipart([client_id.encode("utf-8"), camera_id.encode("utf-8"), message]) + self.socket.send_multipart([client_id, camera_id, message]) + logger.info("Sent control message to client %r, camera %r: %r", client_id, camera_id, message) + else: + logger.error("No video thread found for client %r, camera %r", client_id, camera_id) + + def stop(self) -> None: + """ + Stop the worker thread. + + Parameters + ---------- + None + + Returns + ------- + None + """ + logger.info("ServerWorker %r exiting ...", self.id) + self.running = False + + +class VideoWorker(Thread): + """ + Class for video threads writing video stream to files. + + Parameters + ---------- + client_id : bytes + Client ID + camera_id : bytes + Camera ID + filename : str + Filename to write video to + queue : Queue + Queue to receive video frames + + Methods + ------- + run() + Main loop of the video worker thread. + stop() + Stop thread + """ + + def __init__(self, client_id, camera_id, filename, queue) -> None: + """ + Docstring for __init__ + + Parameters + ---------- + client_id : bytes + Client ID + camera_id : bytes + Camera ID + filename : str + Filename to write video to + queue : Queue + Queue to receive video frames + """ + super().__init__(daemon=True) + self.context = zmq.Context() + self.context.setsockopt(zmq.LINGER, 0) + self.client_id = client_id + self.camera_id = camera_id + self.filename = filename + self.queue = queue + self.live = True + + def stop(self) -> None: + """ + Docstring for stop + + Parameters + ---------- + None + + Returns + ------- + None + """ + logger.info("VideoWorker %r exiting ...", self.camera_id) + self.live = False + + def run(self) -> None: + """ + Main loop of the video worker thread. + + Parameters + ---------- + None + + Returns + ------- + None + """ + # Create a VIDEO_DIR/client_id/camera_id/ directory if it doesn't exist + dirname = os.path.join( + VIDEO_DIR, + self.client_id.decode('utf-8'), + self.camera_id.decode('utf-8'), + ) + filename = os.path.join( + dirname, + self.filename + ) + try: + os.makedirs(dirname, exist_ok=True) + except PermissionError as exc: + logger.error("Could not create directory '%s': %r", dirname, exc) + return + if os.path.exists(filename): + logger.warning("File '%s' already exists, overwriting ...", filename) + + fourcc = cv2.VideoWriter_fourcc(*'avc1') + out = cv2.VideoWriter(filename, fourcc, 30.0, (640, 480)) # Assuming 640x480 resolution + logger.info("This is the first run, binding websocket ...") + while self.live: + logger.debug("VideoWorker writing to file: %s", filename) + frame_bytes = self.queue.get() + if frame_bytes is None: + logger.debug("Received None, stopping video worker for camera: '%r'", self.camera_id) + break + frame = cv2.imdecode(numpy.frombuffer(frame_bytes, dtype=numpy.uint8), cv2.IMREAD_COLOR) + logger.debug("Processing ('writing to a file') frame for camera: '%r'", self.camera_id) + # Write frame to file: + out.write(frame) + + # Release video writer + out.release() + logger.info("VideoWorker finished writing to file: %s", self.filename) + +def signal_handler(sig, frame) -> None: + """ + Docstring for signal_handler + + sig : Signal + Signal number + frame : Frame + Current stack frame + """ + worker.stop() + +def update_clients(client_id, camera_id) -> None: + """ + Update client and camera dictionary and write to a shared file. + + Parameters + ---------- + client_id : bytes + Client ID + camera_id : bytes + Camera ID + + Returns + ------- + None + """ + global CLIENTS_DICT + if client_id not in CLIENTS_DICT: + logger.debug("Client_id not in CLIENTS_DICT, adding an empty list for it.") + CLIENTS_DICT[client_id] = [] + if camera_id not in CLIENTS_DICT[client_id]: + logger.debug("Camera_id not in CLIENTS_DICT[%s] list, adding", client_id) + CLIENTS_DICT[client_id].append(camera_id) + # Atomic write using tempfile. Works only when both files on the same filesystem + with tempfile.NamedTemporaryFile('w', dir=TMP_DIR, delete=False) as tf: + logger.debug("Dumping to file CLIENTS_DICT: %r", CLIENTS_DICT) + json.dump(bytes_to_str(CLIENTS_DICT), tf) + tempname = tf.name + os.replace(tempname, CLIENTS_JSON_FILE) + + +if __name__ == "__main__": + logger.info("Starting up ...") + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + worker = ServerWorker(b"worker-task") + worker.start() + + worker.join() + try: + os.remove(CLIENTS_JSON_FILE) + except FileNotFoundError: + pass |
