#!/usr/bin/env python """ A module containing client for streaming video to a zmq router. """ __author__ = "Franoosh Corporation" import sys import os from threading import Thread, Event from queue import Queue, Empty from collections import deque, defaultdict import time import datetime import signal import uuid import logging from configparser import ConfigParser from concurrent.futures import ThreadPoolExecutor import zmq import cv2 import inotify.adapters import yappi from helpers import ( CustomLoggingFormatter, process_frame, compute_contours, detect_movement, scale_contours, draw_contours, str_to_bytes, timestamp_to_bytes, bytes_to_timestamp, write_yappi_stats, ) ################################################### # Defaults and constants # ################################################### TESTING = False # For now this turns on yappi profiling CONFIG_FILE = "client.cfg" # Those will be updated from config file if present: LOGDIR = 'logs' if not os.path.exists(LOGDIR): try: os.makedirs(LOGDIR) except Exception as exc: print(f"Could not create log directory: {exc}") sys.exit() BASENAME = f"{os.path.splitext(os.path.basename(__file__))[0]}" LOGFILE = os.path.join(LOGDIR, f"{BASENAME}.log") YAAPI_LOG_FILE_BASE = "client_yaapi" LOGLEVEL = logging.INFO ROUTER_ADDRESS = "localhost" PORT = "5569" ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}" CONTOUR_SIZE_THRESHOLD = 4000 # Minimum contour area to consider MOVEMENT_GRACE_PERIOD = 10 # Seconds to wait after movement stops TIME_FORMAT = '%Y_%m_%d-%H_%M_%S' MAX_CAMERA_NAME_LENGTH = 256 FPS = 30 # This should be unique, each box different. # If client ID is wrong, should server reject connection? # TODO: Devise and implement schema for client ID assignment. # Maybe serial number from /proc/cpuinfo. # In bytes. CLIENT_ID = b"client_001" ##################################################### # End defaults and constants # ##################################################### log_formatter = CustomLoggingFormatter() # Attempt to create logging directory if it does not exist: if not os.path.exists(LOGDIR): try: os.makedirs(LOGDIR) except Exception as exc: print("Could not create log directory: %r", exc) try: handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') except Exception as exc: handler = logging.StreamHandler() # Prevent log pollution from inotify when LOG_LEVEL set to debug: logging.getLogger("inotify").setLevel(logging.WARNING) handler.setFormatter(log_formatter) logging.root.addHandler(handler) logging.root.setLevel(LOGLEVEL) logger = logging.getLogger(__name__) logging.basicConfig( filename=LOGFILE, datefmt=TIME_FORMAT, level=LOGLEVEL, ) CAMERAS = {} stop_event = Event() def _encoder_worker( encode_queue: Queue, output_queue: Queue, jpeg_quality=60, ) -> None: while True: try: identity, frame = encode_queue.get(timeout=1) except Empty: logger.debug("Encoder worker: encode queue empty, continuing ...") continue # Encode frame as JPEG with lower quality: encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_quality] try: _, buffer = cv2.imencode('.jpg', frame, encode_param) except Exception as exc: logger.error("Encoder worker: Error encoding frame: %r", exc) try: message = [identity, buffer.tobytes()] output_queue.put_nowait(message) except Exception as exc: logger.debug("Send queue full, dropping encoded frame: %r", exc) time.sleep(0.1) class ClientVideo(Thread): """ Class for sending video stream Attributes: ---------- _id: (bytes) Camera ID device_dict: dict Dictionary with device parameters (bytes for strings, integers for ints) queue: Queue Queue for sending messages to main client task Methods: -------- run Main video streaming logic stop Stop video streaming """ def __init__(self, _id, device_dict, queue) -> None: """ Parameters ---------- _id : bytes Camera ID device_dict : dict Dictionary with device parameters (bytes for strings, integers for ints) queue : Queue Queue for sending messages to main client task """ super().__init__(daemon=True) self.identity = _id self.device_dict = device_dict # <-- A dict with bytes for strings and integers for ints. try: self.device_addr = self.device_dict[b'address'] # No default. except KeyError: logger.error("ClientVideo %r: No address specified for device, cannot start video thread.", self.identity) logger.error("ClientVideo %r: Device dict: %r", self.identity, self.device_dict) raise self.device_threshold = float(device_dict.get(b'contour_size_threshold', CONTOUR_SIZE_THRESHOLD).decode()) self.device_grace_period = float(device_dict.get(b'movement_grace_period', MOVEMENT_GRACE_PERIOD).decode()) self.queue = queue # Store only FPS number of frames, compare first and last: self.frame_deque = deque(maxlen=FPS) self.encode_queue = Queue(maxsize=3) self._enc_executor = ThreadPoolExecutor(max_workers=1) self.live = True def send_frame(self, contours: list) -> None: """ Prepare and send frame with drawn contours. Parameters ---------- contours: list list of contours to draw on a frame """ raw_frame = self.frame_deque[-1][2] scaling_factor = self.frame_deque[-1][1] scaled_contours = scale_contours(contours, scaling_factor) try: frame_out = draw_contours( raw_frame, scaled_contours, min_contour_area=self.device_threshold, ) except Exception as exc: logger.error("ClientVideo %r: Error drawing contours: %r", self.identity, exc) frame_out = raw_frame # Send video message with contour and frame: try: logger.debug("ClientVideo %r: Queueing frame for encoding ...", self.identity) self.encode_queue.put_nowait((self.identity, frame_out)) except Exception as exc: try: logger.debug("ClientVideo %r: Encode queue full, dropping oldest frame: %r", self.identity, exc) _ = self.encode_queue.get_nowait() # Remove oldest frame self.encode_queue.put_nowait((self.identity, frame_out)) except Exception: pass def run(self) -> None: """ Video streaming logic. Parameters ---------- None Returns ------- None """ logger.debug("ClientVideo '%s' starting ...", self.identity) # The cv2.CAP_V4l2 below forces reading using v4l backend rather than gstreamer. # If opencv was compiled with gstreamer support, it will default to gstreamer. # Then we need to make sure that appropriate gstreamer plugins are installed. # Although I haven't profiled it, it seems that using v4l uses less juice. # Some devices can be opened with address (/dev/video0) and some only with integer index (0). cap = cv2.VideoCapture( self.device_addr.decode('utf-8'), cv2.CAP_V4L2 ) if not cap.isOpened(): logger.error("ClientVideo %r: Unable to open video device %r", self.identity, self.device_addr) cap.release() return # Flag to track movement state. Expires after grace period: movement_detected = False movement_detected_start = movement_now = None while self.live and not stop_event.is_set(): ret, frame = cap.read() if not ret: logger.error("ClientVideo %r: Failed to read frame from device %r", self.identity, self.device_addr) break try: # Produce grayscale blurred frame for contour detection: processed_frame_tuple = process_frame(frame) except RuntimeError as exc: logger.error("ClientVideo %r: Error processing frame: %r", self.identity, exc) continue # Append both original and processed frame to deque: # processed_frame_tuple = (processed_frame, scaling_factor, raw_frame) self.frame_deque.append(processed_frame_tuple + (frame,)) # Skip on the first frame: # To detect movement, compare only first and last frame from the queue of length FPS # in order to reduce load: if len(self.frame_deque) >= 2: sample_frames = self.frame_deque[0][0], self.frame_deque[-1][0] try: contours = compute_contours(sample_frames) except Exception as exc: logger.error("ClientVideo %r: Error computing contours: %r", self.identity, exc) continue try: movement_now = detect_movement(contours, min_area=self.device_threshold) except Exception as exc: logger.error("ClientVideo %r: Error detecting movement: %r", self.identity, exc) continue # Only update movement start time and send start message # if movement is detected and was not detected before: if movement_now: if not movement_detected: # Update movement detected start time: movement_detected_start = time.time() queue_metadata_message( identity=self.identity, queue=self.queue, action=b'start', timestamp=timestamp_to_bytes(movement_detected_start), ) movement_detected = True # Prepare and send frame with drawn contours: self.send_frame(contours) else: if movement_detected: # Movement was detected before, but not anymore. # We wait for grace period before sending stop message. now = time.time() delta_seconds = now - movement_detected_start logger.debug( "ClientVideo %r: delta seconds since movement detected: %d, grace remaining: %d.", self.identity, delta_seconds, self.device_grace_period - delta_seconds ) if delta_seconds >= self.device_grace_period: queue_metadata_message( identity=self.identity, queue=self.queue, action=b'stop', timestamp=timestamp_to_bytes(now), ) movement_detected = False movement_detected_start = None else: # Still in grace period, send frame with contours: self.send_frame(contours) else: pass # No movement detected currently or before, do nothing. logger.debug("Calling cap.release() ...") cap.release() logger.info("ClientVideo %r exiting ...", self.identity) def stop(self) -> None: """ Stop video streaming thread. Parameters ---------- None Returns ------- None """ logger.info("Client video %r sending stop message ...", self.identity) self.live = False global stop_event stop_event.set() # self.queue.put([]) # Put a dummy message to unblock queue if needed class ClientTask(Thread): """ Main Client Task with logic, message handling and setup. Attributes: ---------- context: zmq.Context ZMQ context socket: zmq.Socket ZMQ DEALER socket identity: bytes Client identity poll: zmq.Poller ZMQ poller for socket video_threads: dict Dictionary of ClientVideo threads running: bool Flag indicating if client is running queue: Queue Queue for sending messages to server Methods: -------- run Main client logic stop Stop client task receive_messages Receive messages from the server send_messages Send messages to the server update_config(message) Update configuration based on message from server watch_for_new_cameras Watch for new camera devices using inotify """ def __init__(self, _id, context: zmq.Context=None) -> None: """ Parameters ---------- _id : bytes Client ID context : zmq.Context, optional ZMQ context, by default None """ super().__init__(daemon=True) self.context = context or zmq.Context.instance() self.socket = self.context.socket(zmq.DEALER) self.identity = self.socket.identity = _id # <-- bytes self.poll = zmq.Poller() self.poll.register(self.socket, zmq.POLLIN) self.video_threads = {} self.running = True self.connected = False self.queue = Queue() def receive_messages(self) -> None: """ Receive messages from the server. Messages are expected to be multipart with: [camera_id (bytes), directive (bytes), value (bytes)] Update configuration based on received messages running update_config(message). Parameters ---------- None Returns ------- None """ while self.running: try: sockets = dict(self.poll.poll(1000)) if self.socket in sockets: logger.debug("Self socket in sockets, receiving message ...") try: message = self.socket.recv_multipart() logger.info("Client '%r' received message: %r.", self.identity, message) except Exception as exc: logger.error("Failed to receive message: %r", exc) try: logger.debug("Client '%r': Updating config from message ...", self.identity) self.update_config(message) except Exception as exc: logger.error("Failed to update config: %r", exc) except zmq.ZMQError as exc: logger.error("Client '%r': socket error: %r", self.identity, exc) time.sleep(0.01) # Sleep to avoid busy waiting def send_messages(self) -> None: """ Send messages from queue to the server. Parameters ---------- None Returns ------- None """ while self.running: try: logger.debug("Client '%r': Waiting for message to send ...", self.identity) frame = self.queue.get(timeout=0.1) logger.debug("Sending message of length %r ...", len(frame)) try: if len(frame) > 1: self.socket.send_multipart(frame) else: logger.debug("Client '%r': Dummy message received, not sending.", self.identity) except zmq.ZMQError as exc: logger.error("Client '%r': socket error: %r", self.identity, exc) except Empty: continue # No message to send, continue waiting def update_config(self, message: list) -> None: """ Update configuration based on message from server. Only 'cameras' section of the config file can be updated. Message format: [camera_id (bytes), directive (bytes), value (bytes)] Defined directives are: modify_camera_name, modify_camera_threshold, modify_camera_grace_pd, add_camera, remove_camera, Parameters ---------- message : list Message received from server Returns ------- None """ logger.debug("Client received config message: %r", message) try: camera_id, directive, value = message # <-- bytes except ValueError as exc: logger.error("Invalid config message format: %r", exc) return # Parameter validation happens on the server side before sending the message # Just validate that camera exists: if camera_id not in CAMERAS: # <-- bytes logger.error("Cannot update unknown camera: %r", camera_id) logger.error("DEBUG: Known cameras: %r", list(CAMERAS.keys())) match directive: case b'modify_camera_name': logger.debug("Renaming camera %r to %r ...", camera_id, value) try: new_name_bytes, old_name_bytes = value, camera_id # Update global CAMERAS dict: CAMERAS[new_name_bytes] = CAMERAS.pop(old_name_bytes) # Update config file accordingly: old_name, new_name = camera_id.decode('utf-8'), value.decode('utf-8') for key, val in cfg.items('cameras'): # <-- strings if key.startswith(old_name + '.'): # Take the second part of a key like 'camera1.address' -> 'address': param = key.split('.', 1)[1] cfg.set('cameras', f"{new_name}.{param}", val) cfg.remove_option('cameras', key) # Update video threads dict: self.video_threads[value] = self.video_threads.pop(camera_id) # Send message to server to update routing: # FIXME: This is a mess. Settle on a format for data field. queue_metadata_message( identity=new_name_bytes, queue=self.queue, action=b'rename', # data=':'.join([old_name.decode('utf-8'), new_name.decode('utf-8'), timestamp]) timestamp=timestamp_to_bytes(time.time()), ) self.send_messages() logger.info("Renamed and restarted thread for camera %r to %r.", old_name, new_name) except Exception as exc: logger.error("Error renaming camera %r to %r: %r", old_name, new_name, exc) case b'modify_camera_threshold': try: _value = int(value) except ValueError as exc: logger.error("Invalid contour size threshold value: %r, %r", value, exc) return CAMERAS[camera_id][b'contour_size_threshold'] = _value # <-- integer! self.video_threads[camera_id].device_threshold = _value # <-- integer! # configparser requires strings: cfg.set('cameras', f"{camera_id.decode('utf-8')}.contour_size_threshold", str(value)) logger.info("Modified contour size threshold of camera %r to %d.", camera_id, value) case b'modify_camera_grace_pd': try: _value = int(value) except ValueError as exc: logger.error("Invalid movement grace period value: %r, %r", value, exc) return CAMERAS[camera_id][b'movement_grace_period'] = _value # <-- integer! self.video_threads[camera_id].device_grace_period = _value # <-- integer! cfg.set('cameras', f"{camera_id.decode('utf-8')}.movement_grace_period", str(value)) logger.info("Modified movement grace period of camera %r to %d.", camera_id, value) case b'add_camera': CAMERAS[camera_id] = {b'address': value} # vid_cl = ClientVideo(camera_id, CAMERAS[camera_id], self.queue) # vid_cl.start() # self.video_threads[camera_id] = vid_cl cfg.set('cameras', f"{camera_id.decode('utf-8')}.address", value.decode('utf-8')) logger.info("Added and started a thread for new camera %r with address %r.", camera_id, value) case b'remove_camera': if camera_id in self.video_threads: self.video_threads[camera_id].stop() # self.video_threads[camera_id].join() # TODO: Check this is needed and wanted! # del self.video_threads[camera_id] if camera_id in CAMERAS: del CAMERAS[camera_id] # Remove from config file: for key in list(cfg.options('cameras')): # <-- strings if key.startswith(camera_id.decode('utf-8') + '.'): cfg.remove_option('cameras', key) logger.info("Removed and stopped thread for camera %r.", camera_id) case _: logger.warning("Unknown config directive: %r", directive) with open(CONFIG_FILE, 'w') as f: cfg.write(f) def watch_for_new_cameras(self) -> None: """ Watch for new camera devices using inotify. When a new camera device is detected in /dev (e.g., /dev/video2), send a message to the server to start receiving video from it. When a camera device is removed, send a message to stop receiving video from it. Update config file accordingly. Parameters ---------- None Returns ------- None """ i = inotify.adapters.Inotify() i.add_watch('/dev') while self.running: for event in i.event_gen(timeout_s=1, yield_nones=False): (_, type_names, path, filename) = event if filename.startswith('video'): camera_pathname = os.path.join(path, filename) if 'IN_CREATE' in type_names: logger.info("Detected new camera device: %s", camera_pathname) self.update_config([filename.encode('utf-8'), b'add_camera', camera_pathname.encode('utf-8')]) video_dict = {b'address': camera_pathname.encode('utf-8')} self.start_video_thread(filename.encode('utf-8'), video_dict) if 'IN_DELETE' in type_names: logger.info("Detected removed camera device: %s", camera_pathname) self.update_config([filename.encode('utf-8'), b'remove_camera', b'']) self.video_threads[filename.encode('utf-8')].stop() def start_video_thread(self, camera_id: bytes, device_dict: dict) -> None: """ Start a video thread for a given camera. Parameters ---------- camera_id : bytes Camera ID device_dict : dict Dictionary with device parameters Returns ------- None """ vid_cl = ClientVideo(camera_id, device_dict, self.queue) vid_cl.start() self.video_threads[camera_id] = vid_cl logger.info("Started video thread for camera %r.", camera_id) Thread( target=_encoder_worker, args=(vid_cl.encode_queue, self.queue,), daemon=True, ).start() def run(self) -> None: """ Run main client logic thread. Parameters ---------- None Returns ------- None """ self.socket.connect(ADDRESS) logger.debug("Client '%r' connected to %r", self.identity, ADDRESS) logger.debug("Starting video threads ...") for _id, device_dict in CAMERAS.items(): self.start_video_thread(_id, device_dict) recv_thread = Thread(target=self.receive_messages) # Not a daemon send_thread = Thread(target=self.send_messages) # Not a daemon watchdog_thread = Thread(target=self.watch_for_new_cameras, daemon=True) recv_thread.start() logger.debug("Client '%s' started receiving thread.", self.identity) send_thread.start() logger.debug("Client '%s' started sending thread.", self.identity) watchdog_thread.start() logger.debug("Client '%s' started watchdog thread.", self.identity) logger.debug("Client '%s' waiting for threads to finish ...", self.identity) recv_thread.join() send_thread.join() watchdog_thread.join(timeout=5) logger.info("Closing socket ...") try: self.socket.close() except Exception as exc: logger.warning("Failed to close socket: %r", exc) logger.debug("Terminating context ...") try: self.context.term() except Exception as exc: logger.warning("Failed to terminate context: %r", exc) def stop(self) -> None: """ Stop task Parameters ---------- None Returns ------- None """ logger.info("ClientTask cleaning up ...") for _id in list(self.video_threads.keys()): logger.info("Cleaning video thread %s ...", _id) _thread = self.video_threads[_id] _thread.stop() _thread.join(timeout=3) if _thread.is_alive(): logger.warning("Video thread %r did not stop in time.", _id) self.video_threads.pop(_id, None) self.running = False self.queue.put([]) # Put a dummy message to unblock send_messages if waiting try: self.socket.close(linger=0) except Exception as exc: logger.warning("Error closing socket: %r", exc) logger.info("Client task exiting ...") def queue_video_message(identity: bytes, queue: Queue, frame) -> None: """ Docstring for queue_video_message Parameters ---------- identity : bytes Camera identity queue : Queue Queue to put message to frame : ndarray Video frame to send Returns ------- None """ # Encode frame as JPEG: _, buffer = cv2.imencode('.jpg', frame) # Add identity to message: message = (identity, buffer.tobytes()) # Put message to queue: queue.put(message) def queue_metadata_message( identity: bytes, queue: Queue, action: bytes, timestamp: float, ) -> None: """ Queue metadata message to be sent to server. Parameters ---------- identity : bytes Camera ID queue : Queue Queue to put message to action : bytes Action as bytes ('start' or 'stop') data : float Data string: this depends on action, for start/stop it's a timestamp, for rename it's old_name:new_name:timestamp Returns ------- None """ logger.debug("ClientVideo: %r at: %r", action, datetime.datetime.fromtimestamp( bytes_to_timestamp(timestamp)).strftime(TIME_FORMAT)) message = (identity, action, timestamp) queue.put(message) def find_present_cameras() -> dict: """ Find currently present camera devices in /dev. Parameters ---------- None Returns ------- dict Dictionary of present cameras with device names as keys and paths as values """ present_cameras = {} for entry in os.listdir('/dev'): if entry.startswith('video'): present_cameras[entry] = os.path.join('/dev', entry) return present_cameras def read_config(conf_file) -> ConfigParser: """ Read config file and return as dictionary. The only required section is 'cameras' with at least one camera. Do basic sanity checks and update global variables. Log warnings if config file or sections/options are missing. Parameters ---------- conf_file : str Path to config file Returns ------- ConfigParser ConfigParser object with configuration """ cfg = ConfigParser() cfg.read(conf_file) cameras_dict = None global CAMERAS # Need to update logging info: if 'main' in cfg: global TESTING try: TESTING = cfg.getboolean('main', 'testing') logger.info("Testing mode is set to: %r.", TESTING) except ValueError as exc: logger.error("Invalid value for 'testing' option in 'main' section: %r, using default: %r.", exc, TESTING) if 'logging' in cfg: global LOGDIR, LOGFILE, LOGLEVEL LOGDIR = cfg.get('logging', 'logdir', fallback='.') LOGFILE = cfg.get('logging', 'logfile', fallback=LOGFILE) LOGLEVEL = cfg.get('logging', 'loglevel', fallback=LOGLEVEL) logger.setLevel(LOGLEVEL) for _handler in logger.handlers: _handler.setLevel(LOGLEVEL) if 'network' in cfg: global ROUTER_ADDRESS, PORT, ADDRESS if cfg.has_option('network', 'router_address'): ROUTER_ADDRESS = cfg.get('network', 'router_address') else: logger.warning("No 'router_address' option in 'network' section, using default: %r.", ROUTER_ADDRESS) if cfg.has_option('network', 'router_port'): PORT = cfg.get('network', 'router_port') else: logger.warning("No 'router_port' option in 'network' section, using default: %r.", PORT) ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}" else: logger.warning("No 'network' section in config file, using defaults: %r.", ADDRESS) if 'cameras' in cfg: # Need to take care: camera names are stored as strings but comunicated as bytes: cameras_dict = set_up_cameras(cfg.items('cameras')) # <-- dict with strings if cameras_dict: # Only update global CAMERAS if we have at least one valid camera with an address: CAMERAS = str_to_bytes(cameras_dict) # <-- now bytes logger.info("Using camera configuration updated from config file: %r", CAMERAS) if 'cameras' not in cfg or not cameras_dict: # No cameras section or no valid cameras found: logger.warning("No 'cameras' section in config file, using defaults: %r.", CAMERAS) found_cameras = find_present_cameras() for cam_name, cam_addr in found_cameras.items(): CAMERAS[cam_name.encode('utf-8')] = {b'address': cam_addr.encode('utf-8')} # <-- bytes # Use default parameters: if camera_address_unique(cfg, cam_addr): cfg.set('cameras', f"{cam_name}.address", cam_addr) cfg.set('cameras', f"{cam_name}.contour_size_threshold", str(CONTOUR_SIZE_THRESHOLD)) cfg.set('cameras', f"{cam_name}.movement_grace_period", str(MOVEMENT_GRACE_PERIOD)) else: logger.warning("Camera address %r already present in config file, skipping adding camera %r.", cam_addr, cam_name) logger.info("CAMERAS configuration: %r", CAMERAS) with open(conf_file, 'w') as configfile: cfg.write(configfile) return cfg def validate_camera_address(address: str) -> bool: """ Validate camera address. Parameters ---------- address : str Camera address Returns ------- bool True if address is valid, False otherwise """ # Check if address is an integer (device index): if address.startswith('/dev/video'): return address raise ValueError("Invalid camera address: %r" % address) def validate_camera_threshold(threshold: str) -> bool: """ Validate camera contour size threshold. Parameters ---------- threshold : str Contour size threshold Returns ------- bool True if threshold is valid, False otherwise """ try: if int(threshold) > 0: return threshold except ValueError: raise ValueError("Invalid contour size threshold: %r" % threshold) raise ValueError("Invalid contour size threshold: %r" % threshold) def validate_camera_grace_pd(grace_pd: str) -> bool: """ Validate camera movement grace period. Parameters ---------- grace_pd : str Movement grace period Returns ------- bool True if grace period is valid, False otherwise """ try: if int(grace_pd) >= 0: return grace_pd except ValueError: raise ValueError("Invalid movement grace period: %r" % grace_pd) raise ValueError("Invalid movement grace period: %r" % grace_pd) def camera_address_unique(cfg: ConfigParser, address: str) -> bool: """ Check if camera address is unique in config file. Parameters ---------- cfg : ConfigParser ConfigParser object with configuration address : str Camera address Returns ------- bool True if address is unique, False otherwise """ for key, val in cfg.items('cameras'): try: _, param = key.split('.', 1) if param == 'address' and val == address: return False except ValueError: continue return True def set_up_cameras(cameras_cfg_section: dict) -> None: """ Set up camera configuration from config file section. Validate camera parameters and apply defaults if necessary. Parameters ---------- cameras_section : dict Dictionary of camera configuration from config file Returns ------- cameras_dict : dict Dictionary to store validated camera configuration """ # ConfigParser does not support nested sections. # These shenanigans below allow for a nested config simulation with configparser # and so per camera settings: cameras_dict = defaultdict(dict) for key, val in cameras_cfg_section: # <-- strings try: cam, param = key.split('.', 1) match param: case 'address': try: cameras_dict[cam][param] = validate_camera_address(val) # <-- still strings except ValueError as exc: logger.error("Invalid camera address for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('address', '')) continue case 'contour_size_threshold': try: cameras_dict[cam][param] = validate_camera_threshold(val) # <-- still strings except ValueError as exc: logger.error("Invalid contour size threshold for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('contour_size_threshold', '')) cameras_dict[cam][param] = str(CONTOUR_SIZE_THRESHOLD) continue case 'movement_grace_period': try: cameras_dict[cam][param] = validate_camera_grace_pd(val) # <-- still strings except ValueError as exc: logger.error("Invalid movement grace period for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('movement_grace_period', '')) cameras_dict[cam][param] = str(MOVEMENT_GRACE_PERIOD) continue case _: logger.warning("Unknown camera %r parameter: %r", cam, param) except ValueError as exc: logger.error( "Invalid camera configuration entry: %r, error: %r, using defaults.", key, exc) # Camera section needs to contain at least an address: for key in list(cameras_dict.keys()): if 'address' not in cameras_dict[key].keys(): logger.error("Camera %r has no address configured, removing from configuration.", key) del cameras_dict[key] return cameras_dict def signal_handler(sig, frame) -> None: """ Signal handler for graceful shutdown. """ logger.info("Received signal handler '%r', stopping client ...", sig) # Set flag to stop VideoClient thread(s): stop_event.set() if client.is_alive(): client.stop() def get_serial_number() -> bytes: """ Get the serial number of the machine. If unable to get serial number, use uuid.getnode() as fallback. Returns ------- bytes Serial number as string """ try: with open('/proc/cpuinfo', 'r') as f: for line in f: if line.startswith('Serial'): serial = line.split(':')[1].strip() return serial.encode('utf-8') except Exception as exc: logger.error("Error getting serial number: %r, using uuid.", exc) return str(uuid.getnode()).encode('utf-8') if __name__ == "__main__": logger.info("Client starting ...") # Read configuration file before starting logging as it may contain logging settings: cfg = read_config(CONFIG_FILE) if TESTING: logger.info("Starting in testing mode with yappi profiling ...") yappi.set_clock_type("cpu") yappi.start(builtins=True, profile_threads=True) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) logger.info("Starting up ...") client = ClientTask(get_serial_number()) client.start() client.join() if TESTING: yappi.stop() if write_yappi_stats(yappi): logger.error("Error writing yappi stats.") logger.info("Terminating ...")