aboutsummaryrefslogtreecommitdiff
path: root/webserver.py
diff options
context:
space:
mode:
Diffstat (limited to 'webserver.py')
-rw-r--r--webserver.py524
1 files changed, 524 insertions, 0 deletions
diff --git a/webserver.py b/webserver.py
new file mode 100644
index 0000000..41d1d30
--- /dev/null
+++ b/webserver.py
@@ -0,0 +1,524 @@
+#!/usr/bin/env python
+"""
+Module serving video from zmq to a webserver.
+"""
+__author__ = "Franoosh Corporation"
+
+import os
+from collections import defaultdict
+import json
+import logging
+import asyncio
+import datetime
+from contextlib import asynccontextmanager
+import zmq
+import zmq.asyncio
+import uvicorn
+from fastapi import (
+ FastAPI,
+ Request,
+ HTTPException,
+ WebSocket,
+ WebSocketDisconnect,
+ templating,
+)
+from fastapi.responses import HTMLResponse, FileResponse
+from fastapi.staticfiles import StaticFiles
+from typing import AsyncGenerator
+
+from helpers import (
+ CustomLoggingFormatter,
+ DIRECTIVES,
+ MAX_CAMERA_NAME_LENGTH,
+)
+
+
+CWD = os.getcwd()
+CLIENTS_JSON_FILE = os.path.join(CWD, 'clients.json')
+CLIENTS_DICT = defaultdict(list) # CLIENTS_DICT[client_id] = [camera_id1, camera_id2, ...]
+BASENAME = f"{os.path.splitext(os.path.basename(__file__))[0]}"
+LOGDIR = 'logs'
+LOGFILE = os.path.join(LOGDIR, f"{BASENAME}.log")
+LOGLEVEL = logging.INFO
+# LOGLEVEL = logging.DEBUG
+# This should come from the config file:
+VIDEO_DIR = os.path.join(CWD, "videos")
+
+HOST = "127.0.0.1"
+ZMQ_PORT = "9979"
+WEB_PORT = "8008"
+CTRL_BACKEND_ADDR = f"tcp://{HOST}:{ZMQ_PORT}"
+WEB_BACKEND_ADDR = f"tcp://{HOST}:{WEB_PORT}"
+TIME_FORMAT = '%Y_%m_%d-%H_%M_%S'
+
+log_formatter = CustomLoggingFormatter()
+handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
+handler.setFormatter(log_formatter)
+logging.root.addHandler(handler)
+logging.root.setLevel(LOGLEVEL)
+logger = logging.getLogger(__name__)
+logging.basicConfig(
+ filename=LOGFILE,
+ datefmt='%Y-%m-%d %I:%M:%S',
+ level=LOGLEVEL,
+)
+
+# Track websocket connections by (client_id, camera_id):
+ws_connections = defaultdict(dict) # ws_connections[client_id][camera_id] = websocket
+ws_queues = defaultdict(dict)
+ctrl_msg_que = asyncio.Queue()
+# Create ZMQ context and socket:
+zmq_context = zmq.asyncio.Context()
+zmq_socket = zmq_context.socket(zmq.DEALER)
+# Connect to ZMQ backend:
+zmq_socket.connect(WEB_BACKEND_ADDR)
+
+
+async def zmq_bridge():
+ """
+ Bridge between ZMQ backend and websocket clients.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ poll = zmq.asyncio.Poller()
+ poll.register(zmq_socket, zmq.POLLIN)
+ while True:
+ try:
+ sockets = dict(await poll.poll(100))
+ if zmq_socket in sockets:
+ logger.debug("Receiving message from ZMQ backend ...")
+ topic, frame_data = await zmq_socket.recv_multipart()
+ # messages can also now contain directives, like 'new_camera' and 'removed_camera' with empty frame_data
+ # Send those to a control queue to be handled appropriately
+ client_id, camera_id = topic.decode('utf-8').split(':', 1) # <-- strings from now on
+ # client_id, camera_id = topic.split(b':', 1)
+ if not camera_id in CLIENTS_DICT[client_id]:
+ CLIENTS_DICT[client_id].append(camera_id)
+ queue = ws_queues.get(client_id, {}).get(camera_id)
+ if queue:
+ if queue.full():
+ logger.debug(
+ "WebsSocket queue full for '/ws/%r/%r', discarding oldest frame.",
+ client_id,
+ camera_id
+ )
+ _ = queue.get_nowait() # Discard oldest frame
+ await queue.put(frame_data)
+ else:
+ logger.debug("Creating new websocket queue for '/ws/%r/%r'.", client_id, camera_id)
+ ws_queues[client_id][camera_id] = asyncio.Queue(maxsize=10)
+ if not ctrl_msg_que.empty():
+ client_id, camera_id, command, args_list = None, None, None, None
+ try:
+ client_id, camera_id, command, args_list = await ctrl_msg_que.get()
+ except ValueError as ve:
+ logger.error("Invalid control message on queue: %r", ve)
+ if client_id and camera_id and command and args_list is not None:
+ try:
+ zmq_socket.send_multipart([
+ client_id,
+ camera_id,
+ command,
+ ] + args_list, flags=zmq.NOBLOCK)
+ logger.info("Sent control command.")
+ except zmq.Again:
+ logger.error(
+ "ZMQ socket busy, could not send control command '%r' with args: %r for '/ws/%r/%r' to backend.",
+ command,
+ args_list,
+ client_id,
+ camera_id
+ )
+ except Exception as e:
+ logger.error("Error in ZMQ bridge: %r", e)
+
+@asynccontextmanager
+async def lifespan(app: FastAPI) -> AsyncGenerator:
+ """
+ Create lifespan context for FastAPI app.
+
+ Parameters
+ ----------
+ app : FastAPI
+ The FastAPI application instance.
+
+ Returns
+ -------
+ AsyncGenerator
+ Yields control to the application context.
+ """
+ asyncio.create_task(zmq_bridge())
+ yield
+
+_app = FastAPI(lifespan=lifespan)
+_app.mount("/static", StaticFiles(directory="static"), name="static")
+templates = templating.Jinja2Templates(directory='templates')
+
+@_app.get("/")
+async def main_route(request: Request) -> HTMLResponse:
+ """
+ Main route serving the index page.
+
+ Parameters
+ ----------
+ request : Request
+ The incoming HTTP request.
+ """
+ logger.debug("Main route visited")
+ return templates.TemplateResponse(
+ "main.html",
+ {
+ "request": request,
+ "clients": CLIENTS_DICT,
+ }
+ )
+
+@_app.get("/clients/{client_id}", response_class=HTMLResponse)
+async def client_route(request: Request, client_id: str) -> HTMLResponse:
+ """
+ Serve a particular client page.
+
+ Parameters
+ ----------
+ request : Request
+ The incoming HTTP request.
+ client_id : str
+ The client ID to serve.
+ """
+ logger.debug("Checking client_id: %s in clients_dict: %r.", client_id, CLIENTS_DICT)
+ if not client_id in CLIENTS_DICT:
+ raise HTTPException(status_code=404, detail="No such client ID.")
+ return templates.TemplateResponse(
+ "client.html",
+ {
+ "request": request,
+ "client_id": client_id,
+ "camera_ids": CLIENTS_DICT[client_id],
+ "client_videos": list_video_files(client_id),
+ },
+ )
+
+@_app.websocket("/ws/{client_id}/{camera_id}")
+async def camera_route(websocket: WebSocket, client_id: str, camera_id: str) -> None:
+ """
+ Serve a particular camera page.
+
+ Parameters
+ ----------
+ websocket : WebSocket
+ The incoming WebSocket connection.
+ client_id : str
+ The client ID.
+ camera_id : str
+ The camera ID.
+ """
+ logger.info("Accepting websocket connection for '/ws/%s/%s'.", client_id, camera_id)
+ await websocket.accept()
+ ws_connections[client_id][camera_id] = {'ws': websocket}
+ queue = ws_queues[client_id][camera_id]
+
+ async def send_frames():
+ while True:
+ logger.debug("Getting from queue frame for '/ws/%s/%s'.", client_id, camera_id)
+ try:
+ # Don't wait indefinitely to allow checking for client disconnection:
+ frame_data = queue.get_nowait()
+ except asyncio.QueueEmpty:
+ await asyncio.sleep(0.1)
+ continue
+ try:
+ logger.debug("Sending frame to '/ws/%s/%s'.", client_id, camera_id)
+ await websocket.send_bytes(frame_data)
+ logger.debug("Sent frame to '/ws/%s/%s'.", client_id, camera_id)
+ # This exception is raised when client disconnects:
+ except Exception as exc:
+ logger.warning("Error sending frame to '/ws/%s/%s': %r", client_id, camera_id, exc)
+ ws_connections[client_id].pop(camera_id, None)
+ break
+
+ async def receive_control():
+ while True:
+ try:
+ data = await websocket.receive_text()
+ logger.info("Received control message from '/ws/%s/%s': %s", client_id, camera_id, data)
+ # Handle control messages from the client:
+ frontend_message = json.loads(data)
+ for command, args in frontend_message.items():
+ if validate_directive(command, args, client_id, camera_id):
+ args_list = [str(arg).encode('utf-8') for arg in args]
+ ctrl_msg_que.put_nowait((client_id, camera_id, command, args_list))
+ logger.info(
+ "Put control command '%s' with args: %r for '/ws/%s/%s' on queue to backend.",
+ command,
+ args_list,
+ client_id,
+ camera_id
+ )
+ except json.JSONDecodeError:
+ logger.warning("Received invalid JSON from '/ws/%s/%s': %s", client_id, camera_id, data)
+ except WebSocketDisconnect:
+ logger.info("WebSocket disconnected for '/ws/%s/%s'.", client_id, camera_id)
+ ws_connections[client_id].pop(camera_id, None)
+ break
+ except Exception as exc:
+ logger.warning("Error receiving control message: %r", exc)
+
+ send_task = asyncio.create_task(send_frames())
+ receive_task = asyncio.create_task(receive_control())
+ try:
+ await asyncio.gather(send_task, receive_task)
+ finally:
+ send_task.cancel()
+ receive_task.cancel()
+ ws_connections[client_id].pop(camera_id, None)
+ ws_queues[client_id].pop(camera_id, None)
+ logger.info("Cleaned up websocket connection for '/ws/%s/%s'.", client_id, camera_id)
+
+@_app.get("/videos/{client_id}/{camera_id}/{filename}")
+async def serve_video(client_id: str, camera_id: str, filename: str) -> FileResponse:
+ """
+ Route serving video files.
+
+ Parameters
+ ----------
+ client_id : str
+ The client ID.
+ camera_id : str
+ The camera ID.
+ filename : str
+ The video filename.
+ """
+ video_path = os.path.join(VIDEO_DIR, client_id, camera_id, filename)
+
+ if not os.path.exists(video_path):
+ raise HTTPException(status_code=404, detail="Video not found")
+
+ logger.debug("Serving video file: '%s', video_path")
+
+ return FileResponse(video_path, media_type="video/ogg")
+
+def list_video_files(client_id: str) -> dict:
+ """
+ Return a dictionary of recorded video files for a given client.
+ {<camera_id>: [(<filename>, <file url>, <date>)]}
+
+ Parameters
+ ----------
+ client_id : str
+ The client ID.
+
+ Returns
+ -------
+ dict
+ Dictionary of recorded video files for the client.
+ """
+ client_videos = {}
+ client_dir = os.path.join(VIDEO_DIR, client_id)
+ if os.path.exists(client_dir):
+ for (_, dirnames, _) in os.walk(client_dir):
+ for camera_dir in dirnames:
+ client_videos[camera_dir] = []
+ camera_path = os.path.join(client_dir, camera_dir)
+ for (_, _, filenames) in os.walk(camera_path):
+ for filename in filenames:
+ try:
+ filepath = os.path.join(VIDEO_DIR, camera_path, filename)
+ timestamp = datetime.datetime.strftime(
+ datetime.datetime.fromtimestamp(
+ os.path.getmtime(filepath)
+ ),
+ TIME_FORMAT)
+ except Exception as exc:
+ logger.warning("Could not determine creation time for file %r: %r", filename, exc)
+ timestamp = 'unknown'
+ video_url = f"/videos/{client_id}/{camera_dir}/{filename}"
+ client_videos[camera_dir].append((filename, video_url, timestamp))
+ return client_videos
+
+def validate_directive(command: str, args: list, client_id: str, camera_id: str) -> bool:
+ """
+ Validate if the directive is known.
+
+ Parameters
+ ----------
+ command : str
+ The directive command.
+ args : list
+ The arguments for the directive.
+ client_id : str
+ The client ID.
+ camera_id : str
+ The camera ID.
+
+ Returns
+ -------
+ bool
+ True if the directive is valid, False otherwise.
+ """
+ retval = False
+ # First, check if command is known:
+ if command not in DIRECTIVES:
+ logger.warning("Unknown directive received: %s", command)
+ return retval
+ if len(args) != 1:
+ logger.error("Directive '%s' requires exactly one argument.", command)
+ return retval
+ # Validate arguments passed from the frontend:
+ match command:
+ case 'modify_camera_name':
+ retval = validate_camera_name(args, client_id, camera_id)
+ case 'modify_camera_threshold':
+ retval = validate_camera_threshold(args)
+ case 'modify_camera_grace_pd':
+ retval = validate_camera_grace_period(args)
+ case 'modify_camera_address':
+ retval = validate_camera_address(args)
+ # case 'add_camera':
+ # retval = validate_add_camera(args, client_id)
+ case 'remove_camera':
+ retval = True # No specific validation needed
+ return retval
+
+def validate_camera_name(name, client_id, camera_id) -> bool:
+ """
+ Validate camera name to avoid problematic characters and whitespace.
+
+ Parameters
+ ----------
+ name : str
+ The new camera name.
+ client_id : str
+ The client ID.
+ camera_id : str
+ The old camera ID.
+
+ Returns
+ -------
+ bool
+ True if the camera name is valid, False otherwise.
+ """
+ retval = True
+ new_name = name # TODO: check if sending old name is needed if we have camera_id
+ old_name = camera_id
+ if old_name not in CLIENTS_DICT[client_id]:
+ logger.error("Old camera name does not exist.")
+ retval = False
+ if new_name in CLIENTS_DICT[client_id]:
+ logger.error("New camera name already exists.")
+ retval = False
+ if not new_name or any(char in new_name for char in ':') or any(char.isspace() for char in new_name):
+ logger.error("Camera name cannot be empty or contain spaces or colons.")
+ retval = False
+ if len(new_name) > MAX_CAMERA_NAME_LENGTH:
+ logger.error("Camera name is too long (max %s characters).", MAX_CAMERA_NAME_LENGTH)
+ retval = False
+ return retval
+
+def validate_camera_threshold(threshold) -> bool:
+ """
+ Validate contour size threshold.
+
+ Parameters
+ ----------
+ threshold : str
+ The contour size threshold.
+
+ Returns
+ -------
+ bool
+ True if the contour size threshold is valid, False otherwise.
+ """
+ retval = True
+ try:
+ value = int(threshold)
+ if value <= 0:
+ logger.error("Contour size threshold must be positive: %r.", threshold)
+ retval = False
+ except ValueError as exc:
+ logger.error("Invalid contour size threshold: %r, exception: %r", threshold, exc)
+ retval = False
+ return retval
+
+def validate_camera_grace_period(grace_period) -> bool:
+ """
+ Validate movement grace period.
+
+ Parameters
+ ----------
+ grace_period : str
+ The movement grace period.
+
+ Returns
+ -------
+ bool
+ True if the movement grace period is valid, False otherwise.
+ """
+ retval = True
+ try:
+ value = int(grace_period)
+ if value < 0:
+ logger.error("Movement grace period cannot be negative: %r.", grace_period)
+ retval = False
+ except ValueError as exc:
+ logger.error("Invalid movement grace period: %r, exception: ", grace_period, exc)
+ retval = False
+ return retval
+
+def validate_camera_address(address) -> bool:
+ """
+ Validate camera address.
+
+ Parameters
+ ----------
+ address : str
+ The camera address.
+
+ Returns
+ -------
+ bool
+ True if the camera address is valid, False otherwise.
+ """
+ retval = True
+ if not address or any(char.isspace() for char in address):
+ logger.error("Camera address %r cannot be empty.", address)
+ retval = False
+ if not address.startswith('/dev/video'):
+ logger.error("Camera address must start with '/dev/video': %r.", address)
+ retval = False
+ return retval
+
+def validate_remove_camera(name, client_id) -> bool:
+ """
+ Validate remove camera directive.
+
+ Parameters
+ ----------
+ name : str
+ The camera name to remove.
+ client_id : str
+ The client ID.
+
+ Returns
+ -------
+ bool
+ True if the camera name is valid, False otherwise.
+ """
+ retval = True
+ if name not in CLIENTS_DICT[client_id]:
+ logger.error("Camera name %r does not exist.", client_id)
+ retval = False
+ return retval
+
+if __name__ == "__main__":
+ uvicorn.run(
+ _app,
+ port=8007,
+ host='127.0.0.1',
+ log_level='info',
+ )