aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorerg <uinarf@autistici.org>2023-02-28 20:09:47 +0100
committererg <uinarf@autistici.org>2023-02-28 20:09:47 +0100
commit7e51b88785ecde452ffc29445a78492e5a11710e (patch)
tree0f7bc5ee77c428dafef44c05df6f93f37802e47f
parentd6970e81e607f27bee9323d10d1476621de3416c (diff)
downloadPi_Temp_PID_Control-7e51b88785ecde452ffc29445a78492e5a11710e.tar.gz
Pi_Temp_PID_Control-7e51b88785ecde452ffc29445a78492e5a11710e.tar.bz2
Pi_Temp_PID_Control-7e51b88785ecde452ffc29445a78492e5a11710e.zip
Completely revorked
-rw-r--r--.main.py.swpbin16384 -> 0 bytes
-rw-r--r--PID.py244
-rw-r--r--Responder.db0
-rw-r--r--main.py457
-rw-r--r--shroom_daemon (renamed from shroom_daemon.sh)16
5 files changed, 568 insertions, 149 deletions
diff --git a/.main.py.swp b/.main.py.swp
deleted file mode 100644
index fcf7ed4..0000000
--- a/.main.py.swp
+++ /dev/null
Binary files differ
diff --git a/PID.py b/PID.py
new file mode 100644
index 0000000..db66c3f
--- /dev/null
+++ b/PID.py
@@ -0,0 +1,244 @@
+import time
+import warnings
+
+
+def _clamp(value, limits):
+ lower, upper = limits
+ if value is None:
+ return None
+ elif (upper is not None) and (value > upper):
+ return upper
+ elif (lower is not None) and (value < lower):
+ return lower
+ return value
+
+
+try:
+ # Get monotonic time to ensure that time deltas are always positive
+ _current_time = time.monotonic
+except AttributeError:
+ # time.monotonic() not available (using python < 3.3), fallback to time.time()
+ _current_time = time.time
+ warnings.warn('time.monotonic() not available in python < 3.3, using time.time() as fallback')
+
+
+class PID(object):
+ """A simple PID controller."""
+
+ def __init__(
+ self,
+ Kp=1.0,
+ Ki=0.0,
+ Kd=0.0,
+ setpoint=0,
+ sample_time=0.01,
+ output_limits=(None, None),
+ auto_mode=True,
+ proportional_on_measurement=False,
+ error_map=None,
+ ):
+ """
+ Initialize a new PID controller.
+
+ :param Kp: The value for the proportional gain Kp
+ :param Ki: The value for the integral gain Ki
+ :param Kd: The value for the derivative gain Kd
+ :param setpoint: The initial setpoint that the PID will try to achieve
+ :param sample_time: The time in seconds which the controller should wait before generating
+ a new output value. The PID works best when it is constantly called (eg. during a
+ loop), but with a sample time set so that the time difference between each update is
+ (close to) constant. If set to None, the PID will compute a new output value every time
+ it is called.
+ :param output_limits: The initial output limits to use, given as an iterable with 2
+ elements, for example: (lower, upper). The output will never go below the lower limit
+ or above the upper limit. Either of the limits can also be set to None to have no limit
+ in that direction. Setting output limits also avoids integral windup, since the
+ integral term will never be allowed to grow outside of the limits.
+ :param auto_mode: Whether the controller should be enabled (auto mode) or not (manual mode)
+ :param proportional_on_measurement: Whether the proportional term should be calculated on
+ the input directly rather than on the error (which is the traditional way). Using
+ proportional-on-measurement avoids overshoot for some types of systems.
+ :param error_map: Function to transform the error value in another constrained value.
+ """
+ self.Kp, self.Ki, self.Kd = Kp, Ki, Kd
+ self.setpoint = setpoint
+ self.sample_time = sample_time
+
+ self._min_output, self._max_output = None, None
+ self._auto_mode = auto_mode
+ self.proportional_on_measurement = proportional_on_measurement
+ self.error_map = error_map
+
+ self._proportional = 0
+ self._integral = 0
+ self._derivative = 0
+
+ self._last_time = None
+ self._last_output = None
+ self._last_input = None
+
+ self.output_limits = output_limits
+ self.reset()
+
+ def __call__(self, input_, dt=None):
+ """
+ Update the PID controller.
+
+ Call the PID controller with *input_* and calculate and return a control output if
+ sample_time seconds has passed since the last update. If no new output is calculated,
+ return the previous output instead (or None if no value has been calculated yet).
+
+ :param dt: If set, uses this value for timestep instead of real time. This can be used in
+ simulations when simulation time is different from real time.
+ """
+ if not self.auto_mode:
+ return self._last_output
+
+ now = _current_time()
+ if dt is None:
+ dt = now - self._last_time if (now - self._last_time) else 1e-16
+ elif dt <= 0:
+ raise ValueError('dt has negative value {}, must be positive'.format(dt))
+
+ if self.sample_time is not None and dt < self.sample_time and self._last_output is not None:
+ # Only update every sample_time seconds
+ return self._last_output
+
+ # Compute error terms
+ error = self.setpoint - input_
+ d_input = input_ - (self._last_input if (self._last_input is not None) else input_)
+
+ # Check if must map the error
+ if self.error_map is not None:
+ error = self.error_map(error)
+
+ # Compute the proportional term
+ if not self.proportional_on_measurement:
+ # Regular proportional-on-error, simply set the proportional term
+ self._proportional = self.Kp * error
+ else:
+ # Add the proportional error on measurement to error_sum
+ self._proportional -= self.Kp * d_input
+
+ # Compute integral and derivative terms
+ self._integral += self.Ki * error * dt
+ self._integral = _clamp(self._integral, self.output_limits) # Avoid integral windup
+
+ self._derivative = -self.Kd * d_input / dt
+
+ # Compute final output
+ output = self._proportional + self._integral + self._derivative
+ output = _clamp(output, self.output_limits)
+
+ # Keep track of state
+ self._last_output = output
+ self._last_input = input_
+ self._last_time = now
+
+ return output
+
+ def __repr__(self):
+ return (
+ '{self.__class__.__name__}('
+ 'Kp={self.Kp!r}, Ki={self.Ki!r}, Kd={self.Kd!r}, '
+ 'setpoint={self.setpoint!r}, sample_time={self.sample_time!r}, '
+ 'output_limits={self.output_limits!r}, auto_mode={self.auto_mode!r}, '
+ 'proportional_on_measurement={self.proportional_on_measurement!r},'
+ 'error_map={self.error_map!r}'
+ ')'
+ ).format(self=self)
+
+ @property
+ def components(self):
+ """
+ The P-, I- and D-terms from the last computation as separate components as a tuple. Useful
+ for visualizing what the controller is doing or when tuning hard-to-tune systems.
+ """
+ return self._proportional, self._integral, self._derivative
+
+ @property
+ def tunings(self):
+ """The tunings used by the controller as a tuple: (Kp, Ki, Kd)."""
+ return self.Kp, self.Ki, self.Kd
+
+ @tunings.setter
+ def tunings(self, tunings):
+ """Set the PID tunings."""
+ self.Kp, self.Ki, self.Kd = tunings
+
+ @property
+ def auto_mode(self):
+ """Whether the controller is currently enabled (in auto mode) or not."""
+ return self._auto_mode
+
+ @auto_mode.setter
+ def auto_mode(self, enabled):
+ """Enable or disable the PID controller."""
+ self.set_auto_mode(enabled)
+
+ def set_auto_mode(self, enabled, last_output=None):
+ """
+ Enable or disable the PID controller, optionally setting the last output value.
+
+ This is useful if some system has been manually controlled and if the PID should take over.
+ In that case, disable the PID by setting auto mode to False and later when the PID should
+ be turned back on, pass the last output variable (the control variable) and it will be set
+ as the starting I-term when the PID is set to auto mode.
+
+ :param enabled: Whether auto mode should be enabled, True or False
+ :param last_output: The last output, or the control variable, that the PID should start
+ from when going from manual mode to auto mode. Has no effect if the PID is already in
+ auto mode.
+ """
+ if enabled and not self._auto_mode:
+ # Switching from manual mode to auto, reset
+ self.reset()
+
+ self._integral = last_output if (last_output is not None) else 0
+ self._integral = _clamp(self._integral, self.output_limits)
+
+ self._auto_mode = enabled
+
+ @property
+ def output_limits(self):
+ """
+ The current output limits as a 2-tuple: (lower, upper).
+
+ See also the *output_limits* parameter in :meth:`PID.__init__`.
+ """
+ return self._min_output, self._max_output
+
+ @output_limits.setter
+ def output_limits(self, limits):
+ """Set the output limits."""
+ if limits is None:
+ self._min_output, self._max_output = None, None
+ return
+
+ min_output, max_output = limits
+
+ if (None not in limits) and (max_output < min_output):
+ raise ValueError('lower limit must be less than upper limit')
+
+ self._min_output = min_output
+ self._max_output = max_output
+
+ self._integral = _clamp(self._integral, self.output_limits)
+ self._last_output = _clamp(self._last_output, self.output_limits)
+
+ def reset(self):
+ """
+ Reset the PID controller internals.
+
+ This sets each term to 0 as well as clearing the integral, the last output and the last
+ input (derivative calculation).
+ """
+ self._proportional = 0
+ self._integral = 0
+ self._derivative = 0
+
+ self._integral = _clamp(self._integral, self.output_limits)
+
+ self._last_time = _current_time()
+ self._last_output = None
+ self._last_input = None
diff --git a/Responder.db b/Responder.db
deleted file mode 100644
index e69de29..0000000
--- a/Responder.db
+++ /dev/null
diff --git a/main.py b/main.py
index aa80fce..8f22694 100644
--- a/main.py
+++ b/main.py
@@ -1,171 +1,342 @@
-#!/usr/bin/env python3
+#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
-Module managing temperature in the shroombox.
+Module controling temperature with PID setting a MOSFET.
"""
-from os import getpid
-import pigpio
-from math import isnan
-import logging
-import time
-from simple_pid import PID
-
__author__ = "Franek Ɓazarewicz-Muradyan"
__licence__ = "GPL"
__version__ = "0.0.1"
__status__ = "Proof of concept"
-# **********************
-# Hardware Settings
-# **********************
-
-# One wire temperature sensor settings
-w1_sensor_id = '28-0517c1b121ff'
-w1_sensor_f = f'/sys/devices/w1_bus_master1/{w1_sensor_id}/w1_slave'
-# Heater settings
-relay_pin = 5
-mosfet_pin = 22
-
-# **********************
-# Software Settings
-# **********************
-
-timeformat = '%Y-%M-%d %H:%M:%S'
-temperature_min = 20
-temperature_max = 24
-temperature_target = 20
-read_retry = 3
-logname = '/var/log/pipi/shroombox.log'
-data_file = '/var/log/pipi/shroombox.csv'
-
-# **********************
-# PID Settings
-# **********************
-Kp = 1
-Ki = 0.2
-Kd = 0
-sample_time = 10
-
-# **********************
-# Code
-# **********************
-
-
-logging.basicConfig(
- filename=logname,
- encoding='utf-8',
- filemode='a',
- format='%(asctime)s,%(msecs)d %(name)s%(levelname)s %(message)s', datefmt='%H:%M:%S',
- level=logging.DEBUG,
-)
+#############################################
+# Flow #
+#############################################
+
+# start logging, run sanity checks ->
+# turn on relay ->
+# enter a loop of given interval ->
+# check temperature ->
+# try to write temperature log ->
+# calculate MOSFET value with PID >
+# set calculated value on MOSFET
+
+
+import os
+import sys
+import time
+import signal
+import logging
+import io
+
+from pydantic import BaseSettings
+try:
+ from PID import PID
+ import pigpio
+except ImportError as exception:
+ print(f'Import error: {exception}')
+ sys.exit()
+
+
+#############################################
+# Settings #
+#############################################
+
+# User:
+USERNAME = os.getlogin()
+
+# Files:
+
+LOGNAME = f'/var/log/{USERNAME}/shroombox_pid.log'
+DATA_FOLDER = f'/var/log/{USERNAME}'
+DATA_FILE = f'{DATA_FOLDER}/shroombox_pid.csv'
+CONFIG_FILE = ".env"
+
+# Best effort to detect 1-wire sensor: errors out if multiple detected
+W1_SENSOR_ID = '28-0517c1b121ff'
+
+if not W1_SENSOR_ID:
+ for pathname, dirname, filenames in os.walk('/sys/devices/w1_bus_master1/'):
+ if len(filenames) >= 2:
+ print('Multiple 1-wire sensors connected, set rom by hand. Aborting.')
+ sys.exit()
+ elif not filenames:
+ print('No 1-wire sensors detected, aborting.')
+ sys.exit()
+ else:
+ W1_SENSOR_ID = filenames[0]
+W1_SENSOR_FILE = f'/sys/devices/w1_bus_master1/{W1_SENSOR_ID}/w1_slave'
+RPI_MODEL_FILE = '/sys/firmware/devicetree/base/model'
+
+# Other settings:
+
+TIMEFORMAT = '%Y-%M-%d %H:%M:%S'
+
+
+#############################################
+# Sanity checks #
+#############################################
+
+
+def is_raspberrypi():
+ try:
+ with io.open(RPI_MODEL_FILE, 'r', encoding='UTF-8') as model:
+ if 'raspberry pi' in model.read().lower():
+ return True
+ except Exception:
+ pass
+ return False
+
+
+if not is_raspberrypi():
+ print("This is not a raspberry pi, exiting")
+ sys.exit()
+
+if not os.path.exists(DATA_FOLDER):
+ try:
+ os.makedirs(DATA_FOLDER)
+ except OSError as exception:
+ print(f"Couldn't create data folder: {exception}")
+ sys.exit()
+try:
+ logging.basicConfig(
+ filename=LOGNAME,
+ encoding='utf-8',
+ filemode='a',
+ format='%(asctime)s,%(msecs)d %(name)s%(levelname)s %(message)s',
+ datefmt='%H:%M:%S',
+ level=logging.INFO,
+ )
+except PermissionError as exception:
+ print(f"Couldn't open logging directory: {exception}")
+ sys.exit()
logging.info("Starting shroombox")
logger = logging.getLogger()
-def read_temp(
+#############################################
+# Code #
+#############################################
+
+def write_default_config(filename):
+ """
+ Write default config file.
+ """
+ _settings = {
+ 'Kp': 1,
+ 'Ki': 0,
+ 'Kd': 0,
+ 'target_temperature': 24,
+ 'sample_time': 10,
+ 'relay_pin': 5,
+ 'mosfet_pin': 22,
+ 'read_frequency_sec': 15,
+ 'read_retry': 3,
+ }
+ with open(filename, 'w', encoding='UTF-8') as conf:
+ for key, val in _settings:
+ conf.write(f"{key}={val}")
+
+
+# If config file does not exist, create it with default values
+if not os.path.exists(CONFIG_FILE):
+ try:
+ write_default_config(CONFIG_FILE)
+ logger.info("Created .env config file with default values")
+ except Exception as exc:
+ print(f"Failed writing .env file: {exc}. Aborting.")
+ sys.exit()
+
+
+class ShroomboxSettings(BaseSettings):
+ """
+ Shroombox settings loaded from .env file.
+ """
+ kp: float = 0
+ ki: float = 0
+ kd: float = 0
+ target_temperature: float = 24
+ sample_time: int = 15
+ relay_pin: int
+ mosfet_pin: int
+ read_retry: int = 3
+ read_frequency_sec: int = 15
+
+ class Config:
+ env_file = '.env'
+ case_sensitive = True
+
+
+try:
+ settings = ShroomboxSettings()
+except Exception as exception:
+ print(f"Failed to read settings: {exception}")
+ sys.exit()
+
+
+class ShroomboxManager:
+ """
+ Shroombox class.
+ """
+
+ def __init__(self):
+ self.current_temperature = float('NaN')
+ self.gpio = pigpio.pi()
+ # Is below a good idea?
+ self.relay_switch(False)
+ # gpio_status: on is 0, off is 1
+ self.gpio_status = self.gpio.read(settings.relay_pin)
+
+ def __del__(self):
+ self.relay_switch(True)
+
+ def callback(self):
+ """
+ Main callback function.
+ :return: None
+ """
+ while True:
+ # Get current temperature
+ temp = self.read()
+ # Try to write to CSV file
+ self.write(temp)
+ # Get PID value for MOSFET
+ val = self.temp_control(temp)
+ # Set MOSFET to PID value
+ self.mosfet_set(val)
+ time.sleep(settings.read_fqcy)
+
+ def read(
+ self,
retry: int = 0,
-) -> float:
- # If sensor is physically disconected, opening file will fail
- temperature = float('NaN')
- if retry > read_retry:
+ ) -> float:
+ """
+ Function reading temperature data from one wire sensor.
+ Returns float('NaN') on read failure.
+ :param retry: int
+ :return: float
+ """
+ temperature = float('NaN')
+ if retry > settings.read_retry:
+ return temperature
+ try:
+ with open(W1_SENSOR_FILE, 'r', encoding='UTF-8') as sensor:
+ _ok = sensor.readline()[-4:].strip('\n')
+ if _ok == 'YES':
+ try:
+ temperature = round(int(sensor.readline().split('=')[1])/1000, 1)
+ except IndexError: # TODO: What other exceptions?
+ return temperature
+ else:
+ retry += 1
+ self.read(retry=retry)
+ except FileNotFoundError:
+ message = 'Sensor file not found - is sensor physically disconnected?'
+ logger.error(message)
return temperature
- with open(w1_sensor_f, 'r') as sensor:
- ok = sensor.readline()[-4:].strip('\n')
- if ok == 'YES':
+
+ @staticmethod
+ def write(
+ datum: float,
+ ) -> bool:
+ """
+ Method writing data point to csv file.
+ CSV file header: timestamp,temperature
+ :param datum: float
+ :return: bool
+ """
+ result = True
+ if not os.path.exists(DATA_FILE):
try:
- temperature = round(int(sensor.readline().split('=')[1])/1000, 1)
- except IndexError: # TODO: What other exceptions?
- return temperature
- else:
- retry += 1
- read_temp(retry=retry)
- return temperature
-
-
-def heater_on(
- temp: float,
- on: bool = False,
-) -> bool:
- status = pio.read(relay_pin)
- timestamp = time.strftime(timeformat)
- # turning it off
- if not on:
- if status == 1:
- return False
- else:
+ os.makedirs(DATA_FILE)
+ with open(DATA_FILE, 'w', encoding='UTF-8') as _file:
+ header = "timestamp,temperature"
+ _file.write(header)
+ result = False
+ except OSError as exc:
+ logger.error("Couldn't create path %DATA_FILE %exc")
+ result = True
+ try:
+ with open(DATA_FILE, 'a', encoding='UTF-8') as _file:
+ timestamp = time.strftime(TIMEFORMAT)
+ data_point = f'{timestamp},{datum}'
+ _file.write(data_point)
+ result = False
+ except FileNotFoundError as exc:
+ result = True
+ logger.error('Data file not found: %exc')
+ return result
+
+ def relay_switch(
+ self,
+ on: bool,
+ ) -> None:
+ """
+ Turn relay on or off.
+ :param on: bool
+ :return:
+ """
+ if on:
+ logger.info('Turning on relay %time_now()')
try:
- pio.write(relay_pin, 1)
- message = f"Turning off, temp: {temp}, {timestamp}"
- logger.info(message)
- return False
- except: # TODO: documentation doesn't talk about exceptions on write
- return True
- # turning it on
- if on:
- if status == 0:
- return False
+ self.gpio.write(settings.relay_pin, 0)
+ except Exception as exc:
+ logger.warning('Failed to turn on relay: %exc')
else:
+ logger.info('Turning off relay %time_now()')
try:
- pio.write(relay_pin, 0)
- message = f"Turning on, temp: {temp}, {timestamp}"
- logger.info(message)
- return False
- except:
- return True
+ self.gpio.write(settings.relay_pin, 1)
+ except Exception as exc:
+ logger.warning('Failed to turn off relay: %exc')
+ @staticmethod
+ def temp_control(
+ current_temperature: float,
+ ) -> int:
+ """
+ Calculate value to pass to mosfet.
+ :param current_temperature: float
+ :return: int
+ """
+ pid = PID(
+ settings.kp,
+ settings.ki,
+ settings.kd,
+ setpoint=settings.target_temperature,
+ )
+ pid.output_limits = (0, 255)
+ pid.sample_time = settings.sample_time
+ return pid(current_temperature)
-def temp_control(
- current_temperature: float
-) -> int:
- pid = PID(Kp, Ki, Kd, setpoint=temperature_target)
- pid.output_limits = (0, 255)
- pid.sample_time = sample_time
- return pid(current_temperature)
+ def mosfet_set(
+ self,
+ value,
+ ) -> bool:
+ """
+ Set mosfet value.
+ :param value: int
+ :return: bool
+ """
+ result = True
+ logger.debug(f'Changing pin {settings.mosfet_pin} to {value}')
+ try:
+ self.gpio.write(settings.mosfet_pin, value)
+ result = False
+ except Exception as exc:
+ logger.warning('Failed to set mosfet value: %exc.')
+ result = True
+ return result
-def main_alt():
- temp = read_temp_alt()
- timestamp = time.strftime(timeformat)
- print(temp)
- with open(data_file, 'a') as dat:
- dat.write(temp)
- frequency = temp_control(temp)
- try:
- pio.set_PWM_frequency(mosfet_pin, frequency)
- except (
- pio.PI_BAD_USER_GPIO,
- pio.PI_NOT_PERMITTED,
- ) as exc:
- logger.warning(f"Failed to set GPIO PWM frequency: {exc} {timestamp}")
-
-
-def main():
- temp = read_temp()
- timestamp = time.strftime(timeformat)
- print(temp)
- if temp <= temperature_min:
- # if temp < temperature_min - 5:
- # print(f"Heater doesn't work, battery empty?")
- # logger.warning(f"Heater doesn't work, battery empty?")
- # state = heater_on(temp, on=False)
- state = heater_on(temp, on=True)
- if state != 0:
- logger.warning(f"heater_on() method returned {state}, {timestamp}")
- elif temp >= temperature_max:
- state = heater_on(temp, on=False)
- if state != 0:
- logger.warning(f"heater_on() method returned {state} {timestamp}")
- elif isnan(temp):
- state = heater_on(temp, on=False)
- logging.warning(f"Cannot read temperature, check your connections! {timestamp}")
+def time_now():
+ """Set time to TIMEFORMAT."""
+ return time.strftime(TIMEFORMAT)
if __name__ == '__main__':
- with open('/run/shroombox.pid', 'w') as f:
- f.write(str(getpid()))
+ controller = ShroomboxManager()
+ signal.signal(signal.SIGHUP, settings.ShroomboxSettings)
while True:
- pio = pigpio.pi()
- main()
- time.sleep(30)
+ controller.callback()
+ time.sleep(settings.read_frequency_sec)
diff --git a/shroom_daemon.sh b/shroom_daemon
index adf6712..708589c 100644
--- a/shroom_daemon.sh
+++ b/shroom_daemon
@@ -2,14 +2,18 @@
# Copyright 2023 erg_samowzbudnik
# Distributed under the terms of the GNU General Public Licence v2
-GROUP=$(id -g)
+# Set username you want process run with. Should be your regular user
+USERNAME="pipi"
+GROUP=$(id -g ${USERNAME})
supervisor="supervise-daemon"
command_args_foreground="--foreground"
# Could also get dir to store pid file from XDG_RUNTIME_DIR
-pidfile="/run/user/${UID}/${RC_SVCNAME}.pid"
+pidfile="/run/${RC_SVCNAME}.pid"
+# pidfile="/run/user/${GROUP}/${RC_SVCNAME}.pid"
extra_started_commands="reload"
-command_user="${USER}:${GROUP}"
-command="python main.py"
+command_user="${USERNAME}:${USERNAME}"
+# command_user="${USER}:${GROUP}"
+command="python /usr/local/sbin/pi_relay.py"
description="Daemon for shroombox"
depend() {
@@ -19,7 +23,7 @@ depend() {
reload() {
ebegin "Reloading ${RC_SVCNAME}configuration"
- start-stop-daemon --exec $command --signal HUP
- ${supervisor} ${RC_SVCNAME} --signal HUP --pidfile "${pidfile}"
+ start-stop-daemon --exec "$command" --signal HUP
+ ${supervisor} "${RC_SVCNAME}" --signal HUP --pidfile "${pidfile}"
eend $?
}