diff options
| author | Franoosh <uinarf@autistici.org> | 2025-10-15 14:42:29 +0200 | 
|---|---|---|
| committer | Franoosh <uinarf@autistici.org> | 2025-10-15 14:42:29 +0200 | 
| commit | 70beed73465ab27449a59c62043d94c16efe00c5 (patch) | |
| tree | 9f4b5d6a72711af4641b01169da5d20a2228ef64 /router.py | |
| parent | 68bd1bd052a7cd6438b92cb1059ef5e58b8d022c (diff) | |
| download | ZeroMQ_Video_Streaming-added_video.tar.gz ZeroMQ_Video_Streaming-added_video.tar.bz2 ZeroMQ_Video_Streaming-added_video.zip | |
Added camera support. Movement recognition and video streaming. Web server and frontend. Work in progress. To be fixed: frontend reloading information about client and a page after major changes like camera name or address change, camera removal. Add certificates before testing on actual distributed hardware. Add user login logic.added_video
Diffstat (limited to 'router.py')
| -rw-r--r-- | router.py | 160 | 
1 files changed, 99 insertions, 61 deletions
| @@ -18,17 +18,17 @@ from helpers import CustomLoggingFormatter  # TODO: add configparser -ADDR_FRONTEND = 'tcp://localhost' -ADDR_BACKEND = 'tcp://localhost' +IP_FRONTEND = '127.0.0.1' +IP_BACKEND = '127.0.0.1'  PORT_FRONTEND = "5569"  PORT_BACKEND = "9979" -INITIAL_FRONTEND_ADDR = f"{ADDR_FRONTEND}:{PORT_FRONTEND}" -INITIAL_BACKEND_ADDR = f"{ADDR_BACKEND}:{PORT_BACKEND}" +FRONTEND_ADDR = f"tcp://{IP_FRONTEND}:{PORT_FRONTEND}" +BACKEND_ADDR = f"tcp://{IP_BACKEND}:{PORT_BACKEND}"  LOGFILE = f"{os.path.splitext(os.path.basename(__file__))[0]}.log" -LOGLEVEL = logging.DEBUG +LOGLEVEL = logging.INFO  log_formatter = CustomLoggingFormatter()  handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') @@ -52,13 +52,13 @@ def custom_proxy():      # Set up frontend:      frontend_socket = context.socket(zmq.ROUTER)      frontend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) -    frontend_socket.bind(INITIAL_FRONTEND_ADDR) +    frontend_socket.bind(FRONTEND_ADDR)      poller.register(frontend_socket, zmq.POLLIN)      # Set up backend:      backend_socket = context.socket(zmq.ROUTER)      backend_socket.setsockopt(zmq.ROUTER_MANDATORY, 1) -    backend_socket.bind(INITIAL_BACKEND_ADDR) +    backend_socket.bind(BACKEND_ADDR)      poller.register(backend_socket, zmq.POLLIN)      awailable_workers = [] @@ -78,101 +78,137 @@ def custom_proxy():      pending_messages_worker_client = defaultdict(lambda: defaultdict(deque))      while RUNNING: -        events = dict(poller.poll(1000))  # Poll both frontend and backend sockets: -        if frontend_socket in events:  # If message from client: -            msg = frontend_socket.recv_multipart()  # Receive message from client: -            logger.debug("Received message: %r", msg) +        # Poll both frontend and backend sockets: +        events = dict(poller.poll(1000)) +        # If message from client: +        if frontend_socket in events: +            # Receive message from client: +            msg = frontend_socket.recv_multipart() +            logger.debug("Received message.")              client_id, content = msg[0], msg[1:] -            if not client_id in client_worker_dict:  # If client is not in client-worker dictionary: -                logger.info("Received ID from client: %r.", client_id)              # Check if client is already connected to worker:              try: -                worker_id = client_worker_dict[client_id]  # Get worker ID for this client from the client-worker dictionary -                while pending_messages_client_worker[client_id][worker_id]:  # Check if there are any pending messages for this worker and send them first: -                    logger.debug("There are pending messages from client %r for worker %r", client_id, worker_id) +                # Get worker ID for this client from the client-worker dictionary: +                worker_id = client_worker_dict[client_id] +                # Check if there are any pending messages for this worker and send them first: +                while pending_messages_client_worker[client_id][worker_id]: +                    # In order to take a peek at the size of all messages, perhaps check out: +                    # https://web.archive.org/web/20240804164028/https://code.activestate.com/recipes/546530/ +                    logger.debug( +                        "There are '%d' pending messages from client %r for worker %r", +                        len(pending_messages_client_worker[client_id][worker_id]), +                        client_id, +                        worker_id, +                    )                      pending_msg = pending_messages_client_worker[client_id][worker_id].pop()                      try: -                        logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id) +                        logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id)                          backend_socket.send_multipart([worker_id, client_id, *pending_msg])                      except Exception as e: -                        logger.error("Failed to send pending message: %r to worker: %r, error: %r", pending_msg, worker_id, e) -                        pending_messages_client_worker[client_id][worker_id].append(pending_msg)  # Re-queue the message -                        break # Break to avoid infinite loop -                if pending_messages_client_worker[client_id][worker_id]:  # If there are still pending messages for this worker: -                    pending_messages_client_worker[client_id][worker_id].appendleft(content)  # Store message for later delivery +                        logger.error("Failed to send pending message to worker: %r, error: %r", worker_id, e) +                        # Re-queue the message: +                        pending_messages_client_worker[client_id][worker_id].append(pending_msg) +                        # Break to avoid infinite loop: +                        break +                # If there are still pending messages for this worker: +                if pending_messages_client_worker[client_id][worker_id]: +                    # Store message for later delivery: +                    pending_messages_client_worker[client_id][worker_id].appendleft(content) +                # At last send new message to worker:                  else:                      try: -                        logger.debug("No more pending messages for client %r, sending new message to worker %r: %r", client_id, worker_id, msg) -                        backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker: +                        logger.debug("No more pending messages for client %r, sending new message to worker %r.", client_id, worker_id) +                        backend_socket.send_multipart([worker_id, client_id, *content])                      except Exception as e: -                        logger.error("Failed to send message to backend: %r, error: %s", msg, e) -                        pending_messages_client_worker[client_id][worker_id].appendleft(msg)  # Store message for later delivery -            except KeyError:  # If client is not connected to any worker: -                logger.debug("Client '%s' is not connected to any worker, checking available workers", client_id) -                if awailable_workers:  # If there are available workers: -                    worker_id = random.choice(awailable_workers)  # Assign random worker to client -                    client_worker_dict[client_id] = worker_id # Update client-worker dictionary -                    if pending_messages_no_worker[client_id]:  # Check if there are any pending messages for this client: -                        pending_messages_client_worker[client_id][worker_id].extendleft(msg)  # Move messages to pending messages for this worker +                        logger.error("Failed to send message to backend, error: %s", e) +                        # Store message for later delivery: +                        # pending_messages_client_worker[client_id][worker_id].appendleft(msg) <- FIXME: this is the bug, no? +                        pending_messages_client_worker[client_id][worker_id].appendleft(content) +            # If client is not connected to any worker: +            except KeyError: +                logger.info("Received ID from client: %r.", client_id) +                # Check if there are available workers. +                if awailable_workers: +                    # Assign random worker to client: +                    worker_id = random.choice(awailable_workers) +                    # Update client-worker dictionary: +                    client_worker_dict[client_id] = worker_id +                    # Check if there are any pending messages for this client: +                    if pending_messages_no_worker[client_id]: +                        # Move messages to pending messages for this worker: +                        pending_messages_client_worker[client_id][worker_id].extendleft(msg)                          logger.debug("Moved pending messages for client '%s' to worker '%s'", client_id, worker_id) -                    while pending_messages_client_worker[client_id][worker_id]:  # Check if there are any pending messages for this worker: +                    # Check if there are any pending messages for this worker: +                    while pending_messages_client_worker[client_id][worker_id]:                          pending_msg = pending_messages_client_worker[client_id][worker_id].pop()                          try: -                            logger.debug("Sending pending message %r for client %r to worker %r", pending_msg, client_id, worker_id) +                            logger.debug("Sending pending message for client %r to worker %r", client_id, worker_id)                              backend_socket.send_multipart([worker_id, client_id, *pending_msg])                          except Exception as e: -                            logger.error("Failed to send pending message: %r to worker: %r. Error: %s", pending_msg, worker_id, e) -                            pending_messages_client_worker[client_id][worker_id].append(pending_msg)  # Re-queue the message +                            logger.error("Failed to send pending message to worker: %r. Error: %s", worker_id, e) +                            # Re-queue the message: +                            pending_messages_client_worker[client_id][worker_id].append(pending_msg)                              break -                    if pending_messages_client_worker[client_id][worker_id]:  # If there are still pending messages for this worker: -                        logger.debug("There are still pending messages for client '%s' and worker '%s', storing message for later delivery: %r", client_id, worker_id, content) -                        pending_messages_client_worker[client_id][worker_id].appendleft(content)  # Store message for later delivery +                    # If there are still pending messages for this worker: +                    if pending_messages_client_worker[client_id][worker_id]: +                        logger.debug("There are still pending messages for client '%s' and worker '%s', storing message for later delivery.", client_id, worker_id) +                        # Store message for later delivery: +                        pending_messages_client_worker[client_id][worker_id].appendleft(content)                      else: -                        logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg) +                        logger.debug("No more pending messages for client '%s', sending new message to worker '%s'.", client_id, worker_id)                          try: -                            logger.debug("No more pending messages for client '%s', sending new message to worker '%s': %r", client_id, worker_id, msg) -                            backend_socket.send_multipart([worker_id, client_id, *content]) # At last send the message to worker: +                            logger.debug("No more pending messages for client '%s', sending new message to worker '%s'.", client_id, worker_id) +                            # At last, send the message to worker: +                            backend_socket.send_multipart([worker_id, client_id, *content])                          except Exception as e: -                            logger.error("Failed to send message %r for client %r to backend, requeing. Error: %s", msg, client_id, e) -                            pending_messages_client_worker[client_id][worker_id].appendleft(msg)  # Store message for later delivery +                            logger.error("Failed to send message for client %r to backend, requeing. Error: %s", client_id, e) +                            # Store message for later delivery: +                            pending_messages_client_worker[client_id][worker_id].appendleft(msg) -                else:  # If no workers are available, assign a new worker:   -                    pending_messages_no_worker[client_id].append(content)  # Store message for later delivery +                else: +                    # Store message for later delivery: +                    pending_messages_no_worker[client_id].append(content)                      logger.debug("No available workers, storing client '%s' with no worker assigned", client_id)          if backend_socket in events:              _msg = backend_socket.recv_multipart() +            # These messages should be safe to log as they are short strings or bytes.              logger.debug("Received from worker: %r", _msg)              if len(_msg) == 2: # Worker is sending its identity:                  worker_id, msg = _msg[0], _msg[1:]                  logger.debug("Received ID from worker: %r, message: %r", worker_id, msg)                  logger.info("Received ID from worker: %r.", worker_id) -                awailable_workers.append(worker_id)  # Add worker to available workers list +                # Add worker to available workers list: +                awailable_workers.append(worker_id)                  for client, worker in client_worker_dict.items(): -                    if worker is None:  # If client has no worker assigned: +                    # If client has no worker assigned, assign it a new worker: +                    if worker is None:                          client_worker_dict[client] = worker                          logger.debug("Assigning worker %r to client %r", worker, client)                  if pending_messages_no_worker:                      # If there are pending messages for clients with no worker assigned:                      for client_id, messages in pending_messages_no_worker.items():                          if messages: -                            pending_messages_client_worker[client_id][worker_id].extendleft(messages)  # Move messages to pending messages for this worker -                            pending_messages_no_worker[client_id].clear()  # Clear pending messages for this client +                            # Move messages to pending messages for this worker: +                            pending_messages_client_worker[client_id][worker_id].extendleft(messages) +                            # Clear pending messages for this client: +                            pending_messages_no_worker[client_id].clear()                              logger.debug("Moved pending messages for client %r to worker %r", client_id, worker_id)                              try: -                                pending_msg = pending_messages_client_worker[client_id][worker_id].pop()  # Get the first message for this client and worker -                                logger.debug( -                                    "Sending pending message %r to worker %r for client '%r'", pending_msg, worker_id, client_id) -                                backend_socket.send_multipart([worker_id, client_id, *pending_msg])  # Send the message to worker +                                # Get the first message for this client and worker: +                                pending_msg = pending_messages_client_worker[client_id][worker_id].pop() +                                logger.debug("Sending pending message to worker %r for client '%r'", worker_id, client_id) +                                # Send the message to worker: +                                backend_socket.send_multipart([worker_id, client_id, *pending_msg])                              except Exception as e:                                  pending_messages_client_worker[client_id][worker_id].append(pending_msg) -                                logger.error("Failed to send pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e) +                                logger.error("Failed to send pending message to worker: %r. Error: %r", worker_id, e)              else:  # Worker is sending a message to client:                  worker_id, client_id, msg = _msg[0], _msg[1], _msg[2:]                  logger.debug("Received message from worker '%s' for client '%s': %r", worker_id, client_id, msg)                  # First check if there are any pending messages for this worker: -                while pending_messages_worker_client[worker_id][client_id]:  # Check if there are any pending messages for this worker: +                while pending_messages_worker_client[worker_id][client_id]:                      pending_msg = pending_messages_worker_client[worker_id][client_id].pop()                      try:                          logger.debug("Sending pending message %r from %r to client '%r'", pending_msg, client_id, worker_id) @@ -180,10 +216,12 @@ def custom_proxy():                      except Exception as e:                          # It is safe to log the message content as it is a short string or bytes.                          logger.error("Could not deliver pending message: %r to worker: %r. Error: %r", pending_msg, worker_id, e) -                        pending_messages_worker_client[worker_id][client_id].append(pending_msg)  # Re-queue the message   -                        break  # Break to avoid infinite loop -                # Now send the message to the client if no pending messages for this client: -                if not pending_messages_worker_client[worker_id][client_id]:  # If there are no pending messages for this worker: +                        # Re-queue the message: +                        pending_messages_worker_client[worker_id][client_id].append(pending_msg) +                        # Break to avoid infinite loop: +                        break +                # If no pending messages for this client, send new message to the client : +                if not pending_messages_worker_client[worker_id][client_id]:                      logger.debug("Sending message %r to client '%s'", msg, client_id)                      try:                          frontend_socket.send_multipart([client_id, *msg]) | 
