diff options
Diffstat (limited to 'client.py')
| -rw-r--r-- | client.py | 1006 |
1 files changed, 1006 insertions, 0 deletions
diff --git a/client.py b/client.py new file mode 100644 index 0000000..fc0d3f7 --- /dev/null +++ b/client.py @@ -0,0 +1,1006 @@ +#!/usr/bin/env python + +""" +A module containing client for streaming video to a zmq router. +""" + +__author__ = "Franoosh Corporation" + +import sys +import os +from threading import Thread, Event +from queue import Queue, Empty +from collections import deque, defaultdict +import time +import datetime +import signal +import logging +from configparser import ConfigParser +import zmq +import cv2 +import inotify.adapters +import yappi +from concurrent.futures import ThreadPoolExecutor + +from helpers import ( + CustomLoggingFormatter, + process_frame, + compute_contours, + detect_movement, + scale_contours, + draw_contours, + str_to_bytes, + timestamp_to_bytes, + bytes_to_timestamp, + write_yappi_stats, +) + +################################################### +# Defaults and constants # +################################################### + +TESTING = False # For now this turns on yappi profiling +CONFIG_FILE = "client.cfg" +# Those will be updated from config file if present: +LOGDIR = 'logs' +if not os.path.exists(LOGDIR): + try: + os.makedirs(LOGDIR) + except Exception as exc: + print(f"Could not create log directory: {exc}") + sys.exit() +BASENAME = f"{os.path.splitext(os.path.basename(__file__))[0]}" +LOGFILE = os.path.join(LOGDIR, f"{BASENAME}.log") +YAAPI_LOG_FILE_BASE = "client_yaapi" +# LOGLEVEL = logging.INFO +LOGLEVEL = logging.DEBUG + +ROUTER_ADDRESS = "localhost" +PORT = "5569" +ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}" +# This is populated at startup with all cameras present in /dev +# (like /dev/video0, /dev/video1, etc.). +# This will include dummy devices! +# If cameras are already defined in config file, those will override present devicesl: +KNOWN_CAMERA_PARAMETERS = ('address', 'contour_size_threshold', 'movement_grace_period') +CONTOUR_SIZE_THRESHOLD = 4000 # Minimum contour area to consider +MOVEMENT_GRACE_PERIOD = 10 # Seconds to wait after movement stops +TIME_FORMAT = '%Y_%m_%d-%H_%M_%S' +START_STOP_MESSAGE_FMT = "{action}:{data}" +MAX_CAMERA_NAME_LENGTH = 256 +FPS = 30 +# This should be unique, each box different. +# If client ID is wrong, should server reject connection? +# TODO: Devise and implement schema for client ID assignment. +# Maybe serial number from /proc/cpuinfo. +# In bytes. +CLIENT_ID = b"client_001" + +##################################################### +# End defaults and constants # +##################################################### + +log_formatter = CustomLoggingFormatter() +# Attempt to create logging directory if it does not exist: +if not os.path.exists(LOGDIR): + try: + os.makedirs(LOGDIR) + except Exception as exc: + print("Could not create log directory: %r", exc) +try: + handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') +except Exception as exc: + handler = logging.StreamHandler() +# Prevent log pollution from inotify when LOG_LEVEL set to debug: +logging.getLogger("inotify").setLevel(logging.WARNING) +handler.setFormatter(log_formatter) +logging.root.addHandler(handler) +logging.root.setLevel(LOGLEVEL) +logger = logging.getLogger(__name__) +logging.basicConfig( + filename=LOGFILE, + datefmt=TIME_FORMAT, + level=LOGLEVEL, +) + +CAMERAS = {} +stop_event = Event() + + +def _encoder_worker( + encode_queue: Queue, + output_queue: Queue, + jpeg_quality=60, + ) -> None: + while True: + try: + identity, frame = encode_queue.get(timeout=1) + except Empty: + logger.debug("Encoder worker: encode queue empty, continuing ...") + continue + # Encode frame as JPEG with lower quality: + encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_quality] + try: + _, buffer = cv2.imencode('.jpg', frame, encode_param) + except Exception as exc: + logger.error("Encoder worker: Error encoding frame: %r", exc) + try: + message = [identity, buffer.tobytes()] + output_queue.put_nowait(message) + except Exception as exc: + logger.debug("Send queue full, dropping encoded frame: %r", exc) + time.sleep(0.1) + +class ClientVideo(Thread): + """ + Class for sending video stream + + Attributes: + ---------- + _id: (bytes) + Camera ID + device_dict: dict + Dictionary with device parameters (bytes for strings, integers for ints) + queue: Queue + Queue for sending messages to main client task + + Methods: + -------- + run + Main video streaming logic + stop + Stop video streaming + """ + + def __init__(self, _id, device_dict, queue) -> None: + """ + Parameters + ---------- + _id : bytes + Camera ID + device_dict : dict + Dictionary with device parameters (bytes for strings, integers for ints) + queue : Queue + Queue for sending messages to main client task + """ + super().__init__(daemon=True) + self.identity = _id + self.device_dict = device_dict # <-- A dict with bytes for strings and integers for ints. + self.device_addr = self.device_dict.get(b'address') # No default. + self.device_threshold = device_dict[b'contour_size_threshold'] + self.device_grace_period = device_dict[b'movement_grace_period'] + self.queue = queue + # Store only FPS number of frames, compare first and last: + self.frame_deque = deque(maxlen=FPS) + self.encode_queue = Queue(maxsize=3) + self._enc_executor = ThreadPoolExecutor(max_workers=1) + self.live = True + + def send_frame(self, contours: list) -> None: + """ + Prepare and send frame with drawn contours. + + Parameters + ---------- + contours: list + list of contours to draw on a frame + """ + + raw_frame = self.frame_deque[-1][2] + scaling_factor = self.frame_deque[-1][1] + scaled_contours = scale_contours(contours, scaling_factor) + try: + frame_out = draw_contours( + raw_frame, + scaled_contours, + min_contour_area=self.device_threshold, + ) + except Exception as exc: + logger.error("ClientVideo %r: Error drawing contours: %r", self.identity, exc) + frame_out = raw_frame + + # Send video message with contour and frame: + try: + logger.debug("ClientVideo %r: Queueing frame for encoding ...", self.identity) + self.encode_queue.put_nowait((self.identity, frame_out)) + except Exception as exc: + try: + logger.debug("ClientVideo %r: Encode queue full, dropping oldest frame: %r", self.identity, exc) + _ = self.encode_queue.get_nowait() # Remove oldest frame + self.encode_queue.put_nowait((self.identity, frame_out)) + except Exception: + pass + + def run(self) -> None: + """ + Video streaming logic. + + Parameters + ---------- + None + + Returns + ------- + None + """ + logger.debug("ClientVideo '%s' starting ...", self.identity) + # The cv2.CAP_V4l2 below forces reading using v4l backend rather than gstreamer. + # If opencv was compiled with gstreamer support, it will default to gstreamer. + # Then we need to make sure that appropriate gstreamer plugins are installed. + # Although I haven't profiled it, it seems that using v4l uses less juice. + # Some devices can be opened with address (/dev/video0) and some only with integer index (0). + cap = cv2.VideoCapture( + self.device_addr.decode('utf-8'), + cv2.CAP_V4L2 + ) + if not cap.isOpened(): + logger.error("ClientVideo %r: Unable to open video device %r", self.identity, self.device_addr) + cap.release() + return + + # Flag to track movement state. Expires after grace period: + movement_detected = False + movement_detected_start = movement_now = None + while self.live and not stop_event.is_set(): + ret, frame = cap.read() + if not ret: + logger.error("ClientVideo %r: Failed to read frame from device %r", self.identity, self.device_addr) + break + try: + # Produce grayscale blurred frame for contour detection: + processed_frame_tuple = process_frame(frame) + except RuntimeError as exc: + logger.error("ClientVideo %r: Error processing frame: %r", self.identity, exc) + continue + # Append both original and processed frame to deque: + # processed_frame_tuple = (processed_frame, scaling_factor, raw_frame) + self.frame_deque.append(processed_frame_tuple + (frame,)) + # Skip on the first frame: + # To detect movement, compare only first and last frame from the queue of length FPS + # in order to reduce load: + if len(self.frame_deque) >= 2: + sample_frames = self.frame_deque[0][0], self.frame_deque[-1][0] + try: + contours = compute_contours(sample_frames) + logger.debug("ClientVideo %r: Found %d contours.", self.identity, len(contours)) + logger.debug("ClientVideo %r: Contours: %r", self.identity, contours) + except Exception as exc: + logger.error("ClientVideo %r: Error computing contours: %r", self.identity, exc) + continue + try: + movement_now = detect_movement(contours, min_area=self.device_threshold) + logger.debug("ClientVideo %r: Movement detected in frame.", self.identity) + except Exception as exc: + logger.error("ClientVideo %r: Error detecting movement: %r", self.identity, exc) + continue + + # Only update movement start time and send start message + # if movement is detected and was not detected before: + if movement_now: + if not movement_detected: + # Update movement detected start time: + movement_detected_start = time.time() + queue_metadata_message( + identity=self.identity, + queue=self.queue, + action=b'start', + timestamp=timestamp_to_bytes(movement_detected_start), + ) + movement_detected = True + + # Prepare and send frame with drawn contours: + self.send_frame(contours) + + else: + if movement_detected: + # Movement was detected before, but not anymore. + # We wait for grace period before sending stop message. + now = time.time() + delta_seconds = now - movement_detected_start + logger.debug( + "ClientVideo %r: delta seconds since movement detected: %d, grace remaining: %d.", + self.identity, + delta_seconds, + self.device_grace_period - delta_seconds + ) + if delta_seconds >= self.device_grace_period: + queue_metadata_message( + identity=self.identity, + queue=self.queue, + action=b'stop', + timestamp=timestamp_to_bytes(now), + ) + movement_detected = False + movement_detected_start = None + else: + # Still in grace period, send frame with contours: + self.send_frame(contours) + else: + pass # No movement detected currently or before, do nothing. + logger.debug("Calling cap.release() ...") + cap.release() + logger.info("ClientVideo %r exiting ...", self.identity) + + def stop(self) -> None: + """ + Stop video streaming thread. + + Parameters + ---------- + None + + Returns + ------- + None + """ + logger.info("Client video %r sending stop message ...", self.identity) + self.live = False + global stop_event + stop_event.set() + # self.queue.put([]) # Put a dummy message to unblock queue if needed + + +class ClientTask(Thread): + """ + Main Client Task with logic, message handling and setup. + + Attributes: + ---------- + context: zmq.Context + ZMQ context + socket: zmq.Socket + ZMQ DEALER socket + identity: bytes + Client identity + poll: zmq.Poller + ZMQ poller for socket + video_threads: dict + Dictionary of ClientVideo threads + running: bool + Flag indicating if client is running + queue: Queue + Queue for sending messages to server + + Methods: + -------- + run + Main client logic + stop + Stop client task + receive_messages + Receive messages from the server + send_messages + Send messages to the server + update_config(message) + Update configuration based on message from server + watch_for_new_cameras + Watch for new camera devices using inotify + """ + + def __init__(self, _id, context: zmq.Context=None) -> None: + """ + Parameters + ---------- + _id : bytes + Client ID + context : zmq.Context, optional + ZMQ context, by default None + """ + super().__init__(daemon=True) + self.context = context or zmq.Context.instance() + self.socket = self.context.socket(zmq.DEALER) + self.identity = self.socket.identity = _id # <-- bytes + self.poll = zmq.Poller() + self.poll.register(self.socket, zmq.POLLIN) + self.video_threads = {} + self.running = True + self.connected = False + self.queue = Queue() + + def receive_messages(self) -> None: + """ + Receive messages from the server. + Messages are expected to be multipart with: + [camera_id (bytes), directive (bytes), value (bytes)] + Update configuration based on received messages running update_config(message). + + Parameters + ---------- + None + + Returns + ------- + None + """ + while self.running: + try: + sockets = dict(self.poll.poll(1000)) + if self.socket in sockets: + logger.debug("Self socket in sockets, receiving message ...") + try: + message = self.socket.recv_multipart() + logger.info("Client '%r' received message: %r.", self.identity, message) + self.update_config(message) + except Exception as exc: + logger.error("Failed to receive message: %r", exc) + except zmq.ZMQError as exc: + logger.error("Client '%r': socket error: %r", self.identity, exc) + time.sleep(0.01) # Sleep to avoid busy waiting + + def send_messages(self) -> None: + """ + Send messages from queue to the server. + + Parameters + ---------- + None + + Returns + ------- + None + """ + while self.running: + try: + logger.debug("Client '%r': Waiting for message to send ...", self.identity) + frame = self.queue.get(timeout=0.1) + logger.debug("Sending message of length %r ...", len(frame)) + try: + if len(frame) > 1: + # TODO: This blocks? + self.socket.send_multipart(frame) + else: + logger.debug("Client '%r': Dummy message received, not sending.", self.identity) + except zmq.ZMQError as exc: + logger.error("Client '%r': socket error: %r", self.identity, exc) + except Empty: + continue # No message to send, continue waiting + + def update_config(self, message: list) -> None: + """ + Update configuration based on message from server. + Only 'cameras' section of the config file can be updated. + Message format: + [camera_id (bytes), directive (bytes), value (bytes)] + Defined directives are: + modify_camera_name, + modify_camera_threshold, + modify_camera_grace_pd, + add_camera, + remove_camera, + + Parameters + ---------- + message : list + Message received from server + + Returns + ------- + None + """ + try: + logger.debug("Client received config message: %r", message) + try: + camera_id, directive, value = message # <-- bytes + except ValueError as exc: + logger.error("Invalid config message format: %r", exc) + return + # Parameter validation happens on the server side before sending the message + if directive in ( + b'modify_camera_name', + b'modify_camera_threshold', + b'modify_camera_grace_pd', + ): + # Ok, just validate that camera exists: + if camera_id not in CAMERAS: # <-- bytes + logger.error("Cannot update unknown camera: %r", camera_id) + logger.debug("Known cameras: %r", list(CAMERAS.keys())) + else: + if directive == b'modify_camera_name': + old_name, new_name = camera_id, value # <-- bytes + # Update global CAMERAS dict: + CAMERAS[new_name] = CAMERAS.pop(old_name) + # Update config file accordingly: + for key, value in cfg.items('cameras'): # <-- strings + if key.startswith(old_name.encode('utf-8') + '.'): + # Take the second part of a key like 'camera1.address' -> 'address': + param = key.split('.', 1)[1] + cfg.set('cameras', f"{new_name.encode('utf-8')}.{param}", value.encode('utf-8')) + cfg.remove_option('cameras', key) + # Update video threads dict: + self.video_threads[new_name] = self.video_threads.pop(old_name) + # Send message to server to update routing: + # FIXME: This is a mess. Settle on a format for data field. + queue_metadata_message( + identity=self.identity, + queue=self.queue, + action=b'rename', + # data=':'.join([old_name.decode('utf-8'), new_name.decode('utf-8'), timestamp]) # FIXME: this should be datetime object + data=timestamp_to_bytes(time.time()), + ) + self.send_messages() + logger.info("Renamed and restarted thread for camera %r to %r.", old_name, new_name) + elif directive == b'modify_camera_threshold': + try: + _value = int(value) + except ValueError as exc: + logger.error("Invalid contour size threshold value: %r, %r", value, exc) + return + CAMERAS[camera_id][b'contour_size_threshold'] = _value # <-- integer! + self.video_threads[camera_id].device_threshold = _value # <-- integer! + # configparser requires strings: + cfg.set('cameras', f"{camera_id.decode('utf-8')}.contour_size_threshold", str(value)) + logger.info("Modified contour size threshold of camera %r to %d.", camera_id, value) + elif directive == b'modify_camera_grace_pd': + try: + _value = int(value) + except ValueError as exc: + logger.error("Invalid movement grace period value: %r, %r", value, exc) + return + CAMERAS[camera_id][b'movement_grace_period'] = _value # <-- integer! + self.video_threads[camera_id].device_grace_period = _value # <-- integer! + cfg.set('cameras', f"{camera_id.decode('utf-8')}.movement_grace_period", str(value)) + logger.info("Modified movement grace period of camera %r to %d.", camera_id, value) + elif directive == b'add_camera': + CAMERAS[camera_id] = {b'address': value} + # vid_cl = ClientVideo(camera_id, CAMERAS[camera_id], self.queue) + # vid_cl.start() + # self.video_threads[camera_id] = vid_cl + cfg.set('cameras', f"{camera_id.decode('utf-8')}.address", value.decode('utf-8')) + logger.info("Added and started a thread for new camera %r with address %r.", camera_id, value) + elif directive == b'remove_camera': + if camera_id in self.video_threads: + self.video_threads[camera_id].stop() + # self.video_threads[camera_id].join() # TODO: Check this is needed and wanted! + # del self.video_threads[camera_id] + if camera_id in CAMERAS: + del CAMERAS[camera_id] + # Remove from config file: + for key in list(cfg.options('cameras')): # <-- strings + if key.startswith(camera_id.decode('utf-8') + '.'): + cfg.remove_option('cameras', key) + logger.info("Removed and stopped thread for camera %r.", camera_id) + else: + logger.warning("Unknown config directive: %r", directive) + + except (ValueError, IndexError) as exc: + logger.error("Failed to handle config message: %r", exc) + with open(CONFIG_FILE, 'w') as f: + cfg.write(f) + + def watch_for_new_cameras(self) -> None: + """ + Watch for new camera devices using inotify. + When a new camera device is detected in /dev (e.g., /dev/video2), + send a message to the server to start receiving video from it. + When a camera device is removed, send a message to stop receiving video from it. + Update config file accordingly. + + Parameters + ---------- + None + + Returns + ------- + None + """ + i = inotify.adapters.Inotify() + i.add_watch('/dev') + + while self.running: + for event in i.event_gen(timeout_s=1, yield_nones=False): + (_, type_names, path, filename) = event + if filename.startswith('video'): + camera_pathname = os.path.join(path, filename) + if 'IN_CREATE' in type_names: + logger.info("Detected new camera device: %s", camera_pathname) + self.update_config([filename.encode('utf-8'), b'add_camera', camera_pathname.encode('utf-8')]) + video_dict = {b'address': camera_pathname.encode('utf-8')} + self.start_video_thread(filename.encode('utf-8'), video_dict) + if 'IN_DELETE' in type_names: + logger.info("Detected removed camera device: %s", camera_pathname) + self.update_config([filename.encode('utf-8'), b'remove_camera', b'']) + self.video_threads[filename.encode('utf-8')].stop() + + def start_video_thread(self, camera_id: bytes, device_dict: dict) -> None: + """ + Start a video thread for a given camera. + + Parameters + ---------- + camera_id : bytes + Camera ID + device_dict : dict + Dictionary with device parameters + + Returns + ------- + None + """ + vid_cl = ClientVideo(camera_id, device_dict, self.queue) + vid_cl.start() + self.video_threads[camera_id] = vid_cl + logger.info("Started video thread for camera %r.", camera_id) + Thread( + target=_encoder_worker, + args=(vid_cl.encode_queue, self.queue,), + daemon=True, + ).start() + + def run(self) -> None: + """ + Run main client logic thread. + + Parameters + ---------- + None + + Returns + ------- + None + """ + self.socket.connect(ADDRESS) + logger.debug("Client '%r' connected to %r", self.identity, ADDRESS) + logger.debug("Starting video threads ...") + for _id, device_dict in CAMERAS.items(): + self.start_video_thread(_id, device_dict) + + recv_thread = Thread(target=self.receive_messages) # Not a daemon + send_thread = Thread(target=self.send_messages) # Not a daemon + watchdog_thread = Thread(target=self.watch_for_new_cameras, daemon=True) + + recv_thread.start() + logger.debug("Client '%s' started receiving thread.", self.identity) + send_thread.start() + logger.debug("Client '%s' started sending thread.", self.identity) + watchdog_thread.start() + logger.debug("Client '%s' started watchdog thread.", self.identity) + + logger.debug("Client '%s' waiting for threads to finish ...", self.identity) + recv_thread.join() + send_thread.join() + watchdog_thread.join(timeout=5) + + logger.info("Closing socket ...") + try: + self.socket.close() + except Exception as exc: + logger.warning("Failed to close socket: %r", exc) + logger.debug("Terminating context ...") + try: + self.context.term() + except Exception as exc: + logger.warning("Failed to terminate context: %r", exc) + + def stop(self) -> None: + """ + Stop task + + Parameters + ---------- + None + + Returns + ------- + None + """ + logger.info("ClientTask cleaning up ...") + for _id in list(self.video_threads.keys()): + logger.info("Cleaning video thread %s ...", _id) + _thread = self.video_threads[_id] + _thread.stop() + _thread.join(timeout=3) + if _thread.is_alive(): + logger.warning("Video thread %r did not stop in time.", _id) + self.video_threads.pop(_id, None) + + self.running = False + self.queue.put([]) # Put a dummy message to unblock send_messages if waiting + try: + self.socket.close(linger=0) + except Exception as exc: + logger.warning("Error closing socket: %r", exc) + logger.info("Client task exiting ...") + + +def queue_video_message(identity: bytes, queue: Queue, frame) -> None: + """ + Docstring for queue_video_message + + Parameters + ---------- + identity : bytes + Camera identity + queue : Queue + Queue to put message to + frame : ndarray + Video frame to send + + Returns + ------- + None + """ + # Encode frame as JPEG: + _, buffer = cv2.imencode('.jpg', frame) + # Add identity to message: + message = (identity, buffer.tobytes()) + # Put message to queue: + queue.put(message) + +def queue_metadata_message( + identity: bytes, + queue: Queue, + action: bytes, + timestamp: float, + ) -> None: + """ + Queue metadata message to be sent to server. + + Parameters + ---------- + identity : bytes + Camera ID + queue : Queue + Queue to put message to + action : bytes + Action as bytes ('start' or 'stop') + data : float + Data string: this depends on action, for start/stop it's a timestamp, for rename it's old_name:new_name:timestamp + + Returns + ------- + None + """ + logger.debug("ClientVideo: %r at: %r", action, datetime.datetime.fromtimestamp( + bytes_to_timestamp(timestamp)).strftime(TIME_FORMAT)) + message = (identity, action, timestamp) + queue.put(message) + +def find_present_cameras() -> dict: + """ + Find currently present camera devices in /dev. + + Parameters + ---------- + None + + Returns + ------- + dict + Dictionary of present cameras with device names as keys and paths as values + """ + present_cameras = {} + for entry in os.listdir('/dev'): + if entry.startswith('video'): + present_cameras[entry] = os.path.join('/dev', entry) + return present_cameras + +def read_config(conf_file) -> ConfigParser: + """ + Read config file and return as dictionary. + The only required section is 'cameras' with at least one camera. + Do basic sanity checks and update global variables. + Log warnings if config file or sections/options are missing. + + Parameters + ---------- + conf_file : str + Path to config file + + Returns + ------- + ConfigParser + ConfigParser object with configuration + """ + cfg = ConfigParser() + cfg.read(conf_file) + cameras_dict = None + # Need to update logging info: + if 'main' in cfg: + global TESTING + try: + TESTING = cfg.getboolean('main', 'testing') + logger.info("Testing mode is set to: %r.", TESTING) + except ValueError as exc: + logger.error("Invalid value for 'testing' option in 'main' section: %r, using default: %r.", exc, TESTING) + + if 'logging' in cfg: + global LOGDIR, LOGFILE, LOGLEVEL + + LOGDIR = cfg.get('logging', 'logdir', fallback='.') + LOGFILE = cfg.get('logging', 'logfile', fallback=LOGFILE) + LOGLEVEL = cfg.get('logging', 'loglevel', fallback=LOGLEVEL) + logger.setLevel(LOGLEVEL) + for _handler in logger.handlers: + _handler.setLevel(LOGLEVEL) + if 'network' in cfg: + global ROUTER_ADDRESS, PORT, ADDRESS + if cfg.has_option('network', 'router_address'): + ROUTER_ADDRESS = cfg.get('network', 'router_address') + else: + logger.warning("No 'router_address' option in 'network' section, using default: %r.", ROUTER_ADDRESS) + if cfg.has_option('network', 'router_port'): + PORT = cfg.get('network', 'router_port') + else: + logger.warning("No 'router_port' option in 'network' section, using default: %r.", PORT) + ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}" + else: + logger.warning("No 'network' section in config file, using defaults: %r.", ADDRESS) + + if 'cameras' in cfg: + # Need to take care: camera names are stored as strings but comunicated as bytes: + cameras_dict = set_up_cameras(cfg.items('cameras')) # <-- dict with strings + if cameras_dict: + # Only update global CAMERAS if we have at least one valid camera with an address: + CAMERAS = str_to_bytes(cameras_dict) # <-- now bytes + logger.info("Using camera configuration updated from config file: %r", CAMERAS) + + if 'cameras' not in cfg or not cameras_dict: + # No cameras section or no valid cameras found: + logger.warning("No 'cameras' section in config file, using defaults: %r.", CAMERAS) + found_cameras = find_present_cameras() + for cam_name, cam_addr in found_cameras.items(): + CAMERAS[cam_name.encode('utf-8')] = {'address': cam_addr.encode('utf-8')} # <-- bytes + # Use default parameters: + cfg.set('cameras', f"{cam_name}.address", cam_addr) + cfg.set('cameras', f"{cam_name}.contour_size_threshold", str(CONTOUR_SIZE_THRESHOLD)) + cfg.set('cameras', f"{cam_name}.movement_grace_period", str(MOVEMENT_GRACE_PERIOD)) + logger.info("CAMERAS configuration: %r", CAMERAS) + with open(conf_file, 'w') as configfile: + cfg.write(configfile) + + return cfg + +def validate_camera_address(address: str) -> bool: + """ + Validate camera address. + + Parameters + ---------- + address : str + Camera address + + Returns + ------- + bool + True if address is valid, False otherwise + """ + # Check if address is an integer (device index): + if address.startswith('/dev/video'): + return address + raise ValueError("Invalid camera address: %r" % address) + +def validate_camera_threshold(threshold: str) -> bool: + """ + Validate camera contour size threshold. + + Parameters + ---------- + threshold : str + Contour size threshold + + Returns + ------- + bool + True if threshold is valid, False otherwise + """ + try: + if int(threshold) > 0: + return threshold + except ValueError: + raise ValueError("Invalid contour size threshold: %r" % threshold) + raise ValueError("Invalid contour size threshold: %r" % threshold) + +def validate_camera_grace_pd(grace_pd: str) -> bool: + """ + Validate camera movement grace period. + + Parameters + ---------- + grace_pd : str + Movement grace period + + Returns + ------- + bool + True if grace period is valid, False otherwise + """ + try: + if int(grace_pd) >= 0: + return grace_pd + except ValueError: + raise ValueError("Invalid movement grace period: %r" % grace_pd) + raise ValueError("Invalid movement grace period: %r" % grace_pd) + +def set_up_cameras(cameras_cfg_section: dict) -> None: + """ + Set up camera configuration from config file section. + Validate camera parameters and apply defaults if necessary. + Parameters + ---------- + cameras_section : dict + Dictionary of camera configuration from config file + Returns + ------- + cameras_dict : dict + Dictionary to store validated camera configuration + """ + # ConfigParser does not support nested sections. + # These shenanigans below allow for a nested config simulation with configparser + # and so per camera settings: + cameras_dict = defaultdict(dict) + for key, val in cameras_cfg_section: # <-- strings + try: + cam, param = key.split('.', 1) + except ValueError as exc: + logger.error( + "Invalid camera configuration entry: %r, error: %r, using defaults.", key, exc) + match param: + case 'address': + try: + cameras_dict[cam][param] = validate_camera_address(val) # <-- still strings + except ValueError as exc: + logger.error("Invalid camera address for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('address', '')) + continue + case 'contour_size_threshold': + try: + cameras_dict[cam][param] = validate_camera_threshold(val) # <-- still strings + except ValueError as exc: + logger.error("Invalid contour size threshold for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('contour_size_threshold', '')) + cameras_dict[cam][param] = str(CONTOUR_SIZE_THRESHOLD) + continue + case 'movement_grace_period': + try: + cameras_dict[cam][param] = validate_camera_grace_pd(val) # <-- still strings + except ValueError as exc: + logger.error("Invalid movement grace period for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('movement_grace_period', '')) + cameras_dict[cam][param] = str(MOVEMENT_GRACE_PERIOD) + continue + case _: + logger.warning("Unknown camera %r parameter: %r", cam, param) + + # Camera section needs to contain at least an address: + for key in list(cameras_dict.keys()): + if 'address' not in cameras_dict[key].keys(): + logger.error("Camera %r has no address configured, removing from configuration.", key) + del cameras_dict[key] + + return cameras_dict + +def signal_handler(sig, frame) -> None: + """ + Signal handler for graceful shutdown. + """ + logger.info("Received signal handler '%r', stopping client ...", sig) + # Set flag to stop VideoClient thread(s): + stop_event.set() + if client.is_alive(): + client.stop() + + +if __name__ == "__main__": + logger.info("Client starting ...") + + # Read configuration file before starting logging as it may contain logging settings: + cfg = read_config(CONFIG_FILE) + + if TESTING: + logger.info("Starting in testing mode with yappi profiling ...") + yappi.set_clock_type("cpu") + yappi.start(builtins=True, profile_threads=True) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + logger.info("Starting up ...") + + client = ClientTask(CLIENT_ID) + + client.start() + client.join() + + if TESTING: + yappi.stop() + if write_yappi_stats(yappi): + logger.error("Error writing yappi stats.") + + logger.info("Terminating ...") |
