aboutsummaryrefslogtreecommitdiff
path: root/worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'worker.py')
-rw-r--r--worker.py95
1 files changed, 51 insertions, 44 deletions
diff --git a/worker.py b/worker.py
index 26ea8aa..cd47345 100644
--- a/worker.py
+++ b/worker.py
@@ -5,6 +5,8 @@ via router from clients. Able to send control messages to clients.
Needs libzmq and pyzmq with 'drafts' support.
"""
+__author__ = "Franoosh Corporation"
+
import os
import sys
import struct
@@ -27,13 +29,11 @@ from helpers import (
TIME_FORMAT_STRING,
)
-__author__ = "Franoosh Corporation"
# Various constants: # TODO: put them in a config
HOST = "127.0.0.1"
ZMQPORT = "9979"
-# WSPORT = "8000"
WSPORT = "8008"
ZMQ_BACKEND_ADDR = f"tcp://{HOST}:{ZMQPORT}"
WEB_BACKEND_ADDR = f"tcp://{HOST}:{WSPORT}"
@@ -92,7 +92,7 @@ class MonitorTask(Thread):
----------
socket : zmq.Socket
The zmq socket to monitor.
-
+
Methods
-------
run()
@@ -192,7 +192,7 @@ class ServerWorker(Thread):
def __init__(self, identity, context=None) -> None:
"""
Docstring for __init__
-
+
Parameters
----------
identity : bytes
@@ -211,7 +211,7 @@ class ServerWorker(Thread):
self.connected = False
self.running = True
- def start_client(self, client_id, camera_id, filename) -> None:
+ def start_client(self, client_id: bytes, camera_id: bytes, timestamp: bytes) -> None:
"""
Start a video thread for new client_id and camera_id.
@@ -221,14 +221,23 @@ class ServerWorker(Thread):
Client ID
camera_id : bytes
Camera ID
- filename : str
- Filename to write video to
+ timestamp : bytes
+ Timestamp of the action
Returns
-------
None
"""
- # New client or new camera for existing client:
+ update_clients(client_id, camera_id)
+
+ timestamp_string = datetime.datetime.fromtimestamp(
+ struct.unpack('d', timestamp)[0]
+ ).strftime(TIME_FORMAT_STRING)
+ logger.debug("Timestamp string: %r", timestamp_string)
+
+ camera_id_string = camera_id.decode('utf-8')
+ filename = f"{camera_id_string}-{timestamp_string}.mp4"
+
if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id):
q = Queue()
logger.debug("Starting new video thread for client %r, camera %r", client_id, camera_id)
@@ -286,20 +295,12 @@ class ServerWorker(Thread):
"Received metadata directive: %r for client: %r, camera: %r, timestamp: %r",
directive, client_id, camera_id, timestamp
)
- update_clients(client_id, camera_id)
- if directive in (b'start', b'stop'):
- camera_id_string = camera_id.decode('utf-8')
- timestamp_string = datetime.datetime.fromtimestamp(
- struct.unpack('d', timestamp)[0]
- ).strftime(TIME_FORMAT_STRING)
- filename = f"{camera_id_string}-{timestamp_string}.mp4"
- logger.debug("Directive: %r, Timestamp: %r, Filename: %r", directive, timestamp, filename)
- if directive == b"start":
- self.start_client(client_id, camera_id, filename)
- elif directive == b"stop":
- self.stop_client(client_id, camera_id)
- else:
- logger.error("Unknown directive: %r", directive)
+ if directive == b"start":
+ self.start_client(client_id, camera_id, timestamp)
+ elif directive == b"stop":
+ self.stop_client(client_id, camera_id)
+ else:
+ logger.error("Unknown directive: %r", directive)
# elif directive == b'rename':
# old_name = new_name = timestamp = None
# try:
@@ -336,29 +337,27 @@ class ServerWorker(Thread):
# self.web_sock.send_multipart([client_id, camera_id, directive, data], flags=zmq.NOBLOCK)
# except Exception as exc:
# logger.error("Sending camera metadata update to websocket failed: %r", exc)
- else:
- logger.error("Unknown directive: %r", directive)
return False
def handle_video_message(self, msg: list) -> None:
logger.debug("Received video message with data only.")
client_id, camera_id, *content = msg
- if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
- self.video_threads[client_id][camera_id].queue.put(content[0])
- # Send only [client_id, camera_id, jpeg_bytes] to the webserver:
- # zmq subsciber can subscribe to a topic defined by the first
- # part of the multipart message, so in order to allow for a
- # per camera subscription, we need to join client_id and camera_id
- topic = b':'.join([client_id, camera_id])
- try:
- self.web_sock.send_multipart([topic, content[0]], flags=zmq.NOBLOCK)
- except Exception as exc:
- logger.error("Sending message to websocket failed: %r", exc)
- else:
- logger.error("No video thread found for client %r, camera %r", client_id, camera_id)
- logger.debug("Available video threads keys: %r", self.video_threads.keys())
- logger.debug("Available video threads values: %r", self.video_threads.values())
+ if client_id not in self.video_threads or camera_id not in self.video_threads[client_id]:
+ # After crashing worker will not receive start message again.
+ logger.error("No video thread found for client %r, camera %r, creating", client_id, camera_id)
+ self.start_client(client_id, camera_id, struct.pack('d', time.time()))
+
+ self.video_threads[client_id][camera_id].queue.put(content[0])
+ # Send only [client_id, camera_id, jpeg_bytes] to the webserver:
+ # zmq subsciber can subscribe to a topic defined by the first
+ # part of the multipart message, so in order to allow for a
+ # per camera subscription, we need to join client_id and camera_id
+ topic = b':'.join([client_id, camera_id])
+ try:
+ self.web_sock.send_multipart([topic, content[0]], flags=zmq.NOBLOCK)
+ except Exception as exc:
+ logger.error("Sending message to websocket failed: %r", exc)
def run(self) -> None:
"""
@@ -394,7 +393,6 @@ class ServerWorker(Thread):
self.connected = True
msg = self.socket.recv_multipart()
logger.debug("ServerWorker %r received message of length: %d.", self.id, len(msg))
- filename = None
# At the moment we don't expect any other message than a
# start/stop message (of length 4) and a video message:
if len(msg) == 4: # This is a message with start/stop/rename directive and no video data.
@@ -407,10 +405,18 @@ class ServerWorker(Thread):
logger.debug("No message received, polling again ...")
time.sleep(0.1)
if self.web_sock in sockets:
- frontend_msg = self.web_sock.recv_multipart()
- logger.info("Received message from frontend: %r", frontend_msg)
- self.socket.send_multipart(frontend_msg)
- logger.info("Forwarded message to backend: %r", frontend_msg)
+ try:
+ frontend_msg = self.web_sock.recv_multipart()
+ logger.info("Received message from frontend: %r", frontend_msg)
+ except Exception as exc:
+ logger.error("Receiving message from frontend failed: %r", exc)
+ continue
+ try:
+ self.socket.send_multipart(frontend_msg)
+ except Exception as exc:
+ logger.error("Forwarding message to client failed: %r", exc)
+ continue
+ logger.info("Forwarded message to client: %r", frontend_msg)
self.monitor.stop()
self.monitor.join()
@@ -571,6 +577,7 @@ class VideoWorker(Thread):
out.release()
logger.info("VideoWorker finished writing to file: %s", self.filename)
+
def signal_handler(sig, frame) -> None:
"""
Docstring for signal_handler