diff options
Diffstat (limited to 'webserver.py')
| -rw-r--r-- | webserver.py | 524 |
1 files changed, 524 insertions, 0 deletions
diff --git a/webserver.py b/webserver.py new file mode 100644 index 0000000..41d1d30 --- /dev/null +++ b/webserver.py @@ -0,0 +1,524 @@ +#!/usr/bin/env python +""" +Module serving video from zmq to a webserver. +""" +__author__ = "Franoosh Corporation" + +import os +from collections import defaultdict +import json +import logging +import asyncio +import datetime +from contextlib import asynccontextmanager +import zmq +import zmq.asyncio +import uvicorn +from fastapi import ( + FastAPI, + Request, + HTTPException, + WebSocket, + WebSocketDisconnect, + templating, +) +from fastapi.responses import HTMLResponse, FileResponse +from fastapi.staticfiles import StaticFiles +from typing import AsyncGenerator + +from helpers import ( + CustomLoggingFormatter, + DIRECTIVES, + MAX_CAMERA_NAME_LENGTH, +) + + +CWD = os.getcwd() +CLIENTS_JSON_FILE = os.path.join(CWD, 'clients.json') +CLIENTS_DICT = defaultdict(list) # CLIENTS_DICT[client_id] = [camera_id1, camera_id2, ...] +BASENAME = f"{os.path.splitext(os.path.basename(__file__))[0]}" +LOGDIR = 'logs' +LOGFILE = os.path.join(LOGDIR, f"{BASENAME}.log") +LOGLEVEL = logging.INFO +# LOGLEVEL = logging.DEBUG +# This should come from the config file: +VIDEO_DIR = os.path.join(CWD, "videos") + +HOST = "127.0.0.1" +ZMQ_PORT = "9979" +WEB_PORT = "8008" +CTRL_BACKEND_ADDR = f"tcp://{HOST}:{ZMQ_PORT}" +WEB_BACKEND_ADDR = f"tcp://{HOST}:{WEB_PORT}" +TIME_FORMAT = '%Y_%m_%d-%H_%M_%S' + +log_formatter = CustomLoggingFormatter() +handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') +handler.setFormatter(log_formatter) +logging.root.addHandler(handler) +logging.root.setLevel(LOGLEVEL) +logger = logging.getLogger(__name__) +logging.basicConfig( + filename=LOGFILE, + datefmt='%Y-%m-%d %I:%M:%S', + level=LOGLEVEL, +) + +# Track websocket connections by (client_id, camera_id): +ws_connections = defaultdict(dict) # ws_connections[client_id][camera_id] = websocket +ws_queues = defaultdict(dict) +ctrl_msg_que = asyncio.Queue() +# Create ZMQ context and socket: +zmq_context = zmq.asyncio.Context() +zmq_socket = zmq_context.socket(zmq.DEALER) +# Connect to ZMQ backend: +zmq_socket.connect(WEB_BACKEND_ADDR) + + +async def zmq_bridge(): + """ + Bridge between ZMQ backend and websocket clients. + + Parameters + ---------- + None + + Returns + ------- + None + """ + poll = zmq.asyncio.Poller() + poll.register(zmq_socket, zmq.POLLIN) + while True: + try: + sockets = dict(await poll.poll(100)) + if zmq_socket in sockets: + logger.debug("Receiving message from ZMQ backend ...") + topic, frame_data = await zmq_socket.recv_multipart() + # messages can also now contain directives, like 'new_camera' and 'removed_camera' with empty frame_data + # Send those to a control queue to be handled appropriately + client_id, camera_id = topic.decode('utf-8').split(':', 1) # <-- strings from now on + # client_id, camera_id = topic.split(b':', 1) + if not camera_id in CLIENTS_DICT[client_id]: + CLIENTS_DICT[client_id].append(camera_id) + queue = ws_queues.get(client_id, {}).get(camera_id) + if queue: + if queue.full(): + logger.debug( + "WebsSocket queue full for '/ws/%r/%r', discarding oldest frame.", + client_id, + camera_id + ) + _ = queue.get_nowait() # Discard oldest frame + await queue.put(frame_data) + else: + logger.debug("Creating new websocket queue for '/ws/%r/%r'.", client_id, camera_id) + ws_queues[client_id][camera_id] = asyncio.Queue(maxsize=10) + if not ctrl_msg_que.empty(): + client_id, camera_id, command, args_list = None, None, None, None + try: + client_id, camera_id, command, args_list = await ctrl_msg_que.get() + except ValueError as ve: + logger.error("Invalid control message on queue: %r", ve) + if client_id and camera_id and command and args_list is not None: + try: + zmq_socket.send_multipart([ + client_id, + camera_id, + command, + ] + args_list, flags=zmq.NOBLOCK) + logger.info("Sent control command.") + except zmq.Again: + logger.error( + "ZMQ socket busy, could not send control command '%r' with args: %r for '/ws/%r/%r' to backend.", + command, + args_list, + client_id, + camera_id + ) + except Exception as e: + logger.error("Error in ZMQ bridge: %r", e) + +@asynccontextmanager +async def lifespan(app: FastAPI) -> AsyncGenerator: + """ + Create lifespan context for FastAPI app. + + Parameters + ---------- + app : FastAPI + The FastAPI application instance. + + Returns + ------- + AsyncGenerator + Yields control to the application context. + """ + asyncio.create_task(zmq_bridge()) + yield + +_app = FastAPI(lifespan=lifespan) +_app.mount("/static", StaticFiles(directory="static"), name="static") +templates = templating.Jinja2Templates(directory='templates') + +@_app.get("/") +async def main_route(request: Request) -> HTMLResponse: + """ + Main route serving the index page. + + Parameters + ---------- + request : Request + The incoming HTTP request. + """ + logger.debug("Main route visited") + return templates.TemplateResponse( + "main.html", + { + "request": request, + "clients": CLIENTS_DICT, + } + ) + +@_app.get("/clients/{client_id}", response_class=HTMLResponse) +async def client_route(request: Request, client_id: str) -> HTMLResponse: + """ + Serve a particular client page. + + Parameters + ---------- + request : Request + The incoming HTTP request. + client_id : str + The client ID to serve. + """ + logger.debug("Checking client_id: %s in clients_dict: %r.", client_id, CLIENTS_DICT) + if not client_id in CLIENTS_DICT: + raise HTTPException(status_code=404, detail="No such client ID.") + return templates.TemplateResponse( + "client.html", + { + "request": request, + "client_id": client_id, + "camera_ids": CLIENTS_DICT[client_id], + "client_videos": list_video_files(client_id), + }, + ) + +@_app.websocket("/ws/{client_id}/{camera_id}") +async def camera_route(websocket: WebSocket, client_id: str, camera_id: str) -> None: + """ + Serve a particular camera page. + + Parameters + ---------- + websocket : WebSocket + The incoming WebSocket connection. + client_id : str + The client ID. + camera_id : str + The camera ID. + """ + logger.info("Accepting websocket connection for '/ws/%s/%s'.", client_id, camera_id) + await websocket.accept() + ws_connections[client_id][camera_id] = {'ws': websocket} + queue = ws_queues[client_id][camera_id] + + async def send_frames(): + while True: + logger.debug("Getting from queue frame for '/ws/%s/%s'.", client_id, camera_id) + try: + # Don't wait indefinitely to allow checking for client disconnection: + frame_data = queue.get_nowait() + except asyncio.QueueEmpty: + await asyncio.sleep(0.1) + continue + try: + logger.debug("Sending frame to '/ws/%s/%s'.", client_id, camera_id) + await websocket.send_bytes(frame_data) + logger.debug("Sent frame to '/ws/%s/%s'.", client_id, camera_id) + # This exception is raised when client disconnects: + except Exception as exc: + logger.warning("Error sending frame to '/ws/%s/%s': %r", client_id, camera_id, exc) + ws_connections[client_id].pop(camera_id, None) + break + + async def receive_control(): + while True: + try: + data = await websocket.receive_text() + logger.info("Received control message from '/ws/%s/%s': %s", client_id, camera_id, data) + # Handle control messages from the client: + frontend_message = json.loads(data) + for command, args in frontend_message.items(): + if validate_directive(command, args, client_id, camera_id): + args_list = [str(arg).encode('utf-8') for arg in args] + ctrl_msg_que.put_nowait((client_id, camera_id, command, args_list)) + logger.info( + "Put control command '%s' with args: %r for '/ws/%s/%s' on queue to backend.", + command, + args_list, + client_id, + camera_id + ) + except json.JSONDecodeError: + logger.warning("Received invalid JSON from '/ws/%s/%s': %s", client_id, camera_id, data) + except WebSocketDisconnect: + logger.info("WebSocket disconnected for '/ws/%s/%s'.", client_id, camera_id) + ws_connections[client_id].pop(camera_id, None) + break + except Exception as exc: + logger.warning("Error receiving control message: %r", exc) + + send_task = asyncio.create_task(send_frames()) + receive_task = asyncio.create_task(receive_control()) + try: + await asyncio.gather(send_task, receive_task) + finally: + send_task.cancel() + receive_task.cancel() + ws_connections[client_id].pop(camera_id, None) + ws_queues[client_id].pop(camera_id, None) + logger.info("Cleaned up websocket connection for '/ws/%s/%s'.", client_id, camera_id) + +@_app.get("/videos/{client_id}/{camera_id}/{filename}") +async def serve_video(client_id: str, camera_id: str, filename: str) -> FileResponse: + """ + Route serving video files. + + Parameters + ---------- + client_id : str + The client ID. + camera_id : str + The camera ID. + filename : str + The video filename. + """ + video_path = os.path.join(VIDEO_DIR, client_id, camera_id, filename) + + if not os.path.exists(video_path): + raise HTTPException(status_code=404, detail="Video not found") + + logger.debug("Serving video file: '%s', video_path") + + return FileResponse(video_path, media_type="video/ogg") + +def list_video_files(client_id: str) -> dict: + """ + Return a dictionary of recorded video files for a given client. + {<camera_id>: [(<filename>, <file url>, <date>)]} + + Parameters + ---------- + client_id : str + The client ID. + + Returns + ------- + dict + Dictionary of recorded video files for the client. + """ + client_videos = {} + client_dir = os.path.join(VIDEO_DIR, client_id) + if os.path.exists(client_dir): + for (_, dirnames, _) in os.walk(client_dir): + for camera_dir in dirnames: + client_videos[camera_dir] = [] + camera_path = os.path.join(client_dir, camera_dir) + for (_, _, filenames) in os.walk(camera_path): + for filename in filenames: + try: + filepath = os.path.join(VIDEO_DIR, camera_path, filename) + timestamp = datetime.datetime.strftime( + datetime.datetime.fromtimestamp( + os.path.getmtime(filepath) + ), + TIME_FORMAT) + except Exception as exc: + logger.warning("Could not determine creation time for file %r: %r", filename, exc) + timestamp = 'unknown' + video_url = f"/videos/{client_id}/{camera_dir}/{filename}" + client_videos[camera_dir].append((filename, video_url, timestamp)) + return client_videos + +def validate_directive(command: str, args: list, client_id: str, camera_id: str) -> bool: + """ + Validate if the directive is known. + + Parameters + ---------- + command : str + The directive command. + args : list + The arguments for the directive. + client_id : str + The client ID. + camera_id : str + The camera ID. + + Returns + ------- + bool + True if the directive is valid, False otherwise. + """ + retval = False + # First, check if command is known: + if command not in DIRECTIVES: + logger.warning("Unknown directive received: %s", command) + return retval + if len(args) != 1: + logger.error("Directive '%s' requires exactly one argument.", command) + return retval + # Validate arguments passed from the frontend: + match command: + case 'modify_camera_name': + retval = validate_camera_name(args, client_id, camera_id) + case 'modify_camera_threshold': + retval = validate_camera_threshold(args) + case 'modify_camera_grace_pd': + retval = validate_camera_grace_period(args) + case 'modify_camera_address': + retval = validate_camera_address(args) + # case 'add_camera': + # retval = validate_add_camera(args, client_id) + case 'remove_camera': + retval = True # No specific validation needed + return retval + +def validate_camera_name(name, client_id, camera_id) -> bool: + """ + Validate camera name to avoid problematic characters and whitespace. + + Parameters + ---------- + name : str + The new camera name. + client_id : str + The client ID. + camera_id : str + The old camera ID. + + Returns + ------- + bool + True if the camera name is valid, False otherwise. + """ + retval = True + new_name = name # TODO: check if sending old name is needed if we have camera_id + old_name = camera_id + if old_name not in CLIENTS_DICT[client_id]: + logger.error("Old camera name does not exist.") + retval = False + if new_name in CLIENTS_DICT[client_id]: + logger.error("New camera name already exists.") + retval = False + if not new_name or any(char in new_name for char in ':') or any(char.isspace() for char in new_name): + logger.error("Camera name cannot be empty or contain spaces or colons.") + retval = False + if len(new_name) > MAX_CAMERA_NAME_LENGTH: + logger.error("Camera name is too long (max %s characters).", MAX_CAMERA_NAME_LENGTH) + retval = False + return retval + +def validate_camera_threshold(threshold) -> bool: + """ + Validate contour size threshold. + + Parameters + ---------- + threshold : str + The contour size threshold. + + Returns + ------- + bool + True if the contour size threshold is valid, False otherwise. + """ + retval = True + try: + value = int(threshold) + if value <= 0: + logger.error("Contour size threshold must be positive: %r.", threshold) + retval = False + except ValueError as exc: + logger.error("Invalid contour size threshold: %r, exception: %r", threshold, exc) + retval = False + return retval + +def validate_camera_grace_period(grace_period) -> bool: + """ + Validate movement grace period. + + Parameters + ---------- + grace_period : str + The movement grace period. + + Returns + ------- + bool + True if the movement grace period is valid, False otherwise. + """ + retval = True + try: + value = int(grace_period) + if value < 0: + logger.error("Movement grace period cannot be negative: %r.", grace_period) + retval = False + except ValueError as exc: + logger.error("Invalid movement grace period: %r, exception: ", grace_period, exc) + retval = False + return retval + +def validate_camera_address(address) -> bool: + """ + Validate camera address. + + Parameters + ---------- + address : str + The camera address. + + Returns + ------- + bool + True if the camera address is valid, False otherwise. + """ + retval = True + if not address or any(char.isspace() for char in address): + logger.error("Camera address %r cannot be empty.", address) + retval = False + if not address.startswith('/dev/video'): + logger.error("Camera address must start with '/dev/video': %r.", address) + retval = False + return retval + +def validate_remove_camera(name, client_id) -> bool: + """ + Validate remove camera directive. + + Parameters + ---------- + name : str + The camera name to remove. + client_id : str + The client ID. + + Returns + ------- + bool + True if the camera name is valid, False otherwise. + """ + retval = True + if name not in CLIENTS_DICT[client_id]: + logger.error("Camera name %r does not exist.", client_id) + retval = False + return retval + +if __name__ == "__main__": + uvicorn.run( + _app, + port=8007, + host='127.0.0.1', + log_level='info', + ) |
