From a241288241a9dc300014ecfc93c4f537900f3e9f Mon Sep 17 00:00:00 2001 From: Franoosh Date: Sun, 11 Jan 2026 14:26:25 +0100 Subject: Cleanup. --- client.py | 85 ++++++++++++++++++++++++++++-------------------------------- router.py | 17 ++++++------ webserver.py | 5 +++- worker.py | 9 +------ 4 files changed, 52 insertions(+), 64 deletions(-) diff --git a/client.py b/client.py index 4b1c6a9..81c8f8b 100644 --- a/client.py +++ b/client.py @@ -118,14 +118,14 @@ def _encoder_worker( encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_quality] try: _, buffer = cv2.imencode('.jpg', frame, encode_param) + try: + message = [identity, buffer.tobytes()] + output_queue.put_nowait(message) + except Exception as exc: + logger.debug("Send queue full, dropping encoded frame: %r", exc) + time.sleep(0.1) except Exception as exc: logger.error("Encoder worker: Error encoding frame: %r", exc) - try: - message = [identity, buffer.tobytes()] - output_queue.put_nowait(message) - except Exception as exc: - logger.debug("Send queue full, dropping encoded frame: %r", exc) - time.sleep(0.1) class ClientVideo(Thread): """ @@ -161,15 +161,15 @@ class ClientVideo(Thread): """ super().__init__(daemon=True) self.identity = _id - self.device_dict = device_dict # <-- A dict with bytes for strings and integers for ints. + self.device_dict = device_dict # <-- A dict with bytes try: self.device_addr = self.device_dict[b'address'] # No default. except KeyError: logger.error("ClientVideo %r: No address specified for device, cannot start video thread.", self.identity) logger.error("ClientVideo %r: Device dict: %r", self.identity, self.device_dict) raise - self.device_threshold = float(device_dict.get(b'contour_size_threshold', CONTOUR_SIZE_THRESHOLD).decode()) - self.device_grace_period = float(device_dict.get(b'movement_grace_period', MOVEMENT_GRACE_PERIOD).decode()) + self.device_threshold = int(device_dict.get(b'contour_size_threshold', CONTOUR_SIZE_THRESHOLD)) + self.device_grace_period = int(device_dict.get(b'movement_grace_period', MOVEMENT_GRACE_PERIOD)) self.queue = queue # Store only FPS number of frames, compare first and last: self.frame_deque = deque(maxlen=FPS) @@ -184,7 +184,7 @@ class ClientVideo(Thread): Parameters ---------- contours: list - list of contours to draw on a frame + List of contours to draw on a frame """ raw_frame = self.frame_deque[-1][2] @@ -218,7 +218,6 @@ class ClientVideo(Thread): Parameters ---------- - None Returns ------- @@ -241,7 +240,7 @@ class ClientVideo(Thread): # Flag to track movement state. Expires after grace period: movement_detected = False - movement_detected_start = movement_now = None + movement_detected_start = None while self.live and not stop_event.is_set(): ret, frame = cap.read() if not ret: @@ -325,7 +324,6 @@ class ClientVideo(Thread): Parameters ---------- - None Returns ------- @@ -364,7 +362,7 @@ class ClientTask(Thread): run Main client logic stop - Stop client task + Terminate client task receive_messages Receive messages from the server send_messages @@ -404,7 +402,6 @@ class ClientTask(Thread): Parameters ---------- - None Returns ------- @@ -420,6 +417,7 @@ class ClientTask(Thread): logger.info("Client '%r' received message: %r.", self.identity, message) except Exception as exc: logger.error("Failed to receive message: %r", exc) + continue try: logger.debug("Client '%r': Updating config from message ...", self.identity) self.update_config(message) @@ -436,7 +434,6 @@ class ClientTask(Thread): Parameters ---------- - None Returns ------- @@ -520,7 +517,7 @@ class ClientTask(Thread): self.send_messages() logger.info("Renamed and restarted thread for camera %r to %r.", old_name, new_name) except Exception as exc: - logger.error("Error renaming camera %r to %r: %r", old_name, new_name, exc) + logger.error("Error renaming camera %r to %r: %r", camera_id, value, exc) case b'modify_camera_threshold': try: _value = int(value) @@ -577,7 +574,6 @@ class ClientTask(Thread): Parameters ---------- - None Returns ------- @@ -632,7 +628,6 @@ class ClientTask(Thread): Parameters ---------- - None Returns ------- @@ -677,7 +672,6 @@ class ClientTask(Thread): Parameters ---------- - None Returns ------- @@ -744,7 +738,7 @@ def queue_metadata_message( Queue to put message to action : bytes Action as bytes ('start' or 'stop') - data : float + timestamp : float Data string: this depends on action, for start/stop it's a timestamp, for rename it's old_name:new_name:timestamp Returns @@ -763,7 +757,6 @@ def find_present_cameras() -> dict: Parameters ---------- - None Returns ------- @@ -874,8 +867,8 @@ def validate_camera_address(address: str) -> bool: """ # Check if address is an integer (device index): if address.startswith('/dev/video'): - return address - raise ValueError("Invalid camera address: %r" % address) + return True + raise False def validate_camera_threshold(threshold: str) -> bool: @@ -894,10 +887,10 @@ def validate_camera_threshold(threshold: str) -> bool: """ try: if int(threshold) > 0: - return threshold + return True except ValueError: - raise ValueError("Invalid contour size threshold: %r" % threshold) - raise ValueError("Invalid contour size threshold: %r" % threshold) + raise False + return False def validate_camera_grace_pd(grace_pd: str) -> bool: @@ -916,10 +909,10 @@ def validate_camera_grace_pd(grace_pd: str) -> bool: """ try: if int(grace_pd) >= 0: - return grace_pd + return True except ValueError: - raise ValueError("Invalid movement grace period: %r" % grace_pd) - raise ValueError("Invalid movement grace period: %r" % grace_pd) + return False + return False def camera_address_unique(cfg: ConfigParser, address: str) -> bool: @@ -948,18 +941,18 @@ def camera_address_unique(cfg: ConfigParser, address: str) -> bool: return True -def set_up_cameras(cameras_cfg_section: dict) -> None: +def set_up_cameras(cameras_cfg_section: list[tuple[str, str]]) -> dict: """ Set up camera configuration from config file section. Validate camera parameters and apply defaults if necessary. Parameters ---------- - cameras_section : dict - Dictionary of camera configuration from config file + cameras_cfg_section : list[tuple(str, str)] + List of (key, val) tuples of camera configuration items from config file. Returns ------- cameras_dict : dict - Dictionary to store validated camera configuration + Dictionary to store validated camera configuration. """ # ConfigParser does not support nested sections. # These shenanigans below allow for a nested config simulation with configparser @@ -970,23 +963,23 @@ def set_up_cameras(cameras_cfg_section: dict) -> None: cam, param = key.split('.', 1) match param: case 'address': - try: - cameras_dict[cam][param] = validate_camera_address(val) # <-- still strings - except ValueError as exc: - logger.error("Invalid camera address for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('address', '')) + if validate_camera_address(val): + cameras_dict[cam][param] = val # <-- still strings + else: + logger.error("Invalid camera address for camera %r: %r, using default: %r.", cam, val, cameras_dict[cam].get('address', '')) continue case 'contour_size_threshold': - try: - cameras_dict[cam][param] = validate_camera_threshold(val) # <-- still strings - except ValueError as exc: - logger.error("Invalid contour size threshold for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('contour_size_threshold', '')) + if validate_camera_threshold(val): + cameras_dict[cam][param] = val # <-- still strings + else: + logger.error("Invalid contour size threshold for camera %r: %r, using default: %r.", cam, val, cameras_dict[cam].get('contour_size_threshold', '')) cameras_dict[cam][param] = str(CONTOUR_SIZE_THRESHOLD) continue case 'movement_grace_period': - try: - cameras_dict[cam][param] = validate_camera_grace_pd(val) # <-- still strings - except ValueError as exc: - logger.error("Invalid movement grace period for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('movement_grace_period', '')) + if validate_camera_grace_pd(val): + cameras_dict[cam][param] = val # <-- still strings + else: + logger.error("Invalid movement grace period for camera %r: %r, using default: %r.", cam, val, cameras_dict[cam].get('movement_grace_period', '')) cameras_dict[cam][param] = str(MOVEMENT_GRACE_PERIOD) continue case _: diff --git a/router.py b/router.py index 3c92826..f76b75f 100644 --- a/router.py +++ b/router.py @@ -64,7 +64,6 @@ def custom_proxy() -> None: Parameters ---------- - None Returns ------- @@ -86,13 +85,13 @@ def custom_proxy() -> None: backend_socket.bind(BACKEND_ADDR) poller.register(backend_socket, zmq.POLLIN) - awailable_workers = [] + available_workers = [] client_worker_dict = {} # There are three pending messages dictionaries: # 1. pending_messages_no_worker[client_id] - messages that are pending for a client # 2. pending_messages_client_worker[client_id][worker_id] - messages that are pending for a client with a specific worker # 3. pending_messages_worker_client[worker_id][client_id] - messages that are pending for a worker - # To assure delivey of the entire video we need to store messages from start to end until they are delivered + # To assure delivery of the entire video we need to store messages from start to end until they are delivered # to the client. # If a worker is not available for certain amount of time, we will reassign it to another worker. # For now, there is only one worker. Remember to implement this later. @@ -153,9 +152,9 @@ def custom_proxy() -> None: except KeyError: logger.info("Received ID from client: %r.", client_id) # Check if there are available workers. - if awailable_workers: + if available_workers: # Assign random worker to client: - worker_id = random.choice(awailable_workers) + worker_id = random.choice(available_workers) # Update client-worker dictionary: client_worker_dict[client_id] = worker_id # Check if there are any pending messages for this client: @@ -186,7 +185,7 @@ def custom_proxy() -> None: # At last, send the message to worker: backend_socket.send_multipart([worker_id, client_id, *content]) except Exception as e: - logger.error("Failed to send message for client %r to backend, requeing. Error: %r", client_id, e) + logger.error("Failed to send message for client %r to backend, requeuing. Error: %r", client_id, e) # Store message for later delivery: pending_messages_client_worker[client_id][worker_id].appendleft(msg) @@ -204,7 +203,7 @@ def custom_proxy() -> None: logger.debug("Received ID from worker: %r, message: %r", worker_id, msg) logger.info("Received ID from worker: %r.", worker_id) # Add worker to available workers list: - awailable_workers.append(worker_id) + available_workers.append(worker_id) for client, worker in client_worker_dict.items(): # If client has no worker assigned, assign it a new worker: if worker is None: @@ -252,9 +251,9 @@ def custom_proxy() -> None: try: frontend_socket.send_multipart([client_id, *msg]) logger.debug("Sent message %r to client %r", msg, client_id) - except Exception as e: + except Exception as exc: pending_messages_worker_client[worker_id][client_id].appendleft(msg) - logger.error("Could not deliver message: %r from worker: %r to client: %r", msg, worker_id, client_id) + logger.error("Could not deliver message: %r from worker: %r to client: %r, exc: %r", msg, worker_id, client_id, exc) def signal_handler(sig, frame) -> None: global RUNNING diff --git a/webserver.py b/webserver.py index 7fa2e3c..3a95dca 100644 --- a/webserver.py +++ b/webserver.py @@ -85,7 +85,6 @@ async def zmq_bridge(): Parameters ---------- - None Returns ------- @@ -242,6 +241,9 @@ async def camera_route(websocket: WebSocket, client_id: str, camera_id: str) -> logger.info("Accepting websocket connection for '/ws/%s/%s'.", client_id, camera_id) await websocket.accept() ws_connections[client_id][camera_id] = {'ws': websocket} + # Ensure queue exists (zmq_bridge may not have created it yet) + if camera_id not in ws_queues[client_id]: + ws_queues[client_id][camera_id] = asyncio.Queue(maxsize=10) queue = ws_queues[client_id][camera_id] async def send_frames(): @@ -555,4 +557,5 @@ if __name__ == "__main__": port=8007, host='127.0.0.1', log_level='info', + # uds="/run/zmq_webserver.sock" ) diff --git a/worker.py b/worker.py index c3dc9d6..9f6086b 100644 --- a/worker.py +++ b/worker.py @@ -122,7 +122,6 @@ class MonitorTask(Thread): Parameters ---------- - None Returns ------- @@ -161,7 +160,6 @@ class MonitorTask(Thread): Parameters ---------- - None Returns ------- @@ -340,7 +338,6 @@ class ServerWorker(Thread): # except Exception as exc: # logger.error("Sending camera metadata update to websocket failed: %r", exc) - return False def handle_video_message(self, msg: list) -> None: logger.debug("Received video message with data only.") @@ -352,7 +349,7 @@ class ServerWorker(Thread): self.video_threads[client_id][camera_id].queue.put(content[0]) # Send only [client_id, camera_id, jpeg_bytes] to the webserver: - # zmq subsciber can subscribe to a topic defined by the first + # zmq subscriber can subscribe to a topic defined by the first # part of the multipart message, so in order to allow for a # per camera subscription, we need to join client_id and camera_id topic = b':'.join([client_id, camera_id]) @@ -368,7 +365,6 @@ class ServerWorker(Thread): Parameters ---------- - None Returns ------- @@ -459,7 +455,6 @@ class ServerWorker(Thread): Parameters ---------- - None Returns ------- @@ -524,7 +519,6 @@ class VideoWorker(Thread): Parameters ---------- - None Returns ------- @@ -539,7 +533,6 @@ class VideoWorker(Thread): Parameters ---------- - None Returns ------- -- cgit v1.3