#!/usr/bin/env python """ Module serving video from zmq to a webserver. """ __author__ = "Franoosh Corporation" import os from collections import defaultdict import json import logging import asyncio import datetime from contextlib import asynccontextmanager import zmq import zmq.asyncio import uvicorn from fastapi import ( FastAPI, Request, HTTPException, WebSocket, WebSocketDisconnect, templating, ) from fastapi.responses import HTMLResponse, FileResponse from fastapi.staticfiles import StaticFiles from typing import AsyncGenerator from helpers import ( CustomLoggingFormatter, DIRECTIVES, MAX_CAMERA_NAME_LENGTH, auth_service, ) CWD = os.getcwd() CLIENTS_JSON_FILE = os.path.join(CWD, 'clients.json') CLIENTS_DICT = defaultdict(list) # CLIENTS_DICT[client_id] = [camera_id1, camera_id2, ...] BASENAME = f"{os.path.splitext(os.path.basename(__file__))[0]}" LOGDIR = 'logs' LOGFILE = os.path.join(LOGDIR, f"{BASENAME}.log") LOGLEVEL = logging.INFO # This should come from the config file: VIDEO_DIR = os.path.join(CWD, "videos") HOST = "127.0.0.1" ZMQ_PORT = "9979" WEB_PORT = "8008" CTRL_BACKEND_ADDR = f"tcp://{HOST}:{ZMQ_PORT}" WEB_BACKEND_ADDR = f"tcp://{HOST}:{WEB_PORT}" TIME_FORMAT = '%Y_%m_%d-%H_%M_%S' CERTIFICATE_DIR = 'certs' log_formatter = CustomLoggingFormatter() handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') handler.setFormatter(log_formatter) logging.root.addHandler(handler) logging.root.setLevel(LOGLEVEL) logger = logging.getLogger(__name__) logging.basicConfig( filename=LOGFILE, datefmt='%Y-%m-%d %I:%M:%S', level=LOGLEVEL, ) # Track websocket connections by (client_id, camera_id): ws_connections = defaultdict(dict) # ws_connections[client_id][camera_id] = websocket ws_queues = defaultdict(dict) ctrl_msg_que = asyncio.Queue() shutdown_event = asyncio.Event() # Create ZMQ context and socket: zmq_context = zmq.asyncio.Context() # Authenticate context: auth_service(zmq_context, CERTIFICATE_DIR) zmq_socket = zmq_context.socket(zmq.DEALER) # Connect to ZMQ backend: zmq_socket.connect(WEB_BACKEND_ADDR) async def zmq_bridge(): """ Bridge between ZMQ backend and websocket clients. Parameters ---------- None Returns ------- None """ poll = zmq.asyncio.Poller() poll.register(zmq_socket, zmq.POLLIN) while not shutdown_event.is_set(): try: sockets = dict(await poll.poll(100)) if zmq_socket in sockets: logger.debug("Receiving message from ZMQ backend ...") topic, frame_data = await zmq_socket.recv_multipart() # messages can also now contain directives, like 'new_camera' and 'removed_camera' with empty frame_data # Send those to a control queue to be handled appropriately client_id, camera_id = topic.decode('utf-8').split(':', 1) # <-- strings from now on # client_id, camera_id = topic.split(b':', 1) if not camera_id in CLIENTS_DICT[client_id]: CLIENTS_DICT[client_id].append(camera_id) queue = ws_queues.get(client_id, {}).get(camera_id) if queue: if queue.full(): logger.debug( "WebsSocket queue full for '/ws/%r/%r', discarding oldest frame.", client_id, camera_id ) _ = queue.get_nowait() # Discard oldest frame await queue.put(frame_data) else: logger.debug("Creating new websocket queue for '/ws/%r/%r'.", client_id, camera_id) ws_queues[client_id][camera_id] = asyncio.Queue(maxsize=10) if not ctrl_msg_que.empty(): client_id, camera_id, command, args_list = None, None, None, None try: client_id, camera_id, command, args_list = await ctrl_msg_que.get() except ValueError as ve: logger.error("Invalid control message on queue: %r", ve) if client_id and camera_id and command and args_list is not None: try: zmq_socket.send_multipart([ client_id, camera_id, command, ] + args_list, flags=zmq.NOBLOCK) logger.info("Sent control command.") except zmq.Again: logger.error( "ZMQ socket busy, could not send control command '%r' with args: %r for '/ws/%r/%r' to backend.", command, args_list, client_id, camera_id ) except asyncio.CancelledError: logger.info("ZMQ bridge received cancellation signal") raise except Exception as e: logger.error("Error in ZMQ bridge: %r", e) logger.info("ZMQ bridge shutting down") @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator: """ Create lifespan context for FastAPI app. Parameters ---------- app : FastAPI The FastAPI application instance. Returns ------- AsyncGenerator Yields control to the application context. """ zmq_bridge_task = asyncio.create_task(zmq_bridge()) try: yield finally: logger.info("Initiating graceful shutdown") shutdown_event.set() if zmq_bridge_task and not zmq_bridge_task.done(): try: await asyncio.wait_for(zmq_bridge_task, timeout=5.0) except asyncio.TimeoutError: logger.warning("ZMQ bridge did not shut down within timeout") zmq_bridge_task.cancel() except Exception as e: logger.error("Error waiting for ZMQ bridge shutdown: %r", e) _app = FastAPI(lifespan=lifespan) _app.mount("/static", StaticFiles(directory="static"), name="static") templates = templating.Jinja2Templates(directory='templates') @_app.get("/") async def main_route(request: Request) -> HTMLResponse: """ Main route serving the index page. Parameters ---------- request : Request The incoming HTTP request. """ logger.debug("Main route visited") return templates.TemplateResponse( "main.html", { "request": request, "clients": CLIENTS_DICT, } ) @_app.get("/clients/{client_id}", response_class=HTMLResponse) async def client_route(request: Request, client_id: str) -> HTMLResponse: """ Serve a particular client page. Parameters ---------- request : Request The incoming HTTP request. client_id : str The client ID to serve. """ logger.debug("Checking client_id: %s in clients_dict: %r.", client_id, CLIENTS_DICT) if not client_id in CLIENTS_DICT: raise HTTPException(status_code=404, detail="No such client ID.") return templates.TemplateResponse( "client.html", { "request": request, "client_id": client_id, "camera_ids": CLIENTS_DICT[client_id], "client_videos": list_video_files(client_id), }, ) @_app.websocket("/ws/{client_id}/{camera_id}") async def camera_route(websocket: WebSocket, client_id: str, camera_id: str) -> None: """ Serve a particular camera page. Parameters ---------- websocket : WebSocket The incoming WebSocket connection. client_id : str The client ID. camera_id : str The camera ID. """ logger.info("Accepting websocket connection for '/ws/%s/%s'.", client_id, camera_id) await websocket.accept() ws_connections[client_id][camera_id] = {'ws': websocket} queue = ws_queues[client_id][camera_id] async def send_frames(): try: while True: try: # Don't wait indefinitely to allow checking for client disconnection: frame_data = queue.get_nowait() except asyncio.QueueEmpty: await asyncio.sleep(0.1) continue try: await websocket.send_bytes(frame_data) # This exception is raised when client disconnects: except Exception as exc: logger.warning("Error sending frame to '/ws/%s/%s': %r", client_id, camera_id, exc) ws_connections[client_id].pop(camera_id, None) break except asyncio.CancelledError: logger.debug("send_frames task cancelled for '/ws/%s/%s'", client_id, camera_id) raise async def receive_control(): try: while True: try: data = await websocket.receive_text() logger.info("Received control message: %r from client_id: '%s', camera_id: '%s'.", data, client_id, camera_id) # Handle control messages from the client: frontend_message = json.loads(data) for command, args in frontend_message.items(): if validate_directive(command, args, client_id, camera_id): args_list = [str(arg).encode('utf-8') for arg in args] ctrl_msg_que.put_nowait((client_id.encode('utf-8'), camera_id.encode('utf-8'), command.encode('utf-8'), args_list)) logger.info( "Put control command %r with args: %r for client_id: %r, camera_id: %r on queue to backend.", command, args_list, client_id, camera_id ) except json.JSONDecodeError: logger.warning("Received invalid JSON from '/ws/%s/%s': %s", client_id, camera_id, data) except WebSocketDisconnect: logger.info("WebSocket disconnected for '/ws/%s/%s'.", client_id, camera_id) ws_connections[client_id].pop(camera_id, None) break except asyncio.CancelledError: logger.debug("receive_control task cancelled for '/ws/%s/%s'", client_id, camera_id) raise except Exception as exc: logger.warning("Error receiving control message: %r", exc) send_task = asyncio.create_task(send_frames()) receive_task = asyncio.create_task(receive_control()) try: await asyncio.gather(send_task, receive_task) except asyncio.CancelledError: logger.debug("camera_route cancelled for '/ws/%s/%s'", client_id, camera_id) except Exception as exc: logger.error("Error in camera_route for '/ws/%s/%s': %r", client_id, camera_id, exc) finally: send_task.cancel() receive_task.cancel() try: await asyncio.gather(send_task, receive_task, return_exceptions=True) except Exception: pass ws_connections[client_id].pop(camera_id, None) ws_queues[client_id].pop(camera_id, None) logger.info("Cleaned up websocket connection for '/ws/%s/%s'.", client_id, camera_id) @_app.get("/videos/{client_id}/{camera_id}/{filename}") async def serve_video(client_id: str, camera_id: str, filename: str) -> FileResponse: """ Route serving video files. Parameters ---------- client_id : str The client ID. camera_id : str The camera ID. filename : str The video filename. """ video_path = os.path.join(VIDEO_DIR, client_id, camera_id, filename) if not os.path.exists(video_path): raise HTTPException(status_code=404, detail="Video not found") logger.debug("Serving video file: '%s', video_path") return FileResponse(video_path, media_type="video/ogg") def list_video_files(client_id: str) -> dict: """ Return a dictionary of recorded video files for a given client. {: [(, , )]} Parameters ---------- client_id : str The client ID. Returns ------- dict Dictionary of recorded video files for the client. """ client_videos = {} client_dir = os.path.join(VIDEO_DIR, client_id) if os.path.exists(client_dir): for (_, dirnames, _) in os.walk(client_dir): for camera_dir in dirnames: client_videos[camera_dir] = [] camera_path = os.path.join(client_dir, camera_dir) for (_, _, filenames) in os.walk(camera_path): for filename in filenames: try: filepath = os.path.join(VIDEO_DIR, camera_path, filename) timestamp = datetime.datetime.strftime( datetime.datetime.fromtimestamp( os.path.getmtime(filepath) ), TIME_FORMAT) except Exception as exc: logger.warning("Could not determine creation time for file %r: %r", filename, exc) timestamp = 'unknown' video_url = f"/videos/{client_id}/{camera_dir}/{filename}" client_videos[camera_dir].append((filename, video_url, timestamp)) return client_videos def validate_directive(command: str, args: list, client_id: str, camera_id: str) -> bool: """ Validate if the directive is known. Parameters ---------- command : str The directive command. args : list The arguments for the directive. client_id : str The client ID. camera_id : str The camera ID. Returns ------- bool True if the directive is valid, False otherwise. """ retval = False # First, check if command is known: if command not in DIRECTIVES: logger.warning("Unknown directive received: %s", command) return retval if len(args) != 1: logger.error("Directive '%s' requires exactly one argument.", command) return retval # Validate arguments passed from the frontend: match command: case 'modify_camera_name': retval = validate_camera_name(args, client_id, camera_id) case 'modify_camera_threshold': retval = validate_camera_threshold(args) case 'modify_camera_grace_pd': retval = validate_camera_grace_period(args) case 'modify_camera_address': retval = validate_camera_address(args) # case 'add_camera': # retval = validate_add_camera(args, client_id) case 'remove_camera': retval = True # No specific validation needed return retval def validate_camera_name(name, client_id, camera_id) -> bool: """ Validate camera name to avoid problematic characters and whitespace. Parameters ---------- name : str The new camera name. client_id : str The client ID. camera_id : str The old camera ID. Returns ------- bool True if the camera name is valid, False otherwise. """ retval = True new_name = name # TODO: check if sending old name is needed if we have camera_id old_name = camera_id if old_name not in CLIENTS_DICT[client_id]: logger.error("Old camera name does not exist.") retval = False if new_name in CLIENTS_DICT[client_id]: logger.error("New camera name already exists.") retval = False if not new_name or any(char in new_name for char in ':') or any(char.isspace() for char in new_name): logger.error("Camera name cannot be empty or contain spaces or colons.") retval = False if len(new_name) > MAX_CAMERA_NAME_LENGTH: logger.error("Camera name is too long (max %s characters).", MAX_CAMERA_NAME_LENGTH) retval = False return retval def validate_camera_threshold(threshold) -> bool: """ Validate contour size threshold. Parameters ---------- threshold : str The contour size threshold. Returns ------- bool True if the contour size threshold is valid, False otherwise. """ retval = True try: value = int(threshold) if value <= 0: logger.error("Contour size threshold must be positive: %r.", threshold) retval = False except ValueError as exc: logger.error("Invalid contour size threshold: %r, exception: %r", threshold, exc) retval = False return retval def validate_camera_grace_period(grace_period) -> bool: """ Validate movement grace period. Parameters ---------- grace_period : str The movement grace period. Returns ------- bool True if the movement grace period is valid, False otherwise. """ retval = True try: value = int(grace_period) if value < 0: logger.error("Movement grace period cannot be negative: %r.", grace_period) retval = False except ValueError as exc: logger.error("Invalid movement grace period: %r, exception: ", grace_period, exc) retval = False return retval def validate_camera_address(address) -> bool: """ Validate camera address. Parameters ---------- address : str The camera address. Returns ------- bool True if the camera address is valid, False otherwise. """ retval = True if not address or any(char.isspace() for char in address): logger.error("Camera address %r cannot be empty.", address) retval = False if not address.startswith('/dev/video'): logger.error("Camera address must start with '/dev/video': %r.", address) retval = False return retval def validate_remove_camera(name, client_id) -> bool: """ Validate remove camera directive. Parameters ---------- name : str The camera name to remove. client_id : str The client ID. Returns ------- bool True if the camera name is valid, False otherwise. """ retval = True if name not in CLIENTS_DICT[client_id]: logger.error("Camera name %r does not exist.", client_id) retval = False return retval if __name__ == "__main__": uvicorn.run( _app, port=8007, host='127.0.0.1', log_level='info', )