aboutsummaryrefslogtreecommitdiff
path: root/router.py
blob: 61b2f72cdbae31185cb7e829b991245101abb596 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
#!/usr/bin/env python

"""
A module containing zeromq router for video streaming
from a client and sending control messages from server.
"""

__author__ = "Franoosh Corporation"

import os
import signal
import logging
import random
from collections import defaultdict, deque
import zmq

from helpers import CustomLoggingFormatter

# TODO: add configparser

ADDR_FRONTEND = 'tcp://localhost'
ADDR_BACKEND = 'tcp://localhost'

PORT_FRONTEND = "5569"
PORT_BACKEND = "9979"

INITIAL_FRONTEND_ADDR = f"{ADDR_FRONTEND}:{PORT_FRONTEND}"
INITIAL_BACKEND_ADDR = f"{ADDR_BACKEND}:{PORT_BACKEND}"

LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log"
LOGLEVEL = logging.DEBUG

log_formatter = CustomLoggingFormatter()
handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
handler.setFormatter(log_formatter)
logging.root.addHandler(handler)
logging.root.setLevel(LOGLEVEL)
logger = logging.getLogger(__name__)
logging.basicConfig(
    filename=LOGFILE,
    datefmt='%Y-%m-%d %I:%M:%S',
    level=LOGLEVEL,
)


RUNNING = True

