diff options
Diffstat (limited to 'client.py')
| -rw-r--r-- | client.py | 176 | 
1 files changed, 176 insertions, 0 deletions
| diff --git a/client.py b/client.py new file mode 100644 index 0000000..e9dce4f --- /dev/null +++ b/client.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python + +""" +A module containing client for streaming video to a zmq router. +""" + +__author__ = "Franoosh Corporation" + +from os import path +from threading import Thread, Event +from queue import Queue +import time +import datetime +import signal +import logging +from configparser import ConfigParser +import traceback +import zmq + +from helpers import CustomLoggingFormatter + +ROUTER_ADDRESS = "tcp://localhost" +PORT = "5569" +ADDRESS = f"{ROUTER_ADDRESS}:{PORT}" + +CONFIG_FILE = "client.cfg" +LOGFILE = f"{path.splitext(path.basename(__file__))[0]}.log" +LOGLEVEL = logging.DEBUG +CAMERAS = {'front_camera': '/dev/video0', 'back_camera': '/dev/video1'} +SLEEP_TIME = 5 + + +stop_event = Event() + +log_formatter = CustomLoggingFormatter() +handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a') +handler.setFormatter(log_formatter) +logging.root.addHandler(handler) +logging.root.setLevel(LOGLEVEL) +logger = logging.getLogger(__name__) +logging.basicConfig( +    filename=LOGFILE, +    datefmt='%Y-%m-%d %I:%M:%S', +    level=LOGLEVEL, +) + +def read_config(conf_file): +    """Read config file and return as dictionary.""" +    cfg = ConfigParser() +    cfg.read(conf_file) + +    return {key: dict(cfg.items(key)) for key in cfg.sections()} + + +class ClientVideo(Thread): +    """Class for sending video stream""" +    def __init__(self, _id, device, queue): +        super().__init__(daemon=True) +        self.identity = _id +        self.device = device +        self.queue = queue +        self.live = True + +    def run(self): +        """Replace with actual video streaming logic.""" +        ping_no = 0 +        while self.live and not stop_event.is_set(): +            try: +                # Four parts required to start/stop video stream, three parts for ping: +                if ping_no == 0:  # Start video stream +                    timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") +                    metadata = f"start:{timestamp}" +                    logger.debug("ClientVideo '%s' sending metadata: '%s'", self.identity, metadata) +                    message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] +                    self.queue.put(message) +                    text = f"ping-{ping_no}" +                    message = [self.identity.encode('utf-8'), text.encode('utf-8')] +                elif ping_no >= 5:  # Stop video stream +                    logger.debug("ClientVideo '%s' sending stop signal", self.identity) +                    timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") +                    metadata = f"stop:{timestamp}" +                    logger.debug("ClientVideo '%s' sending metadata: '%s' and ping: %r", self.identity, metadata, b'') +                    message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b''] +                    self.live = False +                else:  # Send ping +                    text = f"ping-{ping_no}" +                    message = [self.identity.encode('utf-8'), text.encode('utf-8')] +                logger.debug("ClientVideo '%s' sending message: %r", self.identity, message) +                self.queue.put(message) +                ping_no += 1 +            except Exception as exc: +                logger.error("ClientVideo: socket error: %r", exc) +            time.sleep(SLEEP_TIME) + +        logger.info("ClientVideo '%s' closing socket ...", self.identity) + +    def stop(self): +        logger.info("Client video '%s' exiting ...", self.identity) +        self.live = False + + +class ClientTask(Thread): +    """Main Client Task with logic, +    message handling and setup.""" + +    def __init__(self, _id, context=None): +        super().__init__(daemon=True) +        self.identity = _id +        self.context = context or zmq.Context.instance() +        self.socket = self.context.socket(zmq.DEALER) +        self.socket.identity = self.identity.encode("utf-8") +        self.poll = zmq.Poller() +        self.poll.register(self.socket, zmq.POLLIN) +        self.video_threads = [] +        self.running = True +        self.connected = False + +    def run(self): +        self.socket.connect(ADDRESS) +        logger.debug("Client '%s' connected to %s", self.identity, ADDRESS) +        logger.debug("starting video threads ...") +        q = Queue() +        for _id, device in CAMERAS.items(): +            vid_cl = ClientVideo(_id, device, q) +            vid_cl.start() +            self.video_threads.append(vid_cl) +        while self.running: +            try: +                sockets = dict(self.poll.poll(1000)) +                if self.socket in sockets: +                    try: +                        message = self.socket.recv_multipart() +                        logger.debug("Client '%s' received message: %r", self.identity, message) +                    except Exception: +                        logger.error("Failed to receive message: %r", traceback.format_exc()) +                if q.qsize() > 0: +                    frame = q.get() +                    if frame is not None: +                        logger.debug("Processing frame: %r", frame) +                        self.socket.send_multipart(frame) +            except zmq.ZMQError as exc: +                logger.error("Client '%s': socket error: %r", self.identity, exc) + +        logger.info("Closing socket ...") +        self.socket.close() +        logger.debug("Terminating context ...") +        self.context.term() + +    def stop(self): +        """Stop task""" +        logger.info("ClientTask cleaning up ...") +        for thread in self.video_threads: +            logger.info("Cleaning u video thread ...") +            thread.stop() +            thread.join() +        self.video_threads = [] +        logger.info("Client task exiting ...") +        self.running = False + + +def signal_handler(sig, frame): +    logger.info("Received signal handler '%r', stopping client ...", sig) +    client.stop() + + +if __name__ == "__main__": + +    signal.signal(signal.SIGINT, signal_handler) +    signal.signal(signal.SIGTERM, signal_handler) + +    logger.info("Starting up ...") +    client = ClientTask("client_task") + +    client.start() + +    client.join() | 
