aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client.py251
1 files changed, 155 insertions, 96 deletions
diff --git a/client.py b/client.py
index fc0d3f7..58fed68 100644
--- a/client.py
+++ b/client.py
@@ -14,13 +14,14 @@ from collections import deque, defaultdict
import time
import datetime
import signal
+import uuid
import logging
from configparser import ConfigParser
+from concurrent.futures import ThreadPoolExecutor
import zmq
import cv2
import inotify.adapters
import yappi
-from concurrent.futures import ThreadPoolExecutor
from helpers import (
CustomLoggingFormatter,
@@ -52,21 +53,14 @@ if not os.path.exists(LOGDIR):
BASENAME = f"{os.path.splitext(os.path.basename(__file__))[0]}"
LOGFILE = os.path.join(LOGDIR, f"{BASENAME}.log")
YAAPI_LOG_FILE_BASE = "client_yaapi"
-# LOGLEVEL = logging.INFO
-LOGLEVEL = logging.DEBUG
+LOGLEVEL = logging.INFO
ROUTER_ADDRESS = "localhost"
PORT = "5569"
ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}"
-# This is populated at startup with all cameras present in /dev
-# (like /dev/video0, /dev/video1, etc.).
-# This will include dummy devices!
-# If cameras are already defined in config file, those will override present devicesl:
-KNOWN_CAMERA_PARAMETERS = ('address', 'contour_size_threshold', 'movement_grace_period')
CONTOUR_SIZE_THRESHOLD = 4000 # Minimum contour area to consider
MOVEMENT_GRACE_PERIOD = 10 # Seconds to wait after movement stops
TIME_FORMAT = '%Y_%m_%d-%H_%M_%S'
-START_STOP_MESSAGE_FMT = "{action}:{data}"
MAX_CAMERA_NAME_LENGTH = 256
FPS = 30
# This should be unique, each box different.
@@ -111,7 +105,7 @@ def _encoder_worker(
encode_queue: Queue,
output_queue: Queue,
jpeg_quality=60,
- ) -> None:
+ ) -> None:
while True:
try:
identity, frame = encode_queue.get(timeout=1)
@@ -166,9 +160,14 @@ class ClientVideo(Thread):
super().__init__(daemon=True)
self.identity = _id
self.device_dict = device_dict # <-- A dict with bytes for strings and integers for ints.
- self.device_addr = self.device_dict.get(b'address') # No default.
- self.device_threshold = device_dict[b'contour_size_threshold']
- self.device_grace_period = device_dict[b'movement_grace_period']
+ try:
+ self.device_addr = self.device_dict[b'address'] # No default.
+ except KeyError:
+ logger.error("ClientVideo %r: No address specified for device, cannot start video thread.", self.identity)
+ logger.error("ClientVideo %r: Device dict: %r", self.identity, self.device_dict)
+ raise
+ self.device_threshold = float(device_dict.get(b'contour_size_threshold', CONTOUR_SIZE_THRESHOLD).decode())
+ self.device_grace_period = float(device_dict.get(b'movement_grace_period', MOVEMENT_GRACE_PERIOD).decode())
self.queue = queue
# Store only FPS number of frames, compare first and last:
self.frame_deque = deque(maxlen=FPS)
@@ -262,14 +261,14 @@ class ClientVideo(Thread):
sample_frames = self.frame_deque[0][0], self.frame_deque[-1][0]
try:
contours = compute_contours(sample_frames)
- logger.debug("ClientVideo %r: Found %d contours.", self.identity, len(contours))
- logger.debug("ClientVideo %r: Contours: %r", self.identity, contours)
+ # logger.debug("ClientVideo %r: Found %d contours.", self.identity, len(contours))
+ # logger.debug("ClientVideo %r: Contours: %r", self.identity, contours)
except Exception as exc:
logger.error("ClientVideo %r: Error computing contours: %r", self.identity, exc)
continue
try:
movement_now = detect_movement(contours, min_area=self.device_threshold)
- logger.debug("ClientVideo %r: Movement detected in frame.", self.identity)
+ # logger.debug("ClientVideo %r: Movement detected in frame.", self.identity)
except Exception as exc:
logger.error("ClientVideo %r: Error detecting movement: %r", self.identity, exc)
continue
@@ -420,9 +419,14 @@ class ClientTask(Thread):
try:
message = self.socket.recv_multipart()
logger.info("Client '%r' received message: %r.", self.identity, message)
- self.update_config(message)
except Exception as exc:
logger.error("Failed to receive message: %r", exc)
+ try:
+ logger.debug("Client '%r': Updating config from message ...", self.identity)
+ self.update_config(message)
+ except Exception as exc:
+ logger.error("Failed to update config: %r", exc)
+
except zmq.ZMQError as exc:
logger.error("Client '%r': socket error: %r", self.identity, exc)
time.sleep(0.01) # Sleep to avoid busy waiting
@@ -446,7 +450,6 @@ class ClientTask(Thread):
logger.debug("Sending message of length %r ...", len(frame))
try:
if len(frame) > 1:
- # TODO: This blocks?
self.socket.send_multipart(frame)
else:
logger.debug("Client '%r': Dummy message received, not sending.", self.identity)
@@ -477,77 +480,77 @@ class ClientTask(Thread):
-------
None
"""
+ logger.debug("Client received config message: %r", message)
try:
- logger.debug("Client received config message: %r", message)
- try:
- camera_id, directive, value = message # <-- bytes
- except ValueError as exc:
- logger.error("Invalid config message format: %r", exc)
- return
- # Parameter validation happens on the server side before sending the message
- if directive in (
- b'modify_camera_name',
- b'modify_camera_threshold',
- b'modify_camera_grace_pd',
- ):
- # Ok, just validate that camera exists:
- if camera_id not in CAMERAS: # <-- bytes
- logger.error("Cannot update unknown camera: %r", camera_id)
- logger.debug("Known cameras: %r", list(CAMERAS.keys()))
- else:
- if directive == b'modify_camera_name':
- old_name, new_name = camera_id, value # <-- bytes
- # Update global CAMERAS dict:
- CAMERAS[new_name] = CAMERAS.pop(old_name)
- # Update config file accordingly:
- for key, value in cfg.items('cameras'): # <-- strings
- if key.startswith(old_name.encode('utf-8') + '.'):
- # Take the second part of a key like 'camera1.address' -> 'address':
- param = key.split('.', 1)[1]
- cfg.set('cameras', f"{new_name.encode('utf-8')}.{param}", value.encode('utf-8'))
- cfg.remove_option('cameras', key)
- # Update video threads dict:
- self.video_threads[new_name] = self.video_threads.pop(old_name)
- # Send message to server to update routing:
- # FIXME: This is a mess. Settle on a format for data field.
- queue_metadata_message(
- identity=self.identity,
- queue=self.queue,
- action=b'rename',
- # data=':'.join([old_name.decode('utf-8'), new_name.decode('utf-8'), timestamp]) # FIXME: this should be datetime object
- data=timestamp_to_bytes(time.time()),
- )
- self.send_messages()
- logger.info("Renamed and restarted thread for camera %r to %r.", old_name, new_name)
- elif directive == b'modify_camera_threshold':
- try:
- _value = int(value)
- except ValueError as exc:
- logger.error("Invalid contour size threshold value: %r, %r", value, exc)
- return
- CAMERAS[camera_id][b'contour_size_threshold'] = _value # <-- integer!
- self.video_threads[camera_id].device_threshold = _value # <-- integer!
- # configparser requires strings:
- cfg.set('cameras', f"{camera_id.decode('utf-8')}.contour_size_threshold", str(value))
- logger.info("Modified contour size threshold of camera %r to %d.", camera_id, value)
- elif directive == b'modify_camera_grace_pd':
- try:
- _value = int(value)
- except ValueError as exc:
- logger.error("Invalid movement grace period value: %r, %r", value, exc)
- return
- CAMERAS[camera_id][b'movement_grace_period'] = _value # <-- integer!
- self.video_threads[camera_id].device_grace_period = _value # <-- integer!
- cfg.set('cameras', f"{camera_id.decode('utf-8')}.movement_grace_period", str(value))
- logger.info("Modified movement grace period of camera %r to %d.", camera_id, value)
- elif directive == b'add_camera':
+ camera_id, directive, value = message # <-- bytes
+ except ValueError as exc:
+ logger.error("Invalid config message format: %r", exc)
+ return
+ # Parameter validation happens on the server side before sending the message
+ # Just validate that camera exists:
+ if camera_id not in CAMERAS: # <-- bytes
+ logger.error("Cannot update unknown camera: %r", camera_id)
+ logger.error("DEBUG: Known cameras: %r", list(CAMERAS.keys()))
+
+ match directive:
+ case b'modify_camera_name':
+ logger.debug("Renaming camera %r to %r ...", camera_id, value)
+ try:
+ new_name_bytes, old_name_bytes = value, camera_id
+ # Update global CAMERAS dict:
+ CAMERAS[new_name_bytes] = CAMERAS.pop(old_name_bytes)
+ # Update config file accordingly:
+ old_name, new_name = camera_id.decode('utf-8'), value.decode('utf-8')
+ for key, val in cfg.items('cameras'): # <-- strings
+ if key.startswith(old_name + '.'):
+ # Take the second part of a key like 'camera1.address' -> 'address':
+ param = key.split('.', 1)[1]
+ cfg.set('cameras', f"{new_name}.{param}", val)
+ cfg.remove_option('cameras', key)
+ # Update video threads dict:
+ self.video_threads[value] = self.video_threads.pop(camera_id)
+ # Send message to server to update routing:
+ # FIXME: This is a mess. Settle on a format for data field.
+ queue_metadata_message(
+ identity=new_name_bytes,
+ queue=self.queue,
+ action=b'rename',
+ # data=':'.join([old_name.decode('utf-8'), new_name.decode('utf-8'), timestamp])
+ timestamp=timestamp_to_bytes(time.time()),
+ )
+ self.send_messages()
+ logger.info("Renamed and restarted thread for camera %r to %r.", old_name, new_name)
+ except Exception as exc:
+ logger.error("Error renaming camera %r to %r: %r", old_name, new_name, exc)
+ case b'modify_camera_threshold':
+ try:
+ _value = int(value)
+ except ValueError as exc:
+ logger.error("Invalid contour size threshold value: %r, %r", value, exc)
+ return
+ CAMERAS[camera_id][b'contour_size_threshold'] = _value # <-- integer!
+ self.video_threads[camera_id].device_threshold = _value # <-- integer!
+ # configparser requires strings:
+ cfg.set('cameras', f"{camera_id.decode('utf-8')}.contour_size_threshold", str(value))
+ logger.info("Modified contour size threshold of camera %r to %d.", camera_id, value)
+ case b'modify_camera_grace_pd':
+ try:
+ _value = int(value)
+ except ValueError as exc:
+ logger.error("Invalid movement grace period value: %r, %r", value, exc)
+ return
+ CAMERAS[camera_id][b'movement_grace_period'] = _value # <-- integer!
+ self.video_threads[camera_id].device_grace_period = _value # <-- integer!
+ cfg.set('cameras', f"{camera_id.decode('utf-8')}.movement_grace_period", str(value))
+ logger.info("Modified movement grace period of camera %r to %d.", camera_id, value)
+ case b'add_camera':
CAMERAS[camera_id] = {b'address': value}
# vid_cl = ClientVideo(camera_id, CAMERAS[camera_id], self.queue)
# vid_cl.start()
# self.video_threads[camera_id] = vid_cl
cfg.set('cameras', f"{camera_id.decode('utf-8')}.address", value.decode('utf-8'))
logger.info("Added and started a thread for new camera %r with address %r.", camera_id, value)
- elif directive == b'remove_camera':
+ case b'remove_camera':
if camera_id in self.video_threads:
self.video_threads[camera_id].stop()
# self.video_threads[camera_id].join() # TODO: Check this is needed and wanted!
@@ -559,11 +562,9 @@ class ClientTask(Thread):
if key.startswith(camera_id.decode('utf-8') + '.'):
cfg.remove_option('cameras', key)
logger.info("Removed and stopped thread for camera %r.", camera_id)
- else:
+ case _:
logger.warning("Unknown config directive: %r", directive)
- except (ValueError, IndexError) as exc:
- logger.error("Failed to handle config message: %r", exc)
with open(CONFIG_FILE, 'w') as f:
cfg.write(f)
@@ -726,12 +727,13 @@ def queue_video_message(identity: bytes, queue: Queue, frame) -> None:
# Put message to queue:
queue.put(message)
+
def queue_metadata_message(
identity: bytes,
queue: Queue,
action: bytes,
timestamp: float,
- ) -> None:
+ ) -> None:
"""
Queue metadata message to be sent to server.
@@ -755,6 +757,7 @@ def queue_metadata_message(
message = (identity, action, timestamp)
queue.put(message)
+
def find_present_cameras() -> dict:
"""
Find currently present camera devices in /dev.
@@ -774,6 +777,7 @@ def find_present_cameras() -> dict:
present_cameras[entry] = os.path.join('/dev', entry)
return present_cameras
+
def read_config(conf_file) -> ConfigParser:
"""
Read config file and return as dictionary.
@@ -794,6 +798,7 @@ def read_config(conf_file) -> ConfigParser:
cfg = ConfigParser()
cfg.read(conf_file)
cameras_dict = None
+ global CAMERAS
# Need to update logging info:
if 'main' in cfg:
global TESTING
@@ -839,17 +844,21 @@ def read_config(conf_file) -> ConfigParser:
logger.warning("No 'cameras' section in config file, using defaults: %r.", CAMERAS)
found_cameras = find_present_cameras()
for cam_name, cam_addr in found_cameras.items():
- CAMERAS[cam_name.encode('utf-8')] = {'address': cam_addr.encode('utf-8')} # <-- bytes
+ CAMERAS[cam_name.encode('utf-8')] = {b'address': cam_addr.encode('utf-8')} # <-- bytes
# Use default parameters:
- cfg.set('cameras', f"{cam_name}.address", cam_addr)
- cfg.set('cameras', f"{cam_name}.contour_size_threshold", str(CONTOUR_SIZE_THRESHOLD))
- cfg.set('cameras', f"{cam_name}.movement_grace_period", str(MOVEMENT_GRACE_PERIOD))
+ if camera_address_unique(cfg, cam_addr):
+ cfg.set('cameras', f"{cam_name}.address", cam_addr)
+ cfg.set('cameras', f"{cam_name}.contour_size_threshold", str(CONTOUR_SIZE_THRESHOLD))
+ cfg.set('cameras', f"{cam_name}.movement_grace_period", str(MOVEMENT_GRACE_PERIOD))
+ else:
+ logger.warning("Camera address %r already present in config file, skipping adding camera %r.", cam_addr, cam_name)
logger.info("CAMERAS configuration: %r", CAMERAS)
with open(conf_file, 'w') as configfile:
cfg.write(configfile)
return cfg
+
def validate_camera_address(address: str) -> bool:
"""
Validate camera address.
@@ -869,6 +878,7 @@ def validate_camera_address(address: str) -> bool:
return address
raise ValueError("Invalid camera address: %r" % address)
+
def validate_camera_threshold(threshold: str) -> bool:
"""
Validate camera contour size threshold.
@@ -890,6 +900,7 @@ def validate_camera_threshold(threshold: str) -> bool:
raise ValueError("Invalid contour size threshold: %r" % threshold)
raise ValueError("Invalid contour size threshold: %r" % threshold)
+
def validate_camera_grace_pd(grace_pd: str) -> bool:
"""
Validate camera movement grace period.
@@ -911,6 +922,33 @@ def validate_camera_grace_pd(grace_pd: str) -> bool:
raise ValueError("Invalid movement grace period: %r" % grace_pd)
raise ValueError("Invalid movement grace period: %r" % grace_pd)
+
+def camera_address_unique(cfg: ConfigParser, address: str) -> bool:
+ """
+ Check if camera address is unique in config file.
+
+ Parameters
+ ----------
+ cfg : ConfigParser
+ ConfigParser object with configuration
+ address : str
+ Camera address
+
+ Returns
+ -------
+ bool
+ True if address is unique, False otherwise
+ """
+ for key, val in cfg.items('cameras'):
+ try:
+ _, param = key.split('.', 1)
+ if param == 'address' and val == address:
+ return False
+ except ValueError:
+ continue
+ return True
+
+
def set_up_cameras(cameras_cfg_section: dict) -> None:
"""
Set up camera configuration from config file section.
@@ -931,19 +969,16 @@ def set_up_cameras(cameras_cfg_section: dict) -> None:
for key, val in cameras_cfg_section: # <-- strings
try:
cam, param = key.split('.', 1)
- except ValueError as exc:
- logger.error(
- "Invalid camera configuration entry: %r, error: %r, using defaults.", key, exc)
match param:
case 'address':
try:
- cameras_dict[cam][param] = validate_camera_address(val) # <-- still strings
+ cameras_dict[cam][param] = validate_camera_address(val) # <-- still strings
except ValueError as exc:
logger.error("Invalid camera address for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('address', ''))
continue
case 'contour_size_threshold':
try:
- cameras_dict[cam][param] = validate_camera_threshold(val) # <-- still strings
+ cameras_dict[cam][param] = validate_camera_threshold(val) # <-- still strings
except ValueError as exc:
logger.error("Invalid contour size threshold for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('contour_size_threshold', ''))
cameras_dict[cam][param] = str(CONTOUR_SIZE_THRESHOLD)
@@ -957,6 +992,9 @@ def set_up_cameras(cameras_cfg_section: dict) -> None:
continue
case _:
logger.warning("Unknown camera %r parameter: %r", cam, param)
+ except ValueError as exc:
+ logger.error(
+ "Invalid camera configuration entry: %r, error: %r, using defaults.", key, exc)
# Camera section needs to contain at least an address:
for key in list(cameras_dict.keys()):
@@ -976,6 +1014,27 @@ def signal_handler(sig, frame) -> None:
if client.is_alive():
client.stop()
+def get_serial_number() -> bytes:
+ """
+ Get the serial number of the machine.
+ If unable to get serial number, use uuid.getnode() as fallback.
+
+ Returns
+ -------
+ bytes
+ Serial number as string
+ """
+ try:
+ with open('/proc/cpuinfo', 'r') as f:
+ for line in f:
+ if line.startswith('Serial'):
+ serial = line.split(':')[1].strip()
+ return serial.encode('utf-8')
+ except Exception as exc:
+ logger.error("Error getting serial number: %r, using uuid.", exc)
+
+ return str(uuid.getnode()).encode('utf-8')
+
if __name__ == "__main__":
logger.info("Client starting ...")
@@ -993,7 +1052,7 @@ if __name__ == "__main__":
logger.info("Starting up ...")
- client = ClientTask(CLIENT_ID)
+ client = ClientTask(get_serial_number())
client.start()
client.join()