def custom_proxy():
    context = zmq.Context.instance()
    poller = zmq.Poller()

    # Set up frontend:
    frontend_socket = context.socket(zmq.ROUTER)
    frontend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1)
    frontend_socket.bind(INITIAL_FRONTEND_ADDR)
    poller.register(frontend_socket, zmq.POLLIN)

    # Set up backend:
    backend_socket = context.socket(zmq.ROUTER)
    backend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1)
    backend_socket.bind(INITIAL_BACKEND_ADDR)
    poller.register(backend_socket, zmq.POLLIN)

    awailable_workers = []
    client_worker_dict = {}
    # There are three pending messages dictionaries:
    # 1. pending_messages_no_worker[client_id] - messages that are pending for a client
    # 2. pending_messages_client_worker[client_id][worker_id] - messages that are pending for a client with a specific worker
    # 3. pending_messages_worker_client[worker_id][client_id] - messages that are pending for a worker
    # To assure delivey of the entire video we need to store messages from start to end until they are delivered
    # to the client.
    # If a worker is not available for certain amount of time, we will reassign it to another worker.
    # For now, there is only one worker. Remember to implement this later.
    # Complex case: half delivered video to the worker, worker becomes unavailable and we need to reassign it to another worker.

    pending_messages_no_worker = defaultdict(deque)  # Messages that are pending for a client with no worker assigned
    pending_messages_client_worker = defaultdict(lambda: defaultdict(deque))
    pending_messages_worker_client = defaultdict(lambda: defaultdict(deque))

    while RUNNING:
        events = dict(poller.poll(1000))  # Poll both frontend and backend sockets:
        if frontend_socket in events:  # If message from client:
            msg = frontend_socket.recv_multipart()  # Receive message from client:
            logger.debug("Received message: %r", msg)
            client_id, content = msg[0], msg[1:]
            if not client_id in client_worker_dict:  # If client is not in client-worker dictionary:
                logger.info("Received ID from client: %r.", client_id)
            # Check if client is already connected to worker:
            try:
                worker_id = client_worker_dict[client_id]  # Get worker ID for this client from the client-worker dictionary
                while pending_messages_client_worker[client_id][worker_id]:  # Check if there are any pending messages for this worker and send them first:
                    logger.debug("There are pending messages from client %r for worker %r", client_id, worker_id)
                    pending_msg = pending_messages_client_worker[client_id][worker_id].pop()
                    try:
                        logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id)
                        backend_socket.send_multipart([worker_id, client_id, *pending_msg])
                    except Exception as e:
                        logger.error("Failed to send pending message: %r to worker: %r, error: %r", pending_msg, worker_id, e)
                        pending_messages_client_worker[client_id][worker_id].append(pending_msg)  # Re-queue the message
                        break # Break to avoid infinite loop
                if pending_messages_client_worker[client_id][worker_id]:  # If there are still pending messages for this worker:
                    pending_messages_client_worker[client_id][worker_id].appendleft(content)  # Store message for later delivery
                else:
                    try:
                        logger.debug("No more pending messages for client %r, sending new message to worker %r: %r", client_id, worker_id, msg)
                        backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker:
                    except Exception as e:
                        logger.error("Failed to send message to backend: %r, error: %s", msg, e)
                        pending_messages_client_worker[client_id][worker_id].appendleft(msg)  # Store message for later delivery
            except KeyError:  # If client is not connected to any worker:
                logger.debug("Client '%s' is not connected to any worker, checking available workers", client_id)
                if awailable_workers:  # If there are available workers:
                    worker_id = random.choice(awailable_workers)  # Assign random worker to client
                    client_worker_dict[client_id] = worker_id # Update client-worker dictionary
                    if pending_messages_no_worker[client_id]:  # Check if there are any pending messages for this client:
                        pending_messages_client_worker[client_id][worker_id].extendleft(msg)  # Move messages to pending messages for this worker
                        logger.debug("Moved pending messages for client '%s' to worker '%s'", client_id, worker_id)
                    while pending_messages_client_worker[client_id][worker_id]:  # Check if there are any pending messages for this worker:
                        pending_msg = pending_messages_client_worker[client_id][worker_id].pop()
                        try:
                            logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id)
                            backend_socket.send_multipart([worker_id, client_id, *pending_msg])
                        except Exception as e:
                            logger.error("Failed to send pending message: %r to worker: %r. Error: %s", pending_msg, worker_id, e)
                            pending_messages_client_worker[client_id][worker_id].append(pending_msg)  # Re-queue the message
                            break
                    if pending_messages_client_worker[client_id][worker_id]:  # If there are still pending messages for this worker:
                        logger.debug("There are still pending messages for client '%s' and worker '%s', storing message for later delivery: %r", client_id, worker_id, content)
                        pending_messages_client_worker[client_id][worker_id].appendleft(content)  # Store message for later delivery
                    else:
                        logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg)
                        try:
                            logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg)
                            backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker:
                        except Exception as e:
                            logger.error("Failed to send message %r for client %r to backend, requeing. Error: %s", msg, client_id, e)
                            pending_messages_client_worker[client_id][worker_id].appendleft(msg)  # Store message for later delivery

                else:  # If no workers are available, assign a new worker:  
                    pending_messages_no_worker[client_id].append(content)  # Store message for later delivery
                    logger.debug("No available workers, storing client '%s' with no worker assigned", client_id)

        if backend_socket in events:
            _msg = backend_socket.recv_multipart()
            logger.debug("Received from worker: %r", _msg)
            if len(_msg) == 2: # Worker is sending its identity:
                worker_id, msg = _msg[0], _msg[1:]
                logger.debug("Received ID from worker: %r, message: %r", worker_id, msg)
                logger.info("Received ID from worker: %r.", worker_id)
                awailable_workers.append(worker_id)  # Add worker to available workers list
                for client, worker in client_worker_dict.items():
                    if worker is None:  # If client has no worker assigned:
                        client_worker_dict[client] = worker
                        logger.debug("Assigning worker %r to client %r", worker, client)
                if pending_messages_no_worker:
                    # If there are pending messages for clients with no worker assigned:
                    for client_id, messages in pending_messages_no_worker.items():
                        if messages:
                            pending_messages_client_worker[client_id][worker_id].extendleft(messages)  # Move messages to pending messages for this worker
                            pending_messages_no_worker[client_id].clear()  # Clear pending messages for this client
                            logger.debug("Moved pending messages for client %r to worker %r", client_id, worker_id)
                            try:
                                pending_msg = pending_messages_client_worker[client_id][worker_id].pop()  # Get the first message for this client and worker
                                logger.debug(
                                    "Sending pending message %r to worker %r for client '%r'", pending_msg, worker_id, client_id)
                                backend_socket.send_multipart([worker_id, client_id, *pending_msg])  # Send the message to worker
                            except Exception as e:
                                pending_messages_client_worker[client_id][worker_id].append(pending_msg)
                                logger.error("Failed to send pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e)

            else:  # Worker is sending a message to client:
                worker_id, client_id, msg = _msg[0], _msg[1], _msg[2:]
                logger.debug("Received message from worker '%s' for client '%s': %r", worker_id, client_id, msg)
                # First check if there are any pending messages for this worker:
                while pending_messages_worker_client[worker_id][client_id]:  # Check if there are any pending messages for this worker:
                    pending_msg = pending_messages_worker_client[worker_id][client_id].pop()
                    try:
                        logger.debug("Sending pending message %r from %r to client '%r'", pending_msg, client_id, worker_id)
                        backend_socket.send_multipart([client_id, *pending_msg])
                    except Exception as e:
                        # It is safe to log the message content as it is a short string or bytes.
                        logger.error("Could not deliver pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e)
                        pending_messages_worker_client[worker_id][client_id].append(pending_msg)  # Re-queue the message  
                        break  # Break to avoid infinite loop
                # Now send the message to the client if no pending messages for this client:
                if not pending_messages_worker_client[worker_id][client_id]:  # If there are no pending messages for this worker:
                    logger.debug("Sending message %r to client '%s'", msg, client_id)
                    try:
                        frontend_socket.send_multipart([client_id, *msg])
                        logger.debug("Sent message %r to client '%r'", msg, client_id)
                    except Exception as e:
                        pending_messages_worker_client[worker_id][client_id].appendleft(msg)
                        logger.error("Could not deliver message: %r from worker: %r to client: '%r'", msg, worker_id, client_id)

def signal_handler(sig, frame):
    global RUNNING
    logger.info("Received signal handler %r, stopping ...", signal.Signals(sig).name)
    RUNNING = False


if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    logger.info("Starting up ...")

    custom_proxy()