diff options
| -rw-r--r-- | .main.py.swp | bin | 16384 -> 0 bytes | |||
| -rw-r--r-- | PID.py | 244 | ||||
| -rw-r--r-- | Responder.db | 0 | ||||
| -rw-r--r-- | main.py | 457 | ||||
| -rw-r--r-- | shroom_daemon (renamed from shroom_daemon.sh) | 16 | 
5 files changed, 568 insertions, 149 deletions
| diff --git a/.main.py.swp b/.main.py.swpBinary files differ deleted file mode 100644 index fcf7ed4..0000000 --- a/.main.py.swp +++ /dev/null @@ -0,0 +1,244 @@ +import time +import warnings + + +def _clamp(value, limits): +    lower, upper = limits +    if value is None: +        return None +    elif (upper is not None) and (value > upper): +        return upper +    elif (lower is not None) and (value < lower): +        return lower +    return value + + +try: +    # Get monotonic time to ensure that time deltas are always positive +    _current_time = time.monotonic +except AttributeError: +    # time.monotonic() not available (using python < 3.3), fallback to time.time() +    _current_time = time.time +    warnings.warn('time.monotonic() not available in python < 3.3, using time.time() as fallback') + + +class PID(object): +    """A simple PID controller.""" + +    def __init__( +        self, +        Kp=1.0, +        Ki=0.0, +        Kd=0.0, +        setpoint=0, +        sample_time=0.01, +        output_limits=(None, None), +        auto_mode=True, +        proportional_on_measurement=False, +        error_map=None, +    ): +        """ +        Initialize a new PID controller. + +        :param Kp: The value for the proportional gain Kp +        :param Ki: The value for the integral gain Ki +        :param Kd: The value for the derivative gain Kd +        :param setpoint: The initial setpoint that the PID will try to achieve +        :param sample_time: The time in seconds which the controller should wait before generating +            a new output value. The PID works best when it is constantly called (eg. during a +            loop), but with a sample time set so that the time difference between each update is +            (close to) constant. If set to None, the PID will compute a new output value every time +            it is called. +        :param output_limits: The initial output limits to use, given as an iterable with 2 +            elements, for example: (lower, upper). The output will never go below the lower limit +            or above the upper limit. Either of the limits can also be set to None to have no limit +            in that direction. Setting output limits also avoids integral windup, since the +            integral term will never be allowed to grow outside of the limits. +        :param auto_mode: Whether the controller should be enabled (auto mode) or not (manual mode) +        :param proportional_on_measurement: Whether the proportional term should be calculated on +            the input directly rather than on the error (which is the traditional way). Using +            proportional-on-measurement avoids overshoot for some types of systems. +        :param error_map: Function to transform the error value in another constrained value. +        """ +        self.Kp, self.Ki, self.Kd = Kp, Ki, Kd +        self.setpoint = setpoint +        self.sample_time = sample_time + +        self._min_output, self._max_output = None, None +        self._auto_mode = auto_mode +        self.proportional_on_measurement = proportional_on_measurement +        self.error_map = error_map + +        self._proportional = 0 +        self._integral = 0 +        self._derivative = 0 + +        self._last_time = None +        self._last_output = None +        self._last_input = None + +        self.output_limits = output_limits +        self.reset() + +    def __call__(self, input_, dt=None): +        """ +        Update the PID controller. + +        Call the PID controller with *input_* and calculate and return a control output if +        sample_time seconds has passed since the last update. If no new output is calculated, +        return the previous output instead (or None if no value has been calculated yet). + +        :param dt: If set, uses this value for timestep instead of real time. This can be used in +            simulations when simulation time is different from real time. +        """ +        if not self.auto_mode: +            return self._last_output + +        now = _current_time() +        if dt is None: +            dt = now - self._last_time if (now - self._last_time) else 1e-16 +        elif dt <= 0: +            raise ValueError('dt has negative value {}, must be positive'.format(dt)) + +        if self.sample_time is not None and dt < self.sample_time and self._last_output is not None: +            # Only update every sample_time seconds +            return self._last_output + +        # Compute error terms +        error = self.setpoint - input_ +        d_input = input_ - (self._last_input if (self._last_input is not None) else input_) + +        # Check if must map the error +        if self.error_map is not None: +            error = self.error_map(error) + +        # Compute the proportional term +        if not self.proportional_on_measurement: +            # Regular proportional-on-error, simply set the proportional term +            self._proportional = self.Kp * error +        else: +            # Add the proportional error on measurement to error_sum +            self._proportional -= self.Kp * d_input + +        # Compute integral and derivative terms +        self._integral += self.Ki * error * dt +        self._integral = _clamp(self._integral, self.output_limits)  # Avoid integral windup + +        self._derivative = -self.Kd * d_input / dt + +        # Compute final output +        output = self._proportional + self._integral + self._derivative +        output = _clamp(output, self.output_limits) + +        # Keep track of state +        self._last_output = output +        self._last_input = input_ +        self._last_time = now + +        return output + +    def __repr__(self): +        return ( +            '{self.__class__.__name__}(' +            'Kp={self.Kp!r}, Ki={self.Ki!r}, Kd={self.Kd!r}, ' +            'setpoint={self.setpoint!r}, sample_time={self.sample_time!r}, ' +            'output_limits={self.output_limits!r}, auto_mode={self.auto_mode!r}, ' +            'proportional_on_measurement={self.proportional_on_measurement!r},' +            'error_map={self.error_map!r}' +            ')' +        ).format(self=self) + +    @property +    def components(self): +        """ +        The P-, I- and D-terms from the last computation as separate components as a tuple. Useful +        for visualizing what the controller is doing or when tuning hard-to-tune systems. +        """ +        return self._proportional, self._integral, self._derivative + +    @property +    def tunings(self): +        """The tunings used by the controller as a tuple: (Kp, Ki, Kd).""" +        return self.Kp, self.Ki, self.Kd + +    @tunings.setter +    def tunings(self, tunings): +        """Set the PID tunings.""" +        self.Kp, self.Ki, self.Kd = tunings + +    @property +    def auto_mode(self): +        """Whether the controller is currently enabled (in auto mode) or not.""" +        return self._auto_mode + +    @auto_mode.setter +    def auto_mode(self, enabled): +        """Enable or disable the PID controller.""" +        self.set_auto_mode(enabled) + +    def set_auto_mode(self, enabled, last_output=None): +        """ +        Enable or disable the PID controller, optionally setting the last output value. + +        This is useful if some system has been manually controlled and if the PID should take over. +        In that case, disable the PID by setting auto mode to False and later when the PID should +        be turned back on, pass the last output variable (the control variable) and it will be set +        as the starting I-term when the PID is set to auto mode. + +        :param enabled: Whether auto mode should be enabled, True or False +        :param last_output: The last output, or the control variable, that the PID should start +            from when going from manual mode to auto mode. Has no effect if the PID is already in +            auto mode. +        """ +        if enabled and not self._auto_mode: +            # Switching from manual mode to auto, reset +            self.reset() + +            self._integral = last_output if (last_output is not None) else 0 +            self._integral = _clamp(self._integral, self.output_limits) + +        self._auto_mode = enabled + +    @property +    def output_limits(self): +        """ +        The current output limits as a 2-tuple: (lower, upper). + +        See also the *output_limits* parameter in :meth:`PID.__init__`. +        """ +        return self._min_output, self._max_output + +    @output_limits.setter +    def output_limits(self, limits): +        """Set the output limits.""" +        if limits is None: +            self._min_output, self._max_output = None, None +            return + +        min_output, max_output = limits + +        if (None not in limits) and (max_output < min_output): +            raise ValueError('lower limit must be less than upper limit') + +        self._min_output = min_output +        self._max_output = max_output + +        self._integral = _clamp(self._integral, self.output_limits) +        self._last_output = _clamp(self._last_output, self.output_limits) + +    def reset(self): +        """ +        Reset the PID controller internals. + +        This sets each term to 0 as well as clearing the integral, the last output and the last +        input (derivative calculation). +        """ +        self._proportional = 0 +        self._integral = 0 +        self._derivative = 0 + +        self._integral = _clamp(self._integral, self.output_limits) + +        self._last_time = _current_time() +        self._last_output = None +        self._last_input = None diff --git a/Responder.db b/Responder.db deleted file mode 100644 index e69de29..0000000 --- a/Responder.db +++ /dev/null @@ -1,171 +1,342 @@ -#!/usr/bin/env python3 +#!/usr/bin/env python  # -*- coding: utf-8 -*-  """ -Module managing temperature in the shroombox. +Module controling temperature with PID setting a MOSFET.  """ -from os import getpid -import pigpio -from math import isnan -import logging -import time -from simple_pid import PID -  __author__ = "Franek Ćazarewicz-Muradyan"  __licence__ = "GPL"  __version__ = "0.0.1"  __status__ = "Proof of concept" -# ********************** -# Hardware Settings -# ********************** - -# One wire temperature sensor settings -w1_sensor_id = '28-0517c1b121ff' -w1_sensor_f = f'/sys/devices/w1_bus_master1/{w1_sensor_id}/w1_slave' -# Heater settings -relay_pin = 5 -mosfet_pin = 22 - -# ********************** -# Software Settings -# ********************** - -timeformat = '%Y-%M-%d %H:%M:%S' -temperature_min = 20 -temperature_max = 24 -temperature_target = 20 -read_retry = 3 -logname = '/var/log/pipi/shroombox.log' -data_file = '/var/log/pipi/shroombox.csv' - -# ********************** -# PID Settings -# ********************** -Kp = 1 -Ki = 0.2 -Kd = 0 -sample_time = 10 - -# ********************** -# Code -# ********************** - - -logging.basicConfig( -    filename=logname, -    encoding='utf-8', -    filemode='a', -    format='%(asctime)s,%(msecs)d %(name)s%(levelname)s %(message)s', datefmt='%H:%M:%S', -    level=logging.DEBUG, -) +############################################# +#                   Flow                    # +############################################# + +# start logging, run sanity checks -> +# turn on relay -> +# enter a loop of given interval -> +# check temperature -> +# try to write temperature log -> +# calculate MOSFET value with PID > +# set calculated value on MOSFET + + +import os +import sys +import time +import signal +import logging +import io + +from pydantic import BaseSettings +try: +    from PID import PID +    import pigpio +except ImportError as exception: +    print(f'Import error: {exception}') +    sys.exit() + + +############################################# +#                   Settings                # +############################################# + +# User: +USERNAME = os.getlogin() + +# Files: + +LOGNAME = f'/var/log/{USERNAME}/shroombox_pid.log' +DATA_FOLDER = f'/var/log/{USERNAME}' +DATA_FILE = f'{DATA_FOLDER}/shroombox_pid.csv' +CONFIG_FILE = ".env" + +# Best effort to detect 1-wire sensor: errors out if multiple detected +W1_SENSOR_ID = '28-0517c1b121ff' + +if not W1_SENSOR_ID: +    for pathname, dirname, filenames in os.walk('/sys/devices/w1_bus_master1/'): +        if len(filenames) >= 2: +            print('Multiple 1-wire sensors connected, set rom by hand. Aborting.') +            sys.exit() +        elif not filenames: +            print('No 1-wire sensors detected, aborting.') +            sys.exit() +        else: +            W1_SENSOR_ID = filenames[0] +W1_SENSOR_FILE = f'/sys/devices/w1_bus_master1/{W1_SENSOR_ID}/w1_slave' +RPI_MODEL_FILE = '/sys/firmware/devicetree/base/model' + +# Other settings: + +TIMEFORMAT = '%Y-%M-%d %H:%M:%S' + + +############################################# +#               Sanity checks               # +############################################# + + +def is_raspberrypi(): +    try: +        with io.open(RPI_MODEL_FILE, 'r', encoding='UTF-8') as model: +            if 'raspberry pi' in model.read().lower(): +                return True +    except Exception: +        pass +    return False + + +if not is_raspberrypi(): +    print("This is not a raspberry pi, exiting") +    sys.exit() + +if not os.path.exists(DATA_FOLDER): +    try: +        os.makedirs(DATA_FOLDER) +    except OSError as exception: +        print(f"Couldn't create data folder: {exception}") +        sys.exit() +try: +    logging.basicConfig( +        filename=LOGNAME, +        encoding='utf-8', +        filemode='a', +        format='%(asctime)s,%(msecs)d %(name)s%(levelname)s %(message)s', +        datefmt='%H:%M:%S', +        level=logging.INFO, +    ) +except PermissionError as exception: +    print(f"Couldn't open logging directory: {exception}") +    sys.exit()  logging.info("Starting shroombox")  logger = logging.getLogger() -def read_temp( +############################################# +#                   Code                    # +############################################# + +def write_default_config(filename): +    """ +    Write default config file. +    """ +    _settings = { +        'Kp': 1, +        'Ki': 0, +        'Kd': 0, +        'target_temperature': 24, +        'sample_time': 10, +        'relay_pin': 5, +        'mosfet_pin': 22, +        'read_frequency_sec': 15, +        'read_retry': 3, +    } +    with open(filename, 'w', encoding='UTF-8') as conf: +        for key, val in _settings: +            conf.write(f"{key}={val}") + + +# If config file does not exist, create it with default values +if not os.path.exists(CONFIG_FILE): +    try: +        write_default_config(CONFIG_FILE) +        logger.info("Created .env config file with default values") +    except Exception as exc: +        print(f"Failed writing .env file: {exc}. Aborting.") +        sys.exit() + + +class ShroomboxSettings(BaseSettings): +    """ +    Shroombox settings loaded from .env file. +    """ +    kp: float = 0 +    ki: float = 0 +    kd: float = 0 +    target_temperature: float = 24 +    sample_time: int = 15 +    relay_pin: int +    mosfet_pin: int +    read_retry: int = 3 +    read_frequency_sec: int = 15 + +    class Config: +        env_file = '.env' +        case_sensitive = True + + +try: +    settings = ShroomboxSettings() +except Exception as exception: +    print(f"Failed to read settings: {exception}") +    sys.exit() + + +class ShroomboxManager: +    """ +    Shroombox class. +    """ + +    def __init__(self): +        self.current_temperature = float('NaN') +        self.gpio = pigpio.pi() +        # Is below a good idea? +        self.relay_switch(False) +        # gpio_status: on is 0, off is 1 +        self.gpio_status = self.gpio.read(settings.relay_pin) + +    def __del__(self): +        self.relay_switch(True) + +    def callback(self): +        """ +        Main callback function. +        :return: None +        """ +        while True: +            # Get current temperature +            temp = self.read() +            # Try to write to CSV file +            self.write(temp) +            # Get PID value for MOSFET +            val = self.temp_control(temp) +            # Set MOSFET to PID value +            self.mosfet_set(val) +            time.sleep(settings.read_fqcy) + +    def read( +        self,          retry: int = 0, -) -> float: -    # If sensor is physically disconected, opening file will fail -    temperature = float('NaN') -    if retry > read_retry: +    ) -> float: +        """ +        Function reading temperature data from one wire sensor. +        Returns float('NaN') on read failure. +        :param retry: int +        :return: float +        """ +        temperature = float('NaN') +        if retry > settings.read_retry: +            return temperature +        try: +            with open(W1_SENSOR_FILE, 'r', encoding='UTF-8') as sensor: +                _ok = sensor.readline()[-4:].strip('\n') +                if _ok == 'YES': +                    try: +                        temperature = round(int(sensor.readline().split('=')[1])/1000, 1) +                    except IndexError:  # TODO: What other exceptions? +                        return temperature +                else: +                    retry += 1 +                    self.read(retry=retry) +        except FileNotFoundError: +            message = 'Sensor file not found - is sensor physically disconnected?' +            logger.error(message)          return temperature -    with open(w1_sensor_f, 'r') as sensor: -        ok = sensor.readline()[-4:].strip('\n') -        if ok == 'YES': + +    @staticmethod +    def write( +            datum: float, +    ) -> bool: +        """ +        Method writing data point to csv file. +        CSV file header: timestamp,temperature +        :param datum: float +        :return: bool +        """ +        result = True +        if not os.path.exists(DATA_FILE):              try: -                temperature = round(int(sensor.readline().split('=')[1])/1000, 1) -            except IndexError:  # TODO: What other exceptions? -                return temperature -        else: -            retry += 1 -            read_temp(retry=retry) -    return temperature - - -def heater_on( -        temp: float, -        on: bool = False, -) -> bool: -    status = pio.read(relay_pin) -    timestamp = time.strftime(timeformat) -    # turning it off -    if not on: -        if status == 1: -            return False -        else: +                os.makedirs(DATA_FILE) +                with open(DATA_FILE, 'w', encoding='UTF-8') as _file: +                    header = "timestamp,temperature" +                    _file.write(header) +                    result = False +            except OSError as exc: +                logger.error("Couldn't create path %DATA_FILE %exc") +                result = True +        try: +            with open(DATA_FILE, 'a', encoding='UTF-8') as _file: +                timestamp = time.strftime(TIMEFORMAT) +                data_point = f'{timestamp},{datum}' +                _file.write(data_point) +                result = False +        except FileNotFoundError as exc: +            result = True +            logger.error('Data file not found: %exc') +        return result + +    def relay_switch( +            self, +            on: bool, +    ) -> None: +        """ +        Turn relay on or off. +        :param on: bool +        :return: +        """ +        if on: +            logger.info('Turning on relay %time_now()')              try: -                pio.write(relay_pin, 1) -                message = f"Turning off, temp: {temp}, {timestamp}" -                logger.info(message) -                return False -            except:  # TODO: documentation doesn't talk about exceptions on write -                return True -    # turning it on -    if on: -        if status == 0: -            return False +                self.gpio.write(settings.relay_pin, 0) +            except Exception as exc: +                logger.warning('Failed to turn on relay: %exc')          else: +            logger.info('Turning off relay %time_now()')              try: -                pio.write(relay_pin, 0) -                message = f"Turning on, temp: {temp}, {timestamp}" -                logger.info(message) -                return False -            except: -                return True +                self.gpio.write(settings.relay_pin, 1) +            except Exception as exc: +                logger.warning('Failed to turn off relay: %exc') +    @staticmethod +    def temp_control( +            current_temperature: float, +    ) -> int: +        """ +        Calculate value to pass to mosfet. +        :param current_temperature: float +        :return: int +        """ +        pid = PID( +            settings.kp, +            settings.ki, +            settings.kd, +            setpoint=settings.target_temperature, +        ) +        pid.output_limits = (0, 255) +        pid.sample_time = settings.sample_time +        return pid(current_temperature) -def temp_control( -        current_temperature: float -) -> int: -    pid = PID(Kp, Ki, Kd, setpoint=temperature_target) -    pid.output_limits = (0, 255) -    pid.sample_time = sample_time -    return pid(current_temperature) +    def mosfet_set( +            self, +            value, +    ) -> bool: +        """ +        Set mosfet value. +        :param value: int +        :return: bool +        """ +        result = True +        logger.debug(f'Changing pin {settings.mosfet_pin} to {value}') +        try: +            self.gpio.write(settings.mosfet_pin, value) +            result = False +        except Exception as exc: +            logger.warning('Failed to set mosfet value: %exc.') +            result = True +        return result -def main_alt(): -    temp = read_temp_alt() -    timestamp = time.strftime(timeformat) -    print(temp) -    with open(data_file, 'a') as dat: -        dat.write(temp) -    frequency = temp_control(temp) -    try: -        pio.set_PWM_frequency(mosfet_pin, frequency) -    except ( -        pio.PI_BAD_USER_GPIO, -        pio.PI_NOT_PERMITTED, -    ) as exc: -        logger.warning(f"Failed to set GPIO PWM frequency: {exc} {timestamp}") - - -def main(): -    temp = read_temp() -    timestamp = time.strftime(timeformat) -    print(temp) -    if temp <= temperature_min: -        # if temp < temperature_min - 5: -        #     print(f"Heater doesn't work, battery empty?") -        #     logger.warning(f"Heater doesn't work, battery empty?") -        #     state = heater_on(temp, on=False) -        state = heater_on(temp, on=True) -        if state != 0: -            logger.warning(f"heater_on() method returned {state}, {timestamp}") -    elif temp >= temperature_max: -        state = heater_on(temp, on=False) -        if state != 0: -            logger.warning(f"heater_on() method returned {state} {timestamp}") -    elif isnan(temp): -        state = heater_on(temp, on=False) -        logging.warning(f"Cannot read temperature, check your connections! {timestamp}") +def time_now(): +    """Set time to TIMEFORMAT.""" +    return time.strftime(TIMEFORMAT)  if __name__ == '__main__': -    with open('/run/shroombox.pid', 'w') as f: -        f.write(str(getpid())) +    controller = ShroomboxManager() +    signal.signal(signal.SIGHUP, settings.ShroomboxSettings)      while True: -        pio = pigpio.pi() -        main() -        time.sleep(30) +        controller.callback() +        time.sleep(settings.read_frequency_sec) diff --git a/shroom_daemon.sh b/shroom_daemon index adf6712..708589c 100644 --- a/shroom_daemon.sh +++ b/shroom_daemon @@ -2,14 +2,18 @@  # Copyright 2023 erg_samowzbudnik  # Distributed under the terms of the GNU General Public Licence v2 -GROUP=$(id -g) +# Set username you want process run with. Should be your regular user +USERNAME="pipi" +GROUP=$(id -g ${USERNAME})  supervisor="supervise-daemon"  command_args_foreground="--foreground"  # Could also get dir to store pid file from XDG_RUNTIME_DIR -pidfile="/run/user/${UID}/${RC_SVCNAME}.pid" +pidfile="/run/${RC_SVCNAME}.pid" +# pidfile="/run/user/${GROUP}/${RC_SVCNAME}.pid"  extra_started_commands="reload" -command_user="${USER}:${GROUP}" -command="python main.py" +command_user="${USERNAME}:${USERNAME}" +# command_user="${USER}:${GROUP}" +command="python /usr/local/sbin/pi_relay.py"  description="Daemon for shroombox"  depend() { @@ -19,7 +23,7 @@ depend() {  reload() {      ebegin "Reloading ${RC_SVCNAME}configuration" -    start-stop-daemon --exec $command --signal HUP -    ${supervisor} ${RC_SVCNAME} --signal HUP --pidfile "${pidfile}" +    start-stop-daemon --exec "$command" --signal HUP +    ${supervisor} "${RC_SVCNAME}" --signal HUP --pidfile "${pidfile}"      eend $?  } | 
