#!/usr/bin/env python """ A module containing zeromq router for video streaming from a client and sending control messages from server. """ __author__ = "Franoosh Corporation" import os import signal import logging import random from collections import defaultdict, deque import zmq from helpers import CustomLoggingFormatter # TODO: add configparser ADDR_FRONTEND = 'tcp://localhost' ADDR_BACKEND = 'tcp://localhost' PORT_FRONTEND = "5569" PORT_BACKEND = "9979" INITIAL_FRONTEND_ADDR = f"{ADDR_FRONTEND}:{PORT_FRONTEND}" INITIAL_BACKEND_ADDR = f"{ADDR_BACKEND}:{PORT_BACKEND}" LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log" LOGLEVEL = logging.DEBUG log_formatter = CustomLoggingFormatter() handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') handler.setFormatter(log_formatter) logging.root.addHandler(handler) logging.root.setLevel(LOGLEVEL) logger = logging.getLogger(__name__) logging.basicConfig( filename=LOGFILE, datefmt='%Y-%m-%d %I:%M:%S', level=LOGLEVEL, ) RUNNING = True def custom_proxy(): context = zmq.Context.instance() poller = zmq.Poller() # Set up frontend: frontend_socket = context.socket(zmq.ROUTER) frontend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) frontend_socket.bind(INITIAL_FRONTEND_ADDR) poller.register(frontend_socket, zmq.POLLIN) # Set up backend: backend_socket = context.socket(zmq.ROUTER) backend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) backend_socket.bind(INITIAL_BACKEND_ADDR) poller.register(backend_socket, zmq.POLLIN) awailable_workers = [] client_worker_dict = {} # There are three pending messages dictionaries: # 1. pending_messages_no_worker[client_id] - messages that are pending for a client # 2. pending_messages_client_worker[client_id][worker_id] - messages that are pending for a client with a specific worker # 3. pending_messages_worker_client[worker_id][client_id] - messages that are pending for a worker # To assure delivey of the entire video we need to store messages from start to end until they are delivered # to the client. # If a worker is not available for certain amount of time, we will reassign it to another worker. # For now, there is only one worker. Remember to implement this later. # Complex case: half delivered video to the worker, worker becomes unavailable and we need to reassign it to another worker. pending_messages_no_worker = defaultdict(deque) # Messages that are pending for a client with no worker assigned pending_messages_client_worker = defaultdict(lambda: defaultdict(deque)) pending_messages_worker_client = defaultdict(lambda: defaultdict(deque)) while RUNNING: events = dict(poller.poll(1000)) # Poll both frontend and backend sockets: if frontend_socket in events: # If message from client: msg = frontend_socket.recv_multipart() # Receive message from client: logger.debug("Received message: %r", msg) client_id, content = msg[0], msg[1:] if not client_id in client_worker_dict: # If client is not in client-worker dictionary: logger.info("Received ID from client: %r.", client_id) # Check if client is already connected to worker: try: worker_id = client_worker_dict[client_id] # Get worker ID for this client from the client-worker dictionary while pending_messages_client_worker[client_id][worker_id]: # Check if there are any pending messages for this worker and send them first: logger.debug("There are pending messages from client %r for worker %r", client_id, worker_id) pending_msg = pending_messages_client_worker[client_id][worker_id].pop() try: logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id) backend_socket.send_multipart([worker_id, client_id, *pending_msg]) except Exception as e: logger.error("Failed to send pending message: %r to worker: %r, error: %r", pending_msg, worker_id, e) pending_messages_client_worker[client_id][worker_id].append(pending_msg) # Re-queue the message break # Break to avoid infinite loop if pending_messages_client_worker[client_id][worker_id]: # If there are still pending messages for this worker: pending_messages_client_worker[client_id][worker_id].appendleft(content) # Store message for later delivery else: try: logger.debug("No more pending messages for client %r, sending new message to worker %r: %r", client_id, worker_id, msg) backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker: except Exception as e: logger.error("Failed to send message to backend: %r, error: %s", msg, e) pending_messages_client_worker[client_id][worker_id].appendleft(msg) # Store message for later delivery except KeyError: # If client is not connected to any worker: logger.debug("Client '%s' is not connected to any worker, checking available workers", client_id) if awailable_workers: # If there are available workers: worker_id = random.choice(awailable_workers) # Assign random worker to client client_worker_dict[client_id] = worker_id # Update client-worker dictionary if pending_messages_no_worker[client_id]: # Check if there are any pending messages for this client: pending_messages_client_worker[client_id][worker_id].extendleft(msg) # Move messages to pending messages for this worker logger.debug("Moved pending messages for client '%s' to worker '%s'", client_id, worker_id) while pending_messages_client_worker[client_id][worker_id]: # Check if there are any pending messages for this worker: pending_msg = pending_messages_client_worker[client_id][worker_id].pop() try: logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id) backend_socket.send_multipart([worker_id, client_id, *pending_msg]) except Exception as e: logger.error("Failed to send pending message: %r to worker: %r. Error: %s", pending_msg, worker_id, e) pending_messages_client_worker[client_id][worker_id].append(pending_msg) # Re-queue the message break if pending_messages_client_worker[client_id][worker_id]: # If there are still pending messages for this worker: logger.debug("There are still pending messages for client '%s' and worker '%s', storing message for later delivery: %r", client_id, worker_id, content) pending_messages_client_worker[client_id][worker_id].appendleft(content) # Store message for later delivery else: logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg) try: logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg) backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker: except Exception as e: logger.error("Failed to send message %r for client %r to backend, requeing. Error: %s", msg, client_id, e) pending_messages_client_worker[client_id][worker_id].appendleft(msg) # Store message for later delivery else: # If no workers are available, assign a new worker: pending_messages_no_worker[client_id].append(content) # Store message for later delivery logger.debug("No available workers, storing client '%s' with no worker assigned", client_id) if backend_socket in events: _msg = backend_socket.recv_multipart() logger.debug("Received from worker: %r", _msg) if len(_msg) == 2: # Worker is sending its identity: worker_id, msg = _msg[0], _msg[1:] logger.debug("Received ID from worker: %r, message: %r", worker_id, msg) logger.info("Received ID from worker: %r.", worker_id) awailable_workers.append(worker_id) # Add worker to available workers list for client, worker in client_worker_dict.items(): if worker is None: # If client has no worker assigned: client_worker_dict[client] = worker logger.debug("Assigning worker %r to client %r", worker, client) if pending_messages_no_worker: # If there are pending messages for clients with no worker assigned: for client_id, messages in pending_messages_no_worker.items(): if messages: pending_messages_client_worker[client_id][worker_id].extendleft(messages) # Move messages to pending messages for this worker pending_messages_no_worker[client_id].clear() # Clear pending messages for this client logger.debug("Moved pending messages for client %r to worker %r", client_id, worker_id) try: pending_msg = pending_messages_client_worker[client_id][worker_id].pop() # Get the first message for this client and worker logger.debug( "Sending pending message %r to worker %r for client '%r'", pending_msg, worker_id, client_id) backend_socket.send_multipart([worker_id, client_id, *pending_msg]) # Send the message to worker except Exception as e: pending_messages_client_worker[client_id][worker_id].append(pending_msg) logger.error("Failed to send pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e) else: # Worker is sending a message to client: worker_id, client_id, msg = _msg[0], _msg[1], _msg[2:] logger.debug("Received message from worker '%s' for client '%s': %r", worker_id, client_id, msg) # First check if there are any pending messages for this worker: while pending_messages_worker_client[worker_id][client_id]: # Check if there are any pending messages for this worker: pending_msg = pending_messages_worker_client[worker_id][client_id].pop() try: logger.debug("Sending pending message %r from %r to client '%r'", pending_msg, client_id, worker_id) backend_socket.send_multipart([client_id, *pending_msg]) except Exception as e: # It is safe to log the message content as it is a short string or bytes. logger.error("Could not deliver pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e) pending_messages_worker_client[worker_id][client_id].append(pending_msg) # Re-queue the message break # Break to avoid infinite loop # Now send the message to the client if no pending messages for this client: if not pending_messages_worker_client[worker_id][client_id]: # If there are no pending messages for this worker: logger.debug("Sending message %r to client '%s'", msg, client_id) try: frontend_socket.send_multipart([client_id, *msg]) logger.debug("Sent message %r to client '%r'", msg, client_id) except Exception as e: pending_messages_worker_client[worker_id][client_id].appendleft(msg) logger.error("Could not deliver message: %r from worker: %r to client: '%r'", msg, worker_id, client_id) def signal_handler(sig, frame): global RUNNING logger.info("Received signal handler %r, stopping ...", signal.Signals(sig).name) RUNNING = False if __name__ == "__main__": signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) logger.info("Starting up ...") custom_proxy()