diff options
author | erg <uinarf@autistici.org> | 2023-02-28 20:09:47 +0100 |
---|---|---|
committer | erg <uinarf@autistici.org> | 2023-02-28 20:09:47 +0100 |
commit | 7e51b88785ecde452ffc29445a78492e5a11710e (patch) | |
tree | 0f7bc5ee77c428dafef44c05df6f93f37802e47f /main.py | |
parent | d6970e81e607f27bee9323d10d1476621de3416c (diff) | |
download | Pi_Temp_PID_Control-7e51b88785ecde452ffc29445a78492e5a11710e.tar.gz Pi_Temp_PID_Control-7e51b88785ecde452ffc29445a78492e5a11710e.tar.bz2 Pi_Temp_PID_Control-7e51b88785ecde452ffc29445a78492e5a11710e.zip |
Completely revorked
Diffstat (limited to 'main.py')
-rw-r--r-- | main.py | 457 |
1 files changed, 314 insertions, 143 deletions
@@ -1,171 +1,342 @@ -#!/usr/bin/env python3 +#!/usr/bin/env python # -*- coding: utf-8 -*- """ -Module managing temperature in the shroombox. +Module controling temperature with PID setting a MOSFET. """ -from os import getpid -import pigpio -from math import isnan -import logging -import time -from simple_pid import PID - __author__ = "Franek Ćazarewicz-Muradyan" __licence__ = "GPL" __version__ = "0.0.1" __status__ = "Proof of concept" -# ********************** -# Hardware Settings -# ********************** - -# One wire temperature sensor settings -w1_sensor_id = '28-0517c1b121ff' -w1_sensor_f = f'/sys/devices/w1_bus_master1/{w1_sensor_id}/w1_slave' -# Heater settings -relay_pin = 5 -mosfet_pin = 22 - -# ********************** -# Software Settings -# ********************** - -timeformat = '%Y-%M-%d %H:%M:%S' -temperature_min = 20 -temperature_max = 24 -temperature_target = 20 -read_retry = 3 -logname = '/var/log/pipi/shroombox.log' -data_file = '/var/log/pipi/shroombox.csv' - -# ********************** -# PID Settings -# ********************** -Kp = 1 -Ki = 0.2 -Kd = 0 -sample_time = 10 - -# ********************** -# Code -# ********************** - - -logging.basicConfig( - filename=logname, - encoding='utf-8', - filemode='a', - format='%(asctime)s,%(msecs)d %(name)s%(levelname)s %(message)s', datefmt='%H:%M:%S', - level=logging.DEBUG, -) +############################################# +# Flow # +############################################# + +# start logging, run sanity checks -> +# turn on relay -> +# enter a loop of given interval -> +# check temperature -> +# try to write temperature log -> +# calculate MOSFET value with PID > +# set calculated value on MOSFET + + +import os +import sys +import time +import signal +import logging +import io + +from pydantic import BaseSettings +try: + from PID import PID + import pigpio +except ImportError as exception: + print(f'Import error: {exception}') + sys.exit() + + +############################################# +# Settings # +############################################# + +# User: +USERNAME = os.getlogin() + +# Files: + +LOGNAME = f'/var/log/{USERNAME}/shroombox_pid.log' +DATA_FOLDER = f'/var/log/{USERNAME}' +DATA_FILE = f'{DATA_FOLDER}/shroombox_pid.csv' +CONFIG_FILE = ".env" + +# Best effort to detect 1-wire sensor: errors out if multiple detected +W1_SENSOR_ID = '28-0517c1b121ff' + +if not W1_SENSOR_ID: + for pathname, dirname, filenames in os.walk('/sys/devices/w1_bus_master1/'): + if len(filenames) >= 2: + print('Multiple 1-wire sensors connected, set rom by hand. Aborting.') + sys.exit() + elif not filenames: + print('No 1-wire sensors detected, aborting.') + sys.exit() + else: + W1_SENSOR_ID = filenames[0] +W1_SENSOR_FILE = f'/sys/devices/w1_bus_master1/{W1_SENSOR_ID}/w1_slave' +RPI_MODEL_FILE = '/sys/firmware/devicetree/base/model' + +# Other settings: + +TIMEFORMAT = '%Y-%M-%d %H:%M:%S' + + +############################################# +# Sanity checks # +############################################# + + +def is_raspberrypi(): + try: + with io.open(RPI_MODEL_FILE, 'r', encoding='UTF-8') as model: + if 'raspberry pi' in model.read().lower(): + return True + except Exception: + pass + return False + + +if not is_raspberrypi(): + print("This is not a raspberry pi, exiting") + sys.exit() + +if not os.path.exists(DATA_FOLDER): + try: + os.makedirs(DATA_FOLDER) + except OSError as exception: + print(f"Couldn't create data folder: {exception}") + sys.exit() +try: + logging.basicConfig( + filename=LOGNAME, + encoding='utf-8', + filemode='a', + format='%(asctime)s,%(msecs)d %(name)s%(levelname)s %(message)s', + datefmt='%H:%M:%S', + level=logging.INFO, + ) +except PermissionError as exception: + print(f"Couldn't open logging directory: {exception}") + sys.exit() logging.info("Starting shroombox") logger = logging.getLogger() -def read_temp( +############################################# +# Code # +############################################# + +def write_default_config(filename): + """ + Write default config file. + """ + _settings = { + 'Kp': 1, + 'Ki': 0, + 'Kd': 0, + 'target_temperature': 24, + 'sample_time': 10, + 'relay_pin': 5, + 'mosfet_pin': 22, + 'read_frequency_sec': 15, + 'read_retry': 3, + } + with open(filename, 'w', encoding='UTF-8') as conf: + for key, val in _settings: + conf.write(f"{key}={val}") + + +# If config file does not exist, create it with default values +if not os.path.exists(CONFIG_FILE): + try: + write_default_config(CONFIG_FILE) + logger.info("Created .env config file with default values") + except Exception as exc: + print(f"Failed writing .env file: {exc}. Aborting.") + sys.exit() + + +class ShroomboxSettings(BaseSettings): + """ + Shroombox settings loaded from .env file. + """ + kp: float = 0 + ki: float = 0 + kd: float = 0 + target_temperature: float = 24 + sample_time: int = 15 + relay_pin: int + mosfet_pin: int + read_retry: int = 3 + read_frequency_sec: int = 15 + + class Config: + env_file = '.env' + case_sensitive = True + + +try: + settings = ShroomboxSettings() +except Exception as exception: + print(f"Failed to read settings: {exception}") + sys.exit() + + +class ShroomboxManager: + """ + Shroombox class. + """ + + def __init__(self): + self.current_temperature = float('NaN') + self.gpio = pigpio.pi() + # Is below a good idea? + self.relay_switch(False) + # gpio_status: on is 0, off is 1 + self.gpio_status = self.gpio.read(settings.relay_pin) + + def __del__(self): + self.relay_switch(True) + + def callback(self): + """ + Main callback function. + :return: None + """ + while True: + # Get current temperature + temp = self.read() + # Try to write to CSV file + self.write(temp) + # Get PID value for MOSFET + val = self.temp_control(temp) + # Set MOSFET to PID value + self.mosfet_set(val) + time.sleep(settings.read_fqcy) + + def read( + self, retry: int = 0, -) -> float: - # If sensor is physically disconected, opening file will fail - temperature = float('NaN') - if retry > read_retry: + ) -> float: + """ + Function reading temperature data from one wire sensor. + Returns float('NaN') on read failure. + :param retry: int + :return: float + """ + temperature = float('NaN') + if retry > settings.read_retry: + return temperature + try: + with open(W1_SENSOR_FILE, 'r', encoding='UTF-8') as sensor: + _ok = sensor.readline()[-4:].strip('\n') + if _ok == 'YES': + try: + temperature = round(int(sensor.readline().split('=')[1])/1000, 1) + except IndexError: # TODO: What other exceptions? + return temperature + else: + retry += 1 + self.read(retry=retry) + except FileNotFoundError: + message = 'Sensor file not found - is sensor physically disconnected?' + logger.error(message) return temperature - with open(w1_sensor_f, 'r') as sensor: - ok = sensor.readline()[-4:].strip('\n') - if ok == 'YES': + + @staticmethod + def write( + datum: float, + ) -> bool: + """ + Method writing data point to csv file. + CSV file header: timestamp,temperature + :param datum: float + :return: bool + """ + result = True + if not os.path.exists(DATA_FILE): try: - temperature = round(int(sensor.readline().split('=')[1])/1000, 1) - except IndexError: # TODO: What other exceptions? - return temperature - else: - retry += 1 - read_temp(retry=retry) - return temperature - - -def heater_on( - temp: float, - on: bool = False, -) -> bool: - status = pio.read(relay_pin) - timestamp = time.strftime(timeformat) - # turning it off - if not on: - if status == 1: - return False - else: + os.makedirs(DATA_FILE) + with open(DATA_FILE, 'w', encoding='UTF-8') as _file: + header = "timestamp,temperature" + _file.write(header) + result = False + except OSError as exc: + logger.error("Couldn't create path %DATA_FILE %exc") + result = True + try: + with open(DATA_FILE, 'a', encoding='UTF-8') as _file: + timestamp = time.strftime(TIMEFORMAT) + data_point = f'{timestamp},{datum}' + _file.write(data_point) + result = False + except FileNotFoundError as exc: + result = True + logger.error('Data file not found: %exc') + return result + + def relay_switch( + self, + on: bool, + ) -> None: + """ + Turn relay on or off. + :param on: bool + :return: + """ + if on: + logger.info('Turning on relay %time_now()') try: - pio.write(relay_pin, 1) - message = f"Turning off, temp: {temp}, {timestamp}" - logger.info(message) - return False - except: # TODO: documentation doesn't talk about exceptions on write - return True - # turning it on - if on: - if status == 0: - return False + self.gpio.write(settings.relay_pin, 0) + except Exception as exc: + logger.warning('Failed to turn on relay: %exc') else: + logger.info('Turning off relay %time_now()') try: - pio.write(relay_pin, 0) - message = f"Turning on, temp: {temp}, {timestamp}" - logger.info(message) - return False - except: - return True + self.gpio.write(settings.relay_pin, 1) + except Exception as exc: + logger.warning('Failed to turn off relay: %exc') + @staticmethod + def temp_control( + current_temperature: float, + ) -> int: + """ + Calculate value to pass to mosfet. + :param current_temperature: float + :return: int + """ + pid = PID( + settings.kp, + settings.ki, + settings.kd, + setpoint=settings.target_temperature, + ) + pid.output_limits = (0, 255) + pid.sample_time = settings.sample_time + return pid(current_temperature) -def temp_control( - current_temperature: float -) -> int: - pid = PID(Kp, Ki, Kd, setpoint=temperature_target) - pid.output_limits = (0, 255) - pid.sample_time = sample_time - return pid(current_temperature) + def mosfet_set( + self, + value, + ) -> bool: + """ + Set mosfet value. + :param value: int + :return: bool + """ + result = True + logger.debug(f'Changing pin {settings.mosfet_pin} to {value}') + try: + self.gpio.write(settings.mosfet_pin, value) + result = False + except Exception as exc: + logger.warning('Failed to set mosfet value: %exc.') + result = True + return result -def main_alt(): - temp = read_temp_alt() - timestamp = time.strftime(timeformat) - print(temp) - with open(data_file, 'a') as dat: - dat.write(temp) - frequency = temp_control(temp) - try: - pio.set_PWM_frequency(mosfet_pin, frequency) - except ( - pio.PI_BAD_USER_GPIO, - pio.PI_NOT_PERMITTED, - ) as exc: - logger.warning(f"Failed to set GPIO PWM frequency: {exc} {timestamp}") - - -def main(): - temp = read_temp() - timestamp = time.strftime(timeformat) - print(temp) - if temp <= temperature_min: - # if temp < temperature_min - 5: - # print(f"Heater doesn't work, battery empty?") - # logger.warning(f"Heater doesn't work, battery empty?") - # state = heater_on(temp, on=False) - state = heater_on(temp, on=True) - if state != 0: - logger.warning(f"heater_on() method returned {state}, {timestamp}") - elif temp >= temperature_max: - state = heater_on(temp, on=False) - if state != 0: - logger.warning(f"heater_on() method returned {state} {timestamp}") - elif isnan(temp): - state = heater_on(temp, on=False) - logging.warning(f"Cannot read temperature, check your connections! {timestamp}") +def time_now(): + """Set time to TIMEFORMAT.""" + return time.strftime(TIMEFORMAT) if __name__ == '__main__': - with open('/run/shroombox.pid', 'w') as f: - f.write(str(getpid())) + controller = ShroomboxManager() + signal.signal(signal.SIGHUP, settings.ShroomboxSettings) while True: - pio = pigpio.pi() - main() - time.sleep(30) + controller.callback() + time.sleep(settings.read_frequency_sec) |