aboutsummaryrefslogtreecommitdiff
path: root/router.py
diff options
context:
space:
mode:
Diffstat (limited to 'router.py')
-rw-r--r--router.py266
1 files changed, 266 insertions, 0 deletions
diff --git a/router.py b/router.py
new file mode 100644
index 0000000..ce733b9
--- /dev/null
+++ b/router.py
@@ -0,0 +1,266 @@
+#!/usr/bin/env python
+
+"""
+A module containing zeromq router for video streaming
+from a client and sending control messages from server.
+"""
+
+__author__ = "Franoosh Corporation"
+
+import sys
+import os
+import signal
+import logging
+import random
+from collections import defaultdict, deque
+import zmq
+
+from helpers import CustomLoggingFormatter
+
+# TODO: add configparser
+
+IP_FRONTEND = '127.0.0.1'
+IP_BACKEND = '127.0.0.1'
+
+PORT_FRONTEND = "5569" # This is a client (producer)
+PORT_BACKEND = "9979"
+
+FRONTEND_ADDR = f"tcp://{IP_FRONTEND}:{PORT_FRONTEND}" # This is a client (producer)
+BACKEND_ADDR = f"tcp://{IP_BACKEND}:{PORT_BACKEND}"
+
+LOGDID = 'logs'
+if not os.path.exists(LOGDID):
+ try:
+ os.makedirs(LOGDID)
+ except Exception as exc:
+ print(f"Could not create log directory: {exc}")
+ sys.exit()
+BASENAME = f"{os.path.splitext(os.path.basename(__file__))[0]}"
+LOGFILE = os.path.join(LOGDID, f"{BASENAME}.log")
+LOGLEVEL = logging.INFO
+
+log_formatter = CustomLoggingFormatter()
+handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
+handler.setFormatter(log_formatter)
+logging.root.addHandler(handler)
+logging.root.setLevel(LOGLEVEL)
+logger = logging.getLogger(__name__)
+logging.basicConfig(
+ filename=LOGFILE,
+ datefmt='%Y-%m-%d %I:%M:%S',
+ level=LOGLEVEL,
+)
+
+
+RUNNING = True
+
+def custom_proxy() -> None:
+ """
+ A custom zmq proxy to route messages between clients and workers.
+
+ Parameters
+ ----------
+ None
+
+ Returns
+ -------
+ None
+ """
+ context = zmq.Context.instance()
+ poller = zmq.Poller()
+
+ # Set up frontend:
+ frontend_socket = context.socket(zmq.ROUTER)
+ frontend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1)
+ frontend_socket.bind(FRONTEND_ADDR)
+ poller.register(frontend_socket, zmq.POLLIN)
+
+ # Set up backend:
+ backend_socket = context.socket(zmq.ROUTER)
+ backend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1)
+ backend_socket.bind(BACKEND_ADDR)
+ poller.register(backend_socket, zmq.POLLIN)
+
+ awailable_workers = []
+ client_worker_dict = {}
+ # There are three pending messages dictionaries:
+ # 1. pending_messages_no_worker[client_id] - messages that are pending for a client
+ # 2. pending_messages_client_worker[client_id][worker_id] - messages that are pending for a client with a specific worker
+ # 3. pending_messages_worker_client[worker_id][client_id] - messages that are pending for a worker
+ # To assure delivey of the entire video we need to store messages from start to end until they are delivered
+ # to the client.
+ # If a worker is not available for certain amount of time, we will reassign it to another worker.
+ # For now, there is only one worker. Remember to implement this later.
+ # Complex case: half delivered video to the worker, worker becomes unavailable and we need to reassign it to another worker.
+
+ pending_messages_no_worker = defaultdict(deque) # Messages that are pending for a client with no worker assigned
+ pending_messages_client_worker = defaultdict(lambda: defaultdict(deque))
+ pending_messages_worker_client = defaultdict(lambda: defaultdict(deque))
+
+ while RUNNING:
+ # Poll both frontend and backend sockets:
+ events = dict(poller.poll(10))
+ # If message from client:
+ if frontend_socket in events:
+ # Receive message from client:
+ msg = frontend_socket.recv_multipart()
+ logger.debug("Received message.")
+ client_id, content = msg[0], msg[1:]
+ # Check if client is already connected to worker:
+ try:
+ # Get worker ID for this client from the client-worker dictionary:
+ worker_id = client_worker_dict[client_id]
+ # Check if there are any pending messages for this worker and send them first:
+ while pending_messages_client_worker[client_id][worker_id]:
+ # In order to take a peek at the size of all messages, perhaps check out:
+ # https://web.archive.org/web/20240804164028/https://code.activestate.com/recipes/546530/
+ logger.debug(
+ "There are '%d' pending messages from client %r for worker %r",
+ len(pending_messages_client_worker[client_id][worker_id]),
+ client_id,
+ worker_id,
+ )
+ pending_msg = pending_messages_client_worker[client_id][worker_id].pop()
+ try:
+ logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id)
+ backend_socket.send_multipart([worker_id, client_id, *pending_msg])
+ except Exception as e:
+ logger.error("Failed to send pending message to worker: %r, error: %r", worker_id, e)
+ # Re-queue the message:
+ pending_messages_client_worker[client_id][worker_id].append(pending_msg)
+ # Break to avoid infinite loop:
+ break
+ # If there are still pending messages for this worker:
+ if pending_messages_client_worker[client_id][worker_id]:
+ # Store message for later delivery:
+ pending_messages_client_worker[client_id][worker_id].appendleft(content)
+ # At last send new message to worker:
+ else:
+ try:
+ logger.debug("No more pending messages for client %r, sending new message to worker %r.", client_id, worker_id)
+ backend_socket.send_multipart([worker_id, client_id, *content])
+ except Exception as e:
+ logger.error("Failed to send message to backend, error: %s", e)
+ # Store message for later delivery:
+ # pending_messages_client_worker[client_id][worker_id].appendleft(msg) <- FIXME: this is the bug, no?
+ pending_messages_client_worker[client_id][worker_id].appendleft(content)
+ # If client is not connected to any worker:
+ except KeyError:
+ logger.info("Received ID from client: %r.", client_id)
+ # Check if there are available workers.
+ if awailable_workers:
+ # Assign random worker to client:
+ worker_id = random.choice(awailable_workers)
+ # Update client-worker dictionary:
+ client_worker_dict[client_id] = worker_id
+ # Check if there are any pending messages for this client:
+ if pending_messages_no_worker[client_id]:
+ # Move messages to pending messages for this worker:
+ pending_messages_client_worker[client_id][worker_id].extendleft(msg)
+ logger.debug("Moved pending messages for client '%r' to worker '%r'", client_id, worker_id)
+ # Check if there are any pending messages for this worker:
+ while pending_messages_client_worker[client_id][worker_id]:
+ pending_msg = pending_messages_client_worker[client_id][worker_id].pop()
+ try:
+ logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id)
+ backend_socket.send_multipart([worker_id, client_id, *pending_msg])
+ except Exception as e:
+ logger.error("Failed to send pending message to worker: %r. Error: %r", worker_id, e)
+ # Re-queue the message:
+ pending_messages_client_worker[client_id][worker_id].append(pending_msg)
+ break
+ # If there are still pending messages for this worker:
+ if pending_messages_client_worker[client_id][worker_id]:
+ logger.debug("There are still pending messages for client %r and worker %r, storing message for later delivery.", client_id, worker_id)
+ # Store message for later delivery:
+ pending_messages_client_worker[client_id][worker_id].appendleft(content)
+ else:
+ logger.debug("No more pending messages for client %r, sending new message to worker %r.", client_id, worker_id)
+ try:
+ logger.debug("No more pending messages for client %r, sending new message to worker %r.", client_id, worker_id)
+ # At last, send the message to worker:
+ backend_socket.send_multipart([worker_id, client_id, *content])
+ except Exception as e:
+ logger.error("Failed to send message for client %r to backend, requeing. Error: %r", client_id, e)
+ # Store message for later delivery:
+ pending_messages_client_worker[client_id][worker_id].appendleft(msg)
+
+ else:
+ # Store message for later delivery:
+ pending_messages_no_worker[client_id].append(content)
+ logger.debug("No available workers, storing client %r with no worker assigned", client_id)
+
+ if backend_socket in events:
+ _msg = backend_socket.recv_multipart()
+ # These messages should be safe to log as they are short strings or bytes.
+ logger.debug("Received from worker: %r", _msg)
+ if len(_msg) == 2: # Worker is sending its identity:
+ worker_id, msg = _msg[0], _msg[1:]
+ logger.debug("Received ID from worker: %r, message: %r", worker_id, msg)
+ logger.info("Received ID from worker: %r.", worker_id)
+ # Add worker to available workers list:
+ awailable_workers.append(worker_id)
+ for client, worker in client_worker_dict.items():
+ # If client has no worker assigned, assign it a new worker:
+ if worker is None:
+ client_worker_dict[client] = worker
+ logger.debug("Assigning worker %r to client %r", worker, client)
+ if pending_messages_no_worker:
+ # If there are pending messages for clients with no worker assigned:
+ for client_id, messages in pending_messages_no_worker.items():
+ if messages:
+ # Move messages to pending messages for this worker:
+ pending_messages_client_worker[client_id][worker_id].extendleft(messages)
+ # Clear pending messages for this client:
+ pending_messages_no_worker[client_id].clear()
+ logger.debug("Moved pending messages for client %r to worker %r", client_id, worker_id)
+ try:
+ # Get the first message for this client and worker:
+ pending_msg = pending_messages_client_worker[client_id][worker_id].pop()
+ logger.debug("Sending pending message to worker %r for client '%r'", worker_id, client_id)
+ # Send the message to worker:
+ backend_socket.send_multipart([worker_id, client_id, *pending_msg])
+ except Exception as e:
+ pending_messages_client_worker[client_id][worker_id].append(pending_msg)
+ logger.error("Failed to send pending message to worker: %r. Error: %r", worker_id, e)
+
+ else: # Worker is sending a message to client:
+ # worker_id, client_id, msg = _msg[0], _msg[1], _msg[2:]
+ worker_id, client_id, *msg = _msg
+ logger.debug("Received message from worker %r for client %r: %r", worker_id, client_id, msg)
+ # First check if there are any pending messages for this worker:
+ while pending_messages_worker_client[worker_id][client_id]:
+ pending_msg = pending_messages_worker_client[worker_id][client_id].pop()
+ try:
+ logger.debug("Sending pending message %r from %r to client %r", pending_msg, client_id, worker_id)
+ backend_socket.send_multipart([client_id, *pending_msg])
+ except Exception as e:
+ # It is safe to log the message content as it is a short string or bytes.
+ logger.error("Could not deliver pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e)
+ # Re-queue the message:
+ pending_messages_worker_client[worker_id][client_id].append(pending_msg)
+ # Break to avoid infinite loop:
+ break
+ # If no pending messages for this client, send new message to the client :
+ if not pending_messages_worker_client[worker_id][client_id]:
+ logger.debug("Sending message %r to client %r", msg, client_id)
+ try:
+ frontend_socket.send_multipart([client_id, *msg])
+ logger.debug("Sent message %r to client %r", msg, client_id)
+ except Exception as e:
+ pending_messages_worker_client[worker_id][client_id].appendleft(msg)
+ logger.error("Could not deliver message: %r from worker: %r to client: %r", msg, worker_id, client_id)
+
+def signal_handler(sig, frame) -> None:
+ global RUNNING
+ logger.info("Received signal handler %r, stopping ...", signal.Signals(sig).name)
+ RUNNING = False
+
+
+if __name__ == "__main__":
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+
+ logger.info("Starting up ...")
+
+ custom_proxy()