diff options
| author | Franoosh <uinarf@autistici.org> | 2025-07-25 17:13:38 +0200 | 
|---|---|---|
| committer | Franoosh <uinarf@autistici.org> | 2025-10-15 14:33:59 +0200 | 
| commit | 68bd1bd052a7cd6438b92cb1059ef5e58b8d022c (patch) | |
| tree | 5a7eab3022a7593bd3d9dbdcc99a1590ab0fc3bc | |
| download | ZeroMQ_Video_Streaming-68bd1bd052a7cd6438b92cb1059ef5e58b8d022c.tar.gz ZeroMQ_Video_Streaming-68bd1bd052a7cd6438b92cb1059ef5e58b8d022c.tar.bz2 ZeroMQ_Video_Streaming-68bd1bd052a7cd6438b92cb1059ef5e58b8d022c.zip | |
Initial commit. Proof of concept message passing between client <-> router <-> worker with rudimentary caching
| -rw-r--r-- | client.py | 176 | ||||
| -rw-r--r-- | helpers.py | 44 | ||||
| -rw-r--r-- | router.py | 207 | ||||
| -rw-r--r-- | static/css/main.css | 21 | ||||
| -rw-r--r-- | templates/client.html | 47 | ||||
| -rw-r--r-- | templates/client.html.bak | 41 | ||||
| -rw-r--r-- | templates/main.html | 16 | ||||
| -rw-r--r-- | templates/main.html.bak | 35 | ||||
| -rw-r--r-- | webserver.py | 142 | ||||
| -rw-r--r-- | worker.py | 235 | 
10 files changed, 964 insertions, 0 deletions
| diff --git a/client.py b/client.py new file mode 100644 index 0000000..e9dce4f --- /dev/null +++ b/client.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python + +""" +A module containing client for streaming video to a zmq router. +""" + +__author__ = "Franoosh Corporation" + +from os import path +from threading import Thread, Event +from queue import Queue +import time +import datetime +import signal +import logging +from configparser import ConfigParser +import traceback +import zmq + +from helpers import CustomLoggingFormatter + +ROUTER_ADDRESS = "tcp://localhost" +PORT = "5569" +ADDRESS = f"{ROUTER_ADDRESS}:{PORT}" + +CONFIG_FILE = "client.cfg" +LOGFILE = f"{path.splitext(path.basename(__file__))[0]}.log" +LOGLEVEL = logging.DEBUG +CAMERAS = {'front_camera': '/dev/video0', 'back_camera': '/dev/video1'} +SLEEP_TIME = 5 + + +stop_event = Event() + +log_formatter = CustomLoggingFormatter() +handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') +handler.setFormatter(log_formatter) +logging.root.addHandler(handler) +logging.root.setLevel(LOGLEVEL) +logger = logging.getLogger(__name__) +logging.basicConfig( +    filename=LOGFILE, +    datefmt='%Y-%m-%d %I:%M:%S', +    level=LOGLEVEL, +) + +def read_config(conf_file): +    """Read config file and return as dictionary.""" +    cfg = ConfigParser() +    cfg.read(conf_file) + +    return {key: dict(cfg.items(key)) for key in cfg.sections()} + + +class ClientVideo(Thread): +    """Class for sending video stream""" +    def __init__(self, _id, device, queue): +        super().__init__(daemon=True) +        self.identity = _id +        self.device = device +        self.queue = queue +        self.live = True + +    def run(self): +        """Replace with actual video streaming logic.""" +        ping_no = 0 +        while self.live and not stop_event.is_set(): +            try: +                # Four parts required to start/stop video stream, three parts for ping: +                if ping_no == 0:  # Start video stream +                    timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") +                    metadata = f"start:{timestamp}" +                    logger.debug("ClientVideo '%s' sending metadata: '%s'", self.identity, metadata) +                    message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] +                    self.queue.put(message) +                    text = f"ping-{ping_no}" +                    message = [self.identity.encode('utf-8'), text.encode('utf-8')] +                elif ping_no >= 5:  # Stop video stream +                    logger.debug("ClientVideo '%s' sending stop signal", self.identity) +                    timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") +                    metadata = f"stop:{timestamp}" +                    logger.debug("ClientVideo '%s' sending metadata: '%s' and ping: %r", self.identity, metadata, b'') +                    message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] +                    self.live = False +                else:  # Send ping +                    text = f"ping-{ping_no}" +                    message = [self.identity.encode('utf-8'), text.encode('utf-8')] +                logger.debug("ClientVideo '%s' sending message: %r", self.identity, message) +                self.queue.put(message) +                ping_no += 1 +            except Exception as exc: +                logger.error("ClientVideo: socket error: %r", exc) +            time.sleep(SLEEP_TIME) + +        logger.info("ClientVideo '%s' closing socket ...", self.identity) + +    def stop(self): +        logger.info("Client video '%s' exiting ...", self.identity) +        self.live = False + + +class ClientTask(Thread): +    """Main Client Task with logic, +    message handling and setup.""" + +    def __init__(self, _id, context=None): +        super().__init__(daemon=True) +        self.identity = _id +        self.context = context or zmq.Context.instance() +        self.socket = self.context.socket(zmq.DEALER) +        self.socket.identity = self.identity.encode("utf-8") +        self.poll = zmq.Poller() +        self.poll.register(self.socket, zmq.POLLIN) +        self.video_threads = [] +        self.running = True +        self.connected = False + +    def run(self): +        self.socket.connect(ADDRESS) +        logger.debug("Client '%s' connected to %s", self.identity, ADDRESS) +        logger.debug("starting video threads ...") +        q = Queue() +        for _id, device in CAMERAS.items(): +            vid_cl = ClientVideo(_id, device, q) +            vid_cl.start() +            self.video_threads.append(vid_cl) +        while self.running: +            try: +                sockets = dict(self.poll.poll(1000)) +                if self.socket in sockets: +                    try: +                        message = self.socket.recv_multipart() +                        logger.debug("Client '%s' received message: %r", self.identity, message) +                    except Exception: +                        logger.error("Failed to receive message: %r", traceback.format_exc()) +                if q.qsize() > 0: +                    frame = q.get() +                    if frame is not None: +                        logger.debug("Processing frame: %r", frame) +                        self.socket.send_multipart(frame) +            except zmq.ZMQError as exc: +                logger.error("Client '%s': socket error: %r", self.identity, exc) + +        logger.info("Closing socket ...") +        self.socket.close() +        logger.debug("Terminating context ...") +        self.context.term() + +    def stop(self): +        """Stop task""" +        logger.info("ClientTask cleaning up ...") +        for thread in self.video_threads: +            logger.info("Cleaning u video thread ...") +            thread.stop() +            thread.join() +        self.video_threads = [] +        logger.info("Client task exiting ...") +        self.running = False + + +def signal_handler(sig, frame): +    logger.info("Received signal handler '%r', stopping client ...", sig) +    client.stop() + + +if __name__ == "__main__": + +    signal.signal(signal.SIGINT, signal_handler) +    signal.signal(signal.SIGTERM, signal_handler) + +    logger.info("Starting up ...") +    client = ClientTask("client_task") + +    client.start() + +    client.join() diff --git a/helpers.py b/helpers.py new file mode 100644 index 0000000..409a253 --- /dev/null +++ b/helpers.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python + +""" +Helper functions for zmq video messaging. +""" + +__author__ = "Franoosh Corporation" + + +import logging + + +class CustomLoggingFormatter(logging.Formatter): +    """Custom logging formatter""" +    debug_fmt = 'DEBUG: %(filename)s:%(lineno)d %(asctime)s %(message)s' +    info_fmt = 'INFO: %(asctime)s %(message)s' +    warning_fmt = 'WARNING: %(asctime)s %(message)s' +    error_fmt = 'ERROR: %(asctime)s %(message)s' +    critical_fmt = 'CRITICAL: %(asctime)s %(message)s' + +    def __init__(self): +        super().__init__( +            fmt="%(levelno)d: %s(asctime)s %(message)s", +            datefmt=None, +        ) + +    def format(self, record): +        orig_fmt = self._style._fmt +        if record.levelno == logging.DEBUG: +            self._style._fmt = CustomLoggingFormatter.debug_fmt +        elif record.levelno == logging.INFO: +            self._style._fmt = CustomLoggingFormatter.info_fmt +        elif record.levelno == logging.WARNING: +            self._style._fmt = CustomLoggingFormatter.warning_fmt +        elif record.levelno == logging.ERROR: +            self._style._fmt = CustomLoggingFormatter.error_fmt +        elif record.levelno == logging.CRITICAL: +            self._style._fmt = CustomLoggingFormatter.critical_fmt + +        result = logging.Formatter.format(self, record) +        self._style._fmt = orig_fmt + +        return result + diff --git a/router.py b/router.py new file mode 100644 index 0000000..61b2f72 --- /dev/null +++ b/router.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python + +""" +A module containing zeromq router for video streaming +from a client and sending control messages from server. +""" + +__author__ = "Franoosh Corporation" + +import os +import signal +import logging +import random +from collections import defaultdict, deque +import zmq + +from helpers import CustomLoggingFormatter + +# TODO: add configparser + +ADDR_FRONTEND = 'tcp://localhost' +ADDR_BACKEND = 'tcp://localhost' + +PORT_FRONTEND = "5569" +PORT_BACKEND = "9979" + +INITIAL_FRONTEND_ADDR = f"{ADDR_FRONTEND}:{PORT_FRONTEND}" +INITIAL_BACKEND_ADDR = f"{ADDR_BACKEND}:{PORT_BACKEND}" + +LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log" +LOGLEVEL = logging.DEBUG + +log_formatter = CustomLoggingFormatter() +handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') +handler.setFormatter(log_formatter) +logging.root.addHandler(handler) +logging.root.setLevel(LOGLEVEL) +logger = logging.getLogger(__name__) +logging.basicConfig( +    filename=LOGFILE, +    datefmt='%Y-%m-%d %I:%M:%S', +    level=LOGLEVEL, +) + + +RUNNING = True + +def custom_proxy(): +    context = zmq.Context.instance() +    poller = zmq.Poller() + +    # Set up frontend: +    frontend_socket = context.socket(zmq.ROUTER) +    frontend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) +    frontend_socket.bind(INITIAL_FRONTEND_ADDR) +    poller.register(frontend_socket, zmq.POLLIN) + +    # Set up backend: +    backend_socket = context.socket(zmq.ROUTER) +    backend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) +    backend_socket.bind(INITIAL_BACKEND_ADDR) +    poller.register(backend_socket, zmq.POLLIN) + +    awailable_workers = [] +    client_worker_dict = {} +    # There are three pending messages dictionaries: +    # 1. pending_messages_no_worker[client_id] - messages that are pending for a client +    # 2. pending_messages_client_worker[client_id][worker_id] - messages that are pending for a client with a specific worker +    # 3. pending_messages_worker_client[worker_id][client_id] - messages that are pending for a worker +    # To assure delivey of the entire video we need to store messages from start to end until they are delivered +    # to the client. +    # If a worker is not available for certain amount of time, we will reassign it to another worker. +    # For now, there is only one worker. Remember to implement this later. +    # Complex case: half delivered video to the worker, worker becomes unavailable and we need to reassign it to another worker. + +    pending_messages_no_worker = defaultdict(deque)  # Messages that are pending for a client with no worker assigned +    pending_messages_client_worker = defaultdict(lambda: defaultdict(deque)) +    pending_messages_worker_client = defaultdict(lambda: defaultdict(deque)) + +    while RUNNING: +        events = dict(poller.poll(1000))  # Poll both frontend and backend sockets: +        if frontend_socket in events:  # If message from client: +            msg = frontend_socket.recv_multipart()  # Receive message from client: +            logger.debug("Received message: %r", msg) +            client_id, content = msg[0], msg[1:] +            if not client_id in client_worker_dict:  # If client is not in client-worker dictionary: +                logger.info("Received ID from client: %r.", client_id) +            # Check if client is already connected to worker: +            try: +                worker_id = client_worker_dict[client_id]  # Get worker ID for this client from the client-worker dictionary +                while pending_messages_client_worker[client_id][worker_id]:  # Check if there are any pending messages for this worker and send them first: +                    logger.debug("There are pending messages from client %r for worker %r", client_id, worker_id) +                    pending_msg = pending_messages_client_worker[client_id][worker_id].pop() +                    try: +                        logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id) +                        backend_socket.send_multipart([worker_id, client_id, *pending_msg]) +                    except Exception as e: +                        logger.error("Failed to send pending message: %r to worker: %r, error: %r", pending_msg, worker_id, e) +                        pending_messages_client_worker[client_id][worker_id].append(pending_msg)  # Re-queue the message +                        break # Break to avoid infinite loop +                if pending_messages_client_worker[client_id][worker_id]:  # If there are still pending messages for this worker: +                    pending_messages_client_worker[client_id][worker_id].appendleft(content)  # Store message for later delivery +                else: +                    try: +                        logger.debug("No more pending messages for client %r, sending new message to worker %r: %r", client_id, worker_id, msg) +                        backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker: +                    except Exception as e: +                        logger.error("Failed to send message to backend: %r, error: %s", msg, e) +                        pending_messages_client_worker[client_id][worker_id].appendleft(msg)  # Store message for later delivery +            except KeyError:  # If client is not connected to any worker: +                logger.debug("Client '%s' is not connected to any worker, checking available workers", client_id) +                if awailable_workers:  # If there are available workers: +                    worker_id = random.choice(awailable_workers)  # Assign random worker to client +                    client_worker_dict[client_id] = worker_id # Update client-worker dictionary +                    if pending_messages_no_worker[client_id]:  # Check if there are any pending messages for this client: +                        pending_messages_client_worker[client_id][worker_id].extendleft(msg)  # Move messages to pending messages for this worker +                        logger.debug("Moved pending messages for client '%s' to worker '%s'", client_id, worker_id) +                    while pending_messages_client_worker[client_id][worker_id]:  # Check if there are any pending messages for this worker: +                        pending_msg = pending_messages_client_worker[client_id][worker_id].pop() +                        try: +                            logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id) +                            backend_socket.send_multipart([worker_id, client_id, *pending_msg]) +                        except Exception as e: +                            logger.error("Failed to send pending message: %r to worker: %r. Error: %s", pending_msg, worker_id, e) +                            pending_messages_client_worker[client_id][worker_id].append(pending_msg)  # Re-queue the message +                            break +                    if pending_messages_client_worker[client_id][worker_id]:  # If there are still pending messages for this worker: +                        logger.debug("There are still pending messages for client '%s' and worker '%s', storing message for later delivery: %r", client_id, worker_id, content) +                        pending_messages_client_worker[client_id][worker_id].appendleft(content)  # Store message for later delivery +                    else: +                        logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg) +                        try: +                            logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg) +                            backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker: +                        except Exception as e: +                            logger.error("Failed to send message %r for client %r to backend, requeing. Error: %s", msg, client_id, e) +                            pending_messages_client_worker[client_id][worker_id].appendleft(msg)  # Store message for later delivery + +                else:  # If no workers are available, assign a new worker:   +                    pending_messages_no_worker[client_id].append(content)  # Store message for later delivery +                    logger.debug("No available workers, storing client '%s' with no worker assigned", client_id) + +        if backend_socket in events: +            _msg = backend_socket.recv_multipart() +            logger.debug("Received from worker: %r", _msg) +            if len(_msg) == 2: # Worker is sending its identity: +                worker_id, msg = _msg[0], _msg[1:] +                logger.debug("Received ID from worker: %r, message: %r", worker_id, msg) +                logger.info("Received ID from worker: %r.", worker_id) +                awailable_workers.append(worker_id)  # Add worker to available workers list +                for client, worker in client_worker_dict.items(): +                    if worker is None:  # If client has no worker assigned: +                        client_worker_dict[client] = worker +                        logger.debug("Assigning worker %r to client %r", worker, client) +                if pending_messages_no_worker: +                    # If there are pending messages for clients with no worker assigned: +                    for client_id, messages in pending_messages_no_worker.items(): +                        if messages: +                            pending_messages_client_worker[client_id][worker_id].extendleft(messages)  # Move messages to pending messages for this worker +                            pending_messages_no_worker[client_id].clear()  # Clear pending messages for this client +                            logger.debug("Moved pending messages for client %r to worker %r", client_id, worker_id) +                            try: +                                pending_msg = pending_messages_client_worker[client_id][worker_id].pop()  # Get the first message for this client and worker +                                logger.debug( +                                    "Sending pending message %r to worker %r for client '%r'", pending_msg, worker_id, client_id) +                                backend_socket.send_multipart([worker_id, client_id, *pending_msg])  # Send the message to worker +                            except Exception as e: +                                pending_messages_client_worker[client_id][worker_id].append(pending_msg) +                                logger.error("Failed to send pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e) + +            else:  # Worker is sending a message to client: +                worker_id, client_id, msg = _msg[0], _msg[1], _msg[2:] +                logger.debug("Received message from worker '%s' for client '%s': %r", worker_id, client_id, msg) +                # First check if there are any pending messages for this worker: +                while pending_messages_worker_client[worker_id][client_id]:  # Check if there are any pending messages for this worker: +                    pending_msg = pending_messages_worker_client[worker_id][client_id].pop() +                    try: +                        logger.debug("Sending pending message %r from %r to client '%r'", pending_msg, client_id, worker_id) +                        backend_socket.send_multipart([client_id, *pending_msg]) +                    except Exception as e: +                        # It is safe to log the message content as it is a short string or bytes. +                        logger.error("Could not deliver pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e) +                        pending_messages_worker_client[worker_id][client_id].append(pending_msg)  # Re-queue the message   +                        break  # Break to avoid infinite loop +                # Now send the message to the client if no pending messages for this client: +                if not pending_messages_worker_client[worker_id][client_id]:  # If there are no pending messages for this worker: +                    logger.debug("Sending message %r to client '%s'", msg, client_id) +                    try: +                        frontend_socket.send_multipart([client_id, *msg]) +                        logger.debug("Sent message %r to client '%r'", msg, client_id) +                    except Exception as e: +                        pending_messages_worker_client[worker_id][client_id].appendleft(msg) +                        logger.error("Could not deliver message: %r from worker: %r to client: '%r'", msg, worker_id, client_id) + +def signal_handler(sig, frame): +    global RUNNING +    logger.info("Received signal handler %r, stopping ...", signal.Signals(sig).name) +    RUNNING = False + + +if __name__ == "__main__": +    signal.signal(signal.SIGINT, signal_handler) +    signal.signal(signal.SIGTERM, signal_handler) + +    logger.info("Starting up ...") + +    custom_proxy() diff --git a/static/css/main.css b/static/css/main.css new file mode 100644 index 0000000..4bb18b2 --- /dev/null +++ b/static/css/main.css @@ -0,0 +1,21 @@ +.streams-container { +    display: flex; +    flex-wrap: wrap; +    gap: 16px; +    justify-content: center; +} +.camera-stream { +    flex: 0 1 320px; +    margin: 10px; +    text-align: center; +} +@media (max-width: 600px) { +.streams-container { +    flex-direction: column; +    align-items: center; +} +.camera-stream { +    width: 100%; +    max-width: 100vw; +} +} diff --git a/templates/client.html b/templates/client.html new file mode 100644 index 0000000..e03394a --- /dev/null +++ b/templates/client.html @@ -0,0 +1,47 @@ +<!DOCTYPE html> +<html> +<head> +    <title>Client {{ client_id }} - Camera Streams</title> +    <style> +        .camera-stream { display: inline-block; margin: 10px; } +        video { border: 1px solid #333; } +    </style> +</head> +<body> +    <h1>Camera Streams for client: {{ client_id }}</h1> +    <div class="streams-container"> +        {% for camera_id in camera_ids %} +        <div class="camera-stream"> +            <h2>Camera {{ camera_id }}</h2> +            <img id="video-{{ camera_id }}" width="640" height="480" /> +        </div> +        {% endfor %} +    </div>  +    <script> +    // For each camera, open a WebSocket and update the corresponding <img> +    {% for camera_id in camera_ids %} +    (function() { +        let ws = new WebSocket('ws://' + window.location.host + '/ws/{{ client_id }}/{{ camera_id }}'); +        let image = document.getElementById('video-{{ camera_id }}'); +        let currentUrl = null; +        ws.onmessage = function(event) { +            if (currentUrl) { +                URL.revokeObjectURL(currentUrl); +            } +            currentUrl = URL.createObjectURL(event.data); +            image.src = currentUrl; +        }; +        ws.onclose = function(event) { +            console.log('WebSocket closed for camera {{ camera_id }}:', event); +        }; +        ws.onerror = function(event) { +            console.log('WebSocket error for camera {{ camera_id }}:', event); +        }; +        window.addEventListener('beforeunload', function() { +            ws.close(); +        }); +    })(); +    {% endfor %} +    </script> +</body> +</html> diff --git a/templates/client.html.bak b/templates/client.html.bak new file mode 100644 index 0000000..9a4ff08 --- /dev/null +++ b/templates/client.html.bak @@ -0,0 +1,41 @@ +<!DOCTYPE html> +<html> +<head> +    <title>Client {{ client_id }} - Camera Streams</title> +    <style> +        .camera-stream { display: inline-block; margin: 10px; } +        video { border: 1px solid #333; } +    </style> +</head> +<body> +    <h1>Camera Streams for Client {{ client_id }}</h1> +    {% for camera_id in camera_ids %} +    <div class="camera-stream"> +        <h2>Camera {{ camera_id }}</h2> +        <img id="video-{{ camera_id }}" width="640" height="480" /> +    </div> +    {% endfor %} +    <script> +    {% for camera_id in camera_ids %} +        (function() { +            // const ws = new WebSocket("ws://127.0.0.1:8008/ws/client_task/front_camera"); +            const ws = new WebSocket("ws://${location.host}/ws/{{ client_id }}/{{ camera_id }}"); +            const img = document.getElementById("video-{{ camera_id }}"); +            ws.binaryType = "arraybuffer"; +            ws.onopen = function(event) { +                console.log("Connection on websocket open"); +            } +            ws.onmessage = function(event) { +                console.log("Received frame data size: ", event.data.byteLength); +                const blob = new Blob([event.data], {type: "image/jpeg"}); +                // document.getElementById("video").src = URL.createObjectURL(blob); +                img.src = URL.createObjectURL(blob); +            }; +            ws.onerror = function(event) { +                console.error("Websocket error: ", event); +            }; +        })(); +    {% endfor %} +    </script> +</body> +</html> diff --git a/templates/main.html b/templates/main.html new file mode 100644 index 0000000..e0eaa52 --- /dev/null +++ b/templates/main.html @@ -0,0 +1,16 @@ +<!DOCTYPE html> +<html> +<head> +    <title>Live Streaming</title> +</head> +<body> +    <h1>Streaming live.</h1> +    {% for client_id in clients.keys() %} +        <h2>Client {{ client_id }}</h2> +            <li> +                <a href="/clients/{{ client_id }}"> Client {{ client_id }} streams +                </a> +            <li> +    {% endfor %} +</body> +</html> diff --git a/templates/main.html.bak b/templates/main.html.bak new file mode 100644 index 0000000..7bb2ee5 --- /dev/null +++ b/templates/main.html.bak @@ -0,0 +1,35 @@ +<!DOCTYPE html> +<html> +    <head> +        <title>Live Streaming</title> +    </head> +    <body> +        <img id="frame" src=""> +        <h1>Streaming. Live.</h1> +        <script> +            let ws = new WebSocket("ws://127.0.0.1:8880/{{client_id}}/{{camera_id}}"); +            let image = document.getElementById("frame"); +            let currentUrl = null; + +            ws.onmessage = function(event) { +                if (currentUrl) { +                    URL.revokeObjectURL(currentUrl); +                } +                currentUrl = URL.createObjectURL(event.data); +                image.src = currentUrl; +            }; + +            ws.onclose = function(event) { +                console.log("WebSocket closed:", event); +            }; + +            ws.onerror = function(event) { +                console.log("WebSocket error:", event); +            }; + +            window.addEventListener('beforeunload', function() { +                ws.close(); +            }); +        </script> +    </body> +</html> diff --git a/webserver.py b/webserver.py new file mode 100644 index 0000000..d1c0a1e --- /dev/null +++ b/webserver.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python + +""" +Module serving video from zmq to a webserver. +""" + +__author__ = "Franoosh Corporation" + + +import os +from collections import defaultdict +import json +import logging +import asyncio +from threading import Thread +import uvicorn +from fastapi import ( +    FastAPI, +    Request, +    HTTPException, +    WebSocket, +    templating, +) +from fastapi.responses import HTMLResponse +from fastapi.staticfiles import StaticFiles + +from helpers import CustomLoggingFormatter +import zmq + + +CLIENTS_JSON_FILE = os.path.join(os.getcwd(), 'clients.json') +LOGFILE = 'webserver.log' +LOGLEVEL = logging.INFO + +HOST = "127.0.0.1" +ZMQPORT = "9979" +WSPORT = "8008" +ZMQ_BACKEND_ADDR = f"tcp://{HOST}:{ZMQPORT}" +WS_BACKEND_ADDR = f"tcp://{HOST}:{WSPORT}" + +log_formatter = CustomLoggingFormatter() +handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') +handler.setFormatter(log_formatter) +logging.root.addHandler(handler) +logging.root.setLevel(LOGLEVEL) +logger = logging.getLogger(__name__) +logging.basicConfig( +    filename=LOGFILE, +    datefmt='%Y-%m-%d %I:%M:%S', +    level=LOGLEVEL, +) + + + +app = FastAPI() +app.mount("/static", StaticFiles(directory="static"), name="static") +templates = templating.Jinja2Templates(directory='templates') + +# Track websocket connections by (client_id, camera_id) +ws_connections = defaultdict(dict)  # ws_connections[client_id][camera_id] = websocket + +# Set up a single ZMQ SUB socket for all websocket connections +zmq_context = zmq.Context() +zmq_socket = zmq_context.socket(zmq.SUB) +zmq_socket.bind(WS_BACKEND_ADDR) +zmq_socket.setsockopt(zmq.SUBSCRIBE, b"")  # Subscribe to all topics +poller = zmq.Poller() +poller.register(zmq_socket, zmq.POLLIN) + +def load_clients(): +    try: +        with open(CLIENTS_JSON_FILE) as f: +            clients_dict = json.load(f) +    except FileNotFoundError: +        clients_dict = {} +    return clients_dict + +@app.get("/") +async def main_route(request: Request): +    logger.error("DEBUG: main route visited") +    clients = load_clients() +    return templates.TemplateResponse( +        "main.html", +        { +            "request": request, +            "clients": clients, +         } +    ) + +@app.get("/clients/{client_id}", response_class=HTMLResponse) +async def client_route(request: Request, client_id: str): +    """Serve client page.""" +    clients_dict = load_clients() +    logger.debug("Checking client_id: '%s' in clients_dict: %r.", client_id, clients_dict) +    if not client_id in clients_dict: +        return HTTPException(status_code=404, detail="No such client ID.") +    return templates.TemplateResponse( +        "client.html", +        { +            "request": request, +            "client_id": client_id, +            "camera_ids": clients_dict[client_id], +        }, +    ) + + +@app.websocket("/ws/{client_id}/{camera_id}") +async def camera_route(websocket: WebSocket, client_id: str, camera_id: str): +    """Serve a particular camera page.""" +    logger.info("Accepting websocket connection for '/ws/%s/%s'.", client_id, camera_id) +    await websocket.accept() +    if client_id not in ws_connections: +        ws_connections[client_id] = {} +    ws_connections[client_id][camera_id] = websocket +    try: +        while True: +            # Wait for a frame for this client/camera +            sockets = dict(poller.poll(1000)) +            if zmq_socket in sockets: +                msg = zmq_socket.recv_multipart() +                if len(msg) == 3: +                    recv_client_id, recv_camera_id, content = msg +                    recv_client_id = recv_client_id.decode("utf-8") +                    recv_camera_id = recv_camera_id.decode("utf-8") +                    # Only send to the websocket for this client/camera +                    if recv_client_id == client_id and recv_camera_id == camera_id: +                        await websocket.send_bytes(content) +    except Exception as exc: +        logger.warning("Connection closed: %r", exc) +    finally: +        if client_id in ws_connections and camera_id in ws_connections[client_id]: +            del ws_connections[client_id][camera_id] +        await websocket.close() + + +if __name__ == "__main__": +    uvicorn.run( +        app, +        port=8007, +        host='127.0.0.1', +        log_level='info', +    )
\ No newline at end of file diff --git a/worker.py b/worker.py new file mode 100644 index 0000000..ad5fe1e --- /dev/null +++ b/worker.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python + +__author__ = "Franoosh Corporation" + +""" +A module consisting of a zeromq worker receiving video stream +via router from clients. Able to send control messages to clients. +""" + +import os +from threading import Thread, Event +import time +import signal +import logging +from queue import Queue +from collections import defaultdict +import zmq + +from helpers import CustomLoggingFormatter + + +ADDR = "tcp://localhost" +PORT = "9979" +INITIAL_BACKEND_ADDR = f"{ADDR}:{PORT}" +LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log" +LOGLEVEL = logging.DEBUG + +stop_event = Event() + +log_formatter = CustomLoggingFormatter() +handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') +handler.setFormatter(log_formatter) +logging.root.addHandler(handler) +logging.root.setLevel(LOGLEVEL) +logger = logging.getLogger(__name__) +logging.basicConfig( +    filename=LOGFILE, +    datefmt='%Y-%m-%d %I:%M:%S', +    level=LOGLEVEL, +) + +class MonitorTask(Thread): +    """Monitor task""" +    def __init__(self, socket): +        super().__init__(daemon=True) +        self.socket = socket +        self.running = True + +    def run(self): +        """Monitor connection on the initial socket. +        This is heartbeat monitoring""" +        monitor = self.socket.get_monitor_socket() +        monitor.setsockopt(zmq.RCVTIMEO, 5000) +        logger.debug("Monitor socket started.") +        while self.running: +            try: +                event, _ = monitor.recv_multipart() +                # Resolve the received event type: +                event_type = int.from_bytes(event[:2], "little") +                if event_type in (zmq.EVENT_CLOSED, zmq.EVENT_DISCONNECTED): +                    logger.warning("Monitor socket: closed | disconnected") +                    stop_event.set() +                elif event_type == zmq.EVENT_CONNECT_DELAYED: +                    logger.debug("Monitor socket: event connect delayed") +                elif event_type == zmq.EVENT_CONNECT_RETRIED: +                    logger.debug("Monitor socket: event connect retried") +                elif event_type in (zmq.EVENT_CONNECTED, zmq.EVENT_HANDSHAKE_SUCCEEDED): +                    logger.debug("Monitor socket: client connected to router, handshake OK.") +                    stop_event.clear() +                else: +                    logger.warning("Monitor socket: other event: '%r'", event_type) +            except zmq.Again: +                logger.debug("Timeout on monitoring socket.") +            except Exception as exc: # W: Catching too general exception Exception +                logger.error("Other exception on monitoring socket: %r", exc) + +        # monitor.close() + +    def stop(self): +        """Stop thread""" +        logger.info("Stopping monitor thread ...") +        self.running = False + + +class ServerWorker(Thread): +    """ServerWorker""" +    def __init__(self, identity, context=None): +        super().__init__(daemon=True) +        self.id = identity +        self.context = context or zmq.Context.instance() +        self.socket = self.context.socket(zmq.DEALER) +        self.socket.identity = self.id.encode("utf-8") +        self.monitor = MonitorTask(self.socket) +        self.video_threads = defaultdict(lambda: defaultdict(dict)) +        self.connected = False +        self.running = True + +    def run(self): +        logger.debug("ServerWorker '%s' starting ...", self.id) +        self.socket.connect(INITIAL_BACKEND_ADDR) +        self.poller = zmq.Poller() +        self.poller.register(self.socket, zmq.POLLIN) +        self.monitor.start() + +        while self.running: +            logger.debug("ServerWorker '%s' waiting for a message ...", self.id) +            if not self.connected or stop_event.is_set(): +                self.socket.send_multipart([b"READY"])  # Router needs worker identity, hence this +                time.sleep(1)  # Wait a bit before trying to connect again + +            sockets = dict(self.poller.poll(1000)) +            if self.socket in sockets: +                self.connected = True +                msg = self.socket.recv_multipart() +                logger.debug("ServerWorker '%s' received message: %r", self.id, msg) +                filename = None +                # At the moment we don't expect any other message than a video message: +                if len(msg) == 4:  # This is a message with start/stop directive and no video data. +                    logger.debug("Received start/stop directive: %r", msg) +                    client_id, camera_id, metadata = msg[0], msg[1], msg[2] +                    try: +                        directive, timestamp = metadata.decode("utf-8").split(":") +                        filename = f"{client_id.decode('utf-8')}_{camera_id.decode('utf-8')}-{timestamp}.mp4" +                        logger.debug("Directive: '%s', Timestamp: '%s', Filename: '%s'", directive, timestamp, filename) +                    except ValueError: +                        logger.error("Invalid metadata format: %r", metadata) +                        continue +                    if directive == "start": +                        if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id):  # New client or new camera +                            q = Queue() +                            logger.debug("Starting new video thread for client '%s', camera '%s'", client_id, camera_id) +                            video_worker = VideoWorker(filename, q) +                            video_worker.start() +                            self.video_threads[client_id][camera_id] = video_worker +                    elif directive == "stop": +                        if client_id in self.video_threads and camera_id in self.video_threads[client_id]: +                            logger.debug("Stopping video thread for client '%s', camera '%s'", client_id, camera_id) +                            self.video_threads[client_id][camera_id].queue.put(None)  # Sentinel to stop the thread +                            del self.video_threads[client_id][camera_id] +                            logger.info("Stopped video thread for client '%s', camera '%s'", client_id, camera_id) +                    else: +                        logger.error("Unknown directive: '%s'", directive) +                elif len(msg) == 3:  # This is a video message with data only +                    logger.debug("Received video message with data only: %r", msg) +                    client_id, camera_id, msg = msg[0], msg[1], msg[2] +                    if client_id in self.video_threads and camera_id in self.video_threads[client_id]: +                        self.video_threads[client_id][camera_id].queue.put(msg) +                    else: +                        logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id) +                        logger.error("Available video threads keys: %r", self.video_threads.keys()) +                        logger.error("Available video threads values: %r", self.video_threads.values()) +                elif len(msg) == 2:  # This is a ping message from client +                    logger.debug("Received ping message: %r", msg) +                    client_id, camera_id = msg[0], msg[1] +                    if client_id in self.video_threads and camera_id in self.video_threads[client_id]: +                        # Respond with a pong message +                        try: +                            self.socket.send_multipart([client_id, camera_id, b'pong']) +                            logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id) +                        except zmq.ZMQError as exc: +                            logger.error("Failed to send pong: %r", exc) + +                else: +                    logger.warning("Received a message of not expected length from client: %r message: %r", client_id, msg) +                try: +                    self.socket.send_multipart([client_id, camera_id, b'pong']) +                    logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id) +                except zmq.ZMQError as exc: +                    logger.error("Failed to send pong: %r", exc) +            else: +                logger.debug("No message received, polling again ...") +                time.sleep(5) + +        self.monitor.stop() +        self.monitor.join() +        for camera_thread in self.video_threads.values(): +            for thread in camera_thread.values(): +                thread.queue.put(None)  # Sentinel to unblock queue.get() +                thread.stop() +                thread.join() + +    def send_control_message(self, client_id, camera_id, message): +        """Send control message to a specific client and camera.""" +        if client_id in self.video_threads and camera_id in self.video_threads[client_id]: +            self.socket.send_multipart([client_id.encode("utf-8"), camera_id.encode("utf-8"), message]) +            logger.debug("Sent control message to client '%s', camera '%s': %r", client_id, camera_id, message) +        else: +            logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id) + +    def stop(self): +        logger.info("ServerWorker '%s' exiting ...", self.id) +        self.running = False + + +class VideoWorker(Thread): +    """Class for video threads.""" + +    def __init__(self, filename, queue): +        super().__init__(daemon=True) +        self.filename = filename +        self.queue = queue +        self.live = True + +    def run(self): +        if os.path.exists(self.filename): +            logger.warning("File '%s' already exists, overwriting ...", self.filename) +        with open(self.filename, 'wb') as f: +            logger.info("VideoWorker started writing to file: %s", self.filename) +            while self.live: +                # Simulate video processing +                frame = self.queue.get()  # Get frame from queue +                logger.debug("Processing ('writing to a file') frame for camera: %r", frame) +                if frame is None: +                    logger.info("VideoWorker received stop signal, exiting ...") +                    break +                f.write(frame + b'\n')  # Write frame to file + + +    def stop(self): +        logger.info("VideoWorker '' exiting ...") +        self.live = False + +def signal_handler(sig, frame): +    worker.stop() + +if __name__ == "__main__": +    logger.info("Starting up ...") + +    signal.signal(signal.SIGINT, signal_handler) +    signal.signal(signal.SIGTERM, signal_handler) + +    worker = ServerWorker("worker-task") +    worker.start() + +    worker.join() | 
