aboutsummaryrefslogtreecommitdiff
path: root/router.py
diff options
context:
space:
mode:
Diffstat (limited to 'router.py')
-rw-r--r--router.py17
1 files changed, 8 insertions, 9 deletions
diff --git a/router.py b/router.py
index 3c92826..f76b75f 100644
--- a/router.py
+++ b/router.py
@@ -64,7 +64,6 @@ def custom_proxy() -> None:
Parameters
----------
- None
Returns
-------
@@ -86,13 +85,13 @@ def custom_proxy() -> None:
backend_socket.bind(BACKEND_ADDR)
poller.register(backend_socket, zmq.POLLIN)
- awailable_workers = []
+ available_workers = []
client_worker_dict = {}
# There are three pending messages dictionaries:
# 1. pending_messages_no_worker[client_id] - messages that are pending for a client
# 2. pending_messages_client_worker[client_id][worker_id] - messages that are pending for a client with a specific worker
# 3. pending_messages_worker_client[worker_id][client_id] - messages that are pending for a worker
- # To assure delivey of the entire video we need to store messages from start to end until they are delivered
+ # To assure delivery of the entire video we need to store messages from start to end until they are delivered
# to the client.
# If a worker is not available for certain amount of time, we will reassign it to another worker.
# For now, there is only one worker. Remember to implement this later.
@@ -153,9 +152,9 @@ def custom_proxy() -> None:
except KeyError:
logger.info("Received ID from client: %r.", client_id)
# Check if there are available workers.
- if awailable_workers:
+ if available_workers:
# Assign random worker to client:
- worker_id = random.choice(awailable_workers)
+ worker_id = random.choice(available_workers)
# Update client-worker dictionary:
client_worker_dict[client_id] = worker_id
# Check if there are any pending messages for this client:
@@ -186,7 +185,7 @@ def custom_proxy() -> None:
# At last, send the message to worker:
backend_socket.send_multipart([worker_id, client_id, *content])
except Exception as e:
- logger.error("Failed to send message for client %r to backend, requeing. Error: %r", client_id, e)
+ logger.error("Failed to send message for client %r to backend, requeuing. Error: %r", client_id, e)
# Store message for later delivery:
pending_messages_client_worker[client_id][worker_id].appendleft(msg)
@@ -204,7 +203,7 @@ def custom_proxy() -> None:
logger.debug("Received ID from worker: %r, message: %r", worker_id, msg)
logger.info("Received ID from worker: %r.", worker_id)
# Add worker to available workers list:
- awailable_workers.append(worker_id)
+ available_workers.append(worker_id)
for client, worker in client_worker_dict.items():
# If client has no worker assigned, assign it a new worker:
if worker is None:
@@ -252,9 +251,9 @@ def custom_proxy() -> None:
try:
frontend_socket.send_multipart([client_id, *msg])
logger.debug("Sent message %r to client %r", msg, client_id)
- except Exception as e:
+ except Exception as exc:
pending_messages_worker_client[worker_id][client_id].appendleft(msg)
- logger.error("Could not deliver message: %r from worker: %r to client: %r", msg, worker_id, client_id)
+ logger.error("Could not deliver message: %r from worker: %r to client: %r, exc: %r", msg, worker_id, client_id, exc)
def signal_handler(sig, frame) -> None:
global RUNNING