1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
#!/usr/bin/env python
"""
A module containing client for streaming video to a zmq router.
"""
__author__ = "Franoosh Corporation"
from os import path
from threading import Thread, Event
from queue import Queue
import time
import datetime
import signal
import logging
from configparser import ConfigParser
import traceback
import zmq
from helpers import CustomLoggingFormatter
ROUTER_ADDRESS = "tcp://localhost"
PORT = "5569"
ADDRESS = f"{ROUTER_ADDRESS}:{PORT}"
CONFIG_FILE = "client.cfg"
LOGFILE = f"{path.splitext(path.basename(__file__))[0]}.log"
LOGLEVEL = logging.DEBUG
CAMERAS = {'front_camera': '/dev/video0', 'back_camera': '/dev/video1'}
SLEEP_TIME = 5
stop_event = Event()
log_formatter = CustomLoggingFormatter()
handler = logging.FileHandler(LOGFILE, encoding='utf-8', mode='a')
handler.setFormatter(log_formatter)
logging.root.addHandler(handler)
logging.root.setLevel(LOGLEVEL)
logger = logging.getLogger(__name__)
logging.basicConfig(
filename=LOGFILE,
datefmt='%Y-%m-%d %I:%M:%S',
level=LOGLEVEL,
)
def read_config(conf_file):
"""Read config file and return as dictionary."""
cfg = ConfigParser()
cfg.read(conf_file)
return {key: dict(cfg.items(key)) for key in cfg.sections()}
class ClientVideo(Thread):
"""Class for sending video stream"""
def __init__(self, _id, device, queue):
super().__init__(daemon=True)
self.identity = _id
self.device = device
self.queue = queue
self.live = True
def run(self):
"""Replace with actual video streaming logic."""
ping_no = 0
while self.live and not stop_event.is_set():
try:
# Four parts required to start/stop video stream, three parts for ping:
if ping_no == 0: # Start video stream
timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
metadata = f"start:{timestamp}"
logger.debug("ClientVideo '%s' sending metadata: '%s'", self.identity, metadata)
message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b'']
self.queue.put(message)
text = f"ping-{ping_no}"
message = [self.identity.encode('utf-8'), text.encode('utf-8')]
elif ping_no >= 5: # Stop video stream
logger.debug("ClientVideo '%s' sending stop signal", self.identity)
timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
metadata = f"stop:{timestamp}"
logger.debug("ClientVideo '%s' sending metadata: '%s' and ping: %r", self.identity, metadata, b'')
message = [self.identity.encode('utf-8'), metadata.encode('utf-8'), b'']
self.live = False
else: # Send ping
text = f"ping-{ping_no}"
message = [self.identity.encode('utf-8'), text.encode('utf-8')]
logger.debug("ClientVideo '%s' sending message: %r", self.identity, message)
self.queue.put(message)
ping_no += 1
except Exception as exc:
logger.error("ClientVideo: socket error: %r", exc)
time.sleep(SLEEP_TIME)
logger.info("ClientVideo '%s' closing socket ...", self.identity)
def stop(self):
logger.info("Client video '%s' exiting ...", self.identity)
self.live = False
class ClientTask(Thread):
"""Main Client Task with logic,
message handling and setup."""
def __init__(self, _id, context=None):
super().__init__(daemon=True)
self.identity = _id
self.context = context or zmq.Context.instance()
self.socket = self.context.socket(zmq.DEALER)
self.socket.identity = self.identity.encode("utf-8")
self.poll = zmq.Poller()
self.poll.register(self.socket, zmq.POLLIN)
self.video_threads = []
self.running = True
self.connected = False
def run(self):
self.socket.connect(ADDRESS)
logger.debug("Client '%s' connected to %s", self.identity, ADDRESS)
logger.debug("starting video threads ...")
q = Queue()
for _id, device in CAMERAS.items():
vid_cl = ClientVideo(_id, device, q)
vid_cl.start()
self.video_threads.append(vid_cl)
while self.running:
try:
sockets = dict(self.poll.poll(1000))
if self.socket in sockets:
try:
message = self.socket.recv_multipart()
logger.debug("Client '%s' received message: %r", self.identity, message)
except Exception:
logger.error("Failed to receive message: %r", traceback.format_exc())
if q.qsize() > 0:
frame = q.get()
if frame is not None:
logger.debug("Processing frame: %r", frame)
self.socket.send_multipart(frame)
except zmq.ZMQError as exc:
logger.error("Client '%s': socket error: %r", self.identity, exc)
logger.info("Closing socket ...")
self.socket.close()
logger.debug("Terminating context ...")
self.context.term()
def stop(self):
"""Stop task"""
logger.info("ClientTask cleaning up ...")
for thread in self.video_threads:
logger.info("Cleaning u video thread ...")
thread.stop()
thread.join()
self.video_threads = []
logger.info("Client task exiting ...")
self.running = False
def signal_handler(sig, frame):
logger.info("Received signal handler '%r', stopping client ...", sig)
client.stop()
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
logger.info("Starting up ...")
client = ClientTask("client_task")
client.start()
client.join()
|