diff options
| author | Franoosh <uinarf@autistici.org> | 2026-01-10 13:03:05 +0100 |
|---|---|---|
| committer | Franoosh <uinarf@autistici.org> | 2026-01-10 13:03:05 +0100 |
| commit | 80e224e8edcc6dace9bb47163e10fbc3eb88025e (patch) | |
| tree | fb317b184401c8780c75f898ad03d9429fe74d5f | |
| parent | 75fadd788a22e9fada10dd99ea4e63730b650d64 (diff) | |
| download | ZeroMQ_Video_Streaming-80e224e8edcc6dace9bb47163e10fbc3eb88025e.tar.gz ZeroMQ_Video_Streaming-80e224e8edcc6dace9bb47163e10fbc3eb88025e.tar.bz2 ZeroMQ_Video_Streaming-80e224e8edcc6dace9bb47163e10fbc3eb88025e.zip | |
Add support for zmq certificates
| -rw-r--r-- | client.py | 15 | ||||
| -rw-r--r-- | helpers.py | 10 | ||||
| -rw-r--r-- | router.py | 7 | ||||
| -rw-r--r-- | webserver.py | 5 | ||||
| -rw-r--r-- | worker.py | 26 |
5 files changed, 49 insertions, 14 deletions
@@ -34,6 +34,7 @@ from helpers import ( timestamp_to_bytes, bytes_to_timestamp, write_yappi_stats, + auth_service, ) ################################################### @@ -69,6 +70,7 @@ FPS = 30 # Maybe serial number from /proc/cpuinfo. # In bytes. CLIENT_ID = b"client_001" +CERTIFICATE_DIRECTORY = 'certs' ##################################################### # End defaults and constants # @@ -373,7 +375,7 @@ class ClientTask(Thread): Watch for new camera devices using inotify """ - def __init__(self, _id, context: zmq.Context=None) -> None: + def __init__(self, _id, context: zmq.Context) -> None: """ Parameters ---------- @@ -383,7 +385,7 @@ class ClientTask(Thread): ZMQ context, by default None """ super().__init__(daemon=True) - self.context = context or zmq.Context.instance() + self.context = context self.socket = self.context.socket(zmq.DEALER) self.identity = self.socket.identity = _id # <-- bytes self.poll = zmq.Poller() @@ -1049,7 +1051,14 @@ if __name__ == "__main__": logger.info("Starting up ...") - client = ClientTask(get_serial_number()) + context = zmq.Context() + try: + auth_service(context, cert_dir=CERTIFICATE_DIRECTORY) + except Exception as exc: + logger.error("Error configuring authentication: %r", exc) + exit(1) + + client = ClientTask(get_serial_number(), context) client.start() client.join() @@ -15,6 +15,8 @@ import subprocess import struct import datetime import cv2 +import zmq +import zmq.auth # Known directives for camera configuration: DIRECTIVES = ( @@ -262,3 +264,11 @@ def write_yappi_stats(yappi_instance, logdir=LOGDIR) -> bool: return True return False + + +def auth_service(context, cert_dir): + """Instantiate zmq authenticator.""" + auth = zmq.auth.Authenticator(context) + auth.start() + auth.configure_curve(location=cert_dir) + zmq.auth.load_certificates(cert_dir) @@ -15,7 +15,10 @@ import random from collections import defaultdict, deque import zmq -from helpers import CustomLoggingFormatter +from helpers import ( + CustomLoggingFormatter, + auth_service, +) # TODO: add configparser @@ -38,6 +41,7 @@ if not os.path.exists(LOGDID): BASENAME = f"{os.path.splitext(os.path.basename(__file__))[0]}" LOGFILE = os.path.join(LOGDID, f"{BASENAME}.log") LOGLEVEL = logging.INFO +CERTIFICATE_DIRECTORY = 'certs' log_formatter = CustomLoggingFormatter() handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') @@ -67,6 +71,7 @@ def custom_proxy() -> None: None """ context = zmq.Context.instance() + auth_service(context, CERTIFICATE_DIRECTORY) poller = zmq.Poller() # Set up frontend: diff --git a/webserver.py b/webserver.py index a2b9b7a..4c45692 100644 --- a/webserver.py +++ b/webserver.py @@ -30,6 +30,7 @@ from helpers import ( CustomLoggingFormatter, DIRECTIVES, MAX_CAMERA_NAME_LENGTH, + auth_service, ) @@ -49,6 +50,7 @@ WEB_PORT = "8008" CTRL_BACKEND_ADDR = f"tcp://{HOST}:{ZMQ_PORT}" WEB_BACKEND_ADDR = f"tcp://{HOST}:{WEB_PORT}" TIME_FORMAT = '%Y_%m_%d-%H_%M_%S' +CERTIFICATE_DIR = 'certs' log_formatter = CustomLoggingFormatter() handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') @@ -68,6 +70,9 @@ ws_queues = defaultdict(dict) ctrl_msg_que = asyncio.Queue() # Create ZMQ context and socket: zmq_context = zmq.asyncio.Context() +# Authenticate context: +auth_service(zmq_context, CERTIFICATE_DIR) + zmq_socket = zmq_context.socket(zmq.DEALER) # Connect to ZMQ backend: zmq_socket.connect(WEB_BACKEND_ADDR) @@ -27,6 +27,7 @@ from helpers import ( CustomLoggingFormatter, bytes_to_str, TIME_FORMAT_STRING, + auth_service, ) @@ -57,6 +58,7 @@ TMP_DIR = os.path.join(CWD, "tmp") CLIENTS_JSON_FILE = os.path.join(CWD, 'clients.json') # This should come from the config file: VIDEO_DIR = os.path.join(CWD, "videos") +CERTIFICATE_DIR = 'certs' # Other: @@ -177,8 +179,8 @@ class ServerWorker(Thread): ---------- identity : bytes Worker identity - context : zmq.Context, optional - ZMQ context (default is None, which creates a new context) + context : zmq.Context + ZMQ context Methods ------- @@ -189,7 +191,7 @@ class ServerWorker(Thread): run() Main loop of the worker. """ - def __init__(self, identity, context=None) -> None: + def __init__(self, identity, context) -> None: """ Docstring for __init__ @@ -197,11 +199,11 @@ class ServerWorker(Thread): ---------- identity : bytes Worker identity - context : zmq.Context, optional - ZMQ context (default is None, which creates a new context) + context : zmq.Context + ZMQ context """ super().__init__(daemon=True) - self.context = context or zmq.Context.instance() + self.context = context self.socket = self.context.socket(zmq.DEALER) self.id = self.socket.identity = identity self.monitor = MonitorTask(self.socket) @@ -241,7 +243,7 @@ class ServerWorker(Thread): if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id): q = Queue() logger.debug("Starting new video thread for client %r, camera %r", client_id, camera_id) - video_worker = VideoWorker(client_id, camera_id, filename, q) + video_worker = VideoWorker(client_id, camera_id, filename, q, self.context) video_worker.start() self.video_threads[client_id][camera_id] = video_worker @@ -490,7 +492,7 @@ class VideoWorker(Thread): Stop thread """ - def __init__(self, client_id, camera_id, filename, queue) -> None: + def __init__(self, client_id, camera_id, filename, queue, context) -> None: """ Docstring for __init__ @@ -504,9 +506,11 @@ class VideoWorker(Thread): Filename to write video to queue : Queue Queue to receive video frames + context : zmq.Context + ZMQ context """ super().__init__(daemon=True) - self.context = zmq.Context() + self.context = context self.context.setsockopt(zmq.LINGER, 0) self.client_id = client_id self.camera_id = camera_id @@ -625,7 +629,9 @@ if __name__ == "__main__": signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) - worker = ServerWorker(b"worker-task") + context = zmq.Context() + auth_service(context, CERTIFICATE_DIR) + worker = ServerWorker(b"worker-task", context) worker.start() worker.join() |
