aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFranoosh <uinarf@autistici.org>2026-01-10 14:16:42 +0100
committerFranoosh <uinarf@autistici.org>2026-01-10 14:16:42 +0100
commit632fdc7b31dc11ed478f7371676a09a2145eaba4 (patch)
tree68d8b3096f89640f3065a89cbfb2f6f528bc8fec
parent80e224e8edcc6dace9bb47163e10fbc3eb88025e (diff)
downloadZeroMQ_Video_Streaming-master.tar.gz
ZeroMQ_Video_Streaming-master.tar.bz2
ZeroMQ_Video_Streaming-master.zip
Fix webserver.py hanging on SIGTERM/SIGINT.HEADmaster
-rw-r--r--webserver.py117
1 files changed, 75 insertions, 42 deletions
diff --git a/webserver.py b/webserver.py
index 4c45692..7fa2e3c 100644
--- a/webserver.py
+++ b/webserver.py
@@ -68,6 +68,7 @@ logging.basicConfig(
ws_connections = defaultdict(dict) # ws_connections[client_id][camera_id] = websocket
ws_queues = defaultdict(dict)
ctrl_msg_que = asyncio.Queue()
+shutdown_event = asyncio.Event()
# Create ZMQ context and socket:
zmq_context = zmq.asyncio.Context()
# Authenticate context:
@@ -92,7 +93,7 @@ async def zmq_bridge():
"""
poll = zmq.asyncio.Poller()
poll.register(zmq_socket, zmq.POLLIN)
- while True:
+ while not shutdown_event.is_set():
try:
sockets = dict(await poll.poll(100))
if zmq_socket in sockets:
@@ -139,8 +140,12 @@ async def zmq_bridge():
client_id,
camera_id
)
+ except asyncio.CancelledError:
+ logger.info("ZMQ bridge received cancellation signal")
+ raise
except Exception as e:
logger.error("Error in ZMQ bridge: %r", e)
+ logger.info("ZMQ bridge shutting down")
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator:
@@ -157,8 +162,20 @@ async def lifespan(app: FastAPI) -> AsyncGenerator:
AsyncGenerator
Yields control to the application context.
"""
- asyncio.create_task(zmq_bridge())
- yield
+ zmq_bridge_task = asyncio.create_task(zmq_bridge())
+ try:
+ yield
+ finally:
+ logger.info("Initiating graceful shutdown")
+ shutdown_event.set()
+ if zmq_bridge_task and not zmq_bridge_task.done():
+ try:
+ await asyncio.wait_for(zmq_bridge_task, timeout=5.0)
+ except asyncio.TimeoutError:
+ logger.warning("ZMQ bridge did not shut down within timeout")
+ zmq_bridge_task.cancel()
+ except Exception as e:
+ logger.error("Error waiting for ZMQ bridge shutdown: %r", e)
_app = FastAPI(lifespan=lifespan)
_app.mount("/static", StaticFiles(directory="static"), name="static")
@@ -228,55 +245,71 @@ async def camera_route(websocket: WebSocket, client_id: str, camera_id: str) ->
queue = ws_queues[client_id][camera_id]
async def send_frames():
- while True:
- try:
- # Don't wait indefinitely to allow checking for client disconnection:
- frame_data = queue.get_nowait()
- except asyncio.QueueEmpty:
- await asyncio.sleep(0.1)
- continue
- try:
- await websocket.send_bytes(frame_data)
- # This exception is raised when client disconnects:
- except Exception as exc:
- logger.warning("Error sending frame to '/ws/%s/%s': %r", client_id, camera_id, exc)
- ws_connections[client_id].pop(camera_id, None)
- break
+ try:
+ while True:
+ try:
+ # Don't wait indefinitely to allow checking for client disconnection:
+ frame_data = queue.get_nowait()
+ except asyncio.QueueEmpty:
+ await asyncio.sleep(0.1)
+ continue
+ try:
+ await websocket.send_bytes(frame_data)
+ # This exception is raised when client disconnects:
+ except Exception as exc:
+ logger.warning("Error sending frame to '/ws/%s/%s': %r", client_id, camera_id, exc)
+ ws_connections[client_id].pop(camera_id, None)
+ break
+ except asyncio.CancelledError:
+ logger.debug("send_frames task cancelled for '/ws/%s/%s'", client_id, camera_id)
+ raise
async def receive_control():
- while True:
- try:
- data = await websocket.receive_text()
- logger.info("Received control message: %r from client_id: '%s', camera_id: '%s'.", data, client_id, camera_id)
- # Handle control messages from the client:
- frontend_message = json.loads(data)
- for command, args in frontend_message.items():
- if validate_directive(command, args, client_id, camera_id):
- args_list = [str(arg).encode('utf-8') for arg in args]
- ctrl_msg_que.put_nowait((client_id.encode('utf-8'), camera_id.encode('utf-8'), command.encode('utf-8'), args_list))
- logger.info(
- "Put control command %r with args: %r for client_id: %r, camera_id: %r on queue to backend.",
- command,
- args_list,
- client_id,
- camera_id
- )
- except json.JSONDecodeError:
- logger.warning("Received invalid JSON from '/ws/%s/%s': %s", client_id, camera_id, data)
- except WebSocketDisconnect:
- logger.info("WebSocket disconnected for '/ws/%s/%s'.", client_id, camera_id)
- ws_connections[client_id].pop(camera_id, None)
- break
- except Exception as exc:
- logger.warning("Error receiving control message: %r", exc)
+ try:
+ while True:
+ try:
+ data = await websocket.receive_text()
+ logger.info("Received control message: %r from client_id: '%s', camera_id: '%s'.", data, client_id, camera_id)
+ # Handle control messages from the client:
+ frontend_message = json.loads(data)
+ for command, args in frontend_message.items():
+ if validate_directive(command, args, client_id, camera_id):
+ args_list = [str(arg).encode('utf-8') for arg in args]
+ ctrl_msg_que.put_nowait((client_id.encode('utf-8'), camera_id.encode('utf-8'), command.encode('utf-8'), args_list))
+ logger.info(
+ "Put control command %r with args: %r for client_id: %r, camera_id: %r on queue to backend.",
+ command,
+ args_list,
+ client_id,
+ camera_id
+ )
+ except json.JSONDecodeError:
+ logger.warning("Received invalid JSON from '/ws/%s/%s': %s", client_id, camera_id, data)
+ except WebSocketDisconnect:
+ logger.info("WebSocket disconnected for '/ws/%s/%s'.", client_id, camera_id)
+ ws_connections[client_id].pop(camera_id, None)
+ break
+ except asyncio.CancelledError:
+ logger.debug("receive_control task cancelled for '/ws/%s/%s'", client_id, camera_id)
+ raise
+ except Exception as exc:
+ logger.warning("Error receiving control message: %r", exc)
send_task = asyncio.create_task(send_frames())
receive_task = asyncio.create_task(receive_control())
try:
await asyncio.gather(send_task, receive_task)
+ except asyncio.CancelledError:
+ logger.debug("camera_route cancelled for '/ws/%s/%s'", client_id, camera_id)
+ except Exception as exc:
+ logger.error("Error in camera_route for '/ws/%s/%s': %r", client_id, camera_id, exc)
finally:
send_task.cancel()
receive_task.cancel()
+ try:
+ await asyncio.gather(send_task, receive_task, return_exceptions=True)
+ except Exception:
+ pass
ws_connections[client_id].pop(camera_id, None)
ws_queues[client_id].pop(camera_id, None)
logger.info("Cleaned up websocket connection for '/ws/%s/%s'.", client_id, camera_id)