diff options
Diffstat (limited to 'client.py')
| -rw-r--r-- | client.py | 412 | 
1 files changed, 341 insertions, 71 deletions
| @@ -6,28 +6,51 @@ A module containing client for streaming video to a zmq router.  __author__ = "Franoosh Corporation" +import sys  from os import path  from threading import Thread, Event -from queue import Queue +from queue import Queue, Empty +from collections import deque  import time  import datetime  import signal  import logging  from configparser import ConfigParser -import traceback  import zmq +import cv2 +from collections import defaultdict -from helpers import CustomLoggingFormatter +from helpers import ( +    CustomLoggingFormatter, +    compute_contours, +    detect_movement, +    draw_contours, +    ) -ROUTER_ADDRESS = "tcp://localhost" -PORT = "5569" -ADDRESS = f"{ROUTER_ADDRESS}:{PORT}" +################################################### +# Configuration, defaults and logging setup +###################################################  CONFIG_FILE = "client.cfg" -LOGFILE = f"{path.splitext(path.basename(__file__))[0]}.log" -LOGLEVEL = logging.DEBUG -CAMERAS = {'front_camera': '/dev/video0', 'back_camera': '/dev/video1'} -SLEEP_TIME = 5 +# Those will be updated from config file if present: +LOGDIR = '.' +LOGFILE = path.join(LOGDIR, 'client.log') +LOGLEVEL = logging.INFO + +ROUTER_ADDRESS = "localhost" +PORT = "5569" +ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}" +# This is a default: +CAMERAS = { +    'default camera': { +        'address': '/dev/video0', +    }, +} +KNOWN_CAMERA_PARAMETERS = ('address', 'contour_size_threshold', 'movement_grace_period') +CONTOUR_SIZE_THRESHOLD = 4000  # Minimum contour area to consider +MOVEMENT_GRACE_PERIOD = 10  # Seconds to wait after movement stops +TIME_FORMAT = '%Y_%m_%d-%H_%M_%S' +START_STOP_MESSAGE_FMT = "{action}:{data}"  stop_event = Event() @@ -40,57 +63,115 @@ logging.root.setLevel(LOGLEVEL)  logger = logging.getLogger(__name__)  logging.basicConfig(      filename=LOGFILE, -    datefmt='%Y-%m-%d %I:%M:%S', +    datefmt=TIME_FORMAT,      level=LOGLEVEL,  ) - -def read_config(conf_file): -    """Read config file and return as dictionary.""" -    cfg = ConfigParser() -    cfg.read(conf_file) - -    return {key: dict(cfg.items(key)) for key in cfg.sections()} +################################################### +# End configuration, defaults and logging setup +###################################################  class ClientVideo(Thread):      """Class for sending video stream""" -    def __init__(self, _id, device, queue): +    def __init__(self, _id, device_dict, queue):          super().__init__(daemon=True)          self.identity = _id -        self.device = device +        self.device_dict = device_dict +        self.device_addr = device_dict.get('address', '/dev/video0') +        try: +            # Could move parameter validation to config reading function: +            self.device_threshold = int(device_dict['contour_size_threshold']) +        except (ValueError, KeyError) as exc: +            logger.error("ClientVideo '%s': Invalid contour size threshold: %r, using default: %d", self.identity, exc, CONTOUR_SIZE_THRESHOLD) +            self.device_threshold = CONTOUR_SIZE_THRESHOLD +        try: +            self.device_grace_period = int(device_dict['movement_grace_period']) +        except (ValueError, KeyError) as exc: +            logger.error("ClientVideo '%s': Invalid movement grace period: %r, using default: %d", self.identity, exc, MOVEMENT_GRACE_PERIOD) +            self.device_grace_period = MOVEMENT_GRACE_PERIOD          self.queue = queue +        self.frame_deque = deque(maxlen=2)  # Store last 2 frames          self.live = True +    def put_metadata_message(self, action, data): +        """Put metadata message to queue.""" +        metadata = START_STOP_MESSAGE_FMT.format( +            action=action, +            data=data, +        ) +        logger.info("ClientVideo '%s': %s at: %r", self.identity, action, data) +        message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] +        self.queue.put(message) + +    def put_video_message(self, frame): +        # Encode frame as JPEG: +        _, buffer = cv2.imencode('.jpg', frame) +        # Add identity to message: +        message = [self.identity.encode('utf-8'), buffer.tobytes()] +        # Put message to queue: +        self.queue.put(message) +      def run(self): -        """Replace with actual video streaming logic.""" -        ping_no = 0 +        """Video streaming logic.""" +        logger.debug("ClientVideo '%s' starting ...", self.identity) +        cap = cv2.VideoCapture(self.device_addr) +        if not cap.isOpened(): +            logger.error("ClientVideo '%s': Unable to open video device '%s'", self.identity, self.device_addr) +            return + +        movement_detected = False +        movement_detected_start = None          while self.live and not stop_event.is_set(): -            try: -                # Four parts required to start/stop video stream, three parts for ping: -                if ping_no == 0:  # Start video stream -                    timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") -                    metadata = f"start:{timestamp}" -                    logger.debug("ClientVideo '%s' sending metadata: '%s'", self.identity, metadata) -                    message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] -                    self.queue.put(message) -                    text = f"ping-{ping_no}" -                    message = [self.identity.encode('utf-8'), text.encode('utf-8')] -                elif ping_no >= 5:  # Stop video stream -                    logger.debug("ClientVideo '%s' sending stop signal", self.identity) -                    timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") -                    metadata = f"stop:{timestamp}" -                    logger.debug("ClientVideo '%s' sending metadata: '%s' and ping: %r", self.identity, metadata, b'') -                    message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] -                    self.live = False -                else:  # Send ping -                    text = f"ping-{ping_no}" -                    message = [self.identity.encode('utf-8'), text.encode('utf-8')] -                logger.debug("ClientVideo '%s' sending message: %r", self.identity, message) -                self.queue.put(message) -                ping_no += 1 -            except Exception as exc: -                logger.error("ClientVideo: socket error: %r", exc) -            time.sleep(SLEEP_TIME) +            ret, frame = cap.read() +            if not ret: +                logger.error("ClientVideo '%s': Failed to read frame from device '%s'", self.identity, self.device_addr) +                break +            self.frame_deque.append(frame) +            # Skip on the first frame: +            if len(self.frame_deque) >= 2: +                contours = compute_contours(self.frame_deque) +                logger.debug("ClientVideo '%s': Found %d contours.", self.identity, len(contours)) +                logger.debug("ClientVideo '%s': Contours: %r", self.identity, contours) +                movement_now = detect_movement(contours, min_area=self.device_threshold) +                if movement_now: +                    logger.debug("ClientVideo '%s': Movement detected in frame.", self.identity) +                    # Only update movement start time if movement was not detected before: +                    if not movement_detected: +                        # Update movement detected start time: +                        movement_detected_start = datetime.datetime.now() +                        # Only send start message if movement was not detected before: +                        timestamp = movement_detected_start.strftime(TIME_FORMAT) +                        self.put_metadata_message( +                            action='start', +                            data=timestamp, +                        ) +                        # and set movement detected flag: +                        movement_detected = True +                else: +                    logger.debug("ClientVideo '%s': No movement detected in frame.", self.identity) + +                if movement_detected: +                    # Draw contours on frame: +                    frame = draw_contours(frame, contours, min_contour_area=self.device_threshold) +                    # Send video message with contour and frame: +                    self.put_video_message(frame) + +                    # Check if movement has stopped, taking into account grace period: +                    now = datetime.datetime.now() +                    delta_seconds = (now - movement_detected_start).total_seconds() +                    logger.debug("ClientVideo '%s': delta seconds since movement detected: %d, grace remaining: %d.", self.identity, delta_seconds, self.device_grace_period - delta_seconds) +                    # Wait seconds before deciding movement has stopped: +                    if delta_seconds > self.device_grace_period: +                        timestamp = now.strftime(TIME_FORMAT) +                        self.put_metadata_message( +                            action='stop', +                            data=timestamp, +                        ) +                        movement_detected = False +                        movement_detected_start = None + +            else: +                logger.debug("ClientVideo '%s': Frame deque length: %d", self.identity, len(self.frame_deque))          logger.info("ClientVideo '%s' closing socket ...", self.identity) @@ -98,7 +179,6 @@ class ClientVideo(Thread):          logger.info("Client video '%s' exiting ...", self.identity)          self.live = False -  class ClientTask(Thread):      """Main Client Task with logic,      message handling and setup.""" @@ -111,35 +191,152 @@ class ClientTask(Thread):          self.socket.identity = self.identity.encode("utf-8")          self.poll = zmq.Poller()          self.poll.register(self.socket, zmq.POLLIN) -        self.video_threads = [] +        # self.video_threads = [] +        self.video_threads = {}          self.running = True          self.connected = False +        self.queue = Queue() -    def run(self): -        self.socket.connect(ADDRESS) -        logger.debug("Client '%s' connected to %s", self.identity, ADDRESS) -        logger.debug("starting video threads ...") -        q = Queue() -        for _id, device in CAMERAS.items(): -            vid_cl = ClientVideo(_id, device, q) -            vid_cl.start() -            self.video_threads.append(vid_cl) +    def receive_messages(self): +        """Receive messages from the server."""          while self.running:              try:                  sockets = dict(self.poll.poll(1000))                  if self.socket in sockets:                      try:                          message = self.socket.recv_multipart() -                        logger.debug("Client '%s' received message: %r", self.identity, message) -                    except Exception: -                        logger.error("Failed to receive message: %r", traceback.format_exc()) -                if q.qsize() > 0: -                    frame = q.get() -                    if frame is not None: -                        logger.debug("Processing frame: %r", frame) -                        self.socket.send_multipart(frame) +                        logger.info("Client '%s' received message: %r.", self.identity, message) +                        self.update_config(message) +                    except Exception as exc: +                        logger.error("Failed to receive message: %r", exc)              except zmq.ZMQError as exc:                  logger.error("Client '%s': socket error: %r", self.identity, exc) +            time.sleep(0.01)  # Sleep to avoid busy waiting + +    def send_messages(self): +        """Send messages to the server.""" +        while self.running: +            try: +                frame = self.queue.get(timeout=0.1) +                logger.debug("Sending message of length %r ...", len(frame)) +                try: +                    self.socket.send_multipart(frame) +                except zmq.ZMQError as exc: +                    logger.error("Client '%s': socket error: %r", self.identity, exc) +            except Empty: +                continue  # No message to send, continue waiting + +    def update_config(self, message): +        """ +        Update configuration based on message from server. +        Only 'cameras' section can be updated. +        directives = ( +            'modify_camera_name', +            'modify_camera_threshold', +            'modify_camera_grace_pd', +            'modify_camera_address', +            'add_camera', +            'remove_camera', +        ) +        """ +        try: +            msg_list = [part.decode('utf-8') for part in message] +            logger.debug("Client received config message: %r", msg_list) +            camera_id, directive, value = msg_list[0], msg_list[1], msg_list[2] +            if directive in ( +                'modify_camera_name', +                'modify_camera_threshold', +                'modify_camera_grace_pd', +                'modify_camera_address', +                ): +                if camera_id not in CAMERAS: +                    logger.warning("Cannot rename unknown camera: %r", camera_id) +                    logger.warning("Known cameras: %r", list(CAMERAS.keys())) +                else: +                    if directive == 'modify_camera_name': +                        old_name, new_name = camera_id, value +                        if new_name in CAMERAS: +                            raise ValueError("New camera name already exists.") +                        CAMERAS[new_name] = CAMERAS.pop(old_name) +                        self.video_threads[new_name] = self.video_threads.pop(old_name) +                        # Send message to server to update routing: +                        timestamp = datetime.datetime.now().strftime(TIME_FORMAT) +                        self.video_threads[new_name].put_metadata_message( +                            action='rename', +                            data=':'.join([old_name, new_name, timestamp]) +                        ) +                        self.send_messages +                        for item in cfg.items('cameras'): +                            if item[0].startswith(old_name + '.'): +                                param = item[0].split('.', 1)[1] +                                cfg.set('cameras', f"{new_name}.{param}", item[1]) +                                cfg.remove_option('cameras', item[0]) +                        logger.info("Renamed and restarted thread for camera '%s' to '%s'.", old_name, new_name) +                    elif directive == 'modify_camera_threshold': +                        new_threshold = int(value) +                        if new_threshold <= 0: +                            raise ValueError("Threshold must be positive.") +                        CAMERAS[camera_id]['contour_size_threshold'] = new_threshold +                        self.video_threads[camera_id].device_threshold = new_threshold +                        cfg.set('cameras', f"{camera_id}.contour_size_threshold", str(new_threshold)) +                        logger.info("Modified contour size threshold of camera '%s' to %d.", camera_id, new_threshold) +                    elif directive == 'modify_camera_grace_pd': +                        new_grace_pd = int(value) +                        if new_grace_pd < 0: +                            raise ValueError("Grace period cannot be negative.") +                        CAMERAS[camera_id]['movement_grace_period'] = new_grace_pd +                        self.video_threads[camera_id].device_grace_period = new_grace_pd +                        cfg.set('cameras', f"{camera_id}.movement_grace_period", str(new_grace_pd)) +                        logger.info("Modified movement grace period of camera '%s' to %d.", camera_id, new_grace_pd) +                    elif directive == 'modify_camera_address': +                        new_address = value +                        CAMERAS[camera_id]['address'] = new_address +                        self.video_threads[camera_id].device_addr = new_address +                        # Changing address on the fly requires restarting the video thread: +                        self.video_threads[camera_id].stop() +                        self.video_threads[camera_id].join() +                        vid_cl = ClientVideo(camera_id, CAMERAS[camera_id], self.queue) +                        vid_cl.start() +                        self.video_threads[camera_id] = vid_cl +                        logger.info("Modified address of camera '%s' to '%s'.", camera_id, new_address) +                        cfg.set('cameras', f"{camera_id}.address", new_address) +            elif directive == 'add_camera': +                cam_address = value +                if camera_id in CAMERAS: +                    raise ValueError("Camera name already exists.") +                CAMERAS[camera_id] = {'address': cam_address} +                vid_cl = ClientVideo(camera_id, CAMERAS[camera_id], self.queue) +                vid_cl.start() +                self.video_threads[camera_id] = vid_cl +                cfg.set('cameras', f"{camera_id}.address", cam_address) +                logger.info("Added and started a thread for new camera '%s' with address '%s'.", camera_id, cam_address) +            else: +                logger.warning("Unknown config directive: %r", directive) + +        except (ValueError, IndexError) as exc: +            logger.error("Failed to handle config message: %r", exc) +        cfg.write(open(CONFIG_FILE, 'w')) + +    def run(self): +        """Main client logic.""" +        self.socket.connect(ADDRESS) +        logger.debug("Client '%s' connected to %s", self.identity, ADDRESS) +        logger.debug("Starting video threads ...") +        for _id, device_dict in CAMERAS.items(): +            vid_cl = ClientVideo(_id, device_dict, self.queue) +            vid_cl.start() +            self.video_threads[_id] = (vid_cl) + +        recv_thread = Thread(target=self.receive_messages, daemon=True) +        send_thread = Thread(target=self.send_messages, daemon=True) + +        recv_thread.start() +        send_thread.start() +        logger.debug("Client '%s' started receiving and sending threads.", self.identity) + +        logger.debug("Client '%s' waiting for threads to finish ...", self.identity) +        recv_thread.join() +        send_thread.join()          logger.info("Closing socket ...")          self.socket.close() @@ -149,18 +346,86 @@ class ClientTask(Thread):      def stop(self):          """Stop task"""          logger.info("ClientTask cleaning up ...") -        for thread in self.video_threads: -            logger.info("Cleaning u video thread ...") +        for _id, thread in self.video_threads.items(): +            logger.info("Cleaning video thread %s ...", _id)              thread.stop()              thread.join() -        self.video_threads = [] +        # self.video_threads = [] +        self.video_threads = {}          logger.info("Client task exiting ...")          self.running = False +def read_config(conf_file): +    """ +    Read config file and return as dictionary. +    The only required section is 'cameras' with at least one camera. +    Do basic sanity checks and update global variables. +    Log warnings if config file or sections/options are missing. +    """ +    cfg = ConfigParser() +    cfg.read(conf_file) +    # Need to update logging info: +    if 'logging' in cfg: +        global LOGDIR, LOGFILE, LOGLEVEL + +        LOGDIR = cfg.get('logging', 'logdir', fallback='.') +        LOGFILE = cfg.get('logging', 'logfile', fallback=LOGFILE) +        LOGLEVEL = cfg.get('logging', 'loglevel', fallback=LOGLEVEL) +        logger.setLevel(LOGLEVEL) +        for _handler in logger.handlers: +            _handler.setLevel(LOGLEVEL) +    if 'network' in cfg: +        global ROUTER_ADDRESS, PORT, ADDRESS +        if cfg.has_option('network', 'router_address'): +            ROUTER_ADDRESS = cfg.get('network', 'router_address') +        else: +            logger.warning("No 'router_address' option in 'network' section, using default: %r.", ROUTER_ADDRESS) +        if cfg.has_option('network', 'router_port'): +            PORT = cfg.get('network', 'router_port') +        else: +            logger.warning("No 'router_port' option in 'network' section, using default: %r.", PORT) +        ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}" +    else: +        logger.warning("No 'network' section in config file, using defaults: %r.", ADDRESS) +    if 'cameras' in cfg: +        global CAMERAS +        # These shenanigans below allow for a nested config simulation with configparser +        # and so per camera settings: +        cameras_dict = defaultdict(dict) +        for key, val in cfg.items('cameras'): +            try: +                cam, param = key.split('.', 1) +                if param in KNOWN_CAMERA_PARAMETERS: +                    cameras_dict[cam][param] = val +                else: +                    logger.warning("Unknown camera %r parameter: %r", cam, param) +            except ValueError as exc: +                logger.error("Invalid camera configuration entry: %r, error: %r, using defaults: %r.", key, exc, CAMERAS) +        # Check that each camera has at least an address and remove if not: +        for key in list(cameras_dict.keys()): +            if 'address' not in cameras_dict[key].keys(): +                logger.error("DEBUG: cameras_dict: %r", cameras_dict) +                logger.error("Camera %r has no address configured, removing from configuration.", key) +                del cameras_dict[key] +        if not cameras_dict: +            logger.warning("No valid camera configurations found in config file, using defaults: %r.", CAMERAS) +        else: +            # Only update global CAMERAS if we have at least one valid camera with an address: +            CAMERAS = cameras_dict +            logger.info("Using camera configuration: %r", CAMERAS) +    else: +        logger.warning("No 'cameras' section in config file, using defaults: %r.", CAMERAS) +    logger.info("CAMERAS configuration: %r", CAMERAS) +    with open(conf_file, 'w') as configfile: +        cfg.write(configfile) + +    return cfg +  def signal_handler(sig, frame):      logger.info("Received signal handler '%r', stopping client ...", sig)      client.stop() +    sys.exit(0)  if __name__ == "__main__": @@ -168,9 +433,14 @@ if __name__ == "__main__":      signal.signal(signal.SIGINT, signal_handler)      signal.signal(signal.SIGTERM, signal_handler) +    # Read configuration file before starting logging as it may contain logging settings: +    cfg = read_config(CONFIG_FILE) +      logger.info("Starting up ...") +      client = ClientTask("client_task")      client.start() -      client.join() + +    logger.info("Terminating ...")
\ No newline at end of file | 
