aboutsummaryrefslogtreecommitdiff
path: root/pi_temp_pid.py
diff options
context:
space:
mode:
Diffstat (limited to 'pi_temp_pid.py')
-rw-r--r--pi_temp_pid.py342
1 files changed, 342 insertions, 0 deletions
diff --git a/pi_temp_pid.py b/pi_temp_pid.py
new file mode 100644
index 0000000..8f22694
--- /dev/null
+++ b/pi_temp_pid.py
@@ -0,0 +1,342 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+Module controling temperature with PID setting a MOSFET.
+"""
+
+__author__ = "Franek Ɓazarewicz-Muradyan"
+__licence__ = "GPL"
+__version__ = "0.0.1"
+__status__ = "Proof of concept"
+
+#############################################
+# Flow #
+#############################################
+
+# start logging, run sanity checks ->
+# turn on relay ->
+# enter a loop of given interval ->
+# check temperature ->
+# try to write temperature log ->
+# calculate MOSFET value with PID >
+# set calculated value on MOSFET
+
+
+import os
+import sys
+import time
+import signal
+import logging
+import io
+
+from pydantic import BaseSettings
+try:
+ from PID import PID
+ import pigpio
+except ImportError as exception:
+ print(f'Import error: {exception}')
+ sys.exit()
+
+
+#############################################
+# Settings #
+#############################################
+
+# User:
+USERNAME = os.getlogin()
+
+# Files:
+
+LOGNAME = f'/var/log/{USERNAME}/shroombox_pid.log'
+DATA_FOLDER = f'/var/log/{USERNAME}'
+DATA_FILE = f'{DATA_FOLDER}/shroombox_pid.csv'
+CONFIG_FILE = ".env"
+
+# Best effort to detect 1-wire sensor: errors out if multiple detected
+W1_SENSOR_ID = '28-0517c1b121ff'
+
+if not W1_SENSOR_ID:
+ for pathname, dirname, filenames in os.walk('/sys/devices/w1_bus_master1/'):
+ if len(filenames) >= 2:
+ print('Multiple 1-wire sensors connected, set rom by hand. Aborting.')
+ sys.exit()
+ elif not filenames:
+ print('No 1-wire sensors detected, aborting.')
+ sys.exit()
+ else:
+ W1_SENSOR_ID = filenames[0]
+W1_SENSOR_FILE = f'/sys/devices/w1_bus_master1/{W1_SENSOR_ID}/w1_slave'
+RPI_MODEL_FILE = '/sys/firmware/devicetree/base/model'
+
+# Other settings:
+
+TIMEFORMAT = '%Y-%M-%d %H:%M:%S'
+
+
+#############################################
+# Sanity checks #
+#############################################
+
+
+def is_raspberrypi():
+ try:
+ with io.open(RPI_MODEL_FILE, 'r', encoding='UTF-8') as model:
+ if 'raspberry pi' in model.read().lower():
+ return True
+ except Exception:
+ pass
+ return False
+
+
+if not is_raspberrypi():
+ print("This is not a raspberry pi, exiting")
+ sys.exit()
+
+if not os.path.exists(DATA_FOLDER):
+ try:
+ os.makedirs(DATA_FOLDER)
+ except OSError as exception:
+ print(f"Couldn't create data folder: {exception}")
+ sys.exit()
+try:
+ logging.basicConfig(
+ filename=LOGNAME,
+ encoding='utf-8',
+ filemode='a',
+ format='%(asctime)s,%(msecs)d %(name)s%(levelname)s %(message)s',
+ datefmt='%H:%M:%S',
+ level=logging.INFO,
+ )
+except PermissionError as exception:
+ print(f"Couldn't open logging directory: {exception}")
+ sys.exit()
+logging.info("Starting shroombox")
+logger = logging.getLogger()
+
+
+#############################################
+# Code #
+#############################################
+
+def write_default_config(filename):
+ """
+ Write default config file.
+ """
+ _settings = {
+ 'Kp': 1,
+ 'Ki': 0,
+ 'Kd': 0,
+ 'target_temperature': 24,
+ 'sample_time': 10,
+ 'relay_pin': 5,
+ 'mosfet_pin': 22,
+ 'read_frequency_sec': 15,
+ 'read_retry': 3,
+ }
+ with open(filename, 'w', encoding='UTF-8') as conf:
+ for key, val in _settings:
+ conf.write(f"{key}={val}")
+
+
+# If config file does not exist, create it with default values
+if not os.path.exists(CONFIG_FILE):
+ try:
+ write_default_config(CONFIG_FILE)
+ logger.info("Created .env config file with default values")
+ except Exception as exc:
+ print(f"Failed writing .env file: {exc}. Aborting.")
+ sys.exit()
+
+
+class ShroomboxSettings(BaseSettings):
+ """
+ Shroombox settings loaded from .env file.
+ """
+ kp: float = 0
+ ki: float = 0
+ kd: float = 0
+ target_temperature: float = 24
+ sample_time: int = 15
+ relay_pin: int
+ mosfet_pin: int
+ read_retry: int = 3
+ read_frequency_sec: int = 15
+
+ class Config:
+ env_file = '.env'
+ case_sensitive = True
+
+
+try:
+ settings = ShroomboxSettings()
+except Exception as exception:
+ print(f"Failed to read settings: {exception}")
+ sys.exit()
+
+
+class ShroomboxManager:
+ """
+ Shroombox class.
+ """
+
+ def __init__(self):
+ self.current_temperature = float('NaN')
+ self.gpio = pigpio.pi()
+ # Is below a good idea?
+ self.relay_switch(False)
+ # gpio_status: on is 0, off is 1
+ self.gpio_status = self.gpio.read(settings.relay_pin)
+
+ def __del__(self):
+ self.relay_switch(True)
+
+ def callback(self):
+ """
+ Main callback function.
+ :return: None
+ """
+ while True:
+ # Get current temperature
+ temp = self.read()
+ # Try to write to CSV file
+ self.write(temp)
+ # Get PID value for MOSFET
+ val = self.temp_control(temp)
+ # Set MOSFET to PID value
+ self.mosfet_set(val)
+ time.sleep(settings.read_fqcy)
+
+ def read(
+ self,
+ retry: int = 0,
+ ) -> float:
+ """
+ Function reading temperature data from one wire sensor.
+ Returns float('NaN') on read failure.
+ :param retry: int
+ :return: float
+ """
+ temperature = float('NaN')
+ if retry > settings.read_retry:
+ return temperature
+ try:
+ with open(W1_SENSOR_FILE, 'r', encoding='UTF-8') as sensor:
+ _ok = sensor.readline()[-4:].strip('\n')
+ if _ok == 'YES':
+ try:
+ temperature = round(int(sensor.readline().split('=')[1])/1000, 1)
+ except IndexError: # TODO: What other exceptions?
+ return temperature
+ else:
+ retry += 1
+ self.read(retry=retry)
+ except FileNotFoundError:
+ message = 'Sensor file not found - is sensor physically disconnected?'
+ logger.error(message)
+ return temperature
+
+ @staticmethod
+ def write(
+ datum: float,
+ ) -> bool:
+ """
+ Method writing data point to csv file.
+ CSV file header: timestamp,temperature
+ :param datum: float
+ :return: bool
+ """
+ result = True
+ if not os.path.exists(DATA_FILE):
+ try:
+ os.makedirs(DATA_FILE)
+ with open(DATA_FILE, 'w', encoding='UTF-8') as _file:
+ header = "timestamp,temperature"
+ _file.write(header)
+ result = False
+ except OSError as exc:
+ logger.error("Couldn't create path %DATA_FILE %exc")
+ result = True
+ try:
+ with open(DATA_FILE, 'a', encoding='UTF-8') as _file:
+ timestamp = time.strftime(TIMEFORMAT)
+ data_point = f'{timestamp},{datum}'
+ _file.write(data_point)
+ result = False
+ except FileNotFoundError as exc:
+ result = True
+ logger.error('Data file not found: %exc')
+ return result
+
+ def relay_switch(
+ self,
+ on: bool,
+ ) -> None:
+ """
+ Turn relay on or off.
+ :param on: bool
+ :return:
+ """
+ if on:
+ logger.info('Turning on relay %time_now()')
+ try:
+ self.gpio.write(settings.relay_pin, 0)
+ except Exception as exc:
+ logger.warning('Failed to turn on relay: %exc')
+ else:
+ logger.info('Turning off relay %time_now()')
+ try:
+ self.gpio.write(settings.relay_pin, 1)
+ except Exception as exc:
+ logger.warning('Failed to turn off relay: %exc')
+
+ @staticmethod
+ def temp_control(
+ current_temperature: float,
+ ) -> int:
+ """
+ Calculate value to pass to mosfet.
+ :param current_temperature: float
+ :return: int
+ """
+ pid = PID(
+ settings.kp,
+ settings.ki,
+ settings.kd,
+ setpoint=settings.target_temperature,
+ )
+ pid.output_limits = (0, 255)
+ pid.sample_time = settings.sample_time
+ return pid(current_temperature)
+
+ def mosfet_set(
+ self,
+ value,
+ ) -> bool:
+ """
+ Set mosfet value.
+ :param value: int
+ :return: bool
+ """
+ result = True
+ logger.debug(f'Changing pin {settings.mosfet_pin} to {value}')
+ try:
+ self.gpio.write(settings.mosfet_pin, value)
+ result = False
+ except Exception as exc:
+ logger.warning('Failed to set mosfet value: %exc.')
+ result = True
+ return result
+
+
+def time_now():
+ """Set time to TIMEFORMAT."""
+ return time.strftime(TIMEFORMAT)
+
+
+if __name__ == '__main__':
+ controller = ShroomboxManager()
+ signal.signal(signal.SIGHUP, settings.ShroomboxSettings)
+ while True:
+ controller.callback()
+ time.sleep(settings.read_frequency_sec)