diff options
Diffstat (limited to 'router.py')
| -rw-r--r-- | router.py | 266 |
1 files changed, 266 insertions, 0 deletions
diff --git a/router.py b/router.py new file mode 100644 index 0000000..ce733b9 --- /dev/null +++ b/router.py @@ -0,0 +1,266 @@ +#!/usr/bin/env python + +""" +A module containing zeromq router for video streaming +from a client and sending control messages from server. +""" + +__author__ = "Franoosh Corporation" + +import sys +import os +import signal +import logging +import random +from collections import defaultdict, deque +import zmq + +from helpers import CustomLoggingFormatter + +# TODO: add configparser + +IP_FRONTEND = '127.0.0.1' +IP_BACKEND = '127.0.0.1' + +PORT_FRONTEND = "5569" # This is a client (producer) +PORT_BACKEND = "9979" + +FRONTEND_ADDR = f"tcp://{IP_FRONTEND}:{PORT_FRONTEND}" # This is a client (producer) +BACKEND_ADDR = f"tcp://{IP_BACKEND}:{PORT_BACKEND}" + +LOGDID = 'logs' +if not os.path.exists(LOGDID): + try: + os.makedirs(LOGDID) + except Exception as exc: + print(f"Could not create log directory: {exc}") + sys.exit() +BASENAME = f"{os.path.splitext(os.path.basename(__file__))[0]}" +LOGFILE = os.path.join(LOGDID, f"{BASENAME}.log") +LOGLEVEL = logging.INFO + +log_formatter = CustomLoggingFormatter() +handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') +handler.setFormatter(log_formatter) +logging.root.addHandler(handler) +logging.root.setLevel(LOGLEVEL) +logger = logging.getLogger(__name__) +logging.basicConfig( + filename=LOGFILE, + datefmt='%Y-%m-%d %I:%M:%S', + level=LOGLEVEL, +) + + +RUNNING = True + +def custom_proxy() -> None: + """ + A custom zmq proxy to route messages between clients and workers. + + Parameters + ---------- + None + + Returns + ------- + None + """ + context = zmq.Context.instance() + poller = zmq.Poller() + + # Set up frontend: + frontend_socket = context.socket(zmq.ROUTER) + frontend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) + frontend_socket.bind(FRONTEND_ADDR) + poller.register(frontend_socket, zmq.POLLIN) + + # Set up backend: + backend_socket = context.socket(zmq.ROUTER) + backend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) + backend_socket.bind(BACKEND_ADDR) + poller.register(backend_socket, zmq.POLLIN) + + awailable_workers = [] + client_worker_dict = {} + # There are three pending messages dictionaries: + # 1. pending_messages_no_worker[client_id] - messages that are pending for a client + # 2. pending_messages_client_worker[client_id][worker_id] - messages that are pending for a client with a specific worker + # 3. pending_messages_worker_client[worker_id][client_id] - messages that are pending for a worker + # To assure delivey of the entire video we need to store messages from start to end until they are delivered + # to the client. + # If a worker is not available for certain amount of time, we will reassign it to another worker. + # For now, there is only one worker. Remember to implement this later. + # Complex case: half delivered video to the worker, worker becomes unavailable and we need to reassign it to another worker. + + pending_messages_no_worker = defaultdict(deque) # Messages that are pending for a client with no worker assigned + pending_messages_client_worker = defaultdict(lambda: defaultdict(deque)) + pending_messages_worker_client = defaultdict(lambda: defaultdict(deque)) + + while RUNNING: + # Poll both frontend and backend sockets: + events = dict(poller.poll(10)) + # If message from client: + if frontend_socket in events: + # Receive message from client: + msg = frontend_socket.recv_multipart() + logger.debug("Received message.") + client_id, content = msg[0], msg[1:] + # Check if client is already connected to worker: + try: + # Get worker ID for this client from the client-worker dictionary: + worker_id = client_worker_dict[client_id] + # Check if there are any pending messages for this worker and send them first: + while pending_messages_client_worker[client_id][worker_id]: + # In order to take a peek at the size of all messages, perhaps check out: + # https://web.archive.org/web/20240804164028/https://code.activestate.com/recipes/546530/ + logger.debug( + "There are '%d' pending messages from client %r for worker %r", + len(pending_messages_client_worker[client_id][worker_id]), + client_id, + worker_id, + ) + pending_msg = pending_messages_client_worker[client_id][worker_id].pop() + try: + logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id) + backend_socket.send_multipart([worker_id, client_id, *pending_msg]) + except Exception as e: + logger.error("Failed to send pending message to worker: %r, error: %r", worker_id, e) + # Re-queue the message: + pending_messages_client_worker[client_id][worker_id].append(pending_msg) + # Break to avoid infinite loop: + break + # If there are still pending messages for this worker: + if pending_messages_client_worker[client_id][worker_id]: + # Store message for later delivery: + pending_messages_client_worker[client_id][worker_id].appendleft(content) + # At last send new message to worker: + else: + try: + logger.debug("No more pending messages for client %r, sending new message to worker %r.", client_id, worker_id) + backend_socket.send_multipart([worker_id, client_id, *content]) + except Exception as e: + logger.error("Failed to send message to backend, error: %s", e) + # Store message for later delivery: + # pending_messages_client_worker[client_id][worker_id].appendleft(msg) <- FIXME: this is the bug, no? + pending_messages_client_worker[client_id][worker_id].appendleft(content) + # If client is not connected to any worker: + except KeyError: + logger.info("Received ID from client: %r.", client_id) + # Check if there are available workers. + if awailable_workers: + # Assign random worker to client: + worker_id = random.choice(awailable_workers) + # Update client-worker dictionary: + client_worker_dict[client_id] = worker_id + # Check if there are any pending messages for this client: + if pending_messages_no_worker[client_id]: + # Move messages to pending messages for this worker: + pending_messages_client_worker[client_id][worker_id].extendleft(msg) + logger.debug("Moved pending messages for client '%r' to worker '%r'", client_id, worker_id) + # Check if there are any pending messages for this worker: + while pending_messages_client_worker[client_id][worker_id]: + pending_msg = pending_messages_client_worker[client_id][worker_id].pop() + try: + logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id) + backend_socket.send_multipart([worker_id, client_id, *pending_msg]) + except Exception as e: + logger.error("Failed to send pending message to worker: %r. Error: %r", worker_id, e) + # Re-queue the message: + pending_messages_client_worker[client_id][worker_id].append(pending_msg) + break + # If there are still pending messages for this worker: + if pending_messages_client_worker[client_id][worker_id]: + logger.debug("There are still pending messages for client %r and worker %r, storing message for later delivery.", client_id, worker_id) + # Store message for later delivery: + pending_messages_client_worker[client_id][worker_id].appendleft(content) + else: + logger.debug("No more pending messages for client %r, sending new message to worker %r.", client_id, worker_id) + try: + logger.debug("No more pending messages for client %r, sending new message to worker %r.", client_id, worker_id) + # At last, send the message to worker: + backend_socket.send_multipart([worker_id, client_id, *content]) + except Exception as e: + logger.error("Failed to send message for client %r to backend, requeing. Error: %r", client_id, e) + # Store message for later delivery: + pending_messages_client_worker[client_id][worker_id].appendleft(msg) + + else: + # Store message for later delivery: + pending_messages_no_worker[client_id].append(content) + logger.debug("No available workers, storing client %r with no worker assigned", client_id) + + if backend_socket in events: + _msg = backend_socket.recv_multipart() + # These messages should be safe to log as they are short strings or bytes. + logger.debug("Received from worker: %r", _msg) + if len(_msg) == 2: # Worker is sending its identity: + worker_id, msg = _msg[0], _msg[1:] + logger.debug("Received ID from worker: %r, message: %r", worker_id, msg) + logger.info("Received ID from worker: %r.", worker_id) + # Add worker to available workers list: + awailable_workers.append(worker_id) + for client, worker in client_worker_dict.items(): + # If client has no worker assigned, assign it a new worker: + if worker is None: + client_worker_dict[client] = worker + logger.debug("Assigning worker %r to client %r", worker, client) + if pending_messages_no_worker: + # If there are pending messages for clients with no worker assigned: + for client_id, messages in pending_messages_no_worker.items(): + if messages: + # Move messages to pending messages for this worker: + pending_messages_client_worker[client_id][worker_id].extendleft(messages) + # Clear pending messages for this client: + pending_messages_no_worker[client_id].clear() + logger.debug("Moved pending messages for client %r to worker %r", client_id, worker_id) + try: + # Get the first message for this client and worker: + pending_msg = pending_messages_client_worker[client_id][worker_id].pop() + logger.debug("Sending pending message to worker %r for client '%r'", worker_id, client_id) + # Send the message to worker: + backend_socket.send_multipart([worker_id, client_id, *pending_msg]) + except Exception as e: + pending_messages_client_worker[client_id][worker_id].append(pending_msg) + logger.error("Failed to send pending message to worker: %r. Error: %r", worker_id, e) + + else: # Worker is sending a message to client: + # worker_id, client_id, msg = _msg[0], _msg[1], _msg[2:] + worker_id, client_id, *msg = _msg + logger.debug("Received message from worker %r for client %r: %r", worker_id, client_id, msg) + # First check if there are any pending messages for this worker: + while pending_messages_worker_client[worker_id][client_id]: + pending_msg = pending_messages_worker_client[worker_id][client_id].pop() + try: + logger.debug("Sending pending message %r from %r to client %r", pending_msg, client_id, worker_id) + backend_socket.send_multipart([client_id, *pending_msg]) + except Exception as e: + # It is safe to log the message content as it is a short string or bytes. + logger.error("Could not deliver pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e) + # Re-queue the message: + pending_messages_worker_client[worker_id][client_id].append(pending_msg) + # Break to avoid infinite loop: + break + # If no pending messages for this client, send new message to the client : + if not pending_messages_worker_client[worker_id][client_id]: + logger.debug("Sending message %r to client %r", msg, client_id) + try: + frontend_socket.send_multipart([client_id, *msg]) + logger.debug("Sent message %r to client %r", msg, client_id) + except Exception as e: + pending_messages_worker_client[worker_id][client_id].appendleft(msg) + logger.error("Could not deliver message: %r from worker: %r to client: %r", msg, worker_id, client_id) + +def signal_handler(sig, frame) -> None: + global RUNNING + logger.info("Received signal handler %r, stopping ...", signal.Signals(sig).name) + RUNNING = False + + +if __name__ == "__main__": + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + logger.info("Starting up ...") + + custom_proxy() |
