diff options
| -rw-r--r-- | client.py | 412 | ||||
| -rw-r--r-- | helpers.py | 63 | ||||
| -rw-r--r-- | router.py | 160 | ||||
| -rw-r--r-- | templates/client.html | 98 | ||||
| -rw-r--r-- | webserver.py | 214 | ||||
| -rw-r--r-- | worker.py | 247 | 
6 files changed, 927 insertions, 267 deletions
| @@ -6,28 +6,51 @@ A module containing client for streaming video to a zmq router.  __author__ = "Franoosh Corporation" +import sys  from os import path  from threading import Thread, Event -from queue import Queue +from queue import Queue, Empty +from collections import deque  import time  import datetime  import signal  import logging  from configparser import ConfigParser -import traceback  import zmq +import cv2 +from collections import defaultdict -from helpers import CustomLoggingFormatter +from helpers import ( +    CustomLoggingFormatter, +    compute_contours, +    detect_movement, +    draw_contours, +    ) -ROUTER_ADDRESS = "tcp://localhost" -PORT = "5569" -ADDRESS = f"{ROUTER_ADDRESS}:{PORT}" +################################################### +# Configuration, defaults and logging setup +###################################################  CONFIG_FILE = "client.cfg" -LOGFILE = f"{path.splitext(path.basename(__file__))[0]}.log" -LOGLEVEL = logging.DEBUG -CAMERAS = {'front_camera': '/dev/video0', 'back_camera': '/dev/video1'} -SLEEP_TIME = 5 +# Those will be updated from config file if present: +LOGDIR = '.' +LOGFILE = path.join(LOGDIR, 'client.log') +LOGLEVEL = logging.INFO + +ROUTER_ADDRESS = "localhost" +PORT = "5569" +ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}" +# This is a default: +CAMERAS = { +    'default camera': { +        'address': '/dev/video0', +    }, +} +KNOWN_CAMERA_PARAMETERS = ('address', 'contour_size_threshold', 'movement_grace_period') +CONTOUR_SIZE_THRESHOLD = 4000  # Minimum contour area to consider +MOVEMENT_GRACE_PERIOD = 10  # Seconds to wait after movement stops +TIME_FORMAT = '%Y_%m_%d-%H_%M_%S' +START_STOP_MESSAGE_FMT = "{action}:{data}"  stop_event = Event() @@ -40,57 +63,115 @@ logging.root.setLevel(LOGLEVEL)  logger = logging.getLogger(__name__)  logging.basicConfig(      filename=LOGFILE, -    datefmt='%Y-%m-%d %I:%M:%S', +    datefmt=TIME_FORMAT,      level=LOGLEVEL,  ) - -def read_config(conf_file): -    """Read config file and return as dictionary.""" -    cfg = ConfigParser() -    cfg.read(conf_file) - -    return {key: dict(cfg.items(key)) for key in cfg.sections()} +################################################### +# End configuration, defaults and logging setup +###################################################  class ClientVideo(Thread):      """Class for sending video stream""" -    def __init__(self, _id, device, queue): +    def __init__(self, _id, device_dict, queue):          super().__init__(daemon=True)          self.identity = _id -        self.device = device +        self.device_dict = device_dict +        self.device_addr = device_dict.get('address', '/dev/video0') +        try: +            # Could move parameter validation to config reading function: +            self.device_threshold = int(device_dict['contour_size_threshold']) +        except (ValueError, KeyError) as exc: +            logger.error("ClientVideo '%s': Invalid contour size threshold: %r, using default: %d", self.identity, exc, CONTOUR_SIZE_THRESHOLD) +            self.device_threshold = CONTOUR_SIZE_THRESHOLD +        try: +            self.device_grace_period = int(device_dict['movement_grace_period']) +        except (ValueError, KeyError) as exc: +            logger.error("ClientVideo '%s': Invalid movement grace period: %r, using default: %d", self.identity, exc, MOVEMENT_GRACE_PERIOD) +            self.device_grace_period = MOVEMENT_GRACE_PERIOD          self.queue = queue +        self.frame_deque = deque(maxlen=2)  # Store last 2 frames          self.live = True +    def put_metadata_message(self, action, data): +        """Put metadata message to queue.""" +        metadata = START_STOP_MESSAGE_FMT.format( +            action=action, +            data=data, +        ) +        logger.info("ClientVideo '%s': %s at: %r", self.identity, action, data) +        message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] +        self.queue.put(message) + +    def put_video_message(self, frame): +        # Encode frame as JPEG: +        _, buffer = cv2.imencode('.jpg', frame) +        # Add identity to message: +        message = [self.identity.encode('utf-8'), buffer.tobytes()] +        # Put message to queue: +        self.queue.put(message) +      def run(self): -        """Replace with actual video streaming logic.""" -        ping_no = 0 +        """Video streaming logic.""" +        logger.debug("ClientVideo '%s' starting ...", self.identity) +        cap = cv2.VideoCapture(self.device_addr) +        if not cap.isOpened(): +            logger.error("ClientVideo '%s': Unable to open video device '%s'", self.identity, self.device_addr) +            return + +        movement_detected = False +        movement_detected_start = None          while self.live and not stop_event.is_set(): -            try: -                # Four parts required to start/stop video stream, three parts for ping: -                if ping_no == 0:  # Start video stream -                    timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") -                    metadata = f"start:{timestamp}" -                    logger.debug("ClientVideo '%s' sending metadata: '%s'", self.identity, metadata) -                    message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] -                    self.queue.put(message) -                    text = f"ping-{ping_no}" -                    message = [self.identity.encode('utf-8'), text.encode('utf-8')] -                elif ping_no >= 5:  # Stop video stream -                    logger.debug("ClientVideo '%s' sending stop signal", self.identity) -                    timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") -                    metadata = f"stop:{timestamp}" -                    logger.debug("ClientVideo '%s' sending metadata: '%s' and ping: %r", self.identity, metadata, b'') -                    message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] -                    self.live = False -                else:  # Send ping -                    text = f"ping-{ping_no}" -                    message = [self.identity.encode('utf-8'), text.encode('utf-8')] -                logger.debug("ClientVideo '%s' sending message: %r", self.identity, message) -                self.queue.put(message) -                ping_no += 1 -            except Exception as exc: -                logger.error("ClientVideo: socket error: %r", exc) -            time.sleep(SLEEP_TIME) +            ret, frame = cap.read() +            if not ret: +                logger.error("ClientVideo '%s': Failed to read frame from device '%s'", self.identity, self.device_addr) +                break +            self.frame_deque.append(frame) +            # Skip on the first frame: +            if len(self.frame_deque) >= 2: +                contours = compute_contours(self.frame_deque) +                logger.debug("ClientVideo '%s': Found %d contours.", self.identity, len(contours)) +                logger.debug("ClientVideo '%s': Contours: %r", self.identity, contours) +                movement_now = detect_movement(contours, min_area=self.device_threshold) +                if movement_now: +                    logger.debug("ClientVideo '%s': Movement detected in frame.", self.identity) +                    # Only update movement start time if movement was not detected before: +                    if not movement_detected: +                        # Update movement detected start time: +                        movement_detected_start = datetime.datetime.now() +                        # Only send start message if movement was not detected before: +                        timestamp = movement_detected_start.strftime(TIME_FORMAT) +                        self.put_metadata_message( +                            action='start', +                            data=timestamp, +                        ) +                        # and set movement detected flag: +                        movement_detected = True +                else: +                    logger.debug("ClientVideo '%s': No movement detected in frame.", self.identity) + +                if movement_detected: +                    # Draw contours on frame: +                    frame = draw_contours(frame, contours, min_contour_area=self.device_threshold) +                    # Send video message with contour and frame: +                    self.put_video_message(frame) + +                    # Check if movement has stopped, taking into account grace period: +                    now = datetime.datetime.now() +                    delta_seconds = (now - movement_detected_start).total_seconds() +                    logger.debug("ClientVideo '%s': delta seconds since movement detected: %d, grace remaining: %d.", self.identity, delta_seconds, self.device_grace_period - delta_seconds) +                    # Wait seconds before deciding movement has stopped: +                    if delta_seconds > self.device_grace_period: +                        timestamp = now.strftime(TIME_FORMAT) +                        self.put_metadata_message( +                            action='stop', +                            data=timestamp, +                        ) +                        movement_detected = False +                        movement_detected_start = None + +            else: +                logger.debug("ClientVideo '%s': Frame deque length: %d", self.identity, len(self.frame_deque))          logger.info("ClientVideo '%s' closing socket ...", self.identity) @@ -98,7 +179,6 @@ class ClientVideo(Thread):          logger.info("Client video '%s' exiting ...", self.identity)          self.live = False -  class ClientTask(Thread):      """Main Client Task with logic,      message handling and setup.""" @@ -111,35 +191,152 @@ class ClientTask(Thread):          self.socket.identity = self.identity.encode("utf-8")          self.poll = zmq.Poller()          self.poll.register(self.socket, zmq.POLLIN) -        self.video_threads = [] +        # self.video_threads = [] +        self.video_threads = {}          self.running = True          self.connected = False +        self.queue = Queue() -    def run(self): -        self.socket.connect(ADDRESS) -        logger.debug("Client '%s' connected to %s", self.identity, ADDRESS) -        logger.debug("starting video threads ...") -        q = Queue() -        for _id, device in CAMERAS.items(): -            vid_cl = ClientVideo(_id, device, q) -            vid_cl.start() -            self.video_threads.append(vid_cl) +    def receive_messages(self): +        """Receive messages from the server."""          while self.running:              try:                  sockets = dict(self.poll.poll(1000))                  if self.socket in sockets:                      try:                          message = self.socket.recv_multipart() -                        logger.debug("Client '%s' received message: %r", self.identity, message) -                    except Exception: -                        logger.error("Failed to receive message: %r", traceback.format_exc()) -                if q.qsize() > 0: -                    frame = q.get() -                    if frame is not None: -                        logger.debug("Processing frame: %r", frame) -                        self.socket.send_multipart(frame) +                        logger.info("Client '%s' received message: %r.", self.identity, message) +                        self.update_config(message) +                    except Exception as exc: +                        logger.error("Failed to receive message: %r", exc)              except zmq.ZMQError as exc:                  logger.error("Client '%s': socket error: %r", self.identity, exc) +            time.sleep(0.01)  # Sleep to avoid busy waiting + +    def send_messages(self): +        """Send messages to the server.""" +        while self.running: +            try: +                frame = self.queue.get(timeout=0.1) +                logger.debug("Sending message of length %r ...", len(frame)) +                try: +                    self.socket.send_multipart(frame) +                except zmq.ZMQError as exc: +                    logger.error("Client '%s': socket error: %r", self.identity, exc) +            except Empty: +                continue  # No message to send, continue waiting + +    def update_config(self, message): +        """ +        Update configuration based on message from server. +        Only 'cameras' section can be updated. +        directives = ( +            'modify_camera_name', +            'modify_camera_threshold', +            'modify_camera_grace_pd', +            'modify_camera_address', +            'add_camera', +            'remove_camera', +        ) +        """ +        try: +            msg_list = [part.decode('utf-8') for part in message] +            logger.debug("Client received config message: %r", msg_list) +            camera_id, directive, value = msg_list[0], msg_list[1], msg_list[2] +            if directive in ( +                'modify_camera_name', +                'modify_camera_threshold', +                'modify_camera_grace_pd', +                'modify_camera_address', +                ): +                if camera_id not in CAMERAS: +                    logger.warning("Cannot rename unknown camera: %r", camera_id) +                    logger.warning("Known cameras: %r", list(CAMERAS.keys())) +                else: +                    if directive == 'modify_camera_name': +                        old_name, new_name = camera_id, value +                        if new_name in CAMERAS: +                            raise ValueError("New camera name already exists.") +                        CAMERAS[new_name] = CAMERAS.pop(old_name) +                        self.video_threads[new_name] = self.video_threads.pop(old_name) +                        # Send message to server to update routing: +                        timestamp = datetime.datetime.now().strftime(TIME_FORMAT) +                        self.video_threads[new_name].put_metadata_message( +                            action='rename', +                            data=':'.join([old_name, new_name, timestamp]) +                        ) +                        self.send_messages +                        for item in cfg.items('cameras'): +                            if item[0].startswith(old_name + '.'): +                                param = item[0].split('.', 1)[1] +                                cfg.set('cameras', f"{new_name}.{param}", item[1]) +                                cfg.remove_option('cameras', item[0]) +                        logger.info("Renamed and restarted thread for camera '%s' to '%s'.", old_name, new_name) +                    elif directive == 'modify_camera_threshold': +                        new_threshold = int(value) +                        if new_threshold <= 0: +                            raise ValueError("Threshold must be positive.") +                        CAMERAS[camera_id]['contour_size_threshold'] = new_threshold +                        self.video_threads[camera_id].device_threshold = new_threshold +                        cfg.set('cameras', f"{camera_id}.contour_size_threshold", str(new_threshold)) +                        logger.info("Modified contour size threshold of camera '%s' to %d.", camera_id, new_threshold) +                    elif directive == 'modify_camera_grace_pd': +                        new_grace_pd = int(value) +                        if new_grace_pd < 0: +                            raise ValueError("Grace period cannot be negative.") +                        CAMERAS[camera_id]['movement_grace_period'] = new_grace_pd +                        self.video_threads[camera_id].device_grace_period = new_grace_pd +                        cfg.set('cameras', f"{camera_id}.movement_grace_period", str(new_grace_pd)) +                        logger.info("Modified movement grace period of camera '%s' to %d.", camera_id, new_grace_pd) +                    elif directive == 'modify_camera_address': +                        new_address = value +                        CAMERAS[camera_id]['address'] = new_address +                        self.video_threads[camera_id].device_addr = new_address +                        # Changing address on the fly requires restarting the video thread: +                        self.video_threads[camera_id].stop() +                        self.video_threads[camera_id].join() +                        vid_cl = ClientVideo(camera_id, CAMERAS[camera_id], self.queue) +                        vid_cl.start() +                        self.video_threads[camera_id] = vid_cl +                        logger.info("Modified address of camera '%s' to '%s'.", camera_id, new_address) +                        cfg.set('cameras', f"{camera_id}.address", new_address) +            elif directive == 'add_camera': +                cam_address = value +                if camera_id in CAMERAS: +                    raise ValueError("Camera name already exists.") +                CAMERAS[camera_id] = {'address': cam_address} +                vid_cl = ClientVideo(camera_id, CAMERAS[camera_id], self.queue) +                vid_cl.start() +                self.video_threads[camera_id] = vid_cl +                cfg.set('cameras', f"{camera_id}.address", cam_address) +                logger.info("Added and started a thread for new camera '%s' with address '%s'.", camera_id, cam_address) +            else: +                logger.warning("Unknown config directive: %r", directive) + +        except (ValueError, IndexError) as exc: +            logger.error("Failed to handle config message: %r", exc) +        cfg.write(open(CONFIG_FILE, 'w')) + +    def run(self): +        """Main client logic.""" +        self.socket.connect(ADDRESS) +        logger.debug("Client '%s' connected to %s", self.identity, ADDRESS) +        logger.debug("Starting video threads ...") +        for _id, device_dict in CAMERAS.items(): +            vid_cl = ClientVideo(_id, device_dict, self.queue) +            vid_cl.start() +            self.video_threads[_id] = (vid_cl) + +        recv_thread = Thread(target=self.receive_messages, daemon=True) +        send_thread = Thread(target=self.send_messages, daemon=True) + +        recv_thread.start() +        send_thread.start() +        logger.debug("Client '%s' started receiving and sending threads.", self.identity) + +        logger.debug("Client '%s' waiting for threads to finish ...", self.identity) +        recv_thread.join() +        send_thread.join()          logger.info("Closing socket ...")          self.socket.close() @@ -149,18 +346,86 @@ class ClientTask(Thread):      def stop(self):          """Stop task"""          logger.info("ClientTask cleaning up ...") -        for thread in self.video_threads: -            logger.info("Cleaning u video thread ...") +        for _id, thread in self.video_threads.items(): +            logger.info("Cleaning video thread %s ...", _id)              thread.stop()              thread.join() -        self.video_threads = [] +        # self.video_threads = [] +        self.video_threads = {}          logger.info("Client task exiting ...")          self.running = False +def read_config(conf_file): +    """ +    Read config file and return as dictionary. +    The only required section is 'cameras' with at least one camera. +    Do basic sanity checks and update global variables. +    Log warnings if config file or sections/options are missing. +    """ +    cfg = ConfigParser() +    cfg.read(conf_file) +    # Need to update logging info: +    if 'logging' in cfg: +        global LOGDIR, LOGFILE, LOGLEVEL + +        LOGDIR = cfg.get('logging', 'logdir', fallback='.') +        LOGFILE = cfg.get('logging', 'logfile', fallback=LOGFILE) +        LOGLEVEL = cfg.get('logging', 'loglevel', fallback=LOGLEVEL) +        logger.setLevel(LOGLEVEL) +        for _handler in logger.handlers: +            _handler.setLevel(LOGLEVEL) +    if 'network' in cfg: +        global ROUTER_ADDRESS, PORT, ADDRESS +        if cfg.has_option('network', 'router_address'): +            ROUTER_ADDRESS = cfg.get('network', 'router_address') +        else: +            logger.warning("No 'router_address' option in 'network' section, using default: %r.", ROUTER_ADDRESS) +        if cfg.has_option('network', 'router_port'): +            PORT = cfg.get('network', 'router_port') +        else: +            logger.warning("No 'router_port' option in 'network' section, using default: %r.", PORT) +        ADDRESS = f"tcp://{ROUTER_ADDRESS}:{PORT}" +    else: +        logger.warning("No 'network' section in config file, using defaults: %r.", ADDRESS) +    if 'cameras' in cfg: +        global CAMERAS +        # These shenanigans below allow for a nested config simulation with configparser +        # and so per camera settings: +        cameras_dict = defaultdict(dict) +        for key, val in cfg.items('cameras'): +            try: +                cam, param = key.split('.', 1) +                if param in KNOWN_CAMERA_PARAMETERS: +                    cameras_dict[cam][param] = val +                else: +                    logger.warning("Unknown camera %r parameter: %r", cam, param) +            except ValueError as exc: +                logger.error("Invalid camera configuration entry: %r, error: %r, using defaults: %r.", key, exc, CAMERAS) +        # Check that each camera has at least an address and remove if not: +        for key in list(cameras_dict.keys()): +            if 'address' not in cameras_dict[key].keys(): +                logger.error("DEBUG: cameras_dict: %r", cameras_dict) +                logger.error("Camera %r has no address configured, removing from configuration.", key) +                del cameras_dict[key] +        if not cameras_dict: +            logger.warning("No valid camera configurations found in config file, using defaults: %r.", CAMERAS) +        else: +            # Only update global CAMERAS if we have at least one valid camera with an address: +            CAMERAS = cameras_dict +            logger.info("Using camera configuration: %r", CAMERAS) +    else: +        logger.warning("No 'cameras' section in config file, using defaults: %r.", CAMERAS) +    logger.info("CAMERAS configuration: %r", CAMERAS) +    with open(conf_file, 'w') as configfile: +        cfg.write(configfile) + +    return cfg +  def signal_handler(sig, frame):      logger.info("Received signal handler '%r', stopping client ...", sig)      client.stop() +    sys.exit(0)  if __name__ == "__main__": @@ -168,9 +433,14 @@ if __name__ == "__main__":      signal.signal(signal.SIGINT, signal_handler)      signal.signal(signal.SIGTERM, signal_handler) +    # Read configuration file before starting logging as it may contain logging settings: +    cfg = read_config(CONFIG_FILE) +      logger.info("Starting up ...") +      client = ClientTask("client_task")      client.start() -      client.join() + +    logger.info("Terminating ...")
\ No newline at end of file @@ -8,6 +8,8 @@ __author__ = "Franoosh Corporation"  import logging +import subprocess +import cv2  class CustomLoggingFormatter(logging.Formatter): @@ -42,3 +44,64 @@ class CustomLoggingFormatter(logging.Formatter):          return result +def process_frame(frame): +    """Process frame for contour detection.""" +    # Convert to grayscale: +    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) +    # Apply Gaussian blur: +    blurred = cv2.GaussianBlur(gray, (21, 21), 0) + +    return blurred + +def compute_contours(frame_deque): +    """Compute contours from a deque of frames.""" +    contours = [] +    if len(frame_deque) < 2: +        return contours +    all_contours = [] + +    for idx, frame in enumerate(frame_deque): +        frame_0 = process_frame(frame) +        try: +            frame_1 = process_frame(frame_deque[idx+1]) +        except IndexError: +            break +        frame_delta = cv2.absdiff(frame_0, frame_1) +        threshold = cv2.threshold(frame_delta, 25, 255, cv2.THRESH_BINARY)[1] +        threshold = cv2.dilate(threshold, None, iterations=2) +        contours, _ = cv2.findContours(threshold.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) +        all_contours.extend(contours) + +    return all_contours + +def draw_contours(frame, contours, min_contour_area=500): +    """Draw contours on the frame.""" +    for contour in contours: +        if cv2.contourArea(contour) > min_contour_area: +            (x, y, w, h) = cv2.boundingRect(contour) +            cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2) + +    return frame + +def detect_movement(contours, min_area=500): +    """Detect movement based on contours found from frame diff.""" +    for contour in contours: +        if cv2.contourArea(contour) >= min_area: +            return True +    return False + +def get_available_cameras(): +    """ +    Get list of available camera devices. +    At the moment it does not work. At all. It is useless. +    """ +    proc = subprocess.Popen(['v4l2-ctl', '--list-devices'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) +    stdout, stderr = proc.communicate() +    candidate_devices = [i.strip() for i in stdout.decode('utf-8').strip().splitlines()[1:]] +    verified_devices = [] +    for device in candidate_devices: +        cap = cv2.VideoCapture(device) +        if cap.isOpened(): +            verified_devices.append(device) +        cap.release() +    return verified_devices
\ No newline at end of file @@ -18,17 +18,17 @@ from helpers import CustomLoggingFormatter  # TODO: add configparser -ADDR_FRONTEND = 'tcp://localhost' -ADDR_BACKEND = 'tcp://localhost' +IP_FRONTEND = '127.0.0.1' +IP_BACKEND = '127.0.0.1'  PORT_FRONTEND = "5569"  PORT_BACKEND = "9979" -INITIAL_FRONTEND_ADDR = f"{ADDR_FRONTEND}:{PORT_FRONTEND}" -INITIAL_BACKEND_ADDR = f"{ADDR_BACKEND}:{PORT_BACKEND}" +FRONTEND_ADDR = f"tcp://{IP_FRONTEND}:{PORT_FRONTEND}" +BACKEND_ADDR = f"tcp://{IP_BACKEND}:{PORT_BACKEND}"  LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log" -LOGLEVEL = logging.DEBUG +LOGLEVEL = logging.INFO  log_formatter = CustomLoggingFormatter()  handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') @@ -52,13 +52,13 @@ def custom_proxy():      # Set up frontend:      frontend_socket = context.socket(zmq.ROUTER)      frontend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) -    frontend_socket.bind(INITIAL_FRONTEND_ADDR) +    frontend_socket.bind(FRONTEND_ADDR)      poller.register(frontend_socket, zmq.POLLIN)      # Set up backend:      backend_socket = context.socket(zmq.ROUTER)      backend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) -    backend_socket.bind(INITIAL_BACKEND_ADDR) +    backend_socket.bind(BACKEND_ADDR)      poller.register(backend_socket, zmq.POLLIN)      awailable_workers = [] @@ -78,101 +78,137 @@ def custom_proxy():      pending_messages_worker_client = defaultdict(lambda: defaultdict(deque))      while RUNNING: -        events = dict(poller.poll(1000))  # Poll both frontend and backend sockets: -        if frontend_socket in events:  # If message from client: -            msg = frontend_socket.recv_multipart()  # Receive message from client: -            logger.debug("Received message: %r", msg) +        # Poll both frontend and backend sockets: +        events = dict(poller.poll(1000)) +        # If message from client: +        if frontend_socket in events: +            # Receive message from client: +            msg = frontend_socket.recv_multipart() +            logger.debug("Received message.")              client_id, content = msg[0], msg[1:] -            if not client_id in client_worker_dict:  # If client is not in client-worker dictionary: -                logger.info("Received ID from client: %r.", client_id)              # Check if client is already connected to worker:              try: -                worker_id = client_worker_dict[client_id]  # Get worker ID for this client from the client-worker dictionary -                while pending_messages_client_worker[client_id][worker_id]:  # Check if there are any pending messages for this worker and send them first: -                    logger.debug("There are pending messages from client %r for worker %r", client_id, worker_id) +                # Get worker ID for this client from the client-worker dictionary: +                worker_id = client_worker_dict[client_id] +                # Check if there are any pending messages for this worker and send them first: +                while pending_messages_client_worker[client_id][worker_id]: +                    # In order to take a peek at the size of all messages, perhaps check out: +                    # https://web.archive.org/web/20240804164028/https://code.activestate.com/recipes/546530/ +                    logger.debug( +                        "There are '%d' pending messages from client %r for worker %r", +                        len(pending_messages_client_worker[client_id][worker_id]), +                        client_id, +                        worker_id, +                    )                      pending_msg = pending_messages_client_worker[client_id][worker_id].pop()                      try: -                        logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id) +                        logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id)                          backend_socket.send_multipart([worker_id, client_id, *pending_msg])                      except Exception as e: -                        logger.error("Failed to send pending message: %r to worker: %r, error: %r", pending_msg, worker_id, e) -                        pending_messages_client_worker[client_id][worker_id].append(pending_msg)  # Re-queue the message -                        break # Break to avoid infinite loop -                if pending_messages_client_worker[client_id][worker_id]:  # If there are still pending messages for this worker: -                    pending_messages_client_worker[client_id][worker_id].appendleft(content)  # Store message for later delivery +                        logger.error("Failed to send pending message to worker: %r, error: %r", worker_id, e) +                        # Re-queue the message: +                        pending_messages_client_worker[client_id][worker_id].append(pending_msg) +                        # Break to avoid infinite loop: +                        break +                # If there are still pending messages for this worker: +                if pending_messages_client_worker[client_id][worker_id]: +                    # Store message for later delivery: +                    pending_messages_client_worker[client_id][worker_id].appendleft(content) +                # At last send new message to worker:                  else:                      try: -                        logger.debug("No more pending messages for client %r, sending new message to worker %r: %r", client_id, worker_id, msg) -                        backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker: +                        logger.debug("No more pending messages for client %r, sending new message to worker %r.", client_id, worker_id) +                        backend_socket.send_multipart([worker_id, client_id, *content])                      except Exception as e: -                        logger.error("Failed to send message to backend: %r, error: %s", msg, e) -                        pending_messages_client_worker[client_id][worker_id].appendleft(msg)  # Store message for later delivery -            except KeyError:  # If client is not connected to any worker: -                logger.debug("Client '%s' is not connected to any worker, checking available workers", client_id) -                if awailable_workers:  # If there are available workers: -                    worker_id = random.choice(awailable_workers)  # Assign random worker to client -                    client_worker_dict[client_id] = worker_id # Update client-worker dictionary -                    if pending_messages_no_worker[client_id]:  # Check if there are any pending messages for this client: -                        pending_messages_client_worker[client_id][worker_id].extendleft(msg)  # Move messages to pending messages for this worker +                        logger.error("Failed to send message to backend, error: %s", e) +                        # Store message for later delivery: +                        # pending_messages_client_worker[client_id][worker_id].appendleft(msg) <- FIXME: this is the bug, no? +                        pending_messages_client_worker[client_id][worker_id].appendleft(content) +            # If client is not connected to any worker: +            except KeyError: +                logger.info("Received ID from client: %r.", client_id) +                # Check if there are available workers. +                if awailable_workers: +                    # Assign random worker to client: +                    worker_id = random.choice(awailable_workers) +                    # Update client-worker dictionary: +                    client_worker_dict[client_id] = worker_id +                    # Check if there are any pending messages for this client: +                    if pending_messages_no_worker[client_id]: +                        # Move messages to pending messages for this worker: +                        pending_messages_client_worker[client_id][worker_id].extendleft(msg)                          logger.debug("Moved pending messages for client '%s' to worker '%s'", client_id, worker_id) -                    while pending_messages_client_worker[client_id][worker_id]:  # Check if there are any pending messages for this worker: +                    # Check if there are any pending messages for this worker: +                    while pending_messages_client_worker[client_id][worker_id]:                          pending_msg = pending_messages_client_worker[client_id][worker_id].pop()                          try: -                            logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id) +                            logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id)                              backend_socket.send_multipart([worker_id, client_id, *pending_msg])                          except Exception as e: -                            logger.error("Failed to send pending message: %r to worker: %r. Error: %s", pending_msg, worker_id, e) -                            pending_messages_client_worker[client_id][worker_id].append(pending_msg)  # Re-queue the message +                            logger.error("Failed to send pending message to worker: %r. Error: %s", worker_id, e) +                            # Re-queue the message: +                            pending_messages_client_worker[client_id][worker_id].append(pending_msg)                              break -                    if pending_messages_client_worker[client_id][worker_id]:  # If there are still pending messages for this worker: -                        logger.debug("There are still pending messages for client '%s' and worker '%s', storing message for later delivery: %r", client_id, worker_id, content) -                        pending_messages_client_worker[client_id][worker_id].appendleft(content)  # Store message for later delivery +                    # If there are still pending messages for this worker: +                    if pending_messages_client_worker[client_id][worker_id]: +                        logger.debug("There are still pending messages for client '%s' and worker '%s', storing message for later delivery.", client_id, worker_id) +                        # Store message for later delivery: +                        pending_messages_client_worker[client_id][worker_id].appendleft(content)                      else: -                        logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg) +                        logger.debug("No more pending messages for client '%s', sending new message to worker '%s'.", client_id, worker_id)                          try: -                            logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg) -                            backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker: +                            logger.debug("No more pending messages for client '%s', sending new message to worker '%s'.", client_id, worker_id) +                            # At last, send the message to worker: +                            backend_socket.send_multipart([worker_id, client_id, *content])                          except Exception as e: -                            logger.error("Failed to send message %r for client %r to backend, requeing. Error: %s", msg, client_id, e) -                            pending_messages_client_worker[client_id][worker_id].appendleft(msg)  # Store message for later delivery +                            logger.error("Failed to send message for client %r to backend, requeing. Error: %s", client_id, e) +                            # Store message for later delivery: +                            pending_messages_client_worker[client_id][worker_id].appendleft(msg) -                else:  # If no workers are available, assign a new worker:   -                    pending_messages_no_worker[client_id].append(content)  # Store message for later delivery +                else: +                    # Store message for later delivery: +                    pending_messages_no_worker[client_id].append(content)                      logger.debug("No available workers, storing client '%s' with no worker assigned", client_id)          if backend_socket in events:              _msg = backend_socket.recv_multipart() +            # These messages should be safe to log as they are short strings or bytes.              logger.debug("Received from worker: %r", _msg)              if len(_msg) == 2: # Worker is sending its identity:                  worker_id, msg = _msg[0], _msg[1:]                  logger.debug("Received ID from worker: %r, message: %r", worker_id, msg)                  logger.info("Received ID from worker: %r.", worker_id) -                awailable_workers.append(worker_id)  # Add worker to available workers list +                # Add worker to available workers list: +                awailable_workers.append(worker_id)                  for client, worker in client_worker_dict.items(): -                    if worker is None:  # If client has no worker assigned: +                    # If client has no worker assigned, assign it a new worker: +                    if worker is None:                          client_worker_dict[client] = worker                          logger.debug("Assigning worker %r to client %r", worker, client)                  if pending_messages_no_worker:                      # If there are pending messages for clients with no worker assigned:                      for client_id, messages in pending_messages_no_worker.items():                          if messages: -                            pending_messages_client_worker[client_id][worker_id].extendleft(messages)  # Move messages to pending messages for this worker -                            pending_messages_no_worker[client_id].clear()  # Clear pending messages for this client +                            # Move messages to pending messages for this worker: +                            pending_messages_client_worker[client_id][worker_id].extendleft(messages) +                            # Clear pending messages for this client: +                            pending_messages_no_worker[client_id].clear()                              logger.debug("Moved pending messages for client %r to worker %r", client_id, worker_id)                              try: -                                pending_msg = pending_messages_client_worker[client_id][worker_id].pop()  # Get the first message for this client and worker -                                logger.debug( -                                    "Sending pending message %r to worker %r for client '%r'", pending_msg, worker_id, client_id) -                                backend_socket.send_multipart([worker_id, client_id, *pending_msg])  # Send the message to worker +                                # Get the first message for this client and worker: +                                pending_msg = pending_messages_client_worker[client_id][worker_id].pop() +                                logger.debug("Sending pending message to worker %r for client '%r'", worker_id, client_id) +                                # Send the message to worker: +                                backend_socket.send_multipart([worker_id, client_id, *pending_msg])                              except Exception as e:                                  pending_messages_client_worker[client_id][worker_id].append(pending_msg) -                                logger.error("Failed to send pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e) +                                logger.error("Failed to send pending message to worker: %r. Error: %r", worker_id, e)              else:  # Worker is sending a message to client:                  worker_id, client_id, msg = _msg[0], _msg[1], _msg[2:]                  logger.debug("Received message from worker '%s' for client '%s': %r", worker_id, client_id, msg)                  # First check if there are any pending messages for this worker: -                while pending_messages_worker_client[worker_id][client_id]:  # Check if there are any pending messages for this worker: +                while pending_messages_worker_client[worker_id][client_id]:                      pending_msg = pending_messages_worker_client[worker_id][client_id].pop()                      try:                          logger.debug("Sending pending message %r from %r to client '%r'", pending_msg, client_id, worker_id) @@ -180,10 +216,12 @@ def custom_proxy():                      except Exception as e:                          # It is safe to log the message content as it is a short string or bytes.                          logger.error("Could not deliver pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e) -                        pending_messages_worker_client[worker_id][client_id].append(pending_msg)  # Re-queue the message   -                        break  # Break to avoid infinite loop -                # Now send the message to the client if no pending messages for this client: -                if not pending_messages_worker_client[worker_id][client_id]:  # If there are no pending messages for this worker: +                        # Re-queue the message: +                        pending_messages_worker_client[worker_id][client_id].append(pending_msg) +                        # Break to avoid infinite loop: +                        break +                # If no pending messages for this client, send new message to the client : +                if not pending_messages_worker_client[worker_id][client_id]:                      logger.debug("Sending message %r to client '%s'", msg, client_id)                      try:                          frontend_socket.send_multipart([client_id, *msg]) diff --git a/templates/client.html b/templates/client.html index e03394a..5a52f43 100644 --- a/templates/client.html +++ b/templates/client.html @@ -12,19 +12,52 @@      <div class="streams-container">          {% for camera_id in camera_ids %}          <div class="camera-stream"> -            <h2>Camera {{ camera_id }}</h2> +            <h2>Camera: {{ camera_id }}</h2>              <img id="video-{{ camera_id }}" width="640" height="480" /> +            <div> +                <h3>Modify Camera Name</h3> +                <input type="text" id="new-name-{{ camera_id }}" placeholder="New Name"> +                <button onclick="sendConfig('{{ camera_id }}', 'modify_camera_name')">Send</button> +            </div> +            <div> +                <h3>Modify Camera Threshold</h3> +                <input type="number" id="threshold-value-{{ camera_id }}" placeholder="Threshold"> +                <button onclick="sendConfig('{{ camera_id }}', 'modify_camera_threshold')">Send</button> +            </div> +            <div> +                <h3>Modify Camera Grace Period</h3> +                <input type="number" id="grace-value-{{ camera_id }}" placeholder="Grace Period"> +                <button onclick="sendConfig('{{ camera_id }}', 'modify_camera_grace_pd')">Send</button> +            </div> +            <div> +                <h3>Modify Camera Address</h3> +                <input type="text" id="address-value-{{ camera_id }}" placeholder="New Address"> +                <button onclick="sendConfig('{{ camera_id }}', 'modify_camera_address')">Send</button> +            </div>          </div> -        {% endfor %} -    </div>  +    {% endfor %} +    <div> +        <h3>Add Camera</h3> +        <input type="text" id="add-name" placeholder="Camera Name"> +        <input type="text" id="add-address" placeholder="Address"> +        <button onclick="sendConfig('add_camera')">Send</button> +    </div> +    <div> +        <h3>Remove Camera</h3> +        <input type="text" id="remove-name" placeholder="Camera Name"> +        <button onclick="sendConfig('remove_camera')">Send</button> +    </div> +    </div>      <script> -    // For each camera, open a WebSocket and update the corresponding <img> +    const wsMap = {};      {% for camera_id in camera_ids %} +    // For each camera, open a WebSocket and update the corresponding <img>      (function() { -        let ws = new WebSocket('ws://' + window.location.host + '/ws/{{ client_id }}/{{ camera_id }}'); -        let image = document.getElementById('video-{{ camera_id }}'); +        const cameraId = '{{ camera_id }}'; +        const ws = new WebSocket('ws://' + window.location.host + '/ws/{{ client_id }}/' + cameraId);          let currentUrl = null;          ws.onmessage = function(event) { +            let image = document.getElementById('video-' + cameraId);              if (currentUrl) {                  URL.revokeObjectURL(currentUrl);              } @@ -32,16 +65,65 @@              image.src = currentUrl;          };          ws.onclose = function(event) { -            console.log('WebSocket closed for camera {{ camera_id }}:', event); +            console.log('WebSocket closed for camera ' + cameraId + ':', event);          };          ws.onerror = function(event) { -            console.log('WebSocket error for camera {{ camera_id }}:', event); +            console.log('WebSocket error for camera ' + cameraId + ':', event);          };          window.addEventListener('beforeunload', function() {              ws.close();          }); +        wsMap[cameraId] = ws;      })();      {% endfor %} + +    function sendConfig(cameraId, type) { +        let msg = {}; +        switch(type) { +            case 'modify_camera_name': +                msg[type] = [ +                    // cameraId, +                    document.getElementById('new-name-' + cameraId).value +                ]; +                break; +            case 'modify_camera_threshold': +                msg[type] = [ +                    // cameraId, +                    document.getElementById('threshold-value-' + cameraId).value +                ]; +                break; +            case 'modify_camera_grace_pd': +                msg[type] = [ +                    // cameraId, +                    parseInt(document.getElementById('grace-value-' + cameraId).value) +                ]; +                break; +            case 'modify_camera_address': +                msg[type] = [ +                    // cameraId, +                    document.getElementById('address-value-' + cameraId).value +                ]; +                break; +            case 'add_camera': +                msg[type] = [ +                    document.getElementById('add-name').value, +                    document.getElementById('add-address').value +                ]; +                break; +            case 'remove_camera': +                msg[type] = [ +                    document.getElementById('remove-name').value +                ]; +                break; +        } +        const ws = wsMap[cameraId] && wsMap[cameraId] ? wsMap[cameraId] : Object.values(wsMap)[0]; +        if (ws && ws.readyState === WebSocket.OPEN) { +            console.log("Sending message:", msg, "on ws:", ws); +            ws.send(JSON.stringify(msg)); +        } else { +            alert('WebSocket is not open for camera ' + cameraId); +        } +    }      </script>  </body>  </html> diff --git a/webserver.py b/webserver.py index d1c0a1e..162badc 100644 --- a/webserver.py +++ b/webserver.py @@ -1,42 +1,43 @@  #!/usr/bin/env python -  """  Module serving video from zmq to a webserver.  """ -  __author__ = "Franoosh Corporation" -  import os -from collections import defaultdict +from collections import defaultdict, deque  import json  import logging +import zmq.asyncio  import asyncio -from threading import Thread +from collections import defaultdict +from contextlib import asynccontextmanager  import uvicorn +import zmq  from fastapi import (      FastAPI,      Request,      HTTPException,      WebSocket, +    WebSocketDisconnect,      templating,  )  from fastapi.responses import HTMLResponse  from fastapi.staticfiles import StaticFiles  from helpers import CustomLoggingFormatter -import zmq  CLIENTS_JSON_FILE = os.path.join(os.getcwd(), 'clients.json') +CLIENTS_DICT = defaultdict(list)  # CLIENTS_DICT[client_id] = [camera_id1, camera_id2, ...]   LOGFILE = 'webserver.log' -LOGLEVEL = logging.INFO +LOGLEVEL = logging.DEBUG  HOST = "127.0.0.1" -ZMQPORT = "9979" -WSPORT = "8008" -ZMQ_BACKEND_ADDR = f"tcp://{HOST}:{ZMQPORT}" -WS_BACKEND_ADDR = f"tcp://{HOST}:{WSPORT}" +ZMQ_PORT = "9979" +WEB_PORT = "8008" +CTRL_BACKEND_ADDR = f"tcp://{HOST}:{ZMQ_PORT}" +WEB_BACKEND_ADDR = f"tcp://{HOST}:{WEB_PORT}"  log_formatter = CustomLoggingFormatter()  handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') @@ -50,88 +51,175 @@ logging.basicConfig(      level=LOGLEVEL,  ) - - -app = FastAPI() +# Track websocket connections by (client_id, camera_id): +ws_connections = defaultdict(dict)  # ws_connections[client_id][camera_id] = websocket +ws_queues = defaultdict(dict) +ctrl_msg_que = asyncio.Queue() +# Create ZMQ context and socket: +zmq_context = zmq.asyncio.Context() +zmq_socket = zmq_context.socket(zmq.DEALER) +# Connect to ZMQ backend: +zmq_socket.connect(WEB_BACKEND_ADDR) + + +async def zmq_bridge(): +    """Bridge between ZMQ backend and websocket clients.""" +    while True: +        try: +            data = await zmq_socket.recv_multipart() +            topic, frame_data = None, None +            if len(data) == 2: +                topic, frame_data = data +            elif len(data) == 3: +                topic, _, _ = data +            else: +                logger.warning("Received invalid ZMQ message: %r", data) +                continue +            if topic: +                client_id, camera_id = topic.decode('utf-8').split(':', 1) +                if frame_data: +                    # Add client and camera to CLIENTS_DICT if new: +                    if not camera_id in CLIENTS_DICT[client_id]: +                        CLIENTS_DICT[client_id].append(camera_id) +                else: +                    # No frame data means a notification to remove camera: +                    try: +                        CLIENTS_DICT[client_id].remove(camera_id) +                    except ValueError: +                        pass + +            queue = ws_queues.get(client_id, {}).get(camera_id) +            if queue and frame_data: +                if queue.full(): +                    _ = queue.get_nowait()  # Discard oldest frame +                await queue.put(frame_data) +            if not ctrl_msg_que.empty(): +                client_id, camera_id, command, args_list = await ctrl_msg_que.get() +                zmq_socket.send_multipart([ +                    client_id.encode('utf-8'), +                    camera_id.encode('utf-8'), +                    command.encode('utf-8'), +                ] +  args_list) +                logger.info("Sent control command '%s' with args: %r for '/ws/%s/%s' to backend.", command, args_list, client_id, camera_id) +        except Exception as e: +            logger.error("Error in ZMQ bridge: %r", e) +    # TODO: Check if this loop can be optimized to avoid busy waiting. +    # Alternative implementation using zmq Poller: +    # poll = zmq.asyncio.Poller() +    # poll.register(zmq_socket, zmq.POLLIN) +    # while True: +    #     try: +    #         sockets = dict(await poll.poll()) +    #         if zmq_socket in sockets: +    #             topic, frame_data = await zmq_socket.recv_multipart() +    #             client_id, camera_id = topic.decode('utf-8').split(':', 1) +    #             set_clients(client_id, camera_id) +    #             queue = ws_queues.get(client_id, {}).get(camera_id) +    #             if queue: +    #                 if queue.full(): +    #                     _ = queue.get_nowait()  # Discard oldest frame +    #                 await queue.put(frame_data) +    #         if not ctrl_msg_que.empty(): +    #             client_id, camera_id, command, args_list = await ctrl_msg_que.get() +    #             zmq_socket.send_multipart([ +    #                 client_id.encode('utf-8'), +    #                 camera_id.encode('utf-8'), +    #                 command.encode('utf-8'), +    #             ] +  args_list) +    #             logger.info("Sent control command '%s' with args: %r for '/ws/%s/%s' to backend.", command, args_list, client_id, camera_id) +    #     except Exception as e: +    #         logger.error("Error in ZMQ bridge: %r", e) + +@asynccontextmanager +async def lifespan(app: FastAPI): +    """Create lifespan context for FastAPI app.""" +    asyncio.create_task(zmq_bridge()) +    yield + +app = FastAPI(lifespan=lifespan)  app.mount("/static", StaticFiles(directory="static"), name="static")  templates = templating.Jinja2Templates(directory='templates') -# Track websocket connections by (client_id, camera_id) -ws_connections = defaultdict(dict)  # ws_connections[client_id][camera_id] = websocket - -# Set up a single ZMQ SUB socket for all websocket connections -zmq_context = zmq.Context() -zmq_socket = zmq_context.socket(zmq.SUB) -zmq_socket.bind(WS_BACKEND_ADDR) -zmq_socket.setsockopt(zmq.SUBSCRIBE, b"")  # Subscribe to all topics -poller = zmq.Poller() -poller.register(zmq_socket, zmq.POLLIN) - -def load_clients(): -    try: -        with open(CLIENTS_JSON_FILE) as f: -            clients_dict = json.load(f) -    except FileNotFoundError: -        clients_dict = {} -    return clients_dict  @app.get("/")  async def main_route(request: Request): -    logger.error("DEBUG: main route visited") -    clients = load_clients() +    """Serve main page.""" +    logger.debug("Main route visited")      return templates.TemplateResponse(          "main.html",          {              "request": request, -            "clients": clients, +            "clients": CLIENTS_DICT,           }      )  @app.get("/clients/{client_id}", response_class=HTMLResponse)  async def client_route(request: Request, client_id: str):      """Serve client page.""" -    clients_dict = load_clients() -    logger.debug("Checking client_id: '%s' in clients_dict: %r.", client_id, clients_dict) -    if not client_id in clients_dict: -        return HTTPException(status_code=404, detail="No such client ID.") +    logger.debug("Checking client_id: '%s' in clients_dict: %r.", client_id, CLIENTS_DICT) +    if not client_id in CLIENTS_DICT: +        raise HTTPException(status_code=404, detail="No such client ID.")      return templates.TemplateResponse(          "client.html",          {              "request": request,              "client_id": client_id, -            "camera_ids": clients_dict[client_id], +            "camera_ids": CLIENTS_DICT[client_id],          },      ) -  @app.websocket("/ws/{client_id}/{camera_id}")  async def camera_route(websocket: WebSocket, client_id: str, camera_id: str):      """Serve a particular camera page."""      logger.info("Accepting websocket connection for '/ws/%s/%s'.", client_id, camera_id)      await websocket.accept() -    if client_id not in ws_connections: -        ws_connections[client_id] = {} -    ws_connections[client_id][camera_id] = websocket -    try: +    ws_connections[client_id][camera_id] = {'ws': websocket} +    ws_queues[client_id][camera_id] = asyncio.Queue(maxsize=10) +    queue = ws_queues[client_id][camera_id] +     +    async def send_frames(): +        while True: +            frame_data = await queue.get() +            await websocket.send_bytes(frame_data) +     +    async def receive_control():          while True: -            # Wait for a frame for this client/camera -            sockets = dict(poller.poll(1000)) -            if zmq_socket in sockets: -                msg = zmq_socket.recv_multipart() -                if len(msg) == 3: -                    recv_client_id, recv_camera_id, content = msg -                    recv_client_id = recv_client_id.decode("utf-8") -                    recv_camera_id = recv_camera_id.decode("utf-8") -                    # Only send to the websocket for this client/camera -                    if recv_client_id == client_id and recv_camera_id == camera_id: -                        await websocket.send_bytes(content) -    except Exception as exc: -        logger.warning("Connection closed: %r", exc) +            try: +                data = await websocket.receive_text() +                logger.info("Received control message from '/ws/%s/%s': %s", client_id, camera_id, data) +                # Handle control messages from the client: +                frontend_message = json.loads(data) +                for command, args in frontend_message.items(): +                    args_list = [str(arg).encode('utf-8') for arg in args] +                    ctrl_msg_que.put_nowait((client_id, camera_id, command, args_list)) +                    # zmq_control_socket.send_multipart([ +                    #     client_id.encode('utf-8'), +                    #     camera_id.encode('utf-8'), +                    #     command.encode('utf-8'), +                    #     b'', +                    # ] + args_list) +                    # logger.info("Sent control command '%s' with args: %r for '/ws/%s/%s' to backend.", command, args_list, client_id, camera_id) +                    logger.info("Put control command '%s' with args: %r for '/ws/%s/%s' on queue to backend.", command, args_list, client_id, camera_id) +            except json.JSONDecodeError: +                logger.warning("Received invalid JSON from '/ws/%s/%s': %s", client_id, camera_id, data) +            except WebSocketDisconnect: +                logger.info("WebSocket disconnected for '/ws/%s/%s'.", client_id, camera_id) +                break +            except Exception as exc: +                logger.warning("Error receiving control message: %r", exc) +    send_task = asyncio.create_task(send_frames()) +    receive_task = asyncio.create_task(receive_control()) +    try: +        # await asyncio.gather(send_task, receive_task) +        await asyncio.wait( +            [send_task, receive_task], +            return_when=asyncio.FIRST_COMPLETED, +        )      finally: -        if client_id in ws_connections and camera_id in ws_connections[client_id]: -            del ws_connections[client_id][camera_id] -        await websocket.close() - +        send_task.cancel() +        receive_task.cancel() +        ws_connections[client_id].pop(camera_id, None) +        ws_queues[client_id].pop(camera_id, None)  if __name__ == "__main__":      uvicorn.run( @@ -139,4 +227,4 @@ if __name__ == "__main__":          port=8007,          host='127.0.0.1',          log_level='info', -    )
\ No newline at end of file +    ) @@ -1,29 +1,51 @@  #!/usr/bin/env python -__author__ = "Franoosh Corporation"  """  A module consisting of a zeromq worker receiving video stream  via router from clients. Able to send control messages to clients. +Needs libzmq and pyzmq with 'drafts' support.  """  import os +import sys  from threading import Thread, Event  import time  import signal  import logging  from queue import Queue  from collections import defaultdict +import tempfile +import json  import zmq +import cv2 +import numpy  from helpers import CustomLoggingFormatter +__author__ = "Franoosh Corporation" + +# Various constants:  # TODO: put them in a config + +HOST = "127.0.0.1" +ZMQPORT = "9979" +# WSPORT = "8000" +WSPORT = "8008" +ZMQ_BACKEND_ADDR = f"tcp://{HOST}:{ZMQPORT}" +WEB_BACKEND_ADDR = f"tcp://{HOST}:{WSPORT}" +# LOGLEVEL = logging.DEBUG +LOGLEVEL = logging.INFO + +#   File paths: -ADDR = "tcp://localhost" -PORT = "9979" -INITIAL_BACKEND_ADDR = f"{ADDR}:{PORT}"  LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log" -LOGLEVEL = logging.DEBUG +CWD = os.getcwd() +TMP_DIR = os.path.join(CWD, "tmp") +CLIENTS_JSON_FILE = os.path.join(CWD, 'clients.json') + +# Other: + +CLIENTS_DICT = {}  stop_event = Event() @@ -39,6 +61,14 @@ logging.basicConfig(      level=LOGLEVEL,  ) + +if not os.path.exists(TMP_DIR): +    try: +        os.makedirs(TMP_DIR) +    except Exception as exc: +        logger.error("Could not create temporary directory: %r", exc) +        sys.exit() +  class MonitorTask(Thread):      """Monitor task"""      def __init__(self, socket): @@ -92,14 +122,45 @@ class ServerWorker(Thread):          self.socket.identity = self.id.encode("utf-8")          self.monitor = MonitorTask(self.socket)          self.video_threads = defaultdict(lambda: defaultdict(dict)) +        self.poller = zmq.Poller() +        self.web_sock = self.context.socket(zmq.DEALER)          self.connected = False          self.running = True +    def start_client(self, client_id, camera_id, filename): +        """ +        Start a video thread for new client_id and camera_id. +        """ +        if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id):  # New client or new camera +            q = Queue() +            logger.debug("Starting new video thread for client '%s', camera '%s'", client_id, camera_id) +            video_worker = VideoWorker(client_id, camera_id, filename, q) +            video_worker.start() +            self.video_threads[client_id][camera_id] = video_worker + +    def stop_client(self, client_id, camera_id): +        """ +        Stop video thread for a client_id and camera_id. +        """ +        if client_id in self.video_threads and camera_id in self.video_threads[client_id]: +            logger.debug("Stopping video thread for client '%s', camera '%s'", client_id, camera_id) +            self.video_threads[client_id][camera_id].stop() # Stop the thread +            del self.video_threads[client_id][camera_id] +            logger.info("Stopped video thread for client '%s', camera '%s'", client_id, camera_id) +      def run(self): +        """ +        Main loop of the worker. +        Full of wonders. +        """          logger.debug("ServerWorker '%s' starting ...", self.id) -        self.socket.connect(INITIAL_BACKEND_ADDR) -        self.poller = zmq.Poller() +        self.socket.connect(ZMQ_BACKEND_ADDR) +        try: +           self.web_sock.bind(WEB_BACKEND_ADDR) +        except Exception as exc: +            logger.error("Connection to zmq websocket failed: %r", exc)          self.poller.register(self.socket, zmq.POLLIN) +        self.poller.register(self.web_sock, zmq.POLLIN)          self.monitor.start()          while self.running: @@ -112,67 +173,93 @@ class ServerWorker(Thread):              if self.socket in sockets:                  self.connected = True                  msg = self.socket.recv_multipart() -                logger.debug("ServerWorker '%s' received message: %r", self.id, msg) +                logger.debug("ServerWorker '%s' received message of length: %d.", self.id, len(msg))                  filename = None -                # At the moment we don't expect any other message than a video message: +                # At the moment we don't expect any other message than a start/stop message (of length 4) and a video message:                  if len(msg) == 4:  # This is a message with start/stop directive and no video data. -                    logger.debug("Received start/stop directive: %r", msg) +                    logger.debug("Received start/stop directive: (?)")                      client_id, camera_id, metadata = msg[0], msg[1], msg[2] +                    # Convert bytes to str once and for all: +                    client_id = client_id.decode('utf-8') +                    camera_id = camera_id.decode('utf-8') +                    update_clients(client_id, camera_id)                      try: -                        directive, timestamp = metadata.decode("utf-8").split(":") -                        filename = f"{client_id.decode('utf-8')}_{camera_id.decode('utf-8')}-{timestamp}.mp4" -                        logger.debug("Directive: '%s', Timestamp: '%s', Filename: '%s'", directive, timestamp, filename) +                        # Directive and data are fron now on converted to strings: +                        directive, data = metadata.decode("utf-8").split(":") +                        logger.info( +                            "Received directive '%s' with data: %r for client '%s', camera '%s'", +                            directive, +                            data, +                            client_id, +                            camera_id, +                        )                      except ValueError:                          logger.error("Invalid metadata format: %r", metadata) +                        directive = None                          continue -                    if directive == "start": -                        if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id):  # New client or new camera -                            q = Queue() -                            logger.debug("Starting new video thread for client '%s', camera '%s'", client_id, camera_id) -                            video_worker = VideoWorker(filename, q) -                            video_worker.start() -                            self.video_threads[client_id][camera_id] = video_worker -                    elif directive == "stop": -                        if client_id in self.video_threads and camera_id in self.video_threads[client_id]: -                            logger.debug("Stopping video thread for client '%s', camera '%s'", client_id, camera_id) -                            self.video_threads[client_id][camera_id].queue.put(None)  # Sentinel to stop the thread -                            del self.video_threads[client_id][camera_id] -                            logger.info("Stopped video thread for client '%s', camera '%s'", client_id, camera_id) +                    if directive == 'rename': +                        old_name, new_name, timestamp = None, None, None +                        try: +                            old_name, new_name, timestamp = data.split(":") +                            logger.info("Renamed video thread from '%s' to '%s'.", old_name, new_name) +                        except ValueError: +                            logger.error("Invalid rename data format: %r", data) +                            continue +                        if old_name and new_name and timestamp: +                            # I think it's better to stop the old thread and start a new one, +                            # rather than reuse the old one as it's less mucking about. +                            self.stop_client(old_name, camera_id) +                            self.start_client(new_name, camera_id, f"{new_name}_{camera_id}-{timestamp}.mkv") +                            to_remove= b':'.join([client_id, camera_id]) +                            try: +                                self.web_sock.send_multipart([to_remove, b'', b''])  # Notify webserver of rename +                            except Exception as exc: +                                logger.error("Sending rename notification to websocket failed: %r", exc)                      else: -                        logger.error("Unknown directive: '%s'", directive) -                elif len(msg) == 3:  # This is a video message with data only -                    logger.debug("Received video message with data only: %r", msg) -                    client_id, camera_id, msg = msg[0], msg[1], msg[2] +                        timestamp = data +                        filename = f"{client_id}_{camera_id}-{timestamp}.mkv" +                        logger.debug("Directive: '%s', Timestamp: '%s', Filename: '%s'", directive, timestamp, filename) +                        if directive == "start": +                            self.start_client(client_id, camera_id, filename) +                        elif directive == "stop": +                            self.stop_client(client_id, camera_id) +                        else: +                            logger.error("Unknown directive: %r", directive) + +                elif len(msg) == 3:  # This is a video message with data +                    logger.debug("Received video message with data only.") +                    client_id, camera_id, content = msg[0], msg[1], msg[2] +                    client_id = client_id.decode('utf-8') +                    camera_id = camera_id.decode('utf-8')                      if client_id in self.video_threads and camera_id in self.video_threads[client_id]: -                        self.video_threads[client_id][camera_id].queue.put(msg) +                        self.video_threads[client_id][camera_id].queue.put(content) +                        # Send only [client_id, camera_id, jpeg_bytes] to the webserver: +                        # zmq subsciber can subscribe to a topic defined by the first +                        # part of the multipart message, so in order to allow for a  +                        # per camera subscription, we need to join client_id and camera_id +                        topic = ':'.join([client_id, camera_id]).encode('utf-8') +                        try: +                            self.web_sock.send_multipart([topic, content], flags=zmq.NOBLOCK) +                        except Exception as exc: +                            logger.error("Sending message to websocket failed: %r", exc)                      else:                          logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id)                          logger.error("Available video threads keys: %r", self.video_threads.keys())                          logger.error("Available video threads values: %r", self.video_threads.values()) -                elif len(msg) == 2:  # This is a ping message from client -                    logger.debug("Received ping message: %r", msg) -                    client_id, camera_id = msg[0], msg[1] -                    if client_id in self.video_threads and camera_id in self.video_threads[client_id]: -                        # Respond with a pong message -                        try: -                            self.socket.send_multipart([client_id, camera_id, b'pong']) -                            logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id) -                        except zmq.ZMQError as exc: -                            logger.error("Failed to send pong: %r", exc) -                  else: -                    logger.warning("Received a message of not expected length from client: %r message: %r", client_id, msg) -                try: -                    self.socket.send_multipart([client_id, camera_id, b'pong']) -                    logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id) -                except zmq.ZMQError as exc: -                    logger.error("Failed to send pong: %r", exc) +                    logger.warning("Received a message of unexpected length from client. Message length: %d", len(msg))              else:                  logger.debug("No message received, polling again ...")                  time.sleep(5) +            if self.web_sock in sockets: +                frontend_msg = self.web_sock.recv_multipart() +                logger.info("Received message from frontend: %r", frontend_msg) +                self.socket.send_multipart(frontend_msg) +                logger.info("Forwarded message to backend: %r", frontend_msg)          self.monitor.stop()          self.monitor.join() +          for camera_thread in self.video_threads.values():              for thread in camera_thread.values():                  thread.queue.put(None)  # Sentinel to unblock queue.get() @@ -183,7 +270,7 @@ class ServerWorker(Thread):          """Send control message to a specific client and camera."""          if client_id in self.video_threads and camera_id in self.video_threads[client_id]:              self.socket.send_multipart([client_id.encode("utf-8"), camera_id.encode("utf-8"), message]) -            logger.debug("Sent control message to client '%s', camera '%s': %r", client_id, camera_id, message) +            logger.info("Sent control message to client '%s', camera '%s': %r", client_id, camera_id, message)          else:              logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id) @@ -195,34 +282,62 @@ class ServerWorker(Thread):  class VideoWorker(Thread):      """Class for video threads.""" -    def __init__(self, filename, queue): +    def __init__(self, client_id, camera_id, filename, queue):          super().__init__(daemon=True) +        self.context = zmq.Context() +        self.context.setsockopt(zmq.LINGER, 0) +        self.client_id = client_id +        self.camera_id = camera_id          self.filename = filename          self.queue = queue          self.live = True +    def stop(self): +        logger.info("VideoWorker %r exiting ...", self.camera_id) +        self.live = False +      def run(self):          if os.path.exists(self.filename):              logger.warning("File '%s' already exists, overwriting ...", self.filename) -        with open(self.filename, 'wb') as f: -            logger.info("VideoWorker started writing to file: %s", self.filename) -            while self.live: -                # Simulate video processing -                frame = self.queue.get()  # Get frame from queue -                logger.debug("Processing ('writing to a file') frame for camera: %r", frame) -                if frame is None: -                    logger.info("VideoWorker received stop signal, exiting ...") -                    break -                f.write(frame + b'\n')  # Write frame to file - -    def stop(self): -        logger.info("VideoWorker '' exiting ...") -        self.live = False +        fourcc = cv2.VideoWriter_fourcc(*'VP80') +        out = cv2.VideoWriter(self.filename, fourcc, 30.0, (640, 480))  # Assuming 640x480 resolution +        logger.info("This is the first run, binding websocket ...") +        while self.live: +            logger.debug("VideoWorker writing to file: %s", self.filename) +            frame_bytes = self.queue.get() +            if frame_bytes is None: +                logger.debug("Received None, stopping video worker for camera: '%s'", self.camera_id) +                break +            frame = cv2.imdecode(numpy.frombuffer(frame_bytes, dtype=numpy.uint8), cv2.IMREAD_COLOR) +            logger.debug("Processing ('writing to a file') frame for camera: '%s'", self.camera_id) +            # Write frame to file: +            out.write(frame) + +        # Release video writer +        out.release() +        logger.info("VideoWorker finished writing to file: %s", self.filename)  def signal_handler(sig, frame):      worker.stop() +def update_clients(client_id, camera_id): +    """Update client and camera dictionary and write to a shared file.""" +    global CLIENTS_DICT +    if client_id not in CLIENTS_DICT: +        logger.debug("Client_id not in CLIENTS_DICT, adding an empty list for it.") +        CLIENTS_DICT[client_id] = [] +    if camera_id not in CLIENTS_DICT[client_id]: +        logger.debug("Camera_id not in CLIENTS_DICT[%s] list, adding", client_id) +        CLIENTS_DICT[client_id].append(camera_id) +    # Atomic write using tempfile. Works only when both files on the same filesystem +    with tempfile.NamedTemporaryFile('w', dir=TMP_DIR, delete=False) as tf: +        logger.debug("Dumping to file CLIENTS_DICT: %r", CLIENTS_DICT) +        json.dump(CLIENTS_DICT, tf) +        tempname = tf.name +    os.replace(tempname, CLIENTS_JSON_FILE) + +  if __name__ == "__main__":      logger.info("Starting up ...") @@ -233,3 +348,7 @@ if __name__ == "__main__":      worker.start()      worker.join() +    try: +        os.remove(CLIENTS_JSON_FILE) +    except FileNotFoundError: +        pass
\ No newline at end of file | 
