diff options
Diffstat (limited to 'helpers.py')
| -rw-r--r-- | helpers.py | 32 |
1 files changed, 20 insertions, 12 deletions
@@ -12,9 +12,9 @@ import io import pstats import logging import subprocess -import cv2 import struct import datetime +import cv2 # Known directives for camera configuration: DIRECTIVES = ( @@ -25,9 +25,10 @@ DIRECTIVES = ( 'remove_camera', ) MAX_CAMERA_NAME_LENGTH = 256 -TIME_FORMAT_STRING = '%Y-%m-%d %H:%M:%S.%f' +TIME_FORMAT_STRING = '%Y-%m-%d_%H-%M-%S' LOGDIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'logs') + class CustomLoggingFormatter(logging.Formatter): """Custom logging formatter""" debug_fmt = 'DEBUG: %(filename)s:%(lineno)d %(asctime)s %(message)s' @@ -60,6 +61,7 @@ class CustomLoggingFormatter(logging.Formatter): return result + def process_frame(frame, detect_width=320): """Process frame for contour detection.""" try: @@ -84,10 +86,12 @@ def timestamp_to_bytes(timestamp): """Convert timestamp to bytes.""" return struct.pack('d', timestamp) + def bytes_to_timestamp(byte_data): """Convert bytes to timestamp.""" return struct.unpack('d', byte_data)[0] + def compute_contours(sample_frames): """Compute contours between two frames""" all_contours = [] @@ -95,12 +99,12 @@ def compute_contours(sample_frames): frame_delta = cv2.absdiff(frame_0, frame_1) threshold = cv2.threshold(frame_delta, 25, 255, cv2.THRESH_BINARY)[1] threshold = cv2.dilate(threshold, None, iterations=2) - # contours, _ = cv2.findContours(threshold.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) contours, _ = cv2.findContours(threshold, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) all_contours.extend(contours) return all_contours + def scale_contours(contours, scaling_factor): """Scale contours by the given scaling factor.""" if scaling_factor == 1.0: @@ -113,6 +117,7 @@ def scale_contours(contours, scaling_factor): return scaled_contours + def draw_contours(frame, contours, min_contour_area=500): """Draw contours on the frame.""" for contour in contours: @@ -122,6 +127,7 @@ def draw_contours(frame, contours, min_contour_area=500): return frame + def detect_movement(contours, min_area=500): """Detect movement based on contours found from frame diff.""" for contour in contours: @@ -129,6 +135,7 @@ def detect_movement(contours, min_area=500): return True return False + def get_available_cameras(): """ Get list of available camera devices. @@ -145,27 +152,28 @@ def get_available_cameras(): cap.release() return verified_devices + def bytes_to_str(obj): """Recursively convert bytes to strings in dicts and lists.""" if isinstance(obj, bytes): return obj.decode('utf-8') - elif isinstance(obj, dict): + if isinstance(obj, dict): return {bytes_to_str(k): bytes_to_str(v) for k, v in obj.items()} - elif isinstance(obj, list): + if isinstance(obj, list): return [bytes_to_str(item) for item in obj] - else: - return obj + return obj + def str_to_bytes(obj): """Recursively convert strings to bytes in dicts and lists.""" if isinstance(obj, str): return obj.encode('utf-8') - elif isinstance(obj, dict): + if isinstance(obj, dict): return {str_to_bytes(k): str_to_bytes(v) for k, v in obj.items()} - elif isinstance(obj, list): + if isinstance(obj, list): return [str_to_bytes(item) for item in obj] - else: - return obj + return obj + def write_yappi_stats(yappi_instance, logdir=LOGDIR) -> bool: """ @@ -253,4 +261,4 @@ def write_yappi_stats(yappi_instance, logdir=LOGDIR) -> bool: except Exception as exc: return True - return False
\ No newline at end of file + return False |
