#!/usr/bin/env python """ A module containing client for streaming video to a zmq router. """ __author__ = "Franoosh Corporation" from os import path from threading import Thread, Event from queue import Queue import time import datetime import signal import logging from configparser import ConfigParser import traceback import zmq from helpers import CustomLoggingFormatter ROUTER_ADDRESS = "tcp://localhost" PORT = "5569" ADDRESS = f"{ROUTER_ADDRESS}:{PORT}" CONFIG_FILE = "client.cfg" LOGFILE = f"{path.splitext(path.basename(__file__))[0]}.log" LOGLEVEL = logging.DEBUG CAMERAS = {'front_camera': '/dev/video0', 'back_camera': '/dev/video1'} SLEEP_TIME = 5 stop_event = Event() log_formatter = CustomLoggingFormatter() handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') handler.setFormatter(log_formatter) logging.root.addHandler(handler) logging.root.setLevel(LOGLEVEL) logger = logging.getLogger(__name__) logging.basicConfig( filename=LOGFILE, datefmt='%Y-%m-%d %I:%M:%S', level=LOGLEVEL, ) def read_config(conf_file): """Read config file and return as dictionary.""" cfg = ConfigParser() cfg.read(conf_file) return {key: dict(cfg.items(key)) for key in cfg.sections()} class ClientVideo(Thread): """Class for sending video stream""" def __init__(self, _id, device, queue): super().__init__(daemon=True) self.identity = _id self.device = device self.queue = queue self.live = True def run(self): """Replace with actual video streaming logic.""" ping_no = 0 while self.live and not stop_event.is_set(): try: # Four parts required to start/stop video stream, three parts for ping: if ping_no == 0: # Start video stream timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") metadata = f"start:{timestamp}" logger.debug("ClientVideo '%s' sending metadata: '%s'", self.identity, metadata) message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] self.queue.put(message) text = f"ping-{ping_no}" message = [self.identity.encode('utf-8'), text.encode('utf-8')] elif ping_no >= 5: # Stop video stream logger.debug("ClientVideo '%s' sending stop signal", self.identity) timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") metadata = f"stop:{timestamp}" logger.debug("ClientVideo '%s' sending metadata: '%s' and ping: %r", self.identity, metadata, b'') message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] self.live = False else: # Send ping text = f"ping-{ping_no}" message = [self.identity.encode('utf-8'), text.encode('utf-8')] logger.debug("ClientVideo '%s' sending message: %r", self.identity, message) self.queue.put(message) ping_no += 1 except Exception as exc: logger.error("ClientVideo: socket error: %r", exc) time.sleep(SLEEP_TIME) logger.info("ClientVideo '%s' closing socket ...", self.identity) def stop(self): logger.info("Client video '%s' exiting ...", self.identity) self.live = False class ClientTask(Thread): """Main Client Task with logic, message handling and setup.""" def __init__(self, _id, context=None): super().__init__(daemon=True) self.identity = _id self.context = context or zmq.Context.instance() self.socket = self.context.socket(zmq.DEALER) self.socket.identity = self.identity.encode("utf-8") self.poll = zmq.Poller() self.poll.register(self.socket, zmq.POLLIN) self.video_threads = [] self.running = True self.connected = False def run(self): self.socket.connect(ADDRESS) logger.debug("Client '%s' connected to %s", self.identity, ADDRESS) logger.debug("starting video threads ...") q = Queue() for _id, device in CAMERAS.items(): vid_cl = ClientVideo(_id, device, q) vid_cl.start() self.video_threads.append(vid_cl) while self.running: try: sockets = dict(self.poll.poll(1000)) if self.socket in sockets: try: message = self.socket.recv_multipart() logger.debug("Client '%s' received message: %r", self.identity, message) except Exception: logger.error("Failed to receive message: %r", traceback.format_exc()) if q.qsize() > 0: frame = q.get() if frame is not None: logger.debug("Processing frame: %r", frame) self.socket.send_multipart(frame) except zmq.ZMQError as exc: logger.error("Client '%s': socket error: %r", self.identity, exc) logger.info("Closing socket ...") self.socket.close() logger.debug("Terminating context ...") self.context.term() def stop(self): """Stop task""" logger.info("ClientTask cleaning up ...") for thread in self.video_threads: logger.info("Cleaning u video thread ...") thread.stop() thread.join() self.video_threads = [] logger.info("Client task exiting ...") self.running = False def signal_handler(sig, frame): logger.info("Received signal handler '%r', stopping client ...", sig) client.stop() if __name__ == "__main__": signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) logger.info("Starting up ...") client = ClientTask("client_task") client.start() client.join()