diff options
author | Franoosh <uinarf@autistici.org> | 2025-07-25 17:13:38 +0200 |
---|---|---|
committer | Franoosh <uinarf@autistici.org> | 2025-07-25 17:13:38 +0200 |
commit | 5bf209098ff501a67e380a025dd44985453ad63c (patch) | |
tree | 933b276a72ef308b2868f9d8a1f36bdcdeeaefb4 /router.py | |
download | ZeroMQ_Video_Streaming-5bf209098ff501a67e380a025dd44985453ad63c.tar.gz ZeroMQ_Video_Streaming-5bf209098ff501a67e380a025dd44985453ad63c.tar.bz2 ZeroMQ_Video_Streaming-5bf209098ff501a67e380a025dd44985453ad63c.zip |
Initial commit. Proof of concept message passing between client <-> router <-> worker with rudimentary cachingHEADmaster
Diffstat (limited to 'router.py')
-rw-r--r-- | router.py | 207 |
1 files changed, 207 insertions, 0 deletions
diff --git a/router.py b/router.py new file mode 100644 index 0000000..61b2f72 --- /dev/null +++ b/router.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python + +""" +A module containing zeromq router for video streaming +from a client and sending control messages from server. +""" + +__author__ = "Franoosh Corporation" + +import os +import signal +import logging +import random +from collections import defaultdict, deque +import zmq + +from helpers import CustomLoggingFormatter + +# TODO: add configparser + +ADDR_FRONTEND = 'tcp://localhost' +ADDR_BACKEND = 'tcp://localhost' + +PORT_FRONTEND = "5569" +PORT_BACKEND = "9979" + +INITIAL_FRONTEND_ADDR = f"{ADDR_FRONTEND}:{PORT_FRONTEND}" +INITIAL_BACKEND_ADDR = f"{ADDR_BACKEND}:{PORT_BACKEND}" + +LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log" +LOGLEVEL = logging.DEBUG + +log_formatter = CustomLoggingFormatter() +handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') +handler.setFormatter(log_formatter) +logging.root.addHandler(handler) +logging.root.setLevel(LOGLEVEL) +logger = logging.getLogger(__name__) +logging.basicConfig( + filename=LOGFILE, + datefmt='%Y-%m-%d %I:%M:%S', + level=LOGLEVEL, +) + + +RUNNING = True + +def custom_proxy(): + context = zmq.Context.instance() + poller = zmq.Poller() + + # Set up frontend: + frontend_socket = context.socket(zmq.ROUTER) + frontend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) + frontend_socket.bind(INITIAL_FRONTEND_ADDR) + poller.register(frontend_socket, zmq.POLLIN) + + # Set up backend: + backend_socket = context.socket(zmq.ROUTER) + backend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) + backend_socket.bind(INITIAL_BACKEND_ADDR) + poller.register(backend_socket, zmq.POLLIN) + + awailable_workers = [] + client_worker_dict = {} + # There are three pending messages dictionaries: + # 1. pending_messages_no_worker[client_id] - messages that are pending for a client + # 2. pending_messages_client_worker[client_id][worker_id] - messages that are pending for a client with a specific worker + # 3. pending_messages_worker_client[worker_id][client_id] - messages that are pending for a worker + # To assure delivey of the entire video we need to store messages from start to end until they are delivered + # to the client. + # If a worker is not available for certain amount of time, we will reassign it to another worker. + # For now, there is only one worker. Remember to implement this later. + # Complex case: half delivered video to the worker, worker becomes unavailable and we need to reassign it to another worker. + + pending_messages_no_worker = defaultdict(deque) # Messages that are pending for a client with no worker assigned + pending_messages_client_worker = defaultdict(lambda: defaultdict(deque)) + pending_messages_worker_client = defaultdict(lambda: defaultdict(deque)) + + while RUNNING: + events = dict(poller.poll(1000)) # Poll both frontend and backend sockets: + if frontend_socket in events: # If message from client: + msg = frontend_socket.recv_multipart() # Receive message from client: + logger.debug("Received message: %r", msg) + client_id, content = msg[0], msg[1:] + if not client_id in client_worker_dict: # If client is not in client-worker dictionary: + logger.info("Received ID from client: %r.", client_id) + # Check if client is already connected to worker: + try: + worker_id = client_worker_dict[client_id] # Get worker ID for this client from the client-worker dictionary + while pending_messages_client_worker[client_id][worker_id]: # Check if there are any pending messages for this worker and send them first: + logger.debug("There are pending messages from client %r for worker %r", client_id, worker_id) + pending_msg = pending_messages_client_worker[client_id][worker_id].pop() + try: + logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id) + backend_socket.send_multipart([worker_id, client_id, *pending_msg]) + except Exception as e: + logger.error("Failed to send pending message: %r to worker: %r, error: %r", pending_msg, worker_id, e) + pending_messages_client_worker[client_id][worker_id].append(pending_msg) # Re-queue the message + break # Break to avoid infinite loop + if pending_messages_client_worker[client_id][worker_id]: # If there are still pending messages for this worker: + pending_messages_client_worker[client_id][worker_id].appendleft(content) # Store message for later delivery + else: + try: + logger.debug("No more pending messages for client %r, sending new message to worker %r: %r", client_id, worker_id, msg) + backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker: + except Exception as e: + logger.error("Failed to send message to backend: %r, error: %s", msg, e) + pending_messages_client_worker[client_id][worker_id].appendleft(msg) # Store message for later delivery + except KeyError: # If client is not connected to any worker: + logger.debug("Client '%s' is not connected to any worker, checking available workers", client_id) + if awailable_workers: # If there are available workers: + worker_id = random.choice(awailable_workers) # Assign random worker to client + client_worker_dict[client_id] = worker_id # Update client-worker dictionary + if pending_messages_no_worker[client_id]: # Check if there are any pending messages for this client: + pending_messages_client_worker[client_id][worker_id].extendleft(msg) # Move messages to pending messages for this worker + logger.debug("Moved pending messages for client '%s' to worker '%s'", client_id, worker_id) + while pending_messages_client_worker[client_id][worker_id]: # Check if there are any pending messages for this worker: + pending_msg = pending_messages_client_worker[client_id][worker_id].pop() + try: + logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id) + backend_socket.send_multipart([worker_id, client_id, *pending_msg]) + except Exception as e: + logger.error("Failed to send pending message: %r to worker: %r. Error: %s", pending_msg, worker_id, e) + pending_messages_client_worker[client_id][worker_id].append(pending_msg) # Re-queue the message + break + if pending_messages_client_worker[client_id][worker_id]: # If there are still pending messages for this worker: + logger.debug("There are still pending messages for client '%s' and worker '%s', storing message for later delivery: %r", client_id, worker_id, content) + pending_messages_client_worker[client_id][worker_id].appendleft(content) # Store message for later delivery + else: + logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg) + try: + logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg) + backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker: + except Exception as e: + logger.error("Failed to send message %r for client %r to backend, requeing. Error: %s", msg, client_id, e) + pending_messages_client_worker[client_id][worker_id].appendleft(msg) # Store message for later delivery + + else: # If no workers are available, assign a new worker: + pending_messages_no_worker[client_id].append(content) # Store message for later delivery + logger.debug("No available workers, storing client '%s' with no worker assigned", client_id) + + if backend_socket in events: + _msg = backend_socket.recv_multipart() + logger.debug("Received from worker: %r", _msg) + if len(_msg) == 2: # Worker is sending its identity: + worker_id, msg = _msg[0], _msg[1:] + logger.debug("Received ID from worker: %r, message: %r", worker_id, msg) + logger.info("Received ID from worker: %r.", worker_id) + awailable_workers.append(worker_id) # Add worker to available workers list + for client, worker in client_worker_dict.items(): + if worker is None: # If client has no worker assigned: + client_worker_dict[client] = worker + logger.debug("Assigning worker %r to client %r", worker, client) + if pending_messages_no_worker: + # If there are pending messages for clients with no worker assigned: + for client_id, messages in pending_messages_no_worker.items(): + if messages: + pending_messages_client_worker[client_id][worker_id].extendleft(messages) # Move messages to pending messages for this worker + pending_messages_no_worker[client_id].clear() # Clear pending messages for this client + logger.debug("Moved pending messages for client %r to worker %r", client_id, worker_id) + try: + pending_msg = pending_messages_client_worker[client_id][worker_id].pop() # Get the first message for this client and worker + logger.debug( + "Sending pending message %r to worker %r for client '%r'", pending_msg, worker_id, client_id) + backend_socket.send_multipart([worker_id, client_id, *pending_msg]) # Send the message to worker + except Exception as e: + pending_messages_client_worker[client_id][worker_id].append(pending_msg) + logger.error("Failed to send pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e) + + else: # Worker is sending a message to client: + worker_id, client_id, msg = _msg[0], _msg[1], _msg[2:] + logger.debug("Received message from worker '%s' for client '%s': %r", worker_id, client_id, msg) + # First check if there are any pending messages for this worker: + while pending_messages_worker_client[worker_id][client_id]: # Check if there are any pending messages for this worker: + pending_msg = pending_messages_worker_client[worker_id][client_id].pop() + try: + logger.debug("Sending pending message %r from %r to client '%r'", pending_msg, client_id, worker_id) + backend_socket.send_multipart([client_id, *pending_msg]) + except Exception as e: + # It is safe to log the message content as it is a short string or bytes. + logger.error("Could not deliver pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e) + pending_messages_worker_client[worker_id][client_id].append(pending_msg) # Re-queue the message + break # Break to avoid infinite loop + # Now send the message to the client if no pending messages for this client: + if not pending_messages_worker_client[worker_id][client_id]: # If there are no pending messages for this worker: + logger.debug("Sending message %r to client '%s'", msg, client_id) + try: + frontend_socket.send_multipart([client_id, *msg]) + logger.debug("Sent message %r to client '%r'", msg, client_id) + except Exception as e: + pending_messages_worker_client[worker_id][client_id].appendleft(msg) + logger.error("Could not deliver message: %r from worker: %r to client: '%r'", msg, worker_id, client_id) + +def signal_handler(sig, frame): + global RUNNING + logger.info("Received signal handler %r, stopping ...", signal.Signals(sig).name) + RUNNING = False + + +if __name__ == "__main__": + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + logger.info("Starting up ...") + + custom_proxy() |