#!/usr/bin/env python """ Helper functions for zmq video messaging. """ __author__ = "Franoosh Corporation" import os import io import pstats import logging import subprocess import struct import datetime import cv2 # Known directives for camera configuration: DIRECTIVES = ( 'modify_camera_name', 'modify_camera_threshold', 'modify_camera_grace_pd', 'add_camera', 'remove_camera', ) MAX_CAMERA_NAME_LENGTH = 256 TIME_FORMAT_STRING = '%Y-%m-%d_%H-%M-%S' LOGDIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs') class CustomLoggingFormatter(logging.Formatter): """Custom logging formatter""" debug_fmt = 'DEBUG: %(filename)s:%(lineno)d %(asctime)s %(message)s' info_fmt = 'INFO: %(asctime)s %(message)s' warning_fmt = 'WARNING: %(asctime)s %(message)s' error_fmt = 'ERROR: line: %(lineno)d, %(asctime)s %(message)s' critical_fmt = 'CRITICAL: line: %(lineno)d, %(asctime)s %(message)s' def __init__(self): super().__init__( fmt="%(levelno)d: %s(asctime)s %(message)s", datefmt=None, ) def format(self, record): orig_fmt = self._style._fmt if record.levelno == logging.DEBUG: self._style._fmt = CustomLoggingFormatter.debug_fmt elif record.levelno == logging.INFO: self._style._fmt = CustomLoggingFormatter.info_fmt elif record.levelno == logging.WARNING: self._style._fmt = CustomLoggingFormatter.warning_fmt elif record.levelno == logging.ERROR: self._style._fmt = CustomLoggingFormatter.error_fmt elif record.levelno == logging.CRITICAL: self._style._fmt = CustomLoggingFormatter.critical_fmt result = logging.Formatter.format(self, record) self._style._fmt = orig_fmt return result def process_frame(frame, detect_width=320): """Process frame for contour detection.""" try: height, width = frame.shape[:2] if width > detect_width: scaling_factor = detect_width / float(width) small_frame = cv2.resize(frame, (detect_width, int(height * scaling_factor))) else: scaling_factor = 1.0 small_frame = frame # Convert to grayscale: gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY) # Apply Gaussian blur: blurred = cv2.GaussianBlur(gray, (21, 21), 0) except Exception as exc: raise RuntimeError(f"Error processing frame: {exc}") return blurred, scaling_factor def timestamp_to_bytes(timestamp): """Convert timestamp to bytes.""" return struct.pack('d', timestamp) def bytes_to_timestamp(byte_data): """Convert bytes to timestamp.""" return struct.unpack('d', byte_data)[0] def compute_contours(sample_frames): """Compute contours between two frames""" all_contours = [] frame_0, frame_1 = sample_frames frame_delta = cv2.absdiff(frame_0, frame_1) threshold = cv2.threshold(frame_delta, 25, 255, cv2.THRESH_BINARY)[1] threshold = cv2.dilate(threshold, None, iterations=2) contours, _ = cv2.findContours(threshold, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) all_contours.extend(contours) return all_contours def scale_contours(contours, scaling_factor): """Scale contours by the given scaling factor.""" if scaling_factor == 1.0: return contours scaled_contours = [] for contour in contours: scaled_contour = (contour * (1.0 / scaling_factor)).astype(int) scaled_contours.append(scaled_contour) return scaled_contours def draw_contours(frame, contours, min_contour_area=500): """Draw contours on the frame.""" for contour in contours: if cv2.contourArea(contour) > min_contour_area: (x, y, w, h) = cv2.boundingRect(contour) cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2) return frame def detect_movement(contours, min_area=500): """Detect movement based on contours found from frame diff.""" for contour in contours: if cv2.contourArea(contour) >= min_area: return True return False def get_available_cameras(): """ Get list of available camera devices. At the moment it does not work. At all. It is useless. """ proc = subprocess.Popen(['v4l2-ctl', '--list-devices'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = proc.communicate() candidate_devices = [i.strip() for i in stdout.decode('utf-8').strip().splitlines()[1:]] verified_devices = [] for device in candidate_devices: cap = cv2.VideoCapture(device) if cap.isOpened(): verified_devices.append(device) cap.release() return verified_devices def bytes_to_str(obj): """Recursively convert bytes to strings in dicts and lists.""" if isinstance(obj, bytes): return obj.decode('utf-8') if isinstance(obj, dict): return {bytes_to_str(k): bytes_to_str(v) for k, v in obj.items()} if isinstance(obj, list): return [bytes_to_str(item) for item in obj] return obj def str_to_bytes(obj): """Recursively convert strings to bytes in dicts and lists.""" if isinstance(obj, str): return obj.encode('utf-8') if isinstance(obj, dict): return {str_to_bytes(k): str_to_bytes(v) for k, v in obj.items()} if isinstance(obj, list): return [str_to_bytes(item) for item in obj] return obj def write_yappi_stats(yappi_instance, logdir=LOGDIR) -> bool: """ Function writing yaapi stats to .pstat files and converting stats to readable .txt files Parameters ---------- yappi_instance : yappi Yappi instance Returns ------- bool True on error, False on success """ logger = logging.getLogger(__name__) threads = yappi_instance.get_thread_stats() # combined text output: combined_out = [] time_now = datetime.datetime.strftime(datetime.datetime.now(), TIME_FORMAT_STRING) subdir = f"yaapi_{time_now}" yaapi_dir = os.path.join(logdir, 'yaapi', subdir) try: os.makedirs(yaapi_dir) except Exception as exc: print("Couldn't create directory for yaapi stats: %r", exc) return True logfile = os.path.join(yaapi_dir, "yaapi.log") main_yaapi_png = os.path.join(yaapi_dir, "yaapi.png") gprof2dot_cmd = ["gprof2dot", "-f", "pstats"] dot_cmd = ["dot", "-Tpng"] try: for thread in threads: func_stats = yappi_instance.get_func_stats(ctx_id=thread.id) yappi_instance.get_func_stats().save('profile.callgrind', type="callgrind") # For possible future use thread_filename_base = f"{logfile}.thread{thread.id}" pstat_filename = f"{thread_filename_base}.pstat" txt_filename = f"{thread_filename_base}.txt" dot_filename = f"{thread_filename_base}.dot" png_filename = f"{thread_filename_base}.png" # save pstat (can be opened with pstats or profiling tools) func_stats.save(pstat_filename, type="pstat") # convert pstat to readable text using pstats sio = io.StringIO() ps = pstats.Stats(pstat_filename, stream=sio) ps.sort_stats("tottime") ps.print_stats() text = sio.getvalue() combined_out.append(f"--- Thread {thread.id} ({thread.name}) ---\n{text}") # also write per-thread text file with open(txt_filename, "w", encoding='utf-8') as f: f.write(text) # generate call graph png using gprof2dot and dot _gprof2dot_cmd = gprof2dot_cmd + [pstat_filename, "-o", dot_filename] _dot_cmd = dot_cmd + [dot_filename, "-o", png_filename] try: gprof2dot_proc = subprocess.Popen(_gprof2dot_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) gprof2dot_stdout, gprof2dot_stderr = gprof2dot_proc.communicate() if gprof2dot_proc.returncode != 0: logger.error("Error generating gprof2dot for thread %r: %r", thread.id, gprof2dot_stderr.decode('utf-8')) continue except Exception as exc: logger.error("Exception generating gprof2dot for thread %r: %r", thread.id, exc) continue try: dot_proc = subprocess.Popen(_dot_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) gprof2dot_proc.stdout.close() # Allow gprof2dot_proc to receive a SIGPIPE if dot_proc exits. out, err = dot_proc.communicate() if dot_proc.returncode != 0: logger.error("Error generating call graph PNG for thread %r: %r", thread.id, err.decode('utf-8')) except Exception as exc: logger.error("Exception generating call graph PNG for thread %r: %r", thread.id, exc) # write combined text file with open('.'.join((logfile, "txt")), "w", encoding='utf-8') as f: f.write("\n\n".join(combined_out)) except Exception as exc: return True return False