aboutsummaryrefslogtreecommitdiff
path: root/worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'worker.py')
-rw-r--r--worker.py235
1 files changed, 235 insertions, 0 deletions
diff --git a/worker.py b/worker.py
new file mode 100644
index 0000000..ad5fe1e
--- /dev/null
+++ b/worker.py
@@ -0,0 +1,235 @@
+#!/usr/bin/env python
+
+__author__ = "Franoosh Corporation"
+
+"""
+A module consisting of a zeromq worker receiving video stream
+via router from clients. Able to send control messages to clients.
+"""
+
+import os
+from threading import Thread, Event
+import time
+import signal
+import logging
+from queue import Queue
+from collections import defaultdict
+import zmq
+
+from helpers import CustomLoggingFormatter
+
+
+ADDR = "tcp://localhost"
+PORT = "9979"
+INITIAL_BACKEND_ADDR = f"{ADDR}:{PORT}"
+LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log"
+LOGLEVEL = logging.DEBUG
+
+stop_event = Event()
+
+log_formatter = CustomLoggingFormatter()
+handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
+handler.setFormatter(log_formatter)
+logging.root.addHandler(handler)
+logging.root.setLevel(LOGLEVEL)
+logger = logging.getLogger(__name__)
+logging.basicConfig(
+ filename=LOGFILE,
+ datefmt='%Y-%m-%d %I:%M:%S',
+ level=LOGLEVEL,
+)
+
+class MonitorTask(Thread):
+ """Monitor task"""
+ def __init__(self, socket):
+ super().__init__(daemon=True)
+ self.socket = socket
+ self.running = True
+
+ def run(self):
+ """Monitor connection on the initial socket.
+ This is heartbeat monitoring"""
+ monitor = self.socket.get_monitor_socket()
+ monitor.setsockopt(zmq.RCVTIMEO, 5000)
+ logger.debug("Monitor socket started.")
+ while self.running:
+ try:
+ event, _ = monitor.recv_multipart()
+ # Resolve the received event type:
+ event_type = int.from_bytes(event[:2], "little")
+ if event_type in (zmq.EVENT_CLOSED, zmq.EVENT_DISCONNECTED):
+ logger.warning("Monitor socket: closed | disconnected")
+ stop_event.set()
+ elif event_type == zmq.EVENT_CONNECT_DELAYED:
+ logger.debug("Monitor socket: event connect delayed")
+ elif event_type == zmq.EVENT_CONNECT_RETRIED:
+ logger.debug("Monitor socket: event connect retried")
+ elif event_type in (zmq.EVENT_CONNECTED, zmq.EVENT_HANDSHAKE_SUCCEEDED):
+ logger.debug("Monitor socket: client connected to router, handshake OK.")
+ stop_event.clear()
+ else:
+ logger.warning("Monitor socket: other event: '%r'", event_type)
+ except zmq.Again:
+ logger.debug("Timeout on monitoring socket.")
+ except Exception as exc: # W: Catching too general exception Exception
+ logger.error("Other exception on monitoring socket: %r", exc)
+
+ # monitor.close()
+
+ def stop(self):
+ """Stop thread"""
+ logger.info("Stopping monitor thread ...")
+ self.running = False
+
+
+class ServerWorker(Thread):
+ """ServerWorker"""
+ def __init__(self, identity, context=None):
+ super().__init__(daemon=True)
+ self.id = identity
+ self.context = context or zmq.Context.instance()
+ self.socket = self.context.socket(zmq.DEALER)
+ self.socket.identity = self.id.encode("utf-8")
+ self.monitor = MonitorTask(self.socket)
+ self.video_threads = defaultdict(lambda: defaultdict(dict))
+ self.connected = False
+ self.running = True
+
+ def run(self):
+ logger.debug("ServerWorker '%s' starting ...", self.id)
+ self.socket.connect(INITIAL_BACKEND_ADDR)
+ self.poller = zmq.Poller()
+ self.poller.register(self.socket, zmq.POLLIN)
+ self.monitor.start()
+
+ while self.running:
+ logger.debug("ServerWorker '%s' waiting for a message ...", self.id)
+ if not self.connected or stop_event.is_set():
+ self.socket.send_multipart([b"READY"]) # Router needs worker identity, hence this
+ time.sleep(1) # Wait a bit before trying to connect again
+
+ sockets = dict(self.poller.poll(1000))
+ if self.socket in sockets:
+ self.connected = True
+ msg = self.socket.recv_multipart()
+ logger.debug("ServerWorker '%s' received message: %r", self.id, msg)
+ filename = None
+ # At the moment we don't expect any other message than a video message:
+ if len(msg) == 4: # This is a message with start/stop directive and no video data.
+ logger.debug("Received start/stop directive: %r", msg)
+ client_id, camera_id, metadata = msg[0], msg[1], msg[2]
+ try:
+ directive, timestamp = metadata.decode("utf-8").split(":")
+ filename = f"{client_id.decode('utf-8')}_{camera_id.decode('utf-8')}-{timestamp}.mp4"
+ logger.debug("Directive: '%s', Timestamp: '%s', Filename: '%s'", directive, timestamp, filename)
+ except ValueError:
+ logger.error("Invalid metadata format: %r", metadata)
+ continue
+ if directive == "start":
+ if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id): # New client or new camera
+ q = Queue()
+ logger.debug("Starting new video thread for client '%s', camera '%s'", client_id, camera_id)
+ video_worker = VideoWorker(filename, q)
+ video_worker.start()
+ self.video_threads[client_id][camera_id] = video_worker
+ elif directive == "stop":
+ if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
+ logger.debug("Stopping video thread for client '%s', camera '%s'", client_id, camera_id)
+ self.video_threads[client_id][camera_id].queue.put(None) # Sentinel to stop the thread
+ del self.video_threads[client_id][camera_id]
+ logger.info("Stopped video thread for client '%s', camera '%s'", client_id, camera_id)
+ else:
+ logger.error("Unknown directive: '%s'", directive)
+ elif len(msg) == 3: # This is a video message with data only
+ logger.debug("Received video message with data only: %r", msg)
+ client_id, camera_id, msg = msg[0], msg[1], msg[2]
+ if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
+ self.video_threads[client_id][camera_id].queue.put(msg)
+ else:
+ logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id)
+ logger.error("Available video threads keys: %r", self.video_threads.keys())
+ logger.error("Available video threads values: %r", self.video_threads.values())
+ elif len(msg) == 2: # This is a ping message from client
+ logger.debug("Received ping message: %r", msg)
+ client_id, camera_id = msg[0], msg[1]
+ if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
+ # Respond with a pong message
+ try:
+ self.socket.send_multipart([client_id, camera_id, b'pong'])
+ logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id)
+ except zmq.ZMQError as exc:
+ logger.error("Failed to send pong: %r", exc)
+
+ else:
+ logger.warning("Received a message of not expected length from client: %r message: %r", client_id, msg)
+ try:
+ self.socket.send_multipart([client_id, camera_id, b'pong'])
+ logger.debug("Sent pong to client '%s', camera '%s'", client_id, camera_id)
+ except zmq.ZMQError as exc:
+ logger.error("Failed to send pong: %r", exc)
+ else:
+ logger.debug("No message received, polling again ...")
+ time.sleep(5)
+
+ self.monitor.stop()
+ self.monitor.join()
+ for camera_thread in self.video_threads.values():
+ for thread in camera_thread.values():
+ thread.queue.put(None) # Sentinel to unblock queue.get()
+ thread.stop()
+ thread.join()
+
+ def send_control_message(self, client_id, camera_id, message):
+ """Send control message to a specific client and camera."""
+ if client_id in self.video_threads and camera_id in self.video_threads[client_id]:
+ self.socket.send_multipart([client_id.encode("utf-8"), camera_id.encode("utf-8"), message])
+ logger.debug("Sent control message to client '%s', camera '%s': %r", client_id, camera_id, message)
+ else:
+ logger.error("No video thread found for client '%s', camera '%s'", client_id, camera_id)
+
+ def stop(self):
+ logger.info("ServerWorker '%s' exiting ...", self.id)
+ self.running = False
+
+
+class VideoWorker(Thread):
+ """Class for video threads."""
+
+ def __init__(self, filename, queue):
+ super().__init__(daemon=True)
+ self.filename = filename
+ self.queue = queue
+ self.live = True
+
+ def run(self):
+ if os.path.exists(self.filename):
+ logger.warning("File '%s' already exists, overwriting ...", self.filename)
+ with open(self.filename, 'wb') as f:
+ logger.info("VideoWorker started writing to file: %s", self.filename)
+ while self.live:
+ # Simulate video processing
+ frame = self.queue.get() # Get frame from queue
+ logger.debug("Processing ('writing to a file') frame for camera: %r", frame)
+ if frame is None:
+ logger.info("VideoWorker received stop signal, exiting ...")
+ break
+ f.write(frame + b'\n') # Write frame to file
+
+
+ def stop(self):
+ logger.info("VideoWorker '' exiting ...")
+ self.live = False
+
+def signal_handler(sig, frame):
+ worker.stop()
+
+if __name__ == "__main__":
+ logger.info("Starting up ...")
+
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+
+ worker = ServerWorker("worker-task")
+ worker.start()
+
+ worker.join()