diff options
Diffstat (limited to 'helpers.py')
| -rw-r--r-- | helpers.py | 256 |
1 files changed, 256 insertions, 0 deletions
diff --git a/helpers.py b/helpers.py new file mode 100644 index 0000000..a40b060 --- /dev/null +++ b/helpers.py @@ -0,0 +1,256 @@ +#!/usr/bin/env python + +""" +Helper functions for zmq video messaging. +""" + +__author__ = "Franoosh Corporation" + + +import os +import io +import pstats +import logging +import subprocess +import cv2 +import struct +import datetime + +# Known directives for camera configuration: +DIRECTIVES = ( + 'modify_camera_name', + 'modify_camera_threshold', + 'modify_camera_grace_pd', + 'add_camera', + 'remove_camera', +) +MAX_CAMERA_NAME_LENGTH = 256 +TIME_FORMAT_STRING = '%Y-%m-%d %H:%M:%S.%f' +LOGDIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs') + +class CustomLoggingFormatter(logging.Formatter): + """Custom logging formatter""" + debug_fmt = 'DEBUG: %(filename)s:%(lineno)d %(asctime)s %(message)s' + info_fmt = 'INFO: %(asctime)s %(message)s' + warning_fmt = 'WARNING: %(asctime)s %(message)s' + error_fmt = 'ERROR: line: %(lineno)d, %(asctime)s %(message)s' + critical_fmt = 'CRITICAL: line: %(lineno)d, %(asctime)s %(message)s' + + def __init__(self): + super().__init__( + fmt="%(levelno)d: %s(asctime)s %(message)s", + datefmt=None, + ) + + def format(self, record): + orig_fmt = self._style._fmt + if record.levelno == logging.DEBUG: + self._style._fmt = CustomLoggingFormatter.debug_fmt + elif record.levelno == logging.INFO: + self._style._fmt = CustomLoggingFormatter.info_fmt + elif record.levelno == logging.WARNING: + self._style._fmt = CustomLoggingFormatter.warning_fmt + elif record.levelno == logging.ERROR: + self._style._fmt = CustomLoggingFormatter.error_fmt + elif record.levelno == logging.CRITICAL: + self._style._fmt = CustomLoggingFormatter.critical_fmt + + result = logging.Formatter.format(self, record) + self._style._fmt = orig_fmt + + return result + +def process_frame(frame, detect_width=320): + """Process frame for contour detection.""" + try: + height, width = frame.shape[:2] + if width > detect_width: + scaling_factor = detect_width / float(width) + small_frame = cv2.resize(frame, (detect_width, int(height * scaling_factor))) + else: + scaling_factor = 1.0 + small_frame = frame + # Convert to grayscale: + gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY) + # Apply Gaussian blur: + blurred = cv2.GaussianBlur(gray, (21, 21), 0) + except Exception as exc: + raise RuntimeError(f"Error processing frame: {exc}") + + return blurred, scaling_factor + + +def timestamp_to_bytes(timestamp): + """Convert timestamp to bytes.""" + return struct.pack('d', timestamp) + +def bytes_to_timestamp(byte_data): + """Convert bytes to timestamp.""" + return struct.unpack('d', byte_data)[0] + +def compute_contours(sample_frames): + """Compute contours between two frames""" + all_contours = [] + frame_0, frame_1 = sample_frames + frame_delta = cv2.absdiff(frame_0, frame_1) + threshold = cv2.threshold(frame_delta, 25, 255, cv2.THRESH_BINARY)[1] + threshold = cv2.dilate(threshold, None, iterations=2) + # contours, _ = cv2.findContours(threshold.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + contours, _ = cv2.findContours(threshold, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + all_contours.extend(contours) + + return all_contours + +def scale_contours(contours, scaling_factor): + """Scale contours by the given scaling factor.""" + if scaling_factor == 1.0: + return contours + + scaled_contours = [] + for contour in contours: + scaled_contour = (contour * (1.0 / scaling_factor)).astype(int) + scaled_contours.append(scaled_contour) + + return scaled_contours + +def draw_contours(frame, contours, min_contour_area=500): + """Draw contours on the frame.""" + for contour in contours: + if cv2.contourArea(contour) > min_contour_area: + (x, y, w, h) = cv2.boundingRect(contour) + cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2) + + return frame + +def detect_movement(contours, min_area=500): + """Detect movement based on contours found from frame diff.""" + for contour in contours: + if cv2.contourArea(contour) >= min_area: + return True + return False + +def get_available_cameras(): + """ + Get list of available camera devices. + At the moment it does not work. At all. It is useless. + """ + proc = subprocess.Popen(['v4l2-ctl', '--list-devices'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = proc.communicate() + candidate_devices = [i.strip() for i in stdout.decode('utf-8').strip().splitlines()[1:]] + verified_devices = [] + for device in candidate_devices: + cap = cv2.VideoCapture(device) + if cap.isOpened(): + verified_devices.append(device) + cap.release() + return verified_devices + +def bytes_to_str(obj): + """Recursively convert bytes to strings in dicts and lists.""" + if isinstance(obj, bytes): + return obj.decode('utf-8') + elif isinstance(obj, dict): + return {bytes_to_str(k): bytes_to_str(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [bytes_to_str(item) for item in obj] + else: + return obj + +def str_to_bytes(obj): + """Recursively convert strings to bytes in dicts and lists.""" + if isinstance(obj, str): + return obj.encode('utf-8') + elif isinstance(obj, dict): + return {str_to_bytes(k): str_to_bytes(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [str_to_bytes(item) for item in obj] + else: + return obj + +def write_yappi_stats(yappi_instance, logdir=LOGDIR) -> bool: + """ + Function writing yaapi stats to .pstat files and + converting stats to readable .txt files + + Parameters + ---------- + yappi_instance : yappi + Yappi instance + + Returns + ------- + bool + True on error, False on success + """ + logger = logging.getLogger(__name__) + threads = yappi_instance.get_thread_stats() + # combined text output: + combined_out = [] + + time_now = datetime.datetime.strftime(datetime.datetime.now(), TIME_FORMAT_STRING) + subdir = f"yaapi_{time_now}" + yaapi_dir = os.path.join(logdir, 'yaapi', subdir) + try: + os.makedirs(yaapi_dir) + except Exception as exc: + print("Couldn't create directory for yaapi stats: %r", exc) + return True + + logfile = os.path.join(yaapi_dir, "yaapi.log") + main_yaapi_png = os.path.join(yaapi_dir, "yaapi.png") + gprof2dot_cmd = ["gprof2dot", "-f", "pstats"] + dot_cmd = ["dot", "-Tpng"] + try: + for thread in threads: + func_stats = yappi_instance.get_func_stats(ctx_id=thread.id) + yappi_instance.get_func_stats().save('profile.callgrind', type="callgrind") # For possible future use + thread_filename_base = f"{logfile}.thread{thread.id}" + pstat_filename = f"{thread_filename_base}.pstat" + txt_filename = f"{thread_filename_base}.txt" + dot_filename = f"{thread_filename_base}.dot" + png_filename = f"{thread_filename_base}.png" + + # save pstat (can be opened with pstats or profiling tools) + func_stats.save(pstat_filename, type="pstat") + + # convert pstat to readable text using pstats + sio = io.StringIO() + ps = pstats.Stats(pstat_filename, stream=sio) + ps.sort_stats("tottime") + ps.print_stats() + + text = sio.getvalue() + combined_out.append(f"--- Thread {thread.id} ({thread.name}) ---\n{text}") + + # also write per-thread text file + with open(txt_filename, "w", encoding='utf-8') as f: + f.write(text) + + # generate call graph png using gprof2dot and dot + _gprof2dot_cmd = gprof2dot_cmd + [pstat_filename, "-o", dot_filename] + _dot_cmd = dot_cmd + [dot_filename, "-o", png_filename] + try: + gprof2dot_proc = subprocess.Popen(_gprof2dot_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + gprof2dot_stdout, gprof2dot_stderr = gprof2dot_proc.communicate() + if gprof2dot_proc.returncode != 0: + logger.error("Error generating gprof2dot for thread %r: %r", thread.id, gprof2dot_stderr.decode('utf-8')) + continue + except Exception as exc: + logger.error("Exception generating gprof2dot for thread %r: %r", thread.id, exc) + continue + try: + dot_proc = subprocess.Popen(_dot_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + gprof2dot_proc.stdout.close() # Allow gprof2dot_proc to receive a SIGPIPE if dot_proc exits. + out, err = dot_proc.communicate() + if dot_proc.returncode != 0: + logger.error("Error generating call graph PNG for thread %r: %r", thread.id, err.decode('utf-8')) + except Exception as exc: + logger.error("Exception generating call graph PNG for thread %r: %r", thread.id, exc) + + # write combined text file + with open('.'.join((logfile, "txt")), "w", encoding='utf-8') as f: + f.write("\n\n".join(combined_out)) + except Exception as exc: + return True + + return False
\ No newline at end of file |
