diff options
| author | Franoosh <uinarf@autistici.org> | 2026-01-10 14:16:42 +0100 |
|---|---|---|
| committer | Franoosh <uinarf@autistici.org> | 2026-01-10 14:16:42 +0100 |
| commit | 632fdc7b31dc11ed478f7371676a09a2145eaba4 (patch) | |
| tree | 68d8b3096f89640f3065a89cbfb2f6f528bc8fec | |
| parent | 80e224e8edcc6dace9bb47163e10fbc3eb88025e (diff) | |
| download | ZeroMQ_Video_Streaming-master.tar.gz ZeroMQ_Video_Streaming-master.tar.bz2 ZeroMQ_Video_Streaming-master.zip | |
| -rw-r--r-- | webserver.py | 117 |
1 files changed, 75 insertions, 42 deletions
diff --git a/webserver.py b/webserver.py index 4c45692..7fa2e3c 100644 --- a/webserver.py +++ b/webserver.py @@ -68,6 +68,7 @@ logging.basicConfig( ws_connections = defaultdict(dict) # ws_connections[client_id][camera_id] = websocket ws_queues = defaultdict(dict) ctrl_msg_que = asyncio.Queue() +shutdown_event = asyncio.Event() # Create ZMQ context and socket: zmq_context = zmq.asyncio.Context() # Authenticate context: @@ -92,7 +93,7 @@ async def zmq_bridge(): """ poll = zmq.asyncio.Poller() poll.register(zmq_socket, zmq.POLLIN) - while True: + while not shutdown_event.is_set(): try: sockets = dict(await poll.poll(100)) if zmq_socket in sockets: @@ -139,8 +140,12 @@ async def zmq_bridge(): client_id, camera_id ) + except asyncio.CancelledError: + logger.info("ZMQ bridge received cancellation signal") + raise except Exception as e: logger.error("Error in ZMQ bridge: %r", e) + logger.info("ZMQ bridge shutting down") @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator: @@ -157,8 +162,20 @@ async def lifespan(app: FastAPI) -> AsyncGenerator: AsyncGenerator Yields control to the application context. """ - asyncio.create_task(zmq_bridge()) - yield + zmq_bridge_task = asyncio.create_task(zmq_bridge()) + try: + yield + finally: + logger.info("Initiating graceful shutdown") + shutdown_event.set() + if zmq_bridge_task and not zmq_bridge_task.done(): + try: + await asyncio.wait_for(zmq_bridge_task, timeout=5.0) + except asyncio.TimeoutError: + logger.warning("ZMQ bridge did not shut down within timeout") + zmq_bridge_task.cancel() + except Exception as e: + logger.error("Error waiting for ZMQ bridge shutdown: %r", e) _app = FastAPI(lifespan=lifespan) _app.mount("/static", StaticFiles(directory="static"), name="static") @@ -228,55 +245,71 @@ async def camera_route(websocket: WebSocket, client_id: str, camera_id: str) -> queue = ws_queues[client_id][camera_id] async def send_frames(): - while True: - try: - # Don't wait indefinitely to allow checking for client disconnection: - frame_data = queue.get_nowait() - except asyncio.QueueEmpty: - await asyncio.sleep(0.1) - continue - try: - await websocket.send_bytes(frame_data) - # This exception is raised when client disconnects: - except Exception as exc: - logger.warning("Error sending frame to '/ws/%s/%s': %r", client_id, camera_id, exc) - ws_connections[client_id].pop(camera_id, None) - break + try: + while True: + try: + # Don't wait indefinitely to allow checking for client disconnection: + frame_data = queue.get_nowait() + except asyncio.QueueEmpty: + await asyncio.sleep(0.1) + continue + try: + await websocket.send_bytes(frame_data) + # This exception is raised when client disconnects: + except Exception as exc: + logger.warning("Error sending frame to '/ws/%s/%s': %r", client_id, camera_id, exc) + ws_connections[client_id].pop(camera_id, None) + break + except asyncio.CancelledError: + logger.debug("send_frames task cancelled for '/ws/%s/%s'", client_id, camera_id) + raise async def receive_control(): - while True: - try: - data = await websocket.receive_text() - logger.info("Received control message: %r from client_id: '%s', camera_id: '%s'.", data, client_id, camera_id) - # Handle control messages from the client: - frontend_message = json.loads(data) - for command, args in frontend_message.items(): - if validate_directive(command, args, client_id, camera_id): - args_list = [str(arg).encode('utf-8') for arg in args] - ctrl_msg_que.put_nowait((client_id.encode('utf-8'), camera_id.encode('utf-8'), command.encode('utf-8'), args_list)) - logger.info( - "Put control command %r with args: %r for client_id: %r, camera_id: %r on queue to backend.", - command, - args_list, - client_id, - camera_id - ) - except json.JSONDecodeError: - logger.warning("Received invalid JSON from '/ws/%s/%s': %s", client_id, camera_id, data) - except WebSocketDisconnect: - logger.info("WebSocket disconnected for '/ws/%s/%s'.", client_id, camera_id) - ws_connections[client_id].pop(camera_id, None) - break - except Exception as exc: - logger.warning("Error receiving control message: %r", exc) + try: + while True: + try: + data = await websocket.receive_text() + logger.info("Received control message: %r from client_id: '%s', camera_id: '%s'.", data, client_id, camera_id) + # Handle control messages from the client: + frontend_message = json.loads(data) + for command, args in frontend_message.items(): + if validate_directive(command, args, client_id, camera_id): + args_list = [str(arg).encode('utf-8') for arg in args] + ctrl_msg_que.put_nowait((client_id.encode('utf-8'), camera_id.encode('utf-8'), command.encode('utf-8'), args_list)) + logger.info( + "Put control command %r with args: %r for client_id: %r, camera_id: %r on queue to backend.", + command, + args_list, + client_id, + camera_id + ) + except json.JSONDecodeError: + logger.warning("Received invalid JSON from '/ws/%s/%s': %s", client_id, camera_id, data) + except WebSocketDisconnect: + logger.info("WebSocket disconnected for '/ws/%s/%s'.", client_id, camera_id) + ws_connections[client_id].pop(camera_id, None) + break + except asyncio.CancelledError: + logger.debug("receive_control task cancelled for '/ws/%s/%s'", client_id, camera_id) + raise + except Exception as exc: + logger.warning("Error receiving control message: %r", exc) send_task = asyncio.create_task(send_frames()) receive_task = asyncio.create_task(receive_control()) try: await asyncio.gather(send_task, receive_task) + except asyncio.CancelledError: + logger.debug("camera_route cancelled for '/ws/%s/%s'", client_id, camera_id) + except Exception as exc: + logger.error("Error in camera_route for '/ws/%s/%s': %r", client_id, camera_id, exc) finally: send_task.cancel() receive_task.cancel() + try: + await asyncio.gather(send_task, receive_task, return_exceptions=True) + except Exception: + pass ws_connections[client_id].pop(camera_id, None) ws_queues[client_id].pop(camera_id, None) logger.info("Cleaned up websocket connection for '/ws/%s/%s'.", client_id, camera_id) |
