diff options
Diffstat (limited to 'main.py')
| -rw-r--r-- | main.py | 342 | 
1 files changed, 0 insertions, 342 deletions
| diff --git a/main.py b/main.py deleted file mode 100644 index 8f22694..0000000 --- a/main.py +++ /dev/null @@ -1,342 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" -Module controling temperature with PID setting a MOSFET. -""" - -__author__ = "Franek Ćazarewicz-Muradyan" -__licence__ = "GPL" -__version__ = "0.0.1" -__status__ = "Proof of concept" - -############################################# -#                   Flow                    # -############################################# - -# start logging, run sanity checks -> -# turn on relay -> -# enter a loop of given interval -> -# check temperature -> -# try to write temperature log -> -# calculate MOSFET value with PID > -# set calculated value on MOSFET - - -import os -import sys -import time -import signal -import logging -import io - -from pydantic import BaseSettings -try: -    from PID import PID -    import pigpio -except ImportError as exception: -    print(f'Import error: {exception}') -    sys.exit() - - -############################################# -#                   Settings                # -############################################# - -# User: -USERNAME = os.getlogin() - -# Files: - -LOGNAME = f'/var/log/{USERNAME}/shroombox_pid.log' -DATA_FOLDER = f'/var/log/{USERNAME}' -DATA_FILE = f'{DATA_FOLDER}/shroombox_pid.csv' -CONFIG_FILE = ".env" - -# Best effort to detect 1-wire sensor: errors out if multiple detected -W1_SENSOR_ID = '28-0517c1b121ff' - -if not W1_SENSOR_ID: -    for pathname, dirname, filenames in os.walk('/sys/devices/w1_bus_master1/'): -        if len(filenames) >= 2: -            print('Multiple 1-wire sensors connected, set rom by hand. Aborting.') -            sys.exit() -        elif not filenames: -            print('No 1-wire sensors detected, aborting.') -            sys.exit() -        else: -            W1_SENSOR_ID = filenames[0] -W1_SENSOR_FILE = f'/sys/devices/w1_bus_master1/{W1_SENSOR_ID}/w1_slave' -RPI_MODEL_FILE = '/sys/firmware/devicetree/base/model' - -# Other settings: - -TIMEFORMAT = '%Y-%M-%d %H:%M:%S' - - -############################################# -#               Sanity checks               # -############################################# - - -def is_raspberrypi(): -    try: -        with io.open(RPI_MODEL_FILE, 'r', encoding='UTF-8') as model: -            if 'raspberry pi' in model.read().lower(): -                return True -    except Exception: -        pass -    return False - - -if not is_raspberrypi(): -    print("This is not a raspberry pi, exiting") -    sys.exit() - -if not os.path.exists(DATA_FOLDER): -    try: -        os.makedirs(DATA_FOLDER) -    except OSError as exception: -        print(f"Couldn't create data folder: {exception}") -        sys.exit() -try: -    logging.basicConfig( -        filename=LOGNAME, -        encoding='utf-8', -        filemode='a', -        format='%(asctime)s,%(msecs)d %(name)s%(levelname)s %(message)s', -        datefmt='%H:%M:%S', -        level=logging.INFO, -    ) -except PermissionError as exception: -    print(f"Couldn't open logging directory: {exception}") -    sys.exit() -logging.info("Starting shroombox") -logger = logging.getLogger() - - -############################################# -#                   Code                    # -############################################# - -def write_default_config(filename): -    """ -    Write default config file. -    """ -    _settings = { -        'Kp': 1, -        'Ki': 0, -        'Kd': 0, -        'target_temperature': 24, -        'sample_time': 10, -        'relay_pin': 5, -        'mosfet_pin': 22, -        'read_frequency_sec': 15, -        'read_retry': 3, -    } -    with open(filename, 'w', encoding='UTF-8') as conf: -        for key, val in _settings: -            conf.write(f"{key}={val}") - - -# If config file does not exist, create it with default values -if not os.path.exists(CONFIG_FILE): -    try: -        write_default_config(CONFIG_FILE) -        logger.info("Created .env config file with default values") -    except Exception as exc: -        print(f"Failed writing .env file: {exc}. Aborting.") -        sys.exit() - - -class ShroomboxSettings(BaseSettings): -    """ -    Shroombox settings loaded from .env file. -    """ -    kp: float = 0 -    ki: float = 0 -    kd: float = 0 -    target_temperature: float = 24 -    sample_time: int = 15 -    relay_pin: int -    mosfet_pin: int -    read_retry: int = 3 -    read_frequency_sec: int = 15 - -    class Config: -        env_file = '.env' -        case_sensitive = True - - -try: -    settings = ShroomboxSettings() -except Exception as exception: -    print(f"Failed to read settings: {exception}") -    sys.exit() - - -class ShroomboxManager: -    """ -    Shroombox class. -    """ - -    def __init__(self): -        self.current_temperature = float('NaN') -        self.gpio = pigpio.pi() -        # Is below a good idea? -        self.relay_switch(False) -        # gpio_status: on is 0, off is 1 -        self.gpio_status = self.gpio.read(settings.relay_pin) - -    def __del__(self): -        self.relay_switch(True) - -    def callback(self): -        """ -        Main callback function. -        :return: None -        """ -        while True: -            # Get current temperature -            temp = self.read() -            # Try to write to CSV file -            self.write(temp) -            # Get PID value for MOSFET -            val = self.temp_control(temp) -            # Set MOSFET to PID value -            self.mosfet_set(val) -            time.sleep(settings.read_fqcy) - -    def read( -        self, -        retry: int = 0, -    ) -> float: -        """ -        Function reading temperature data from one wire sensor. -        Returns float('NaN') on read failure. -        :param retry: int -        :return: float -        """ -        temperature = float('NaN') -        if retry > settings.read_retry: -            return temperature -        try: -            with open(W1_SENSOR_FILE, 'r', encoding='UTF-8') as sensor: -                _ok = sensor.readline()[-4:].strip('\n') -                if _ok == 'YES': -                    try: -                        temperature = round(int(sensor.readline().split('=')[1])/1000, 1) -                    except IndexError:  # TODO: What other exceptions? -                        return temperature -                else: -                    retry += 1 -                    self.read(retry=retry) -        except FileNotFoundError: -            message = 'Sensor file not found - is sensor physically disconnected?' -            logger.error(message) -        return temperature - -    @staticmethod -    def write( -            datum: float, -    ) -> bool: -        """ -        Method writing data point to csv file. -        CSV file header: timestamp,temperature -        :param datum: float -        :return: bool -        """ -        result = True -        if not os.path.exists(DATA_FILE): -            try: -                os.makedirs(DATA_FILE) -                with open(DATA_FILE, 'w', encoding='UTF-8') as _file: -                    header = "timestamp,temperature" -                    _file.write(header) -                    result = False -            except OSError as exc: -                logger.error("Couldn't create path %DATA_FILE %exc") -                result = True -        try: -            with open(DATA_FILE, 'a', encoding='UTF-8') as _file: -                timestamp = time.strftime(TIMEFORMAT) -                data_point = f'{timestamp},{datum}' -                _file.write(data_point) -                result = False -        except FileNotFoundError as exc: -            result = True -            logger.error('Data file not found: %exc') -        return result - -    def relay_switch( -            self, -            on: bool, -    ) -> None: -        """ -        Turn relay on or off. -        :param on: bool -        :return: -        """ -        if on: -            logger.info('Turning on relay %time_now()') -            try: -                self.gpio.write(settings.relay_pin, 0) -            except Exception as exc: -                logger.warning('Failed to turn on relay: %exc') -        else: -            logger.info('Turning off relay %time_now()') -            try: -                self.gpio.write(settings.relay_pin, 1) -            except Exception as exc: -                logger.warning('Failed to turn off relay: %exc') - -    @staticmethod -    def temp_control( -            current_temperature: float, -    ) -> int: -        """ -        Calculate value to pass to mosfet. -        :param current_temperature: float -        :return: int -        """ -        pid = PID( -            settings.kp, -            settings.ki, -            settings.kd, -            setpoint=settings.target_temperature, -        ) -        pid.output_limits = (0, 255) -        pid.sample_time = settings.sample_time -        return pid(current_temperature) - -    def mosfet_set( -            self, -            value, -    ) -> bool: -        """ -        Set mosfet value. -        :param value: int -        :return: bool -        """ -        result = True -        logger.debug(f'Changing pin {settings.mosfet_pin} to {value}') -        try: -            self.gpio.write(settings.mosfet_pin, value) -            result = False -        except Exception as exc: -            logger.warning('Failed to set mosfet value: %exc.') -            result = True -        return result - - -def time_now(): -    """Set time to TIMEFORMAT.""" -    return time.strftime(TIMEFORMAT) - - -if __name__ == '__main__': -    controller = ShroomboxManager() -    signal.signal(signal.SIGHUP, settings.ShroomboxSettings) -    while True: -        controller.callback() -        time.sleep(settings.read_frequency_sec) | 
