aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client.py15
-rw-r--r--helpers.py10
-rw-r--r--router.py7
-rw-r--r--webserver.py5
-rw-r--r--worker.py26
5 files changed, 49 insertions, 14 deletions
diff --git a/client.py b/client.py
index 2c1b372..4b1c6a9 100644
--- a/client.py
+++ b/client.py
@@ -34,6 +34,7 @@ from helpers import (
timestamp_to_bytes,
bytes_to_timestamp,
write_yappi_stats,
+ auth_service,
)
###################################################
@@ -69,6 +70,7 @@ FPS = 30
# Maybe serial number from /proc/cpuinfo.
# In bytes.
CLIENT_ID = b"client_001"
+CERTIFICATE_DIRECTORY = 'certs'
#####################################################
# End defaults and constants #
@@ -373,7 +375,7 @@ class ClientTask(Thread):
Watch for new camera devices using inotify
"""
- def __init__(self, _id, context: zmq.Context=None) -> None:
+ def __init__(self, _id, context: zmq.Context) -> None:
"""
Parameters
----------
@@ -383,7 +385,7 @@ class ClientTask(Thread):
ZMQ context, by default None
"""
super().__init__(daemon=True)
- self.context = context or zmq.Context.instance()
+ self.context = context
self.socket = self.context.socket(zmq.DEALER)
self.identity = self.socket.identity = _id # <-- bytes
self.poll = zmq.Poller()
@@ -1049,7 +1051,14 @@ if __name__ == "__main__":
logger.info("Starting up ...")
- client = ClientTask(get_serial_number())
+ context = zmq.Context()
+ try:
+ auth_service(context, cert_dir=CERTIFICATE_DIRECTORY)
+ except Exception as exc:
+ logger.error("Error configuring authentication: %r", exc)
+ exit(1)
+
+ client = ClientTask(get_serial_number(), context)
client.start()
client.join()
diff --git a/helpers.py b/helpers.py
index 449b275..8a47415 100644
--- a/helpers.py
+++ b/helpers.py
@@ -15,6 +15,8 @@ import subprocess
import struct
import datetime
import cv2
+import zmq
+import zmq.auth
# Known directives for camera configuration:
DIRECTIVES = (
@@ -262,3 +264,11 @@ def write_yappi_stats(yappi_instance, logdir=LOGDIR) -> bool:
return True
return False
+
+
+def auth_service(context, cert_dir):
+ """Instantiate zmq authenticator."""
+ auth = zmq.auth.Authenticator(context)
+ auth.start()
+ auth.configure_curve(location=cert_dir)
+ zmq.auth.load_certificates(cert_dir)
diff --git a/router.py b/router.py
index ce733b9..3c92826 100644
--- a/router.py
+++ b/router.py
@@ -15,7 +15,10 @@ import random
from collections import defaultdict, deque
import zmq
-from helpers import CustomLoggingFormatter
+from helpers import (
+ CustomLoggingFormatter,
+ auth_service,
+)
# TODO: add configparser
@@ -38,6 +41,7 @@ if not os.path.exists(LOGDID):
BASENAME = f"{os.path.splitext(os.path.basename(__file__))[0]}"
LOGFILE = os.path.join(LOGDID, f"{BASENAME}.log")
LOGLEVEL = logging.INFO
+CERTIFICATE_DIRECTORY = 'certs'
log_formatter = CustomLoggingFormatter()
handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
@@ -67,6 +71,7 @@ def custom_proxy() -> None:
None
"""
context = zmq.Context.instance()
+ auth_service(context, CERTIFICATE_DIRECTORY)
poller = zmq.Poller()
# Set up frontend:
diff --git a/webserver.py b/webserver.py
index a2b9b7a..4c45692 100644
--- a/webserver.py
+++ b/webserver.py
@@ -30,6 +30,7 @@ from helpers import (
CustomLoggingFormatter,
DIRECTIVES,
MAX_CAMERA_NAME_LENGTH,
+ auth_service,
)
@@ -49,6 +50,7 @@ WEB_PORT = "8008"
CTRL_BACKEND_ADDR = f"tcp://{HOST}:{ZMQ_PORT}"
WEB_BACKEND_ADDR = f"tcp://{HOST}:{WEB_PORT}"
TIME_FORMAT = '%Y_%m_%d-%H_%M_%S'
+CERTIFICATE_DIR = 'certs'
log_formatter = CustomLoggingFormatter()
handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
@@ -68,6 +70,9 @@ ws_queues = defaultdict(dict)
ctrl_msg_que = asyncio.Queue()
# Create ZMQ context and socket:
zmq_context = zmq.asyncio.Context()
+# Authenticate context:
+auth_service(zmq_context, CERTIFICATE_DIR)
+
zmq_socket = zmq_context.socket(zmq.DEALER)
# Connect to ZMQ backend:
zmq_socket.connect(WEB_BACKEND_ADDR)
diff --git a/worker.py b/worker.py
index cd47345..c3dc9d6 100644
--- a/worker.py
+++ b/worker.py
@@ -27,6 +27,7 @@ from helpers import (
CustomLoggingFormatter,
bytes_to_str,
TIME_FORMAT_STRING,
+ auth_service,
)
@@ -57,6 +58,7 @@ TMP_DIR = os.path.join(CWD, "tmp")
CLIENTS_JSON_FILE = os.path.join(CWD, 'clients.json')
# This should come from the config file:
VIDEO_DIR = os.path.join(CWD, "videos")
+CERTIFICATE_DIR = 'certs'
# Other:
@@ -177,8 +179,8 @@ class ServerWorker(Thread):
----------
identity : bytes
Worker identity
- context : zmq.Context, optional
- ZMQ context (default is None, which creates a new context)
+ context : zmq.Context
+ ZMQ context
Methods
-------
@@ -189,7 +191,7 @@ class ServerWorker(Thread):
run()
Main loop of the worker.
"""
- def __init__(self, identity, context=None) -> None:
+ def __init__(self, identity, context) -> None:
"""
Docstring for __init__
@@ -197,11 +199,11 @@ class ServerWorker(Thread):
----------
identity : bytes
Worker identity
- context : zmq.Context, optional
- ZMQ context (default is None, which creates a new context)
+ context : zmq.Context
+ ZMQ context
"""
super().__init__(daemon=True)
- self.context = context or zmq.Context.instance()
+ self.context = context
self.socket = self.context.socket(zmq.DEALER)
self.id = self.socket.identity = identity
self.monitor = MonitorTask(self.socket)
@@ -241,7 +243,7 @@ class ServerWorker(Thread):
if client_id not in self.video_threads or not self.video_threads[client_id].get(camera_id):
q = Queue()
logger.debug("Starting new video thread for client %r, camera %r", client_id, camera_id)
- video_worker = VideoWorker(client_id, camera_id, filename, q)
+ video_worker = VideoWorker(client_id, camera_id, filename, q, self.context)
video_worker.start()
self.video_threads[client_id][camera_id] = video_worker
@@ -490,7 +492,7 @@ class VideoWorker(Thread):
Stop thread
"""
- def __init__(self, client_id, camera_id, filename, queue) -> None:
+ def __init__(self, client_id, camera_id, filename, queue, context) -> None:
"""
Docstring for __init__
@@ -504,9 +506,11 @@ class VideoWorker(Thread):
Filename to write video to
queue : Queue
Queue to receive video frames
+ context : zmq.Context
+ ZMQ context
"""
super().__init__(daemon=True)
- self.context = zmq.Context()
+ self.context = context
self.context.setsockopt(zmq.LINGER, 0)
self.client_id = client_id
self.camera_id = camera_id
@@ -625,7 +629,9 @@ if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
- worker = ServerWorker(b"worker-task")
+ context = zmq.Context()
+ auth_service(context, CERTIFICATE_DIR)
+ worker = ServerWorker(b"worker-task", context)
worker.start()
worker.join()