#!/usr/bin/env python # -*- coding: utf-8 -*- """ Module controling temperature with PID setting a MOSFET. ############################################# # Flow # ############################################# start logging, run sanity checks -> turn on relay -> enter a loop of given interval -> check temperature -> try to write temperature log -> calculate MOSFET value with PID > set calculated value on MOSFET """ __author__ = "Franek Ɓazarewicz-Muradyan" __licence__ = "GPL" __version__ = "0.0.1" __status__ = "Proof of concept" import os import sys import time import signal import logging import io import getpass from csv import writer from pydantic import BaseSettings, BaseModel, Field try: from PID import PID import pigpio except ImportError as exception: print(f'Import error: {exception}') sys.exit() ############################################# # Settings # ############################################# # User: USERNAME = getpass.getuser() # Files: LOGNAME = f'/var/log/{USERNAME}/shroombox_pid.log' DATA_FOLDER = f'/var/log/{USERNAME}' DATA_FILE = f'{DATA_FOLDER}/shroombox_pid.csv' CONFIG_FILE = ".env" # Best effort to detect 1-wire sensor: errors out if multiple detected W1_SENSOR_ID = '28-0517c1b121ff' if not W1_SENSOR_ID: for pathname, dirname, filenames in os.walk('/sys/devices/w1_bus_master1/'): if len(filenames) >= 2: print('Multiple 1-wire sensors connected, set rom by hand. Aborting.') sys.exit() elif not filenames: print('No 1-wire sensors detected, aborting.') sys.exit() else: W1_SENSOR_ID = filenames[0] W1_SENSOR_FILE = f'/sys/devices/w1_bus_master1/{W1_SENSOR_ID}/w1_slave' RPI_MODEL_FILE = '/sys/firmware/devicetree/base/model' # Other settings: TIMEFORMAT = '%Y-%M-%d %H:%M:%S' ############################################# # Sanity checks # ############################################# def is_raspberrypi(): try: with io.open(RPI_MODEL_FILE, 'r', encoding='UTF-8') as model: if 'raspberry pi' in model.read().lower(): return True except Exception: pass return False if not is_raspberrypi(): print("This is not a raspberry pi, exiting") sys.exit() if not os.path.exists(DATA_FOLDER): try: os.makedirs(DATA_FOLDER) except OSError as exception: print(f"Couldn't create data folder: {exception}") sys.exit() try: logging.basicConfig( filename=LOGNAME, encoding='utf-8', filemode='a', format='%(asctime)s,%(msecs)d %(name)s%(levelname)s %(message)s', datefmt='%H:%M:%S', level=logging.INFO, ) except PermissionError as exception: print(f"Couldn't open logging directory: {exception}") sys.exit() logging.info("Starting shroombox") logger = logging.getLogger() ############################################# # Code # ############################################# def write_default_config(filename): """ Write default config file. """ _settings = { 'Kp': 1, 'Ki': 0, 'Kd': 0, 'target_temperature': 24, 'sample_time': 10, 'relay_pin': 5, 'mosfet_pin': 22, 'read_frequency_sec': 15, 'read_retry': 3, } with open(filename, 'w', encoding='UTF-8') as conf: for key, val in _settings: conf.write(f"{key}={val}") # If config file does not exist, create it with default values if not os.path.exists(CONFIG_FILE): try: write_default_config(CONFIG_FILE) logger.info("Created .env config file with default values") except Exception as exc: print(f"Failed writing .env file: {exc}. Aborting.") sys.exit() class PidValue(BaseModel): value: int = Field(..., ge=1, le=255) class ShroomboxSettings(BaseSettings): """ Shroombox settings loaded from .env file. """ kp: float = 0 ki: float = 0 kd: float = 0 target_temperature: float = 24 sample_time: int = 15 relay_pin: int mosfet_pin: int read_retry: int = 3 read_frequency_sec: int = 15 class Config: env_file = '.env' case_sensitive = True def read_settings(): global settings try: settings = ShroomboxSettings() except Exception as exception: print(f"Failed to read settings: {exception}") sys.exit() return settings settings = read_settings() class ShroomboxManager: """ Shroombox class. """ def __init__(self): self.current_temperature = float('NaN') self.gpio = pigpio.pi() # Is below a good idea? self.relay_switch(on=True) # gpio_status: on is 0, off is 1 self.gpio_status = self.gpio.read(settings.relay_pin) def __del__(self): self.relay_switch(on=False) # FIXME: this doesn't work as class instance is already gone with gpio instance def callback(self): """ Main callback function. :return: None """ while True: # Get current temperature temp = self.read() # Get PID value for MOSFET val = self.temp_control(temp) # Try to write to CSV file self.write(temp, val) # Set MOSFET to PID value self.mosfet_set(val) time.sleep(settings.read_frequency_sec) def read( self, retry: int = 0, ) -> float: """ Function reading temperature data from one wire sensor. Returns float('NaN') on read failure. :param retry: int :return: float """ temperature = float('NaN') if retry > settings.read_retry: return temperature try: with open(W1_SENSOR_FILE, 'r', encoding='UTF-8') as sensor: _ok = sensor.readline()[-4:].strip('\n') if _ok == 'YES': try: temperature = round(int(sensor.readline().split('=')[1])/1000, 1) except IndexError: # TODO: What other exceptions? return temperature else: retry += 1 self.read(retry=retry) except FileNotFoundError: message = 'Sensor file not found - is sensor physically disconnected?' logger.error(message) return temperature @staticmethod def write( datum: float, kp: float, ) -> bool: """ Method writing data point to csv file. CSV file header: timestamp,temperature :param datum: float :param kp: float :return: bool """ result = True if not os.path.exists(DATA_FILE): try: with open(DATA_FILE, 'w', encoding='UTF-8') as _file: header = "timestamp,temperature,kp" wrtr = writer(_file) wrtr.writerow(header) result = False except OSError as exc: logger.error("Couldn't create path %s, %s", DATA_FILE, exc) result = True try: with open(DATA_FILE, 'a', encoding='UTF-8') as _file: wrtr = writer(_file) timestamp = time.strftime(TIMEFORMAT) wrtr.writerow((timestamp, datum, kp)) result = False except FileNotFoundError as exc: result = True logger.error('Data file not found: %s', exc) return result def relay_switch( self, on: bool = False, ) -> None: """ Turn relay on or off :param on: bool :return: None """ if on: logger.info('Turning on relay %s', time_now()) try: self.gpio.write(settings.relay_pin, 0) except Exception as exc: logger.warning('Failed to turn on relay: %s', exc) else: logger.info('Turning off relay %s', time_now()) try: self.gpio.write(settings.relay_pin, 1) except Exception as exc: logger.warning('Failed to turn off relay: %s', exc) @staticmethod def temp_control( current_temperature: float, ) -> PidValue: """ Calculate value to pass to mosfet. :param current_temperature: float :return: int """ pid = PID( Kp=settings.kp, Ki=settings.ki, Kd=settings.kd, setpoint=settings.target_temperature, sample_time=settings.sample_time, output_limits=(0, 255), ) return int(pid(current_temperature)) # TODO: check if it can be done nicer def mosfet_set( self, value: PidValue = 0, ) -> bool: """ Set mosfet value. :param value: int :return: bool """ logger.debug('Changing pin %s to %s', settings.mosfet_pin, value) try: self.gpio.set_PWM_dutycycle(settings.mosfet_pin, value) result = False except Exception as exc: logger.warning('Failed to set mosfet value: %s', exc) result = True return result def time_now(): """Set time to TIMEFORMAT.""" return time.strftime(TIMEFORMAT) if __name__ == '__main__': controller = ShroomboxManager() signal.signal(signal.SIGHUP, read_settings) while True: controller.callback() time.sleep(settings.read_frequency_sec)