aboutsummaryrefslogtreecommitdiff
path: root/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'client.py')
-rw-r--r--client.py1006
1 files changed, 1006 insertions, 0 deletions
diff --git a/client.py b/client.py
new file mode 100644
index 0000000..fc0d3f7
--- /dev/null
+++ b/client.py
@@ -0,0 +1,1006 @@
+#!/usr/bin/env python
+
+"""
+A module containing client for streaming video to a zmq router.
+"""
+
+__author__ = "Franoosh Corporation"
+
+import sys
+import os
+from threading import Thread, Event
+from queue import Queue, Empty
+from collections import deque, defaultdict
+import time
+import datetime
+import signal
+import logging
+from configparser import ConfigParser
+import zmq
+import cv2
+import inotify.adapters
+import yappi
+from concurrent.futures import ThreadPoolExecutor
+
+from helpers import (
+ CustomLoggingFormatter,
+ process_frame,
+ compute_contours,
+ detect_movement,
+ scale_contours,
+ draw_contours,
+ str_to_bytes,
+ timestamp_to_bytes,
+ bytes_to_timestamp,
+ write_yappi_stats,
+)
+
+###################################################
+# Defaults and constants #
+###################################################
+
+TESTING = False # For now this turns on yappi profiling
+CONFIG_FILE = "client.cfg"
+# Those will be updated from config file if present:
+LOGDIR = 'logs'
+if not os.path.exists(LOGDIR):
+ try:
+ os.makedirs(LOGDIR)
+ except Exception as exc:
+ print(f"Could not create log directory: {exc}")
+ sys.exit()
+BASENAME = f"{os.path.splitext(os.path.basename(__file__))[0]}"
+LOGFILE = os.path.join(LOGDIR, f"{BASENAME}.log")
+YAAPI_LOG_FILE_BASE = "client_yaapi"
+# LOGLEVEL = logging.INFO
+LOGLEVEL = logging.DEBUG
+
+ROUTER_ADDRESS = "localhost"
+PORT = "5569"
+ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}"
+# This is populated at startup with all cameras present in /dev
+# (like /dev/video0, /dev/video1, etc.).
+# This will include dummy devices!
+# If cameras are already defined in config file, those will override present devicesl:
+KNOWN_CAMERA_PARAMETERS = ('address', 'contour_size_threshold', 'movement_grace_period')
+CONTOUR_SIZE_THRESHOLD = 4000 # Minimum contour area to consider
+MOVEMENT_GRACE_PERIOD = 10 # Seconds to wait after movement stops
+TIME_FORMAT = '%Y_%m_%d-%H_%M_%S'
+START_STOP_MESSAGE_FMT = "{action}:{data}"
+MAX_CAMERA_NAME_LENGTH = 256
+FPS = 30
+# This should be unique, each box different.
+# If client ID is wrong, should server reject connection?
+# TODO: Devise and implement schema for client ID assignment.
+# Maybe serial number from /proc/cpuinfo.
+# In bytes.
+CLIENT_ID = b"client_001"
+
+#####################################################
+# End defaults and constants #
+#####################################################
+
+log_formatter = CustomLoggingFormatter()
+# Attempt to create logging directory if it does not exist:
+if not os.path.exists(LOGDIR):
+ try:
+ os.makedirs(LOGDIR)
+ except Exception as exc:
+ print("Could not create log directory: %r", exc)
+try:
+ handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
+except Exception as exc:
+ handler = logging.StreamHandler()
+# Prevent log pollution from inotify when LOG_LEVEL set to debug:
+logging.getLogger("inotify").setLevel(logging.WARNING)
+handler.setFormatter(log_formatter)
+logging.root.addHandler(handler)
+logging.root.setLevel(LOGLEVEL)
+logger = logging.getLogger(__name__)
+logging.basicConfig(
+ filename=LOGFILE,
+ datefmt=TIME_FORMAT,
+ level=LOGLEVEL,
+)
+
+CAMERAS = {}
+stop_event = Event()
+
+
+def _encoder_worker(
+ encode_queue: Queue,
+ output_queue: Queue,
+ jpeg_quality=60,
+ ) -> None:
+ while True:
+ try:
+ identity, frame = encode_queue.get(timeout=1)
+ except Empty:
+ logger.debug("Encoder worker: encode queue empty, continuing ...")
+ continue
+ # Encode frame as JPEG with lower quality:
+ encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_quality]
+ try:
+ _, buffer = cv2.imencode('.jpg', frame, encode_param)
+ except Exception as exc:
+ logger.error("Encoder worker: Error encoding frame: %r", exc)
+ try:
+ message = [identity, buffer.tobytes()]
+ output_queue.put_nowait(message)
+ except Exception as exc:
+ logger.debug("Send queue full, dropping encoded frame: %r", exc)
+ time.sleep(0.1)
+
+class ClientVideo(Thread):
+ """
+ Class for sending video stream
+
+ Attributes:
+ ----------
+ _id: (bytes)
+ Camera ID
+ device_dict: dict
+ Dictionary with device parameters (bytes for strings, integers for ints)
+ queue: Queue
+ Queue for sending messages to main client task
+
+ Methods:
+ --------
+ run
+ Main video streaming logic
+ stop
+ Stop video streaming
+ """
+
+ def __init__(self, _id, device_dict, queue) -> None:
+ """
+ Parameters
+ ----------
+ _id : bytes
+ Camera ID
+ device_dict : dict
+ Dictionary with device parameters (bytes for strings, integers for ints)
+ queue : Queue
+ Queue for sending messages to main client task
+ """
+ super().__init__(daemon=True)
+ self.identity = _id
+ self.device_dict = device_dict # <-- A dict with bytes for strings and integers for ints.
+ self.device_addr = self.device_dict.get(b'address') # No default.
+ self.device_threshold = device_dict[b'contour_size_threshold']
+ self.device_grace_period = device_dict[b'movement_grace_period']
+ self.queue = queue
+ # Store only FPS number of frames, compare first and last:
+ self.frame_deque = deque(maxlen=FPS)
+ self.encode_queue = Queue(maxsize=3)
+ self._enc_executor = ThreadPoolExecutor(max_workers=1)
+ self.live = True
+
+ def send_frame(self, contours: list) -> None:
+ """
+ Prepare and send frame with drawn contours.
+
+ Parameters
+ ----------
+ contours: list
+ list of contours to draw on a frame
+ """
+
+ raw_frame = self.frame_deque[-1][2]
+ scaling_factor = self.frame_deque[-1][1]
+ scaled_contours = scale_contours(contours, scaling_factor)
+ try:
+ frame_out = draw_contours(
+ raw_frame,
+ scaled_contours,
+ min_contour_area=self.device_threshold,
+ )
+ except Exception as exc:
+ logger.error("ClientVideo %r: Error drawing contours: %r", self.identity, exc)
+ frame_out = raw_frame
+
+ # Send video message with contour and frame:
+ try:
+ logger.debug("ClientVideo %r: Queueing frame for encoding ...", self.identity)
+ self.encode_queue.put_nowait((self.identity, frame_out))
+ except Exception as exc:
+ try:
+ logger.debug("ClientVideo %r: Encode queue full, dropping oldest frame: %r", self.identity, exc)
+ _ = self.encode_queue.get_nowait() # Remove oldest frame
+ self.encode_queue.put_nowait((self.identity, frame_out))
+ except Exception:
+ pass
+
+ def run(self) -> None:
+ """
+ Video streaming logic.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ logger.debug("ClientVideo '%s' starting ...", self.identity)
+ # The cv2.CAP_V4l2 below forces reading using v4l backend rather than gstreamer.
+ # If opencv was compiled with gstreamer support, it will default to gstreamer.
+ # Then we need to make sure that appropriate gstreamer plugins are installed.
+ # Although I haven't profiled it, it seems that using v4l uses less juice.
+ # Some devices can be opened with address (/dev/video0) and some only with integer index (0).
+ cap = cv2.VideoCapture(
+ self.device_addr.decode('utf-8'),
+ cv2.CAP_V4L2
+ )
+ if not cap.isOpened():
+ logger.error("ClientVideo %r: Unable to open video device %r", self.identity, self.device_addr)
+ cap.release()
+ return
+
+ # Flag to track movement state. Expires after grace period:
+ movement_detected = False
+ movement_detected_start = movement_now = None
+ while self.live and not stop_event.is_set():
+ ret, frame = cap.read()
+ if not ret:
+ logger.error("ClientVideo %r: Failed to read frame from device %r", self.identity, self.device_addr)
+ break
+ try:
+ # Produce grayscale blurred frame for contour detection:
+ processed_frame_tuple = process_frame(frame)
+ except RuntimeError as exc:
+ logger.error("ClientVideo %r: Error processing frame: %r", self.identity, exc)
+ continue
+ # Append both original and processed frame to deque:
+ # processed_frame_tuple = (processed_frame, scaling_factor, raw_frame)
+ self.frame_deque.append(processed_frame_tuple + (frame,))
+ # Skip on the first frame:
+ # To detect movement, compare only first and last frame from the queue of length FPS
+ # in order to reduce load:
+ if len(self.frame_deque) >= 2:
+ sample_frames = self.frame_deque[0][0], self.frame_deque[-1][0]
+ try:
+ contours = compute_contours(sample_frames)
+ logger.debug("ClientVideo %r: Found %d contours.", self.identity, len(contours))
+ logger.debug("ClientVideo %r: Contours: %r", self.identity, contours)
+ except Exception as exc:
+ logger.error("ClientVideo %r: Error computing contours: %r", self.identity, exc)
+ continue
+ try:
+ movement_now = detect_movement(contours, min_area=self.device_threshold)
+ logger.debug("ClientVideo %r: Movement detected in frame.", self.identity)
+ except Exception as exc:
+ logger.error("ClientVideo %r: Error detecting movement: %r", self.identity, exc)
+ continue
+
+ # Only update movement start time and send start message
+ # if movement is detected and was not detected before:
+ if movement_now:
+ if not movement_detected:
+ # Update movement detected start time:
+ movement_detected_start = time.time()
+ queue_metadata_message(
+ identity=self.identity,
+ queue=self.queue,
+ action=b'start',
+ timestamp=timestamp_to_bytes(movement_detected_start),
+ )
+ movement_detected = True
+
+ # Prepare and send frame with drawn contours:
+ self.send_frame(contours)
+
+ else:
+ if movement_detected:
+ # Movement was detected before, but not anymore.
+ # We wait for grace period before sending stop message.
+ now = time.time()
+ delta_seconds = now - movement_detected_start
+ logger.debug(
+ "ClientVideo %r: delta seconds since movement detected: %d, grace remaining: %d.",
+ self.identity,
+ delta_seconds,
+ self.device_grace_period - delta_seconds
+ )
+ if delta_seconds >= self.device_grace_period:
+ queue_metadata_message(
+ identity=self.identity,
+ queue=self.queue,
+ action=b'stop',
+ timestamp=timestamp_to_bytes(now),
+ )
+ movement_detected = False
+ movement_detected_start = None
+ else:
+ # Still in grace period, send frame with contours:
+ self.send_frame(contours)
+ else:
+ pass # No movement detected currently or before, do nothing.
+ logger.debug("Calling cap.release() ...")
+ cap.release()
+ logger.info("ClientVideo %r exiting ...", self.identity)
+
+ def stop(self) -> None:
+ """
+ Stop video streaming thread.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ logger.info("Client video %r sending stop message ...", self.identity)
+ self.live = False
+ global stop_event
+ stop_event.set()
+ # self.queue.put([]) # Put a dummy message to unblock queue if needed
+
+
+class ClientTask(Thread):
+ """
+ Main Client Task with logic, message handling and setup.
+
+ Attributes:
+ ----------
+ context: zmq.Context
+ ZMQ context
+ socket: zmq.Socket
+ ZMQ DEALER socket
+ identity: bytes
+ Client identity
+ poll: zmq.Poller
+ ZMQ poller for socket
+ video_threads: dict
+ Dictionary of ClientVideo threads
+ running: bool
+ Flag indicating if client is running
+ queue: Queue
+ Queue for sending messages to server
+
+ Methods:
+ --------
+ run
+ Main client logic
+ stop
+ Stop client task
+ receive_messages
+ Receive messages from the server
+ send_messages
+ Send messages to the server
+ update_config(message)
+ Update configuration based on message from server
+ watch_for_new_cameras
+ Watch for new camera devices using inotify
+ """
+
+ def __init__(self, _id, context: zmq.Context=None) -> None:
+ """
+ Parameters
+ ----------
+ _id : bytes
+ Client ID
+ context : zmq.Context, optional
+ ZMQ context, by default None
+ """
+ super().__init__(daemon=True)
+ self.context = context or zmq.Context.instance()
+ self.socket = self.context.socket(zmq.DEALER)
+ self.identity = self.socket.identity = _id # <-- bytes
+ self.poll = zmq.Poller()
+ self.poll.register(self.socket, zmq.POLLIN)
+ self.video_threads = {}
+ self.running = True
+ self.connected = False
+ self.queue = Queue()
+
+ def receive_messages(self) -> None:
+ """
+ Receive messages from the server.
+ Messages are expected to be multipart with:
+ [camera_id (bytes), directive (bytes), value (bytes)]
+ Update configuration based on received messages running update_config(message).
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ while self.running:
+ try:
+ sockets = dict(self.poll.poll(1000))
+ if self.socket in sockets:
+ logger.debug("Self socket in sockets, receiving message ...")
+ try:
+ message = self.socket.recv_multipart()
+ logger.info("Client '%r' received message: %r.", self.identity, message)
+ self.update_config(message)
+ except Exception as exc:
+ logger.error("Failed to receive message: %r", exc)
+ except zmq.ZMQError as exc:
+ logger.error("Client '%r': socket error: %r", self.identity, exc)
+ time.sleep(0.01) # Sleep to avoid busy waiting
+
+ def send_messages(self) -> None:
+ """
+ Send messages from queue to the server.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ while self.running:
+ try:
+ logger.debug("Client '%r': Waiting for message to send ...", self.identity)
+ frame = self.queue.get(timeout=0.1)
+ logger.debug("Sending message of length %r ...", len(frame))
+ try:
+ if len(frame) > 1:
+ # TODO: This blocks?
+ self.socket.send_multipart(frame)
+ else:
+ logger.debug("Client '%r': Dummy message received, not sending.", self.identity)
+ except zmq.ZMQError as exc:
+ logger.error("Client '%r': socket error: %r", self.identity, exc)
+ except Empty:
+ continue # No message to send, continue waiting
+
+ def update_config(self, message: list) -> None:
+ """
+ Update configuration based on message from server.
+ Only 'cameras' section of the config file can be updated.
+ Message format:
+ [camera_id (bytes), directive (bytes), value (bytes)]
+ Defined directives are:
+ modify_camera_name,
+ modify_camera_threshold,
+ modify_camera_grace_pd,
+ add_camera,
+ remove_camera,
+
+ Parameters
+ ----------
+ message : list
+ Message received from server
+
+ Returns
+ -------
+ None
+ """
+ try:
+ logger.debug("Client received config message: %r", message)
+ try:
+ camera_id, directive, value = message # <-- bytes
+ except ValueError as exc:
+ logger.error("Invalid config message format: %r", exc)
+ return
+ # Parameter validation happens on the server side before sending the message
+ if directive in (
+ b'modify_camera_name',
+ b'modify_camera_threshold',
+ b'modify_camera_grace_pd',
+ ):
+ # Ok, just validate that camera exists:
+ if camera_id not in CAMERAS: # <-- bytes
+ logger.error("Cannot update unknown camera: %r", camera_id)
+ logger.debug("Known cameras: %r", list(CAMERAS.keys()))
+ else:
+ if directive == b'modify_camera_name':
+ old_name, new_name = camera_id, value # <-- bytes
+ # Update global CAMERAS dict:
+ CAMERAS[new_name] = CAMERAS.pop(old_name)
+ # Update config file accordingly:
+ for key, value in cfg.items('cameras'): # <-- strings
+ if key.startswith(old_name.encode('utf-8') + '.'):
+ # Take the second part of a key like 'camera1.address' -> 'address':
+ param = key.split('.', 1)[1]
+ cfg.set('cameras', f"{new_name.encode('utf-8')}.{param}", value.encode('utf-8'))
+ cfg.remove_option('cameras', key)
+ # Update video threads dict:
+ self.video_threads[new_name] = self.video_threads.pop(old_name)
+ # Send message to server to update routing:
+ # FIXME: This is a mess. Settle on a format for data field.
+ queue_metadata_message(
+ identity=self.identity,
+ queue=self.queue,
+ action=b'rename',
+ # data=':'.join([old_name.decode('utf-8'), new_name.decode('utf-8'), timestamp]) # FIXME: this should be datetime object
+ data=timestamp_to_bytes(time.time()),
+ )
+ self.send_messages()
+ logger.info("Renamed and restarted thread for camera %r to %r.", old_name, new_name)
+ elif directive == b'modify_camera_threshold':
+ try:
+ _value = int(value)
+ except ValueError as exc:
+ logger.error("Invalid contour size threshold value: %r, %r", value, exc)
+ return
+ CAMERAS[camera_id][b'contour_size_threshold'] = _value # <-- integer!
+ self.video_threads[camera_id].device_threshold = _value # <-- integer!
+ # configparser requires strings:
+ cfg.set('cameras', f"{camera_id.decode('utf-8')}.contour_size_threshold", str(value))
+ logger.info("Modified contour size threshold of camera %r to %d.", camera_id, value)
+ elif directive == b'modify_camera_grace_pd':
+ try:
+ _value = int(value)
+ except ValueError as exc:
+ logger.error("Invalid movement grace period value: %r, %r", value, exc)
+ return
+ CAMERAS[camera_id][b'movement_grace_period'] = _value # <-- integer!
+ self.video_threads[camera_id].device_grace_period = _value # <-- integer!
+ cfg.set('cameras', f"{camera_id.decode('utf-8')}.movement_grace_period", str(value))
+ logger.info("Modified movement grace period of camera %r to %d.", camera_id, value)
+ elif directive == b'add_camera':
+ CAMERAS[camera_id] = {b'address': value}
+ # vid_cl = ClientVideo(camera_id, CAMERAS[camera_id], self.queue)
+ # vid_cl.start()
+ # self.video_threads[camera_id] = vid_cl
+ cfg.set('cameras', f"{camera_id.decode('utf-8')}.address", value.decode('utf-8'))
+ logger.info("Added and started a thread for new camera %r with address %r.", camera_id, value)
+ elif directive == b'remove_camera':
+ if camera_id in self.video_threads:
+ self.video_threads[camera_id].stop()
+ # self.video_threads[camera_id].join() # TODO: Check this is needed and wanted!
+ # del self.video_threads[camera_id]
+ if camera_id in CAMERAS:
+ del CAMERAS[camera_id]
+ # Remove from config file:
+ for key in list(cfg.options('cameras')): # <-- strings
+ if key.startswith(camera_id.decode('utf-8') + '.'):
+ cfg.remove_option('cameras', key)
+ logger.info("Removed and stopped thread for camera %r.", camera_id)
+ else:
+ logger.warning("Unknown config directive: %r", directive)
+
+ except (ValueError, IndexError) as exc:
+ logger.error("Failed to handle config message: %r", exc)
+ with open(CONFIG_FILE, 'w') as f:
+ cfg.write(f)
+
+ def watch_for_new_cameras(self) -> None:
+ """
+ Watch for new camera devices using inotify.
+ When a new camera device is detected in /dev (e.g., /dev/video2),
+ send a message to the server to start receiving video from it.
+ When a camera device is removed, send a message to stop receiving video from it.
+ Update config file accordingly.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ i = inotify.adapters.Inotify()
+ i.add_watch('/dev')
+
+ while self.running:
+ for event in i.event_gen(timeout_s=1, yield_nones=False):
+ (_, type_names, path, filename) = event
+ if filename.startswith('video'):
+ camera_pathname = os.path.join(path, filename)
+ if 'IN_CREATE' in type_names:
+ logger.info("Detected new camera device: %s", camera_pathname)
+ self.update_config([filename.encode('utf-8'), b'add_camera', camera_pathname.encode('utf-8')])
+ video_dict = {b'address': camera_pathname.encode('utf-8')}
+ self.start_video_thread(filename.encode('utf-8'), video_dict)
+ if 'IN_DELETE' in type_names:
+ logger.info("Detected removed camera device: %s", camera_pathname)
+ self.update_config([filename.encode('utf-8'), b'remove_camera', b''])
+ self.video_threads[filename.encode('utf-8')].stop()
+
+ def start_video_thread(self, camera_id: bytes, device_dict: dict) -> None:
+ """
+ Start a video thread for a given camera.
+
+ Parameters
+ ----------
+ camera_id : bytes
+ Camera ID
+ device_dict : dict
+ Dictionary with device parameters
+
+ Returns
+ -------
+ None
+ """
+ vid_cl = ClientVideo(camera_id, device_dict, self.queue)
+ vid_cl.start()
+ self.video_threads[camera_id] = vid_cl
+ logger.info("Started video thread for camera %r.", camera_id)
+ Thread(
+ target=_encoder_worker,
+ args=(vid_cl.encode_queue, self.queue,),
+ daemon=True,
+ ).start()
+
+ def run(self) -> None:
+ """
+ Run main client logic thread.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ self.socket.connect(ADDRESS)
+ logger.debug("Client '%r' connected to %r", self.identity, ADDRESS)
+ logger.debug("Starting video threads ...")
+ for _id, device_dict in CAMERAS.items():
+ self.start_video_thread(_id, device_dict)
+
+ recv_thread = Thread(target=self.receive_messages) # Not a daemon
+ send_thread = Thread(target=self.send_messages) # Not a daemon
+ watchdog_thread = Thread(target=self.watch_for_new_cameras, daemon=True)
+
+ recv_thread.start()
+ logger.debug("Client '%s' started receiving thread.", self.identity)
+ send_thread.start()
+ logger.debug("Client '%s' started sending thread.", self.identity)
+ watchdog_thread.start()
+ logger.debug("Client '%s' started watchdog thread.", self.identity)
+
+ logger.debug("Client '%s' waiting for threads to finish ...", self.identity)
+ recv_thread.join()
+ send_thread.join()
+ watchdog_thread.join(timeout=5)
+
+ logger.info("Closing socket ...")
+ try:
+ self.socket.close()
+ except Exception as exc:
+ logger.warning("Failed to close socket: %r", exc)
+ logger.debug("Terminating context ...")
+ try:
+ self.context.term()
+ except Exception as exc:
+ logger.warning("Failed to terminate context: %r", exc)
+
+ def stop(self) -> None:
+ """
+ Stop task
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ logger.info("ClientTask cleaning up ...")
+ for _id in list(self.video_threads.keys()):
+ logger.info("Cleaning video thread %s ...", _id)
+ _thread = self.video_threads[_id]
+ _thread.stop()
+ _thread.join(timeout=3)
+ if _thread.is_alive():
+ logger.warning("Video thread %r did not stop in time.", _id)
+ self.video_threads.pop(_id, None)
+
+ self.running = False
+ self.queue.put([]) # Put a dummy message to unblock send_messages if waiting
+ try:
+ self.socket.close(linger=0)
+ except Exception as exc:
+ logger.warning("Error closing socket: %r", exc)
+ logger.info("Client task exiting ...")
+
+
+def queue_video_message(identity: bytes, queue: Queue, frame) -> None:
+ """
+ Docstring for queue_video_message
+
+ Parameters
+ ----------
+ identity : bytes
+ Camera identity
+ queue : Queue
+ Queue to put message to
+ frame : ndarray
+ Video frame to send
+
+ Returns
+ -------
+ None
+ """
+ # Encode frame as JPEG:
+ _, buffer = cv2.imencode('.jpg', frame)
+ # Add identity to message:
+ message = (identity, buffer.tobytes())
+ # Put message to queue:
+ queue.put(message)
+
+def queue_metadata_message(
+ identity: bytes,
+ queue: Queue,
+ action: bytes,
+ timestamp: float,
+ ) -> None:
+ """
+ Queue metadata message to be sent to server.
+
+ Parameters
+ ----------
+ identity : bytes
+ Camera ID
+ queue : Queue
+ Queue to put message to
+ action : bytes
+ Action as bytes ('start' or 'stop')
+ data : float
+ Data string: this depends on action, for start/stop it's a timestamp, for rename it's old_name:new_name:timestamp
+
+ Returns
+ -------
+ None
+ """
+ logger.debug("ClientVideo: %r at: %r", action, datetime.datetime.fromtimestamp(
+ bytes_to_timestamp(timestamp)).strftime(TIME_FORMAT))
+ message = (identity, action, timestamp)
+ queue.put(message)
+
+def find_present_cameras() -> dict:
+ """
+ Find currently present camera devices in /dev.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ dict
+ Dictionary of present cameras with device names as keys and paths as values
+ """
+ present_cameras = {}
+ for entry in os.listdir('/dev'):
+ if entry.startswith('video'):
+ present_cameras[entry] = os.path.join('/dev', entry)
+ return present_cameras
+
+def read_config(conf_file) -> ConfigParser:
+ """
+ Read config file and return as dictionary.
+ The only required section is 'cameras' with at least one camera.
+ Do basic sanity checks and update global variables.
+ Log warnings if config file or sections/options are missing.
+
+ Parameters
+ ----------
+ conf_file : str
+ Path to config file
+
+ Returns
+ -------
+ ConfigParser
+ ConfigParser object with configuration
+ """
+ cfg = ConfigParser()
+ cfg.read(conf_file)
+ cameras_dict = None
+ # Need to update logging info:
+ if 'main' in cfg:
+ global TESTING
+ try:
+ TESTING = cfg.getboolean('main', 'testing')
+ logger.info("Testing mode is set to: %r.", TESTING)
+ except ValueError as exc:
+ logger.error("Invalid value for 'testing' option in 'main' section: %r, using default: %r.", exc, TESTING)
+
+ if 'logging' in cfg:
+ global LOGDIR, LOGFILE, LOGLEVEL
+
+ LOGDIR = cfg.get('logging', 'logdir', fallback='.')
+ LOGFILE = cfg.get('logging', 'logfile', fallback=LOGFILE)
+ LOGLEVEL = cfg.get('logging', 'loglevel', fallback=LOGLEVEL)
+ logger.setLevel(LOGLEVEL)
+ for _handler in logger.handlers:
+ _handler.setLevel(LOGLEVEL)
+ if 'network' in cfg:
+ global ROUTER_ADDRESS, PORT, ADDRESS
+ if cfg.has_option('network', 'router_address'):
+ ROUTER_ADDRESS = cfg.get('network', 'router_address')
+ else:
+ logger.warning("No 'router_address' option in 'network' section, using default: %r.", ROUTER_ADDRESS)
+ if cfg.has_option('network', 'router_port'):
+ PORT = cfg.get('network', 'router_port')
+ else:
+ logger.warning("No 'router_port' option in 'network' section, using default: %r.", PORT)
+ ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}"
+ else:
+ logger.warning("No 'network' section in config file, using defaults: %r.", ADDRESS)
+
+ if 'cameras' in cfg:
+ # Need to take care: camera names are stored as strings but comunicated as bytes:
+ cameras_dict = set_up_cameras(cfg.items('cameras')) # <-- dict with strings
+ if cameras_dict:
+ # Only update global CAMERAS if we have at least one valid camera with an address:
+ CAMERAS = str_to_bytes(cameras_dict) # <-- now bytes
+ logger.info("Using camera configuration updated from config file: %r", CAMERAS)
+
+ if 'cameras' not in cfg or not cameras_dict:
+ # No cameras section or no valid cameras found:
+ logger.warning("No 'cameras' section in config file, using defaults: %r.", CAMERAS)
+ found_cameras = find_present_cameras()
+ for cam_name, cam_addr in found_cameras.items():
+ CAMERAS[cam_name.encode('utf-8')] = {'address': cam_addr.encode('utf-8')} # <-- bytes
+ # Use default parameters:
+ cfg.set('cameras', f"{cam_name}.address", cam_addr)
+ cfg.set('cameras', f"{cam_name}.contour_size_threshold", str(CONTOUR_SIZE_THRESHOLD))
+ cfg.set('cameras', f"{cam_name}.movement_grace_period", str(MOVEMENT_GRACE_PERIOD))
+ logger.info("CAMERAS configuration: %r", CAMERAS)
+ with open(conf_file, 'w') as configfile:
+ cfg.write(configfile)
+
+ return cfg
+
+def validate_camera_address(address: str) -> bool:
+ """
+ Validate camera address.
+
+ Parameters
+ ----------
+ address : str
+ Camera address
+
+ Returns
+ -------
+ bool
+ True if address is valid, False otherwise
+ """
+ # Check if address is an integer (device index):
+ if address.startswith('/dev/video'):
+ return address
+ raise ValueError("Invalid camera address: %r" % address)
+
+def validate_camera_threshold(threshold: str) -> bool:
+ """
+ Validate camera contour size threshold.
+
+ Parameters
+ ----------
+ threshold : str
+ Contour size threshold
+
+ Returns
+ -------
+ bool
+ True if threshold is valid, False otherwise
+ """
+ try:
+ if int(threshold) > 0:
+ return threshold
+ except ValueError:
+ raise ValueError("Invalid contour size threshold: %r" % threshold)
+ raise ValueError("Invalid contour size threshold: %r" % threshold)
+
+def validate_camera_grace_pd(grace_pd: str) -> bool:
+ """
+ Validate camera movement grace period.
+
+ Parameters
+ ----------
+ grace_pd : str
+ Movement grace period
+
+ Returns
+ -------
+ bool
+ True if grace period is valid, False otherwise
+ """
+ try:
+ if int(grace_pd) >= 0:
+ return grace_pd
+ except ValueError:
+ raise ValueError("Invalid movement grace period: %r" % grace_pd)
+ raise ValueError("Invalid movement grace period: %r" % grace_pd)
+
+def set_up_cameras(cameras_cfg_section: dict) -> None:
+ """
+ Set up camera configuration from config file section.
+ Validate camera parameters and apply defaults if necessary.
+ Parameters
+ ----------
+ cameras_section : dict
+ Dictionary of camera configuration from config file
+ Returns
+ -------
+ cameras_dict : dict
+ Dictionary to store validated camera configuration
+ """
+ # ConfigParser does not support nested sections.
+ # These shenanigans below allow for a nested config simulation with configparser
+ # and so per camera settings:
+ cameras_dict = defaultdict(dict)
+ for key, val in cameras_cfg_section: # <-- strings
+ try:
+ cam, param = key.split('.', 1)
+ except ValueError as exc:
+ logger.error(
+ "Invalid camera configuration entry: %r, error: %r, using defaults.", key, exc)
+ match param:
+ case 'address':
+ try:
+ cameras_dict[cam][param] = validate_camera_address(val) # <-- still strings
+ except ValueError as exc:
+ logger.error("Invalid camera address for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('address', ''))
+ continue
+ case 'contour_size_threshold':
+ try:
+ cameras_dict[cam][param] = validate_camera_threshold(val) # <-- still strings
+ except ValueError as exc:
+ logger.error("Invalid contour size threshold for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('contour_size_threshold', ''))
+ cameras_dict[cam][param] = str(CONTOUR_SIZE_THRESHOLD)
+ continue
+ case 'movement_grace_period':
+ try:
+ cameras_dict[cam][param] = validate_camera_grace_pd(val) # <-- still strings
+ except ValueError as exc:
+ logger.error("Invalid movement grace period for camera %r: %r, using default: %r.", cam, exc, cameras_dict[cam].get('movement_grace_period', ''))
+ cameras_dict[cam][param] = str(MOVEMENT_GRACE_PERIOD)
+ continue
+ case _:
+ logger.warning("Unknown camera %r parameter: %r", cam, param)
+
+ # Camera section needs to contain at least an address:
+ for key in list(cameras_dict.keys()):
+ if 'address' not in cameras_dict[key].keys():
+ logger.error("Camera %r has no address configured, removing from configuration.", key)
+ del cameras_dict[key]
+
+ return cameras_dict
+
+def signal_handler(sig, frame) -> None:
+ """
+ Signal handler for graceful shutdown.
+ """
+ logger.info("Received signal handler '%r', stopping client ...", sig)
+ # Set flag to stop VideoClient thread(s):
+ stop_event.set()
+ if client.is_alive():
+ client.stop()
+
+
+if __name__ == "__main__":
+ logger.info("Client starting ...")
+
+ # Read configuration file before starting logging as it may contain logging settings:
+ cfg = read_config(CONFIG_FILE)
+
+ if TESTING:
+ logger.info("Starting in testing mode with yappi profiling ...")
+ yappi.set_clock_type("cpu")
+ yappi.start(builtins=True, profile_threads=True)
+
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+
+ logger.info("Starting up ...")
+
+ client = ClientTask(CLIENT_ID)
+
+ client.start()
+ client.join()
+
+ if TESTING:
+ yappi.stop()
+ if write_yappi_stats(yappi):
+ logger.error("Error writing yappi stats.")
+
+ logger.info("Terminating ...")