aboutsummaryrefslogtreecommitdiff
path: root/helpers.py
diff options
context:
space:
mode:
Diffstat (limited to 'helpers.py')
-rw-r--r--helpers.py256
1 files changed, 256 insertions, 0 deletions
diff --git a/helpers.py b/helpers.py
new file mode 100644
index 0000000..a40b060
--- /dev/null
+++ b/helpers.py
@@ -0,0 +1,256 @@
+#!/usr/bin/env python
+
+"""
+Helper functions for zmq video messaging.
+"""
+
+__author__ = "Franoosh Corporation"
+
+
+import os
+import io
+import pstats
+import logging
+import subprocess
+import cv2
+import struct
+import datetime
+
+# Known directives for camera configuration:
+DIRECTIVES = (
+ 'modify_camera_name',
+ 'modify_camera_threshold',
+ 'modify_camera_grace_pd',
+ 'add_camera',
+ 'remove_camera',
+)
+MAX_CAMERA_NAME_LENGTH = 256
+TIME_FORMAT_STRING = '%Y-%m-%d %H:%M:%S.%f'
+LOGDIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs')
+
+class CustomLoggingFormatter(logging.Formatter):
+ """Custom logging formatter"""
+ debug_fmt = 'DEBUG: %(filename)s:%(lineno)d %(asctime)s %(message)s'
+ info_fmt = 'INFO: %(asctime)s %(message)s'
+ warning_fmt = 'WARNING: %(asctime)s %(message)s'
+ error_fmt = 'ERROR: line: %(lineno)d, %(asctime)s %(message)s'
+ critical_fmt = 'CRITICAL: line: %(lineno)d, %(asctime)s %(message)s'
+
+ def __init__(self):
+ super().__init__(
+ fmt="%(levelno)d: %s(asctime)s %(message)s",
+ datefmt=None,
+ )
+
+ def format(self, record):
+ orig_fmt = self._style._fmt
+ if record.levelno == logging.DEBUG:
+ self._style._fmt = CustomLoggingFormatter.debug_fmt
+ elif record.levelno == logging.INFO:
+ self._style._fmt = CustomLoggingFormatter.info_fmt
+ elif record.levelno == logging.WARNING:
+ self._style._fmt = CustomLoggingFormatter.warning_fmt
+ elif record.levelno == logging.ERROR:
+ self._style._fmt = CustomLoggingFormatter.error_fmt
+ elif record.levelno == logging.CRITICAL:
+ self._style._fmt = CustomLoggingFormatter.critical_fmt
+
+ result = logging.Formatter.format(self, record)
+ self._style._fmt = orig_fmt
+
+ return result
+
+def process_frame(frame, detect_width=320):
+ """Process frame for contour detection."""
+ try:
+ height, width = frame.shape[:2]
+ if width > detect_width:
+ scaling_factor = detect_width / float(width)
+ small_frame = cv2.resize(frame, (detect_width, int(height * scaling_factor)))
+ else:
+ scaling_factor = 1.0
+ small_frame = frame
+ # Convert to grayscale:
+ gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY)
+ # Apply Gaussian blur:
+ blurred = cv2.GaussianBlur(gray, (21, 21), 0)
+ except Exception as exc:
+ raise RuntimeError(f"Error processing frame: {exc}")
+
+ return blurred, scaling_factor
+
+
+def timestamp_to_bytes(timestamp):
+ """Convert timestamp to bytes."""
+ return struct.pack('d', timestamp)
+
+def bytes_to_timestamp(byte_data):
+ """Convert bytes to timestamp."""
+ return struct.unpack('d', byte_data)[0]
+
+def compute_contours(sample_frames):
+ """Compute contours between two frames"""
+ all_contours = []
+ frame_0, frame_1 = sample_frames
+ frame_delta = cv2.absdiff(frame_0, frame_1)
+ threshold = cv2.threshold(frame_delta, 25, 255, cv2.THRESH_BINARY)[1]
+ threshold = cv2.dilate(threshold, None, iterations=2)
+ # contours, _ = cv2.findContours(threshold.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
+ contours, _ = cv2.findContours(threshold, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
+ all_contours.extend(contours)
+
+ return all_contours
+
+def scale_contours(contours, scaling_factor):
+ """Scale contours by the given scaling factor."""
+ if scaling_factor == 1.0:
+ return contours
+
+ scaled_contours = []
+ for contour in contours:
+ scaled_contour = (contour * (1.0 / scaling_factor)).astype(int)
+ scaled_contours.append(scaled_contour)
+
+ return scaled_contours
+
+def draw_contours(frame, contours, min_contour_area=500):
+ """Draw contours on the frame."""
+ for contour in contours:
+ if cv2.contourArea(contour) > min_contour_area:
+ (x, y, w, h) = cv2.boundingRect(contour)
+ cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
+
+ return frame
+
+def detect_movement(contours, min_area=500):
+ """Detect movement based on contours found from frame diff."""
+ for contour in contours:
+ if cv2.contourArea(contour) >= min_area:
+ return True
+ return False
+
+def get_available_cameras():
+ """
+ Get list of available camera devices.
+ At the moment it does not work. At all. It is useless.
+ """
+ proc = subprocess.Popen(['v4l2-ctl', '--list-devices'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ stdout, stderr = proc.communicate()
+ candidate_devices = [i.strip() for i in stdout.decode('utf-8').strip().splitlines()[1:]]
+ verified_devices = []
+ for device in candidate_devices:
+ cap = cv2.VideoCapture(device)
+ if cap.isOpened():
+ verified_devices.append(device)
+ cap.release()
+ return verified_devices
+
+def bytes_to_str(obj):
+ """Recursively convert bytes to strings in dicts and lists."""
+ if isinstance(obj, bytes):
+ return obj.decode('utf-8')
+ elif isinstance(obj, dict):
+ return {bytes_to_str(k): bytes_to_str(v) for k, v in obj.items()}
+ elif isinstance(obj, list):
+ return [bytes_to_str(item) for item in obj]
+ else:
+ return obj
+
+def str_to_bytes(obj):
+ """Recursively convert strings to bytes in dicts and lists."""
+ if isinstance(obj, str):
+ return obj.encode('utf-8')
+ elif isinstance(obj, dict):
+ return {str_to_bytes(k): str_to_bytes(v) for k, v in obj.items()}
+ elif isinstance(obj, list):
+ return [str_to_bytes(item) for item in obj]
+ else:
+ return obj
+
+def write_yappi_stats(yappi_instance, logdir=LOGDIR) -> bool:
+ """
+ Function writing yaapi stats to .pstat files and
+ converting stats to readable .txt files
+
+ Parameters
+ ----------
+ yappi_instance : yappi
+ Yappi instance
+
+ Returns
+ -------
+ bool
+ True on error, False on success
+ """
+ logger = logging.getLogger(__name__)
+ threads = yappi_instance.get_thread_stats()
+ # combined text output:
+ combined_out = []
+
+ time_now = datetime.datetime.strftime(datetime.datetime.now(), TIME_FORMAT_STRING)
+ subdir = f"yaapi_{time_now}"
+ yaapi_dir = os.path.join(logdir, 'yaapi', subdir)
+ try:
+ os.makedirs(yaapi_dir)
+ except Exception as exc:
+ print("Couldn't create directory for yaapi stats: %r", exc)
+ return True
+
+ logfile = os.path.join(yaapi_dir, "yaapi.log")
+ main_yaapi_png = os.path.join(yaapi_dir, "yaapi.png")
+ gprof2dot_cmd = ["gprof2dot", "-f", "pstats"]
+ dot_cmd = ["dot", "-Tpng"]
+ try:
+ for thread in threads:
+ func_stats = yappi_instance.get_func_stats(ctx_id=thread.id)
+ yappi_instance.get_func_stats().save('profile.callgrind', type="callgrind") # For possible future use
+ thread_filename_base = f"{logfile}.thread{thread.id}"
+ pstat_filename = f"{thread_filename_base}.pstat"
+ txt_filename = f"{thread_filename_base}.txt"
+ dot_filename = f"{thread_filename_base}.dot"
+ png_filename = f"{thread_filename_base}.png"
+
+ # save pstat (can be opened with pstats or profiling tools)
+ func_stats.save(pstat_filename, type="pstat")
+
+ # convert pstat to readable text using pstats
+ sio = io.StringIO()
+ ps = pstats.Stats(pstat_filename, stream=sio)
+ ps.sort_stats("tottime")
+ ps.print_stats()
+
+ text = sio.getvalue()
+ combined_out.append(f"--- Thread {thread.id} ({thread.name}) ---\n{text}")
+
+ # also write per-thread text file
+ with open(txt_filename, "w", encoding='utf-8') as f:
+ f.write(text)
+
+ # generate call graph png using gprof2dot and dot
+ _gprof2dot_cmd = gprof2dot_cmd + [pstat_filename, "-o", dot_filename]
+ _dot_cmd = dot_cmd + [dot_filename, "-o", png_filename]
+ try:
+ gprof2dot_proc = subprocess.Popen(_gprof2dot_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ gprof2dot_stdout, gprof2dot_stderr = gprof2dot_proc.communicate()
+ if gprof2dot_proc.returncode != 0:
+ logger.error("Error generating gprof2dot for thread %r: %r", thread.id, gprof2dot_stderr.decode('utf-8'))
+ continue
+ except Exception as exc:
+ logger.error("Exception generating gprof2dot for thread %r: %r", thread.id, exc)
+ continue
+ try:
+ dot_proc = subprocess.Popen(_dot_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ gprof2dot_proc.stdout.close() # Allow gprof2dot_proc to receive a SIGPIPE if dot_proc exits.
+ out, err = dot_proc.communicate()
+ if dot_proc.returncode != 0:
+ logger.error("Error generating call graph PNG for thread %r: %r", thread.id, err.decode('utf-8'))
+ except Exception as exc:
+ logger.error("Exception generating call graph PNG for thread %r: %r", thread.id, exc)
+
+ # write combined text file
+ with open('.'.join((logfile, "txt")), "w", encoding='utf-8') as f:
+ f.write("\n\n".join(combined_out))
+ except Exception as exc:
+ return True
+
+ return False \ No newline at end of file