diff options
| author | Franoosh <uinarf@autistici.org> | 2026-01-09 15:02:45 +0100 |
|---|---|---|
| committer | Franoosh <uinarf@autistici.org> | 2026-01-09 15:02:45 +0100 |
| commit | e803c9fa034187d6f530d8eb4513a4c1efb6edc7 (patch) | |
| tree | f02f5b3cf1a616bf7dd8655dadf9a867f34b43f2 | |
| parent | 1bf339c91139dc14ab85dfc25bd85d12f0dae7e6 (diff) | |
| download | ZeroMQ_Video_Streaming-e803c9fa034187d6f530d8eb4513a4c1efb6edc7.tar.gz ZeroMQ_Video_Streaming-e803c9fa034187d6f530d8eb4513a4c1efb6edc7.tar.bz2 ZeroMQ_Video_Streaming-e803c9fa034187d6f530d8eb4513a4c1efb6edc7.zip | |
Add generating client ID from serial number. Refactoring
| -rw-r--r-- | client.py | 251 |
1 files changed, 155 insertions, 96 deletions
@@ -14,13 +14,14 @@ from collections import deque, defaultdict import time import datetime import signal +import uuid import logging from configparser import ConfigParser +from concurrent.futures import ThreadPoolExecutor import zmq import cv2 import inotify.adapters import yappi -from concurrent.futures import ThreadPoolExecutor from helpers import ( CustomLoggingFormatter, @@ -52,21 +53,14 @@ if not os.path.exists(LOGDIR): BASENAME = f"{os.path.splitext(os.path.basename(__file__))[0]}" LOGFILE = os.path.join(LOGDIR, f"{BASENAME}.log") YAAPI_LOG_FILE_BASE = "client_yaapi" -# LOGLEVEL = logging.INFO -LOGLEVEL = logging.DEBUG +LOGLEVEL = logging.INFO ROUTER_ADDRESS = "localhost" PORT = "5569" ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}" -# This is populated at startup with all cameras present in /dev -# (like /dev/video0, /dev/video1, etc.). -# This will include dummy devices! -# If cameras are already defined in config file, those will override present devicesl: -KNOWN_CAMERA_PARAMETERS = ('address', 'contour_size_threshold', 'movement_grace_period') CONTOUR_SIZE_THRESHOLD = 4000 # Minimum contour area to consider MOVEMENT_GRACE_PERIOD = 10 # Seconds to wait after movement stops TIME_FORMAT = '%Y_%m_%d-%H_%M_%S' -START_STOP_MESSAGE_FMT = "{action}:{data}" MAX_CAMERA_NAME_LENGTH = 256 FPS = 30 # This should be unique, each box different. @@ -111,7 +105,7 @@ def _encoder_worker( encode_queue: Queue, output_queue: Queue, jpeg_quality=60, - ) -> None: + ) -> None: while True: try: identity, frame = encode_queue.get(timeout=1) @@ -166,9 +160,14 @@ class ClientVideo(Thread): super().__init__(daemon=True) self.identity = _id self.device_dict = device_dict # <-- A dict with bytes for strings and integers for ints. - self.device_addr = self.device_dict.get(b'address') # No default. - self.device_threshold = device_dict[b'contour_size_threshold'] - self.device_grace_period = device_dict[b'movement_grace_period'] + try: + self.device_addr = self.device_dict[b'address'] # No default. + except KeyError: + logger.error("ClientVideo %r: No address specified for device, cannot start video thread.", self.identity) + logger.error("ClientVideo %r: Device dict: %r", self.identity, self.device_dict) + raise + self.device_threshold = float(device_dict.get(b'contour_size_threshold', CONTOUR_SIZE_THRESHOLD).decode()) + self.device_grace_period = float(device_dict.get(b'movement_grace_period', MOVEMENT_GRACE_PERIOD).decode()) self.queue = queue # Store only FPS number of frames, compare first and last: self.frame_deque = deque(maxlen=FPS) @@ -262,14 +261,14 @@ class ClientVideo(Thread): sample_frames = self.frame_deque[0][0], self.frame_deque[-1][0] try: contours = compute_contours(sample_frames) - logger.debug("ClientVideo %r: Found %d contours.", self.identity, len(contours)) - logger.debug("ClientVideo %r: Contours: %r", self.identity, contours) + # logger.debug("ClientVideo %r: Found %d contours.", self.identity, len(contours)) + # logger.debug("ClientVideo %r: Contours: %r", self.identity, contours) except Exception as exc: logger.error("ClientVideo %r: Error computing contours: %r", self.identity, exc) continue try: movement_now = detect_movement(contours, min_area=self.device_threshold) - logger.debug("ClientVideo %r: Movement detected in frame.", self.identity) + # logger.debug("ClientVideo %r: Movement detected in frame.", self.identity) except Exception as exc: logger.error("ClientVideo %r: Error detecting movement: %r", self.identity, exc) continue @@ -420,9 +419,14 @@ class ClientTask(Thread): try: message = self.socket.recv_multipart() logger.info("Client '%r' received message: %r.", self.identity, message) - self.update_config(message) except Exception as exc: logger.error("Failed to receive message: %r", exc) + try: + logger.debug("Client '%r': Updating config from message ...", self.identity) + self.update_config(message) + except Exception as exc: + logger.error("Failed to update config: %r", exc) + except zmq.ZMQError as exc: logger.error("Client '%r': socket error: %r", self.identity, exc) time.sleep(0.01) # Sleep to avoid busy waiting @@ -446,7 +450,6 @@ class ClientTask(Thread): logger.debug("Sending message of length %r ...", len(frame)) try: if len(frame) > 1: - # TODO: This blocks? self.socket.send_multipart(frame) else: logger.debug("Client '%r': Dummy message received, not sending.", self.identity) @@ -477,77 +480,77 @@ class ClientTask(Thread): ------- None """ + logger.debug("Client received config message: %r", message) try: - logger.debug("Client received config message: %r", message) - try: - camera_id, directive, value = message # <-- bytes - except ValueError as exc: - logger.error("Invalid config message format: %r", exc) - return - # Parameter validation happens on the server side before sending the message - if directive in ( - b'modify_camera_name', - b'modify_camera_threshold', - b'modify_camera_grace_pd', - ): - # Ok, just validate that camera exists: - if camera_id not in CAMERAS: # <-- bytes - logger.error("Cannot update unknown camera: %r", camera_id) - logger.debug("Known cameras: %r", list(CAMERAS.keys())) - else: - if directive == b'modify_camera_name': - old_name, new_name = camera_id, value # <-- bytes - # Update global CAMERAS dict: - CAMERAS[new_name] = CAMERAS.pop(old_name) - # Update config file accordingly: - for key, value in cfg.items('cameras'): # <-- strings - if key.startswith(old_name.encode('utf-8') + '.'): - # Take the second part of a key like 'camera1.address' -> 'address': - param = key.split('.', 1)[1] - cfg.set('cameras', f"{new_name.encode('utf-8')}.{param}", value.encode('utf-8')) - cfg.remove_option('cameras', key) - # Update video threads dict: - self.video_threads[new_name] = self.video_threads.pop(old_name) - # Send message to server to update routing: - # FIXME: This is a mess. Settle on a format for data field. - queue_metadata_message( - identity=self.identity, - queue=self.queue, - action=b'rename', - # data=':'.join([old_name.decode('utf-8'), new_name.decode('utf-8'), timestamp]) # FIXME: this should be datetime object - data=timestamp_to_bytes(time.time()), - ) - self.send_messages() - logger.info("Renamed and restarted thread for camera %r to %r.", old_name, new_name) - elif directive == b'modify_camera_threshold': - try: - _value = int(value) - except ValueError as exc: - logger.error("Invalid contour size threshold value: %r, %r", value, exc) - return - CAMERAS[camera_id][b'contour_size_threshold'] = _value # <-- integer! - self.video_threads[camera_id].device_threshold = _value # <-- integer! - # configparser requires strings: - cfg.set('cameras', f"{camera_id.decode('utf-8')}.contour_size_threshold", str(value)) - logger.info("Modified contour size threshold of camera %r to %d.", camera_id, value) - elif directive == b'modify_camera_grace_pd': - try: - _value = int(value) - except ValueError as exc: - logger.error("Invalid movement grace period value: %r, %r", value, exc) - return - CAMERAS[camera_id][b'movement_grace_period'] = _value # <-- integer! - self.video_threads[camera_id].device_grace_period = _value # <-- integer! - cfg.set('cameras', f"{camera_id.decode('utf-8')}.movement_grace_period", str(value)) - logger.info("Modified movement grace period of camera %r to %d.", camera_id, value) - elif directive == b'add_camera': + camera_id, directive, value = message # <-- bytes + except ValueError as exc: + logger.error("Invalid config message format: %r", exc) + return + # Parameter validation happens on the server side before sending the message + # Just validate that camera exists: + if camera_id not in CAMERAS: # <-- bytes + logger.error("Cannot update unknown camera: %r", camera_id) + logger.error("DEBUG: Known cameras: %r", list(CAMERAS.keys())) + + match directive: + case b'modify_camera_name': + logger.debug("Renaming camera %r to %r ...", camera_id, value) + try: + new_name_bytes, old_name_bytes = value, camera_id + # Update global CAMERAS dict: + CAMERAS[new_name_bytes] = CAMERAS.pop(old_name_bytes) + # Update config file accordingly: + old_name, new_name = camera_id.decode('utf-8'), value.decode('utf-8') + for key, val in cfg.items('cameras'): # <-- strings + if key.startswith(old_name + '.'): + # Take the second part of a key like 'camera1.address' -> 'address': + param = key.split('.', 1)[1] + cfg.set('cameras', f"{new_name}.{param}", val) + cfg.remove_option('cameras', key) + # Update video threads dict: + self.video_threads[value] = self.video_threads.pop(camera_id) + # Send message to server to update routing: + # FIXME: This is a mess. Settle on a format for data field. + queue_metadata_message( + identity=new_name_bytes, + queue=self.queue, + action=b'rename', + # data=':'.join([old_name.decode('utf-8'), new_name.decode('utf-8'), timestamp]) + timestamp=timestamp_to_bytes(time.time()), + ) + self.send_messages() + logger.info("Renamed and restarted thread for camera %r to %r.", old_name, new_name) + except Exception as exc: + logger.error("Error renaming camera %r to %r: %r", old_name, new_name, exc) + case b'modify_camera_threshold': + try: + _value = int(value) + except ValueError as exc: + logger.error("Invalid contour size threshold value: %r, %r", value, exc) + return + CAMERAS[camera_id][b'contour_size_threshold'] = _value # <-- integer! + self.video_threads[camera_id].device_threshold = _value # <-- integer! + # configparser requires strings: + cfg.set('cameras', f"{camera_id.decode('utf-8')}.contour_size_threshold", str(value)) + logger.info("Modified contour size threshold of camera %r to %d.", camera_id, value) + case b'modify_camera_grace_pd': + try: + _value = int(value) + except ValueError as exc: + logger.error("Invalid movement grace period value: %r, %r", value, exc) + return + CAMERAS[camera_id][b'movement_grace_period'] = _value # <-- integer! + self.video_threads[camera_id].device_grace_period = _value # <-- integer! + cfg.set('cameras', f"{camera_id.decode('utf-8')}.movement_grace_period", str(value)) + logger.info("Modified movement grace period of camera %r to %d.", camera_id, value) + case b'add_camera': CAMERAS[camera_id] = {b'address': value} # vid_cl = ClientVideo(camera_id, CAMERAS[camera_id], self.queue) # vid_cl.start() # self.video_threads[camera_id] = vid_cl cfg.set('cameras', f"{camera_id.decode('utf-8')}.address", value.decode('utf-8')) logger.info("Added and started a thread for new camera %r with address %r.", camera_id, value) - elif directive == b'remove_camera': + case b'remove_camera': if camera_id in self.video_threads: self.video_threads[camera_id].stop() # self.video_threads[camera_id].join() # TODO: Check this is needed and wanted! @@ -559,11 +562,9 @@ class ClientTask(Thread): if key.startswith(camera_id.decode('utf-8') + '.'): cfg.remove_option('cameras', key) logger.info("Removed and stopped thread for camera %r.", camera_id) - else: + case _: logger.warning("Unknown config directive: %r", directive) - except (ValueError, IndexError) as exc: - logger.error("Failed to handle config message: %r", exc) with open(CONFIG_FILE, 'w') as f: cfg.write(f) @@ -726,12 +727,13 @@ def queue_video_message(identity: bytes, queue: Queue, frame) -> None: # Put message to queue: queue.put(message) + def queue_metadata_message( identity: bytes, queue: Queue, action: bytes, timestamp: float, - ) -> None: + ) -> None: """ Queue metadata message to be sent to server. @@ -755,6 +757,7 @@ def queue_metadata_message( message = (identity, action, timestamp) queue.put(message) + def find_present_cameras() -> dict: """ Find currently present camera devices in /dev. @@ -774,6 +777,7 @@ def find_present_cameras() -> dict: present_cameras[entry] = os.path.join('/dev', entry) return present_cameras + def read_config(conf_file) -> ConfigParser: """ Read config file and return as dictionary. @@ -794,6 +798,7 @@ def read_config(conf_file) -> ConfigParser: cfg = ConfigParser() cfg.read(conf_file) cameras_dict = None + global CAMERAS # Need to update logging info: if 'main' in cfg: global TESTING @@ -839,17 +844,21 @@ def read_config(conf_file) -> ConfigParser: logger.warning("No 'cameras' section in config file, using defaults: %r.", CAMERAS) found_cameras = find_present_cameras() for cam_name, cam_addr in found_cameras.items(): - CAMERAS[cam_name.encode('utf-8')] = {'address': cam_addr.encode('utf-8')} # <-- bytes + CAMERAS[cam_name.encode('utf-8')] = {b'address': cam_addr.encode('utf-8')} # <-- bytes # Use default parameters: - cfg.set('cameras', f"{cam_name}.address", cam_addr) - cfg.set('cameras', f"{cam_name}.contour_size_threshold", str(CONTOUR_SIZE_THRESHOLD)) - cfg.set('cameras', f"{cam_name}.movement_grace_period", str(MOVEMENT_GRACE_PERIOD)) + if camera_address_unique(cfg, cam_addr): + cfg.set('cameras', f"{cam_name}.address", cam_addr) + cfg.set('cameras', f"{cam_name}.contour_size_threshold", str(CONTOUR_SIZE_THRESHOLD)) + cfg.set('cameras', f"{cam_name}.movement_grace_period", str(MOVEMENT_GRACE_PERIOD)) + else: + logger.warning("Camera address %r already present in config file, skipping adding camera %r.", cam_addr, cam_name) logger.info("CAMERAS configuration: %r", CAMERAS) with open(conf_file, 'w') as configfile: cfg.write(configfile) return cfg + def validate_camera_address(address: str) -> bool: """ Validate camera address. @@ -869,6 +878,7 @@ def validate_camera_address(address: str) -> bool: return address raise ValueError("Invalid camera address: %r" % address) + def validate_camera_threshold(threshold: str) -> bool: """ Validate camera contour size threshold. @@ -890,6 +900,7 @@ def validate_camera_threshold(threshold: str) -> bool: raise ValueError("Invalid contour size threshold: %r" % threshold) raise ValueError("Invalid contour size threshold: %r" % threshold) + def validate_camera_grace_pd(grace_pd: str) -> bool: """ Validate camera movement grace period. @@ -911,6 +922,33 @@ def validate_camera_grace_pd(grace_pd: str) -> bool: raise ValueError("Invalid movement grace period: %r" % grace_pd) raise ValueError("Invalid movement grace period: %r" % grace_pd) + +def camera_address_unique(cfg: ConfigParser, address: str) -> bool: + """ + Check if camera address is unique in config file. + + Parameters + ---------- + cfg : ConfigParser + ConfigParser object with configuration + address : str + Camera address + + Returns + ------- + bool + True if address is unique, False otherwise + """ + for key, val in cfg.items('cameras'): + try: + _, param = key.split('.', 1) + if param == 'address' and val == address: + return False + except ValueError: + continue + return True + + def set_up_cameras(cameras_cfg_section: dict) -> None: """ Set up camera configuration from config file section. @@ -931,19 +969,16 @@ def set_up_cameras(cameras_cfg_section: dict) -> None: for key, val in cameras_cfg_section: # <-- strings try: cam, param = key.split('.', 1) - except ValueError as exc: - logger.error( - "Invalid camera configuration entry: %r, error: %r, using defaults.", key, exc) match param: case 'address': try: - cameras_dict[cam][param] = validate_camera_address(val) # <-- still strings + cameras_dict[cam][param] = validate_camera_address(val) # <-- still strings except ValueError as exc: logger.error("Invalid camera address for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('address', '')) continue case 'contour_size_threshold': try: - cameras_dict[cam][param] = validate_camera_threshold(val) # <-- still strings + cameras_dict[cam][param] = validate_camera_threshold(val) # <-- still strings except ValueError as exc: logger.error("Invalid contour size threshold for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('contour_size_threshold', '')) cameras_dict[cam][param] = str(CONTOUR_SIZE_THRESHOLD) @@ -957,6 +992,9 @@ def set_up_cameras(cameras_cfg_section: dict) -> None: continue case _: logger.warning("Unknown camera %r parameter: %r", cam, param) + except ValueError as exc: + logger.error( + "Invalid camera configuration entry: %r, error: %r, using defaults.", key, exc) # Camera section needs to contain at least an address: for key in list(cameras_dict.keys()): @@ -976,6 +1014,27 @@ def signal_handler(sig, frame) -> None: if client.is_alive(): client.stop() +def get_serial_number() -> bytes: + """ + Get the serial number of the machine. + If unable to get serial number, use uuid.getnode() as fallback. + + Returns + ------- + bytes + Serial number as string + """ + try: + with open('/proc/cpuinfo', 'r') as f: + for line in f: + if line.startswith('Serial'): + serial = line.split(':')[1].strip() + return serial.encode('utf-8') + except Exception as exc: + logger.error("Error getting serial number: %r, using uuid.", exc) + + return str(uuid.getnode()).encode('utf-8') + if __name__ == "__main__": logger.info("Client starting ...") @@ -993,7 +1052,7 @@ if __name__ == "__main__": logger.info("Starting up ...") - client = ClientTask(CLIENT_ID) + client = ClientTask(get_serial_number()) client.start() client.join() |
