aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client.py85
-rw-r--r--router.py17
-rw-r--r--webserver.py5
-rw-r--r--worker.py9
4 files changed, 52 insertions, 64 deletions
diff --git a/client.py b/client.py
index 4b1c6a9..81c8f8b 100644
--- a/client.py
+++ b/client.py
@@ -118,14 +118,14 @@ def _encoder_worker(
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_quality]
try:
_, buffer = cv2.imencode('.jpg', frame, encode_param)
+ try:
+ message = [identity, buffer.tobytes()]
+ output_queue.put_nowait(message)
+ except Exception as exc:
+ logger.debug("Send queue full, dropping encoded frame: %r", exc)
+ time.sleep(0.1)
except Exception as exc:
logger.error("Encoder worker: Error encoding frame: %r", exc)
- try:
- message = [identity, buffer.tobytes()]
- output_queue.put_nowait(message)
- except Exception as exc:
- logger.debug("Send queue full, dropping encoded frame: %r", exc)
- time.sleep(0.1)
class ClientVideo(Thread):
"""
@@ -161,15 +161,15 @@ class ClientVideo(Thread):
"""
super().__init__(daemon=True)
self.identity = _id
- self.device_dict = device_dict # <-- A dict with bytes for strings and integers for ints.
+ self.device_dict = device_dict # <-- A dict with bytes
try:
self.device_addr = self.device_dict[b'address'] # No default.
except KeyError:
logger.error("ClientVideo %r: No address specified for device, cannot start video thread.", self.identity)
logger.error("ClientVideo %r: Device dict: %r", self.identity, self.device_dict)
raise
- self.device_threshold = float(device_dict.get(b'contour_size_threshold', CONTOUR_SIZE_THRESHOLD).decode())
- self.device_grace_period = float(device_dict.get(b'movement_grace_period', MOVEMENT_GRACE_PERIOD).decode())
+ self.device_threshold = int(device_dict.get(b'contour_size_threshold', CONTOUR_SIZE_THRESHOLD))
+ self.device_grace_period = int(device_dict.get(b'movement_grace_period', MOVEMENT_GRACE_PERIOD))
self.queue = queue
# Store only FPS number of frames, compare first and last:
self.frame_deque = deque(maxlen=FPS)
@@ -184,7 +184,7 @@ class ClientVideo(Thread):
Parameters
----------
contours: list
- list of contours to draw on a frame
+ List of contours to draw on a frame
"""
raw_frame = self.frame_deque[-1][2]
@@ -218,7 +218,6 @@ class ClientVideo(Thread):
Parameters
----------
- None
Returns
-------
@@ -241,7 +240,7 @@ class ClientVideo(Thread):
# Flag to track movement state. Expires after grace period:
movement_detected = False
- movement_detected_start = movement_now = None
+ movement_detected_start = None
while self.live and not stop_event.is_set():
ret, frame = cap.read()
if not ret:
@@ -325,7 +324,6 @@ class ClientVideo(Thread):
Parameters
----------
- None
Returns
-------
@@ -364,7 +362,7 @@ class ClientTask(Thread):
run
Main client logic
stop
- Stop client task
+ Terminate client task
receive_messages
Receive messages from the server
send_messages
@@ -404,7 +402,6 @@ class ClientTask(Thread):
Parameters
----------
- None
Returns
-------
@@ -420,6 +417,7 @@ class ClientTask(Thread):
logger.info("Client '%r' received message: %r.", self.identity, message)
except Exception as exc:
logger.error("Failed to receive message: %r", exc)
+ continue
try:
logger.debug("Client '%r': Updating config from message ...", self.identity)
self.update_config(message)
@@ -436,7 +434,6 @@ class ClientTask(Thread):
Parameters
----------
- None
Returns
-------
@@ -520,7 +517,7 @@ class ClientTask(Thread):
self.send_messages()
logger.info("Renamed and restarted thread for camera %r to %r.", old_name, new_name)
except Exception as exc:
- logger.error("Error renaming camera %r to %r: %r", old_name, new_name, exc)
+ logger.error("Error renaming camera %r to %r: %r", camera_id, value, exc)
case b'modify_camera_threshold':
try:
_value = int(value)
@@ -577,7 +574,6 @@ class ClientTask(Thread):
Parameters
----------
- None
Returns
-------
@@ -632,7 +628,6 @@ class ClientTask(Thread):
Parameters
----------
- None
Returns
-------
@@ -677,7 +672,6 @@ class ClientTask(Thread):
Parameters
----------
- None
Returns
-------
@@ -744,7 +738,7 @@ def queue_metadata_message(
Queue to put message to
action : bytes
Action as bytes ('start' or 'stop')
- data : float
+ timestamp : float
Data string: this depends on action, for start/stop it's a timestamp, for rename it's old_name:new_name:timestamp
Returns
@@ -763,7 +757,6 @@ def find_present_cameras() -> dict:
Parameters
----------
- None
Returns
-------
@@ -874,8 +867,8 @@ def validate_camera_address(address: str) -> bool:
"""
# Check if address is an integer (device index):
if address.startswith('/dev/video'):
- return address
- raise ValueError("Invalid camera address: %r" % address)
+ return True
+ raise False
def validate_camera_threshold(threshold: str) -> bool:
@@ -894,10 +887,10 @@ def validate_camera_threshold(threshold: str) -> bool:
"""
try:
if int(threshold) > 0:
- return threshold
+ return True
except ValueError:
- raise ValueError("Invalid contour size threshold: %r" % threshold)
- raise ValueError("Invalid contour size threshold: %r" % threshold)
+ raise False
+ return False
def validate_camera_grace_pd(grace_pd: str) -> bool:
@@ -916,10 +909,10 @@ def validate_camera_grace_pd(grace_pd: str) -> bool:
"""
try:
if int(grace_pd) >= 0:
- return grace_pd
+ return True
except ValueError:
- raise ValueError("Invalid movement grace period: %r" % grace_pd)
- raise ValueError("Invalid movement grace period: %r" % grace_pd)
+ return False
+ return False
def camera_address_unique(cfg: ConfigParser, address: str) -> bool:
@@ -948,18 +941,18 @@ def camera_address_unique(cfg: ConfigParser, address: str) -> bool:
return True
-def set_up_cameras(cameras_cfg_section: dict) -> None:
+def set_up_cameras(cameras_cfg_section: list[tuple[str, str]]) -> dict:
"""
Set up camera configuration from config file section.
Validate camera parameters and apply defaults if necessary.
Parameters
----------
- cameras_section : dict
- Dictionary of camera configuration from config file
+ cameras_cfg_section : list[tuple(str, str)]
+ List of (key, val) tuples of camera configuration items from config file.
Returns
-------
cameras_dict : dict
- Dictionary to store validated camera configuration
+ Dictionary to store validated camera configuration.
"""
# ConfigParser does not support nested sections.
# These shenanigans below allow for a nested config simulation with configparser
@@ -970,23 +963,23 @@ def set_up_cameras(cameras_cfg_section: dict) -> None:
cam, param = key.split('.', 1)
match param:
case 'address':
- try:
- cameras_dict[cam][param] = validate_camera_address(val) # <-- still strings
- except ValueError as exc:
- logger.error("Invalid camera address for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('address', ''))
+ if validate_camera_address(val):
+ cameras_dict[cam][param] = val # <-- still strings
+ else:
+ logger.error("Invalid camera address for camera %r: %r, using default: %r.", cam, val, cameras_dict[cam].get('address', ''))
continue
case 'contour_size_threshold':
- try:
- cameras_dict[cam][param] = validate_camera_threshold(val) # <-- still strings
- except ValueError as exc:
- logger.error("Invalid contour size threshold for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('contour_size_threshold', ''))
+ if validate_camera_threshold(val):
+ cameras_dict[cam][param] = val # <-- still strings
+ else:
+ logger.error("Invalid contour size threshold for camera %r: %r, using default: %r.", cam, val, cameras_dict[cam].get('contour_size_threshold', ''))
cameras_dict[cam][param] = str(CONTOUR_SIZE_THRESHOLD)
continue
case 'movement_grace_period':
- try:
- cameras_dict[cam][param] = validate_camera_grace_pd(val) # <-- still strings
- except ValueError as exc:
- logger.error("Invalid movement grace period for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('movement_grace_period', ''))
+ if validate_camera_grace_pd(val):
+ cameras_dict[cam][param] = val # <-- still strings
+ else:
+ logger.error("Invalid movement grace period for camera %r: %r, using default: %r.", cam, val, cameras_dict[cam].get('movement_grace_period', ''))
cameras_dict[cam][param] = str(MOVEMENT_GRACE_PERIOD)
continue
case _:
diff --git a/router.py b/router.py
index 3c92826..f76b75f 100644
--- a/router.py
+++ b/router.py
@@ -64,7 +64,6 @@ def custom_proxy() -> None:
Parameters
----------
- None
Returns
-------
@@ -86,13 +85,13 @@ def custom_proxy() -> None:
backend_socket.bind(BACKEND_ADDR)
poller.register(backend_socket, zmq.POLLIN)
- awailable_workers = []
+ available_workers = []
client_worker_dict = {}
# There are three pending messages dictionaries:
# 1. pending_messages_no_worker[client_id] - messages that are pending for a client
# 2. pending_messages_client_worker[client_id][worker_id] - messages that are pending for a client with a specific worker
# 3. pending_messages_worker_client[worker_id][client_id] - messages that are pending for a worker
- # To assure delivey of the entire video we need to store messages from start to end until they are delivered
+ # To assure delivery of the entire video we need to store messages from start to end until they are delivered
# to the client.
# If a worker is not available for certain amount of time, we will reassign it to another worker.
# For now, there is only one worker. Remember to implement this later.
@@ -153,9 +152,9 @@ def custom_proxy() -> None:
except KeyError:
logger.info("Received ID from client: %r.", client_id)
# Check if there are available workers.
- if awailable_workers:
+ if available_workers:
# Assign random worker to client:
- worker_id = random.choice(awailable_workers)
+ worker_id = random.choice(available_workers)
# Update client-worker dictionary:
client_worker_dict[client_id] = worker_id
# Check if there are any pending messages for this client:
@@ -186,7 +185,7 @@ def custom_proxy() -> None:
# At last, send the message to worker:
backend_socket.send_multipart([worker_id, client_id, *content])
except Exception as e:
- logger.error("Failed to send message for client %r to backend, requeing. Error: %r", client_id, e)
+ logger.error("Failed to send message for client %r to backend, requeuing. Error: %r", client_id, e)
# Store message for later delivery:
pending_messages_client_worker[client_id][worker_id].appendleft(msg)
@@ -204,7 +203,7 @@ def custom_proxy() -> None:
logger.debug("Received ID from worker: %r, message: %r", worker_id, msg)
logger.info("Received ID from worker: %r.", worker_id)
# Add worker to available workers list:
- awailable_workers.append(worker_id)
+ available_workers.append(worker_id)
for client, worker in client_worker_dict.items():
# If client has no worker assigned, assign it a new worker:
if worker is None:
@@ -252,9 +251,9 @@ def custom_proxy() -> None:
try:
frontend_socket.send_multipart([client_id, *msg])
logger.debug("Sent message %r to client %r", msg, client_id)
- except Exception as e:
+ except Exception as exc:
pending_messages_worker_client[worker_id][client_id].appendleft(msg)
- logger.error("Could not deliver message: %r from worker: %r to client: %r", msg, worker_id, client_id)
+ logger.error("Could not deliver message: %r from worker: %r to client: %r, exc: %r", msg, worker_id, client_id, exc)
def signal_handler(sig, frame) -> None:
global RUNNING
diff --git a/webserver.py b/webserver.py
index 7fa2e3c..3a95dca 100644
--- a/webserver.py
+++ b/webserver.py
@@ -85,7 +85,6 @@ async def zmq_bridge():
Parameters
----------
- None
Returns
-------
@@ -242,6 +241,9 @@ async def camera_route(websocket: WebSocket, client_id: str, camera_id: str) ->
logger.info("Accepting websocket connection for '/ws/%s/%s'.", client_id, camera_id)
await websocket.accept()
ws_connections[client_id][camera_id] = {'ws': websocket}
+ # Ensure queue exists (zmq_bridge may not have created it yet)
+ if camera_id not in ws_queues[client_id]:
+ ws_queues[client_id][camera_id] = asyncio.Queue(maxsize=10)
queue = ws_queues[client_id][camera_id]
async def send_frames():
@@ -555,4 +557,5 @@ if __name__ == "__main__":
port=8007,
host='127.0.0.1',
log_level='info',
+ # uds="/run/zmq_webserver.sock"
)
diff --git a/worker.py b/worker.py
index c3dc9d6..9f6086b 100644
--- a/worker.py
+++ b/worker.py
@@ -122,7 +122,6 @@ class MonitorTask(Thread):
Parameters
----------
- None
Returns
-------
@@ -161,7 +160,6 @@ class MonitorTask(Thread):
Parameters
----------
- None
Returns
-------
@@ -340,7 +338,6 @@ class ServerWorker(Thread):
# except Exception as exc:
# logger.error("Sending camera metadata update to websocket failed: %r", exc)
- return False
def handle_video_message(self, msg: list) -> None:
logger.debug("Received video message with data only.")
@@ -352,7 +349,7 @@ class ServerWorker(Thread):
self.video_threads[client_id][camera_id].queue.put(content[0])
# Send only [client_id, camera_id, jpeg_bytes] to the webserver:
- # zmq subsciber can subscribe to a topic defined by the first
+ # zmq subscriber can subscribe to a topic defined by the first
# part of the multipart message, so in order to allow for a
# per camera subscription, we need to join client_id and camera_id
topic = b':'.join([client_id, camera_id])
@@ -368,7 +365,6 @@ class ServerWorker(Thread):
Parameters
----------
- None
Returns
-------
@@ -459,7 +455,6 @@ class ServerWorker(Thread):
Parameters
----------
- None
Returns
-------
@@ -524,7 +519,6 @@ class VideoWorker(Thread):
Parameters
----------
- None
Returns
-------
@@ -539,7 +533,6 @@ class VideoWorker(Thread):
Parameters
----------
- None
Returns
-------