aboutsummaryrefslogtreecommitdiff
path: root/main.py
diff options
context:
space:
mode:
authorerg <uinarf@autistici.org>2023-02-28 20:09:47 +0100
committererg <uinarf@autistici.org>2023-02-28 20:09:47 +0100
commit7e51b88785ecde452ffc29445a78492e5a11710e (patch)
tree0f7bc5ee77c428dafef44c05df6f93f37802e47f /main.py
parentd6970e81e607f27bee9323d10d1476621de3416c (diff)
downloadPi_Temp_PID_Control-7e51b88785ecde452ffc29445a78492e5a11710e.tar.gz
Pi_Temp_PID_Control-7e51b88785ecde452ffc29445a78492e5a11710e.tar.bz2
Pi_Temp_PID_Control-7e51b88785ecde452ffc29445a78492e5a11710e.zip
Completely revorked
Diffstat (limited to 'main.py')
-rw-r--r--main.py457
1 files changed, 314 insertions, 143 deletions
diff --git a/main.py b/main.py
index aa80fce..8f22694 100644
--- a/main.py
+++ b/main.py
@@ -1,171 +1,342 @@
-#!/usr/bin/env python3
+#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
-Module managing temperature in the shroombox.
+Module controling temperature with PID setting a MOSFET.
"""
-from os import getpid
-import pigpio
-from math import isnan
-import logging
-import time
-from simple_pid import PID
-
__author__ = "Franek Ɓazarewicz-Muradyan"
__licence__ = "GPL"
__version__ = "0.0.1"
__status__ = "Proof of concept"
-# **********************
-# Hardware Settings
-# **********************
-
-# One wire temperature sensor settings
-w1_sensor_id = '28-0517c1b121ff'
-w1_sensor_f = f'/sys/devices/w1_bus_master1/{w1_sensor_id}/w1_slave'
-# Heater settings
-relay_pin = 5
-mosfet_pin = 22
-
-# **********************
-# Software Settings
-# **********************
-
-timeformat = '%Y-%M-%d %H:%M:%S'
-temperature_min = 20
-temperature_max = 24
-temperature_target = 20
-read_retry = 3
-logname = '/var/log/pipi/shroombox.log'
-data_file = '/var/log/pipi/shroombox.csv'
-
-# **********************
-# PID Settings
-# **********************
-Kp = 1
-Ki = 0.2
-Kd = 0
-sample_time = 10
-
-# **********************
-# Code
-# **********************
-
-
-logging.basicConfig(
- filename=logname,
- encoding='utf-8',
- filemode='a',
- format='%(asctime)s,%(msecs)d %(name)s%(levelname)s %(message)s', datefmt='%H:%M:%S',
- level=logging.DEBUG,
-)
+#############################################
+# Flow #
+#############################################
+
+# start logging, run sanity checks ->
+# turn on relay ->
+# enter a loop of given interval ->
+# check temperature ->
+# try to write temperature log ->
+# calculate MOSFET value with PID >
+# set calculated value on MOSFET
+
+
+import os
+import sys
+import time
+import signal
+import logging
+import io
+
+from pydantic import BaseSettings
+try:
+ from PID import PID
+ import pigpio
+except ImportError as exception:
+ print(f'Import error: {exception}')
+ sys.exit()
+
+
+#############################################
+# Settings #
+#############################################
+
+# User:
+USERNAME = os.getlogin()
+
+# Files:
+
+LOGNAME = f'/var/log/{USERNAME}/shroombox_pid.log'
+DATA_FOLDER = f'/var/log/{USERNAME}'
+DATA_FILE = f'{DATA_FOLDER}/shroombox_pid.csv'
+CONFIG_FILE = ".env"
+
+# Best effort to detect 1-wire sensor: errors out if multiple detected
+W1_SENSOR_ID = '28-0517c1b121ff'
+
+if not W1_SENSOR_ID:
+ for pathname, dirname, filenames in os.walk('/sys/devices/w1_bus_master1/'):
+ if len(filenames) >= 2:
+ print('Multiple 1-wire sensors connected, set rom by hand. Aborting.')
+ sys.exit()
+ elif not filenames:
+ print('No 1-wire sensors detected, aborting.')
+ sys.exit()
+ else:
+ W1_SENSOR_ID = filenames[0]
+W1_SENSOR_FILE = f'/sys/devices/w1_bus_master1/{W1_SENSOR_ID}/w1_slave'
+RPI_MODEL_FILE = '/sys/firmware/devicetree/base/model'
+
+# Other settings:
+
+TIMEFORMAT = '%Y-%M-%d %H:%M:%S'
+
+
+#############################################
+# Sanity checks #
+#############################################
+
+
+def is_raspberrypi():
+ try:
+ with io.open(RPI_MODEL_FILE, 'r', encoding='UTF-8') as model:
+ if 'raspberry pi' in model.read().lower():
+ return True
+ except Exception:
+ pass
+ return False
+
+
+if not is_raspberrypi():
+ print("This is not a raspberry pi, exiting")
+ sys.exit()
+
+if not os.path.exists(DATA_FOLDER):
+ try:
+ os.makedirs(DATA_FOLDER)
+ except OSError as exception:
+ print(f"Couldn't create data folder: {exception}")
+ sys.exit()
+try:
+ logging.basicConfig(
+ filename=LOGNAME,
+ encoding='utf-8',
+ filemode='a',
+ format='%(asctime)s,%(msecs)d %(name)s%(levelname)s %(message)s',
+ datefmt='%H:%M:%S',
+ level=logging.INFO,
+ )
+except PermissionError as exception:
+ print(f"Couldn't open logging directory: {exception}")
+ sys.exit()
logging.info("Starting shroombox")
logger = logging.getLogger()
-def read_temp(
+#############################################
+# Code #
+#############################################
+
+def write_default_config(filename):
+ """
+ Write default config file.
+ """
+ _settings = {
+ 'Kp': 1,
+ 'Ki': 0,
+ 'Kd': 0,
+ 'target_temperature': 24,
+ 'sample_time': 10,
+ 'relay_pin': 5,
+ 'mosfet_pin': 22,
+ 'read_frequency_sec': 15,
+ 'read_retry': 3,
+ }
+ with open(filename, 'w', encoding='UTF-8') as conf:
+ for key, val in _settings:
+ conf.write(f"{key}={val}")
+
+
+# If config file does not exist, create it with default values
+if not os.path.exists(CONFIG_FILE):
+ try:
+ write_default_config(CONFIG_FILE)
+ logger.info("Created .env config file with default values")
+ except Exception as exc:
+ print(f"Failed writing .env file: {exc}. Aborting.")
+ sys.exit()
+
+
+class ShroomboxSettings(BaseSettings):
+ """
+ Shroombox settings loaded from .env file.
+ """
+ kp: float = 0
+ ki: float = 0
+ kd: float = 0
+ target_temperature: float = 24
+ sample_time: int = 15
+ relay_pin: int
+ mosfet_pin: int
+ read_retry: int = 3
+ read_frequency_sec: int = 15
+
+ class Config:
+ env_file = '.env'
+ case_sensitive = True
+
+
+try:
+ settings = ShroomboxSettings()
+except Exception as exception:
+ print(f"Failed to read settings: {exception}")
+ sys.exit()
+
+
+class ShroomboxManager:
+ """
+ Shroombox class.
+ """
+
+ def __init__(self):
+ self.current_temperature = float('NaN')
+ self.gpio = pigpio.pi()
+ # Is below a good idea?
+ self.relay_switch(False)
+ # gpio_status: on is 0, off is 1
+ self.gpio_status = self.gpio.read(settings.relay_pin)
+
+ def __del__(self):
+ self.relay_switch(True)
+
+ def callback(self):
+ """
+ Main callback function.
+ :return: None
+ """
+ while True:
+ # Get current temperature
+ temp = self.read()
+ # Try to write to CSV file
+ self.write(temp)
+ # Get PID value for MOSFET
+ val = self.temp_control(temp)
+ # Set MOSFET to PID value
+ self.mosfet_set(val)
+ time.sleep(settings.read_fqcy)
+
+ def read(
+ self,
retry: int = 0,
-) -> float:
- # If sensor is physically disconected, opening file will fail
- temperature = float('NaN')
- if retry > read_retry:
+ ) -> float:
+ """
+ Function reading temperature data from one wire sensor.
+ Returns float('NaN') on read failure.
+ :param retry: int
+ :return: float
+ """
+ temperature = float('NaN')
+ if retry > settings.read_retry:
+ return temperature
+ try:
+ with open(W1_SENSOR_FILE, 'r', encoding='UTF-8') as sensor:
+ _ok = sensor.readline()[-4:].strip('\n')
+ if _ok == 'YES':
+ try:
+ temperature = round(int(sensor.readline().split('=')[1])/1000, 1)
+ except IndexError: # TODO: What other exceptions?
+ return temperature
+ else:
+ retry += 1
+ self.read(retry=retry)
+ except FileNotFoundError:
+ message = 'Sensor file not found - is sensor physically disconnected?'
+ logger.error(message)
return temperature
- with open(w1_sensor_f, 'r') as sensor:
- ok = sensor.readline()[-4:].strip('\n')
- if ok == 'YES':
+
+ @staticmethod
+ def write(
+ datum: float,
+ ) -> bool:
+ """
+ Method writing data point to csv file.
+ CSV file header: timestamp,temperature
+ :param datum: float
+ :return: bool
+ """
+ result = True
+ if not os.path.exists(DATA_FILE):
try:
- temperature = round(int(sensor.readline().split('=')[1])/1000, 1)
- except IndexError: # TODO: What other exceptions?
- return temperature
- else:
- retry += 1
- read_temp(retry=retry)
- return temperature
-
-
-def heater_on(
- temp: float,
- on: bool = False,
-) -> bool:
- status = pio.read(relay_pin)
- timestamp = time.strftime(timeformat)
- # turning it off
- if not on:
- if status == 1:
- return False
- else:
+ os.makedirs(DATA_FILE)
+ with open(DATA_FILE, 'w', encoding='UTF-8') as _file:
+ header = "timestamp,temperature"
+ _file.write(header)
+ result = False
+ except OSError as exc:
+ logger.error("Couldn't create path %DATA_FILE %exc")
+ result = True
+ try:
+ with open(DATA_FILE, 'a', encoding='UTF-8') as _file:
+ timestamp = time.strftime(TIMEFORMAT)
+ data_point = f'{timestamp},{datum}'
+ _file.write(data_point)
+ result = False
+ except FileNotFoundError as exc:
+ result = True
+ logger.error('Data file not found: %exc')
+ return result
+
+ def relay_switch(
+ self,
+ on: bool,
+ ) -> None:
+ """
+ Turn relay on or off.
+ :param on: bool
+ :return:
+ """
+ if on:
+ logger.info('Turning on relay %time_now()')
try:
- pio.write(relay_pin, 1)
- message = f"Turning off, temp: {temp}, {timestamp}"
- logger.info(message)
- return False
- except: # TODO: documentation doesn't talk about exceptions on write
- return True
- # turning it on
- if on:
- if status == 0:
- return False
+ self.gpio.write(settings.relay_pin, 0)
+ except Exception as exc:
+ logger.warning('Failed to turn on relay: %exc')
else:
+ logger.info('Turning off relay %time_now()')
try:
- pio.write(relay_pin, 0)
- message = f"Turning on, temp: {temp}, {timestamp}"
- logger.info(message)
- return False
- except:
- return True
+ self.gpio.write(settings.relay_pin, 1)
+ except Exception as exc:
+ logger.warning('Failed to turn off relay: %exc')
+ @staticmethod
+ def temp_control(
+ current_temperature: float,
+ ) -> int:
+ """
+ Calculate value to pass to mosfet.
+ :param current_temperature: float
+ :return: int
+ """
+ pid = PID(
+ settings.kp,
+ settings.ki,
+ settings.kd,
+ setpoint=settings.target_temperature,
+ )
+ pid.output_limits = (0, 255)
+ pid.sample_time = settings.sample_time
+ return pid(current_temperature)
-def temp_control(
- current_temperature: float
-) -> int:
- pid = PID(Kp, Ki, Kd, setpoint=temperature_target)
- pid.output_limits = (0, 255)
- pid.sample_time = sample_time
- return pid(current_temperature)
+ def mosfet_set(
+ self,
+ value,
+ ) -> bool:
+ """
+ Set mosfet value.
+ :param value: int
+ :return: bool
+ """
+ result = True
+ logger.debug(f'Changing pin {settings.mosfet_pin} to {value}')
+ try:
+ self.gpio.write(settings.mosfet_pin, value)
+ result = False
+ except Exception as exc:
+ logger.warning('Failed to set mosfet value: %exc.')
+ result = True
+ return result
-def main_alt():
- temp = read_temp_alt()
- timestamp = time.strftime(timeformat)
- print(temp)
- with open(data_file, 'a') as dat:
- dat.write(temp)
- frequency = temp_control(temp)
- try:
- pio.set_PWM_frequency(mosfet_pin, frequency)
- except (
- pio.PI_BAD_USER_GPIO,
- pio.PI_NOT_PERMITTED,
- ) as exc:
- logger.warning(f"Failed to set GPIO PWM frequency: {exc} {timestamp}")
-
-
-def main():
- temp = read_temp()
- timestamp = time.strftime(timeformat)
- print(temp)
- if temp <= temperature_min:
- # if temp < temperature_min - 5:
- # print(f"Heater doesn't work, battery empty?")
- # logger.warning(f"Heater doesn't work, battery empty?")
- # state = heater_on(temp, on=False)
- state = heater_on(temp, on=True)
- if state != 0:
- logger.warning(f"heater_on() method returned {state}, {timestamp}")
- elif temp >= temperature_max:
- state = heater_on(temp, on=False)
- if state != 0:
- logger.warning(f"heater_on() method returned {state} {timestamp}")
- elif isnan(temp):
- state = heater_on(temp, on=False)
- logging.warning(f"Cannot read temperature, check your connections! {timestamp}")
+def time_now():
+ """Set time to TIMEFORMAT."""
+ return time.strftime(TIMEFORMAT)
if __name__ == '__main__':
- with open('/run/shroombox.pid', 'w') as f:
- f.write(str(getpid()))
+ controller = ShroomboxManager()
+ signal.signal(signal.SIGHUP, settings.ShroomboxSettings)
while True:
- pio = pigpio.pi()
- main()
- time.sleep(30)
+ controller.callback()
+ time.sleep(settings.read_frequency_sec)