diff options
Diffstat (limited to 'router.py')
| -rw-r--r-- | router.py | 207 | 
1 files changed, 207 insertions, 0 deletions
| diff --git a/router.py b/router.py new file mode 100644 index 0000000..61b2f72 --- /dev/null +++ b/router.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python + +""" +A module containing zeromq router for video streaming +from a client and sending control messages from server. +""" + +__author__ = "Franoosh Corporation" + +import os +import signal +import logging +import random +from collections import defaultdict, deque +import zmq + +from helpers import CustomLoggingFormatter + +# TODO: add configparser + +ADDR_FRONTEND = 'tcp://localhost' +ADDR_BACKEND = 'tcp://localhost' + +PORT_FRONTEND = "5569" +PORT_BACKEND = "9979" + +INITIAL_FRONTEND_ADDR = f"{ADDR_FRONTEND}:{PORT_FRONTEND}" +INITIAL_BACKEND_ADDR = f"{ADDR_BACKEND}:{PORT_BACKEND}" + +LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log" +LOGLEVEL = logging.DEBUG + +log_formatter = CustomLoggingFormatter() +handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') +handler.setFormatter(log_formatter) +logging.root.addHandler(handler) +logging.root.setLevel(LOGLEVEL) +logger = logging.getLogger(__name__) +logging.basicConfig( +    filename=LOGFILE, +    datefmt='%Y-%m-%d %I:%M:%S', +    level=LOGLEVEL, +) + + +RUNNING = True + +def custom_proxy(): +    context = zmq.Context.instance() +    poller = zmq.Poller() + +    # Set up frontend: +    frontend_socket = context.socket(zmq.ROUTER) +    frontend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) +    frontend_socket.bind(INITIAL_FRONTEND_ADDR) +    poller.register(frontend_socket, zmq.POLLIN) + +    # Set up backend: +    backend_socket = context.socket(zmq.ROUTER) +    backend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) +    backend_socket.bind(INITIAL_BACKEND_ADDR) +    poller.register(backend_socket, zmq.POLLIN) + +    awailable_workers = [] +    client_worker_dict = {} +    # There are three pending messages dictionaries: +    # 1. pending_messages_no_worker[client_id] - messages that are pending for a client +    # 2. pending_messages_client_worker[client_id][worker_id] - messages that are pending for a client with a specific worker +    # 3. pending_messages_worker_client[worker_id][client_id] - messages that are pending for a worker +    # To assure delivey of the entire video we need to store messages from start to end until they are delivered +    # to the client. +    # If a worker is not available for certain amount of time, we will reassign it to another worker. +    # For now, there is only one worker. Remember to implement this later. +    # Complex case: half delivered video to the worker, worker becomes unavailable and we need to reassign it to another worker. + +    pending_messages_no_worker = defaultdict(deque)  # Messages that are pending for a client with no worker assigned +    pending_messages_client_worker = defaultdict(lambda: defaultdict(deque)) +    pending_messages_worker_client = defaultdict(lambda: defaultdict(deque)) + +    while RUNNING: +        events = dict(poller.poll(1000))  # Poll both frontend and backend sockets: +        if frontend_socket in events:  # If message from client: +            msg = frontend_socket.recv_multipart()  # Receive message from client: +            logger.debug("Received message: %r", msg) +            client_id, content = msg[0], msg[1:] +            if not client_id in client_worker_dict:  # If client is not in client-worker dictionary: +                logger.info("Received ID from client: %r.", client_id) +            # Check if client is already connected to worker: +            try: +                worker_id = client_worker_dict[client_id]  # Get worker ID for this client from the client-worker dictionary +                while pending_messages_client_worker[client_id][worker_id]:  # Check if there are any pending messages for this worker and send them first: +                    logger.debug("There are pending messages from client %r for worker %r", client_id, worker_id) +                    pending_msg = pending_messages_client_worker[client_id][worker_id].pop() +                    try: +                        logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id) +                        backend_socket.send_multipart([worker_id, client_id, *pending_msg]) +                    except Exception as e: +                        logger.error("Failed to send pending message: %r to worker: %r, error: %r", pending_msg, worker_id, e) +                        pending_messages_client_worker[client_id][worker_id].append(pending_msg)  # Re-queue the message +                        break # Break to avoid infinite loop +                if pending_messages_client_worker[client_id][worker_id]:  # If there are still pending messages for this worker: +                    pending_messages_client_worker[client_id][worker_id].appendleft(content)  # Store message for later delivery +                else: +                    try: +                        logger.debug("No more pending messages for client %r, sending new message to worker %r: %r", client_id, worker_id, msg) +                        backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker: +                    except Exception as e: +                        logger.error("Failed to send message to backend: %r, error: %s", msg, e) +                        pending_messages_client_worker[client_id][worker_id].appendleft(msg)  # Store message for later delivery +            except KeyError:  # If client is not connected to any worker: +                logger.debug("Client '%s' is not connected to any worker, checking available workers", client_id) +                if awailable_workers:  # If there are available workers: +                    worker_id = random.choice(awailable_workers)  # Assign random worker to client +                    client_worker_dict[client_id] = worker_id # Update client-worker dictionary +                    if pending_messages_no_worker[client_id]:  # Check if there are any pending messages for this client: +                        pending_messages_client_worker[client_id][worker_id].extendleft(msg)  # Move messages to pending messages for this worker +                        logger.debug("Moved pending messages for client '%s' to worker '%s'", client_id, worker_id) +                    while pending_messages_client_worker[client_id][worker_id]:  # Check if there are any pending messages for this worker: +                        pending_msg = pending_messages_client_worker[client_id][worker_id].pop() +                        try: +                            logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id) +                            backend_socket.send_multipart([worker_id, client_id, *pending_msg]) +                        except Exception as e: +                            logger.error("Failed to send pending message: %r to worker: %r. Error: %s", pending_msg, worker_id, e) +                            pending_messages_client_worker[client_id][worker_id].append(pending_msg)  # Re-queue the message +                            break +                    if pending_messages_client_worker[client_id][worker_id]:  # If there are still pending messages for this worker: +                        logger.debug("There are still pending messages for client '%s' and worker '%s', storing message for later delivery: %r", client_id, worker_id, content) +                        pending_messages_client_worker[client_id][worker_id].appendleft(content)  # Store message for later delivery +                    else: +                        logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg) +                        try: +                            logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg) +                            backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker: +                        except Exception as e: +                            logger.error("Failed to send message %r for client %r to backend, requeing. Error: %s", msg, client_id, e) +                            pending_messages_client_worker[client_id][worker_id].appendleft(msg)  # Store message for later delivery + +                else:  # If no workers are available, assign a new worker:   +                    pending_messages_no_worker[client_id].append(content)  # Store message for later delivery +                    logger.debug("No available workers, storing client '%s' with no worker assigned", client_id) + +        if backend_socket in events: +            _msg = backend_socket.recv_multipart() +            logger.debug("Received from worker: %r", _msg) +            if len(_msg) == 2: # Worker is sending its identity: +                worker_id, msg = _msg[0], _msg[1:] +                logger.debug("Received ID from worker: %r, message: %r", worker_id, msg) +                logger.info("Received ID from worker: %r.", worker_id) +                awailable_workers.append(worker_id)  # Add worker to available workers list +                for client, worker in client_worker_dict.items(): +                    if worker is None:  # If client has no worker assigned: +                        client_worker_dict[client] = worker +                        logger.debug("Assigning worker %r to client %r", worker, client) +                if pending_messages_no_worker: +                    # If there are pending messages for clients with no worker assigned: +                    for client_id, messages in pending_messages_no_worker.items(): +                        if messages: +                            pending_messages_client_worker[client_id][worker_id].extendleft(messages)  # Move messages to pending messages for this worker +                            pending_messages_no_worker[client_id].clear()  # Clear pending messages for this client +                            logger.debug("Moved pending messages for client %r to worker %r", client_id, worker_id) +                            try: +                                pending_msg = pending_messages_client_worker[client_id][worker_id].pop()  # Get the first message for this client and worker +                                logger.debug( +                                    "Sending pending message %r to worker %r for client '%r'", pending_msg, worker_id, client_id) +                                backend_socket.send_multipart([worker_id, client_id, *pending_msg])  # Send the message to worker +                            except Exception as e: +                                pending_messages_client_worker[client_id][worker_id].append(pending_msg) +                                logger.error("Failed to send pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e) + +            else:  # Worker is sending a message to client: +                worker_id, client_id, msg = _msg[0], _msg[1], _msg[2:] +                logger.debug("Received message from worker '%s' for client '%s': %r", worker_id, client_id, msg) +                # First check if there are any pending messages for this worker: +                while pending_messages_worker_client[worker_id][client_id]:  # Check if there are any pending messages for this worker: +                    pending_msg = pending_messages_worker_client[worker_id][client_id].pop() +                    try: +                        logger.debug("Sending pending message %r from %r to client '%r'", pending_msg, client_id, worker_id) +                        backend_socket.send_multipart([client_id, *pending_msg]) +                    except Exception as e: +                        # It is safe to log the message content as it is a short string or bytes. +                        logger.error("Could not deliver pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e) +                        pending_messages_worker_client[worker_id][client_id].append(pending_msg)  # Re-queue the message   +                        break  # Break to avoid infinite loop +                # Now send the message to the client if no pending messages for this client: +                if not pending_messages_worker_client[worker_id][client_id]:  # If there are no pending messages for this worker: +                    logger.debug("Sending message %r to client '%s'", msg, client_id) +                    try: +                        frontend_socket.send_multipart([client_id, *msg]) +                        logger.debug("Sent message %r to client '%r'", msg, client_id) +                    except Exception as e: +                        pending_messages_worker_client[worker_id][client_id].appendleft(msg) +                        logger.error("Could not deliver message: %r from worker: %r to client: '%r'", msg, worker_id, client_id) + +def signal_handler(sig, frame): +    global RUNNING +    logger.info("Received signal handler %r, stopping ...", signal.Signals(sig).name) +    RUNNING = False + + +if __name__ == "__main__": +    signal.signal(signal.SIGINT, signal_handler) +    signal.signal(signal.SIGTERM, signal_handler) + +    logger.info("Starting up ...") + +    custom_proxy() | 
