From efe863451d901d8a3931c851be42b0d6b7370034 Mon Sep 17 00:00:00 2001 From: erg Date: Wed, 1 Mar 2023 09:21:43 +0100 Subject: Changed file names --- pi_temp_pid.py | 342 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 342 insertions(+) create mode 100644 pi_temp_pid.py (limited to 'pi_temp_pid.py') diff --git a/pi_temp_pid.py b/pi_temp_pid.py new file mode 100644 index 0000000..8f22694 --- /dev/null +++ b/pi_temp_pid.py @@ -0,0 +1,342 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Module controling temperature with PID setting a MOSFET. +""" + +__author__ = "Franek Ɓazarewicz-Muradyan" +__licence__ = "GPL" +__version__ = "0.0.1" +__status__ = "Proof of concept" + +############################################# +# Flow # +############################################# + +# start logging, run sanity checks -> +# turn on relay -> +# enter a loop of given interval -> +# check temperature -> +# try to write temperature log -> +# calculate MOSFET value with PID > +# set calculated value on MOSFET + + +import os +import sys +import time +import signal +import logging +import io + +from pydantic import BaseSettings +try: + from PID import PID + import pigpio +except ImportError as exception: + print(f'Import error: {exception}') + sys.exit() + + +############################################# +# Settings # +############################################# + +# User: +USERNAME = os.getlogin() + +# Files: + +LOGNAME = f'/var/log/{USERNAME}/shroombox_pid.log' +DATA_FOLDER = f'/var/log/{USERNAME}' +DATA_FILE = f'{DATA_FOLDER}/shroombox_pid.csv' +CONFIG_FILE = ".env" + +# Best effort to detect 1-wire sensor: errors out if multiple detected +W1_SENSOR_ID = '28-0517c1b121ff' + +if not W1_SENSOR_ID: + for pathname, dirname, filenames in os.walk('/sys/devices/w1_bus_master1/'): + if len(filenames) >= 2: + print('Multiple 1-wire sensors connected, set rom by hand. Aborting.') + sys.exit() + elif not filenames: + print('No 1-wire sensors detected, aborting.') + sys.exit() + else: + W1_SENSOR_ID = filenames[0] +W1_SENSOR_FILE = f'/sys/devices/w1_bus_master1/{W1_SENSOR_ID}/w1_slave' +RPI_MODEL_FILE = '/sys/firmware/devicetree/base/model' + +# Other settings: + +TIMEFORMAT = '%Y-%M-%d %H:%M:%S' + + +############################################# +# Sanity checks # +############################################# + + +def is_raspberrypi(): + try: + with io.open(RPI_MODEL_FILE, 'r', encoding='UTF-8') as model: + if 'raspberry pi' in model.read().lower(): + return True + except Exception: + pass + return False + + +if not is_raspberrypi(): + print("This is not a raspberry pi, exiting") + sys.exit() + +if not os.path.exists(DATA_FOLDER): + try: + os.makedirs(DATA_FOLDER) + except OSError as exception: + print(f"Couldn't create data folder: {exception}") + sys.exit() +try: + logging.basicConfig( + filename=LOGNAME, + encoding='utf-8', + filemode='a', + format='%(asctime)s,%(msecs)d %(name)s%(levelname)s %(message)s', + datefmt='%H:%M:%S', + level=logging.INFO, + ) +except PermissionError as exception: + print(f"Couldn't open logging directory: {exception}") + sys.exit() +logging.info("Starting shroombox") +logger = logging.getLogger() + + +############################################# +# Code # +############################################# + +def write_default_config(filename): + """ + Write default config file. + """ + _settings = { + 'Kp': 1, + 'Ki': 0, + 'Kd': 0, + 'target_temperature': 24, + 'sample_time': 10, + 'relay_pin': 5, + 'mosfet_pin': 22, + 'read_frequency_sec': 15, + 'read_retry': 3, + } + with open(filename, 'w', encoding='UTF-8') as conf: + for key, val in _settings: + conf.write(f"{key}={val}") + + +# If config file does not exist, create it with default values +if not os.path.exists(CONFIG_FILE): + try: + write_default_config(CONFIG_FILE) + logger.info("Created .env config file with default values") + except Exception as exc: + print(f"Failed writing .env file: {exc}. Aborting.") + sys.exit() + + +class ShroomboxSettings(BaseSettings): + """ + Shroombox settings loaded from .env file. + """ + kp: float = 0 + ki: float = 0 + kd: float = 0 + target_temperature: float = 24 + sample_time: int = 15 + relay_pin: int + mosfet_pin: int + read_retry: int = 3 + read_frequency_sec: int = 15 + + class Config: + env_file = '.env' + case_sensitive = True + + +try: + settings = ShroomboxSettings() +except Exception as exception: + print(f"Failed to read settings: {exception}") + sys.exit() + + +class ShroomboxManager: + """ + Shroombox class. + """ + + def __init__(self): + self.current_temperature = float('NaN') + self.gpio = pigpio.pi() + # Is below a good idea? + self.relay_switch(False) + # gpio_status: on is 0, off is 1 + self.gpio_status = self.gpio.read(settings.relay_pin) + + def __del__(self): + self.relay_switch(True) + + def callback(self): + """ + Main callback function. + :return: None + """ + while True: + # Get current temperature + temp = self.read() + # Try to write to CSV file + self.write(temp) + # Get PID value for MOSFET + val = self.temp_control(temp) + # Set MOSFET to PID value + self.mosfet_set(val) + time.sleep(settings.read_fqcy) + + def read( + self, + retry: int = 0, + ) -> float: + """ + Function reading temperature data from one wire sensor. + Returns float('NaN') on read failure. + :param retry: int + :return: float + """ + temperature = float('NaN') + if retry > settings.read_retry: + return temperature + try: + with open(W1_SENSOR_FILE, 'r', encoding='UTF-8') as sensor: + _ok = sensor.readline()[-4:].strip('\n') + if _ok == 'YES': + try: + temperature = round(int(sensor.readline().split('=')[1])/1000, 1) + except IndexError: # TODO: What other exceptions? + return temperature + else: + retry += 1 + self.read(retry=retry) + except FileNotFoundError: + message = 'Sensor file not found - is sensor physically disconnected?' + logger.error(message) + return temperature + + @staticmethod + def write( + datum: float, + ) -> bool: + """ + Method writing data point to csv file. + CSV file header: timestamp,temperature + :param datum: float + :return: bool + """ + result = True + if not os.path.exists(DATA_FILE): + try: + os.makedirs(DATA_FILE) + with open(DATA_FILE, 'w', encoding='UTF-8') as _file: + header = "timestamp,temperature" + _file.write(header) + result = False + except OSError as exc: + logger.error("Couldn't create path %DATA_FILE %exc") + result = True + try: + with open(DATA_FILE, 'a', encoding='UTF-8') as _file: + timestamp = time.strftime(TIMEFORMAT) + data_point = f'{timestamp},{datum}' + _file.write(data_point) + result = False + except FileNotFoundError as exc: + result = True + logger.error('Data file not found: %exc') + return result + + def relay_switch( + self, + on: bool, + ) -> None: + """ + Turn relay on or off. + :param on: bool + :return: + """ + if on: + logger.info('Turning on relay %time_now()') + try: + self.gpio.write(settings.relay_pin, 0) + except Exception as exc: + logger.warning('Failed to turn on relay: %exc') + else: + logger.info('Turning off relay %time_now()') + try: + self.gpio.write(settings.relay_pin, 1) + except Exception as exc: + logger.warning('Failed to turn off relay: %exc') + + @staticmethod + def temp_control( + current_temperature: float, + ) -> int: + """ + Calculate value to pass to mosfet. + :param current_temperature: float + :return: int + """ + pid = PID( + settings.kp, + settings.ki, + settings.kd, + setpoint=settings.target_temperature, + ) + pid.output_limits = (0, 255) + pid.sample_time = settings.sample_time + return pid(current_temperature) + + def mosfet_set( + self, + value, + ) -> bool: + """ + Set mosfet value. + :param value: int + :return: bool + """ + result = True + logger.debug(f'Changing pin {settings.mosfet_pin} to {value}') + try: + self.gpio.write(settings.mosfet_pin, value) + result = False + except Exception as exc: + logger.warning('Failed to set mosfet value: %exc.') + result = True + return result + + +def time_now(): + """Set time to TIMEFORMAT.""" + return time.strftime(TIMEFORMAT) + + +if __name__ == '__main__': + controller = ShroomboxManager() + signal.signal(signal.SIGHUP, settings.ShroomboxSettings) + while True: + controller.callback() + time.sleep(settings.read_frequency_sec) -- cgit v1.2.3-65-gdbad