#!/usr/bin/env python # -*- coding: utf-8 -*- __author__ = "Franek Łazarewicz-Muradyan" __copyright__ = "Copyright 2021, Franek Łazarewicz-Muradyan" __version__ = "0.1" __status__ = "Beta" __email__ = "uinarf@autistici.org" """ Program intended to be run as daemon reading from sensors and writing to database. """ import signal import psutil #optional import sys, os, glob, re from time import strftime, localtime, sleep import subprocess import importlib import pigpio import smbus import configparser import mysql.connector from mysql.connector import errorcode # below installed on my system with custom ebuild, available through pip import pigpio_dht # we'll use it for now: #sys.path.append('..') #LOCAL_DIR = os.path.dirname(os.path.realpath(__file__)) + "/" __location__ = os.path.realpath( os.path.join(os.getcwd(), os.path.dirname(__file__))) # here goes our settings file settings_file = 'settings_config_parser.cfg' settings_db = 'settings_server.cfg' # setting up configparser # so that paths work when script is run from different directory settings = os.path.join(__location__, settings_file) settings_db = os.path.join(__location__, settings_db) cfg = configparser.ConfigParser(interpolation=None) cfg_db = configparser.ConfigParser(interpolation=None) cfg.read(settings) cfg_db.read(settings_db) class Sensor: def __init__(self): # read default database name try: self.dtb = cfg_db['mysql']['database'] except KeyError: self.dtb = 'default.dtb' # here we'll defind the elusive NaN # needs to be None for database self.nan = None self.timestamp_format = cfg.get('various', 'timestamp_format') def create_connection(self): """ Creates connection to database with credentials provided in config file. """ self.conn = None try: self.conn = mysql.connector.connect(**cfg_db['mysql']) except mysql.connector.Error as e: if e.errno == errorcode.ER_ACCESS_DENIED_ERROR: print("Something is wrong with your username and password.") else: print(e) return self.conn def create_database(cursor): try: cursor.execute( f"CREATE DATABASE {self.dtb} DEFAULT CHARACTER SET 'utf-8'" ) except mysql.connector.Error as e: print(f"Failed creating database: {e}") def create_table(self, t_name, dictionary): """ Takes table name (w1, light etc) and data in shape of a dictionary and creates database table. """ self.t_name = t_name self.data = dictionary self.cur = self.create_connection().cursor() try: self.cur.execute(f"USE {self.dtb}") except mysql.connector.Error as e: print(f"Database {dtb} does not exist.") create_database(self.cur) self.cur.execute(f"USE {self.dtb}") # Create db statement # Achtung: identifiers cannot start with digits!!! # Achtung: identifiers cannot contain special characters (-_-)!!! # This: row name + data type rows_tp= ['timestamp timestamp'] # This: row name self.rows = ['timestamp'] for i in self.data.keys(): # skip timestamp if i != 'timestamp': # digit cannot go first if i[0].isdigit(): i = 'sens'+i # strip of special characters excluding ',' and ' ' # and remove ', ' at the end i = re.sub('[^A-Za-z0-9, ]+', '', i) # all the sensor data is float, ok? rows_tp.append(i + ' float') self.rows.append(i) rows_tp = (', ').join(rows_tp) self.rows = (', ').join(self.rows) stmt = ( f"CREATE TABLE IF NOT EXISTS {self.t_name} (" f"{rows_tp}" ")" ) try: self.cur.execute(stmt) except mysql.connector.Error as e: print(e) def db_write(self, t_name, dictionary): """ Write dictionary containing timestamp and sensor values into a table in a database. """ self.data = dictionary self.cur = self.create_connection().cursor() #cur = conn.cursor() self.create_table(t_name, dictionary) vals = [*self.data.values()] # placeholder length pl_hd = ('%s,'*len(self.data))[:-1] sql = ( f"INSERT INTO {self.t_name} (" f"{self.rows})" "VALUES (" f"{pl_hd}" ")" ) try: #cur.execute(sql, vals) self.cur.execute(sql, vals) except mysql.connector.Error as e: print(e) self.conn.commit() self.cur.close() self.conn.close() def write(self, data, log_file): """ Takes data in form of dictionary and writes to a log file. Data in dictionary takes form of sensor name : value pairs. In case of sensors returning multiple values each sensor name is followed by data type (i.e. 'tsl_0-infrared'). """ # Since I'm replacing it with db_write it has been abandoned, # variables passed from dictionary need stringification (sic!) # employed one hasn't been checked. # leaving it here, may get used as a fallback option. self.data = data self.log_file = log_file while True: with open(self.log_file, 'a') as f: [_data.extend(str(x)) for x in self.data.items()] data = ' '.join(_data) f.write(f'{data}\n') break class Temperature(Sensor): """ Dealing with 1 wire temperature sensors. """ w1_path = '/sys/bus/w1/devices' the_line = w1_path + '/28-*/w1_slave' sens_files = glob.glob(the_line) def __init__(self): """ Check if w1_gpio and w1_therm kernel modules are loaded. """ # implement writing warnings and such into a log file super().__init__() cmd = ['lsmod'] proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, error = proc.communicate() if 'w1_gpio' in output.decode(): pass else: print('Module w1-gpio not found.') # write that to a log file if 'w1_therm' in output.decode(): pass else: print('Module w1-therm not found.') # write that to a log file def check_w1(self): """ Checks consistency of w1 sensor mapping. Need to have access to the database already """ conn = self.create_connection() cur = conn.cursor() try: dtb = cfg_db['mysql']['database'] except KeyError: print("You have not defined database. Check your settings file.") sys.exit() cur.execute(f"USE {dtb};") sql = ( "SELECT * FROM temperature;" ) cur.execute(sql) # checking actual number of w1 sensors detected: act_sens_list = [] [[act_sens_list.append(i) for i in x if i.__contains__('28-')] \ for z, x, y in os.walk(self.w1_path)] # checking number of sensors in database: db_sens_list = [i[0] for i in cur.description][1:] # checking for no mapping defined in config file: if len(cfg['w1_mapping']) == 0: print("Info: you have no mapping for w1 temperature sensors " "defined, assigning default mapping. You can reassing them " "in settings file." ) map_list = ['Inside ground', 'Inside', 'Outside ground', 'Outside'] cfg['w1_mapping'] = dict(zip(db_sens_list , map_list)) with open(settings_file, 'w') as f: cfg.write(f) elif len(cfg['w1_mapping']) != 0 and len(db_sens_list) > len(act_sens_list): print("Warning: Number of w1 temperature sensors detected is " f"({len(act_sens_list)}), smaller than number of sensors in " f"the database ({len(db_sens_list)}). " "Perhaps a sensor has detached/malfunctioned?" ) sys.exit() elif len(db_sens_list) < len(act_sens_list): print("Warning: number of w1 temperature sensors detected is " "greater than that in a database." ) sys.exit() def read(self): """ Returns dictionary of available sensor names as keys and readings in Celcius as values. """ sens_list = ['timestamp'] [[sens_list.append(i) for i in x if i.__contains__('28-')] \ for z, x, y in os.walk(self.w1_path)] temp_list = [strftime(self.timestamp_format, localtime())] for i in self.sens_files: with open(i, 'r') as f: tries=0 _crc = f.readline()[-4:-1] if _crc !='YES': reading = self.nan else: reading = f.readline().split('t=')[1] try: temp = str(round(float(reading)/1000, 1)) except ValueError: temp = self.nan temp_list.append(temp) # morphing into a dict so that we knew which sensor is which temp_dict = dict(zip(sens_list, temp_list)) return temp_dict class Humidity(Sensor): """ This part deals with a DHT11 temperature and humidity sensor. Using pigpio-dht module so not much to do here """ def __init__(self): super().__init__() self.pin = cfg.getint('hardware_settings','dht_pin_1') self.dht = pigpio_dht.DHT11(self.pin) def read(self, n=0): """ Reading from dht11 sensor, tries to read 10x on bad read then passes. """ n=0 now = strftime(self.timestamp_format, localtime()) reading = {'timestamp': now} try: reading.update(self.dht.read()) except TimeoutError: reading.update({'temp_c': self.nan, 'humidity': self.nan}) return reading while True: if reading['valid'] == True: del reading['temp_f'] del reading['valid'] for i in list(reading): reading[i] = str(reading[i]) return reading break if reading['valid'] == False and n >=3: del reading['temp_f'] del reading['valid'] reading.update({'temp_c': self.nan, 'humidity': self.nan}) return reading break else: n+=1 sleep(0.17) class Light(Sensor): """ This part deals with tsl2561 sensor using smbus. """ # at the moment I find changing gain and exposure time to not be reliable # and/or usefull. May try to reimplement it at a later date. _SMBUSADDR = 1 tsls = [ ('_TSLADDR', 0x39), # default ('_TSLADDR_LOW', 0x29), ('_TSLADDR_HIGH', 0x49) ] # controls # this first is for on/of only _REG_CONTR = 0x00 # this is for any other commands we'll use here _CONTR = 0x01 # command option _CMD = 0x80 # max reading a channel can hold # on exposure 402ms: max_read = 65535 # on exposure 100ms: max_read_1 = 37177 # on exposure 13.7ms: max_read_2 = 5047 # commands _ON = 0x03 _OF = 0x00 # set gain _CMD_GN = 0x81 # registers _regs = { '_CH_0' : 0xAC, # visible + infra red '_CH_1' : 0xAE # infra red } # gain and exposure _gain = [ ('_GN_VLOW', 0x00), # gain 1x, exposure : 13.7ms ('_GN_LOW', 0x01), # gain 1x, exposure : 100ms ('_GN_DEF', 0x02), # gain 1x, exposure : 402ms ('_GN_HI', 0x10), # gain 16x, exposure : 13.7ms ('_GN_VHI', 0x11), # gain 16x, exposure : 100ms ('_GN_UHI', 0x12) # gain 16x, exposure : 402ms ] def __init__(self, tsl): super().__init__() self.tsl = tsl if self.tsl > 2: print('We can only adress 3 TSL2561 sensors on this bus, aborting') sys.exit() # since default gain is always _gain[2][1] self.n = 2 # initiate bus self.bus = smbus.SMBus(self._SMBUSADDR) # turn on sensor self.tsl_addr = self.tsls[self.tsl][1] self.bus.write_byte_data(self.tsl_addr, self._REG_CONTR | self._CMD, self._ON) def setGain(self, gain): """ To be used from within read function. """ self.gain = gain self.bus.write_byte_data(self.tsl_addr, self._CMD_GN | self._CMD, gain) def read(self, idx=0): # Upon testing sensor I've come to the conclusion that all that # setting gain and timing gives me nothing, sensor reports limits # on those instead. self.idx = idx _vals = [] _iter = 0 for i in self._regs.keys(): _vals.append(self.bus.read_word_data(self.tsl_addr, self._regs.get(i))) #for i in _vals: # if self.safe_read < i < max_read: # pass # elif i >= max_read and self.idx < 3: # idx+=1 # # if excessive reading decrease gain/exposure # self.n -=1 # try: # gain = self._gain[self.n][1] # self.setGain(gain) # increment gain position by one # self.read(self.idx) # except IndexError: # _vals = [self.nan, self.nan] # elif 0 <= i <= self.safe_read: # # attempt to increase gain/exposure to optimal # idx+=1 # if idx >= 3: # pass # else: # try: # self.n +=1 # gain = self._gain[self.n][1] # self.setGain(gain) # self.read(self.idx) # except IndexError: # pass # else: # _vals = ['Nan', 'Nan'] lux = self.calculate_lux(_vals) now = strftime(self.timestamp_format, localtime()) reading = { 'timestamp': now, f'tsl_{self.tsl}-Broadband': _vals[0], f'tsl_{self.tsl}-Infrared': _vals[1], f'tsl_{self.tsl}-Lux': lux } return reading def calculate_lux(self, _vals): """ Calculate lux from tuple 'visible+infrared'/'infrared' as returned by read() function. From datasheet for FN package sensor version. """ ch0, ch1 = _vals if ch0 == 0: lux = 0 elif ch0 == self.nan or ch1 == self.nan: lux = self.nan # normally broadband maxes out before infrared which makes visible # drop which is clearly wrong hence: elif ch0 == self.max_read or ch1 == self.max_read: lux = self.nan else: ratio = ch1/ch0 if 0 < ratio <= 0.52: lux = 0.0304 * ch0 - 0.062 * ch0 * (ratio ** 1.4) elif 0.5 < ratio <= 0.61: lux = 0.0224 * ch0 - 0.031 * ch1 elif 0.61 < ratio <= 0.8: lux = 0.0128 * ch0 - 0.0153 * ch1 elif 0.8 < ratio <= 1.3: lux = 0.00146 * ch0 - 0.00112 * ch1 else: lux = 0 lux = round(lux, 4) return lux class Rain(Sensor): def __init__(self): super().__init__() self.pin = cfg.getint('hardware_settings', 'rain_pin') self.pi = pigpio.pi() self.pi.set_pull_up_down(self.pin, pigpio.PUD_UP) self.rain = self.pi.callback(self.pin) self.bucket = cfg.getfloat('hardware_settings', 'bucket_size') def read(self): """ Read and return rain data. """ amount = str(round(self.rain.tally() * self.bucket, 4)) now = strftime(self.timestamp_format, localtime()) data = { 'timestamp': now, 'amount' : amount } self.rain.reset_tally() return data # GPIO.add_event_detect(self.pin, GPIO.FALLING, callback=self.read(), # bouncetime=300) if __name__ == '__main__': fq = cfg.getint('hardware_settings', 'read_frequency') print("Instantiating gpio connection") dht = Humidity() tmp = Temperature() tsl = Light(0) # tsl_sensors = [] # tsl_no = cfg.getint('hardware_settings', 'tsl2561_number') # for i in range(tsl_no): # tsl_sensors.append(Light(i)) rain = Rain() while True: #if not pi.connected: # try: # print('Attempting to reconnect to pigpiod ...') # pi = pigpio.pi() # except: # print("Pigpio couldn't connect to pi on port 8888") # exit() #else: # pass tmp.check_w1() tmp.db_write('temperature', tmp.read()) dht.db_write('humidity', dht.read()) tsl.db_write('light', tsl.read()) rain.db_write('rain', rain.read()) #for i in tsl_sensors: # i.log = f'{log_files[light_log]}_{i}' # i.write(i.read(), i.log) #pi.stop() sleep(fq) sys.exit()