#!/usr/bin/env python # -*- coding: utf-8 -*- __author__ = "Franek Łazarewicz-Muradyan" __copyright__ = "Copyright 2021, Franek Łazarewicz-Muradyan" __version__ = "0.1" __status__ = "Beta" __email__ = "uinarf@autistici.org" """ Program intended to be run as daemon reading from sensors and writing to database. """ import sys, os, glob, re from time import strftime, localtime, sleep import subprocess import importlib import pigpio import smbus import configparser import mysql.connector from mysql.connector import errorcode # below installed on my system with custom ebuild, available through pip import pigpio_dht # we'll use it for now: #sys.path.append('..') #LOCAL_DIR = os.path.dirname(os.path.realpath(__file__)) + "/" __location__ = os.path.realpath( os.path.join(os.getcwd(), os.path.dirname(__file__))) # here goes our settings file settings_file = 'settings_config_parser.py' settings_db = 'settings_server.cfg' # setting up configparser # so that paths work when script is run from different directory settings = os.path.join(__location__, settings_file) settings_db = os.path.join(__location__, settings_db) cfg = configparser.ConfigParser(interpolation=None) cfg_db = configparser.ConfigParser(interpolation=None) cfg.read(settings) cfg_db.read(settings_db) class Sensor: def __init__(self): # read default database name try: self.dtb = cfg_db['mysql']['database'] except KeyError: self.dtb = 'default.dtb' # here we'll defind the elusive NaN # needs to be None for database self.nan = None self.timestamp_format = cfg.get('various', 'timestamp_format') def create_connection(self): """ Creates connection to database with credentials provided in config file. """ self.conn = None try: self.conn = mysql.connector.connect(**cfg_db['mysql']) except mysql.connector.Error as e: if e.errno == errorcode.ER_ACCESS_DENIED_ERROR: print("Something is wrong with your username and password.") else: print(e) return self.conn def create_database(cursor): try: cursor.execute( f"CREATE DATABASE {self.dtb} DEFAULT CHARACTER SET 'utf-8'" ) except mysql.connector.Error as e: print(f"Failed creating database: {e}") def create_table(self, t_name, dictionary): """ Takes table name (w1, light etc) and data in shape of a dictionary and creates database table. """ self.t_name = t_name self.data = dictionary self.cur = self.create_connection().cursor() try: self.cur.execute(f"USE {self.dtb}") except mysql.connector.Error as e: print(f"Database {dtb} does not exist.") create_database(self.cur) self.cur.execute(f"USE {self.dtb}") # Create db statement # Achtung: identifiers cannot start with digits!!! # Achtung: identifiers cannot contain special characters (-_-)!!! # This: row name + data type rows_tp= ['timestamp timestamp'] # This: row name self.rows = ['timestamp'] for i in self.data.keys(): # skip timestamp if i != 'timestamp': # digit cannot go first if i[0].isdigit(): i = 'sens'+i # strip of special characters excluding ',' and ' ' # and remove ', ' at the end i = re.sub('[^A-Za-z0-9, ]+', '', i) # all the sensor data is float, ok? rows_tp.append(i + ' float') self.rows.append(i) rows_tp = (', ').join(rows_tp) self.rows = (', ').join(self.rows) stmt = ( f"CREATE TABLE IF NOT EXISTS {self.t_name} (" f"{rows_tp}" ")" ) try: self.cur.execute(stmt) except mysql.connector.Error as e: print(e) def db_write(self, t_name, dictionary): """ Write dictionary containing timestamp and sensor values into a table in a database. """ self.data = dictionary self.cur = self.create_connection().cursor() #cur = conn.cursor() self.create_table(t_name, dictionary) vals = [*self.data.values()] # placeholder length pl_hd = ('%s,'*len(self.data))[:-1] sql = ( f"INSERT INTO {self.t_name} (" f"{self.rows})" "VALUES (" f"{pl_hd}" ")" ) try: #cur.execute(sql, vals) self.cur.execute(sql, vals) except mysql.connector.Error as e: print(e) self.conn.commit() self.cur.close() self.conn.close() def write(self, data, log_file): """ Takes data in form of dictionary and writes to a log file. Data in dictionary takes form of sensor name : value pairs. In case of sensors returning multiple values each sensor name is followed by data type (i.e. 'tsl_0-infrared'). """ # Since I'm replacing it with db_write it has been abandoned, # variables passed from dictionary need stringification (sic!) # employed one hasn't been checked. # leaving it here, may get used as a fallback option. self.data = data self.log_file = log_file while True: with open(self.log_file, 'a') as f: [_data.extend(str(x)) for x in self.data.items()] data = ' '.join(_data) f.write(f'{data}\n') break class Temperature(Sensor): """ Dealing with 1 wire temperature sensors. """ w1_path = '/sys/bus/w1/devices' the_line = w1_path + '/28-*/w1_slave' sens_files = glob.glob(the_line) def __init__(self): """ Check if w1_gpio and w1_therm kernel modules are loaded. """ # implement writing warnings and such into a log file super().__init__() cmd = ['lsmod'] proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, error = proc.communicate() if 'w1_gpio' in output.decode(): pass else: print('Module w1-gpio not found.') # write that to a log file if 'w1_therm' in output.decode(): pass else: print('Module w1-therm not found.') # write that to a log file def check_w1(self): """ Checks consistency of w1 sensor mapping. Need to have access to the database already """ conn = self.create_connection() cur = conn.cursor() try: dtb = cfg_db['mysql']['database'] except KeyError: print("You have not defined database. Check your settings file.") sys.exit() cur.execute(f"USE {dtb};") sql = ( "SELECT * FROM temperature;" ) cur.execute(sql) # checking actual number of w1 sensors detected: act_sens_list = [] [[act_sens_list.append(i) for i in x if i.__contains__('28-')] \ for z, x, y in os.walk(self.w1_path)] # checking number of sensors in database: db_sens_list = [i[0] for i in cur.description][1:] # checking for no mapping defined in config file: if len(cfg['w1_mapping']) == 0: print("Info: you have no mapping for w1 temperature sensors " "defined, assigning default mapping. You can reassing them " "in settings file." ) map_list = ['Inside ground', 'Inside', 'Outside ground', 'Outside'] cfg['w1_mapping'] = dict(zip(db_sens_list , map_list)) with open(settings_file, 'w') as f: cfg.write(f) elif len(cfg['w1_mapping']) != 0 and len(db_sens_list) > len(act_sens_list): print("Warning: Number of w1 temperature sensors detected is " f"({len(act_sens_list)}), smaller than number of sensors in " f"the database ({len(db_sens_list)}). " "Perhaps a sensor has detached/malfunctioned?" ) sys.exit() elif len(db_sens_list) < len(act_sens_list): print("Warning: number of w1 temperature sensors detected is " "greater than that in a database." ) sys.exit() def read(self): """ Returns dictionary of available sensor names as keys and readings in Celcius as values. """ sens_list = ['timestamp'] [[sens_list.append(i) for i in x if i.__contains__('28-')] \ for z, x, y in os.walk(self.w1_path)] temp_list = [strftime(self.timestamp_format, localtime())] for i in self.sens_files: with open(i, 'r') as f: tries=0 _crc = f.readline()[-4:-1] if _crc !='YES': reading = self.nan else: reading = f.readline().split('t=')[1] try: temp = str(round(float(reading)/1000, 1)) except ValueError: temp = self.nan temp_list.append(temp) # morphing into a dict so that we knew which sensor is which temp_dict = dict(zip(sens_list, temp_list)) return temp_dict class Humidity(Sensor): """ This part deals with a DHT11 temperature and humidity sensor. Using pigpio-dht module so not much to do here """ def __init__(self): super().__init__() self.pin = cfg.getint('hardware_settings','dht_pin_1') self.dht = pigpio_dht.DHT11(self.pin) def read(self, n=0): """ Reading from dht11 sensor, tries to read 10x on bad read then passes. """ n=0 now = strftime(self.timestamp_format, localtime()) reading = {'timestamp': now} try: reading.update(self.dht.read()) except TimeoutError: reading.update({'temp_c': self.nan, 'humidity': self.nan}) return reading while True: if reading['valid'] == True: del reading['temp_f'] del reading['valid'] for i in list(reading): reading[i] = str(reading[i]) return reading break if reading['valid'] == False and n >=3: del reading['temp_f'] del reading['valid'] reading.update({'temp_c': self.nan, 'humidity': self.nan}) return reading break else: n+=1 sleep(0.17) class Light(Sensor): """ This part deals with tsl2561 sensor using smbus. """ # at the moment I find changing gain and exposure time to not be reliable # and/or usefull. May try to reimplement it at a later date. _SMBUSADDR = 1 tsls = [ ('_TSLADDR', 0x39), # default ('_TSLADDR_LOW', 0x29), ('_TSLADDR_HIGH', 0x49) ] # controls # this first is for on/of only _REG_CONTR = 0x00 # this is for any other commands we'll use here _CONTR = 0x01 # command option _CMD = 0x80 # max reading a channel can hold # on exposure 402ms: max_read = 65535 # on exposure 100ms: max_read_1 = 37177 # on exposure 13.7ms: max_read_2 = 5047 # commands _ON = 0x03 _OF = 0x00 # set gain _CMD_GN = 0x81 # registers _regs = { '_CH_0' : 0xAC, # visible + infra red '_CH_1' : 0xAE # infra red } # gain and exposure _gain = [ ('_GN_VLOW', 0x00), # gain 1x, exposure : 13.7ms ('_GN_LOW', 0x01), # gain 1x, exposure : 100ms ('_GN_DEF', 0x02), # gain 1x, exposure : 402ms ('_GN_HI', 0x10), # gain 16x, exposure : 13.7ms ('_GN_VHI', 0x11), # gain 16x, exposure : 100ms ('_GN_UHI', 0x12) # gain 16x, exposure : 402ms ] def __init__(self, tsl): super().__init__() self.tsl = tsl if self.tsl > 2: print('We can only adress 3 TSL2561 sensors on this bus, aborting') sys.exit() # since default gain is always _gain[2][1] self.n = 2 # initiate bus self.bus = smbus.SMBus(self._SMBUSADDR) # turn on sensor self.tsl_addr = self.tsls[self.tsl][1] self.bus.write_byte_data(self.tsl_addr, self._REG_CONTR | self._CMD, self._ON) def setGain(self, gain): """ To be used from within read function. """ self.gain = gain self.bus.write_byte_data(self.tsl_addr, self._CMD_GN | self._CMD, gain) def read(self, idx=0): # Upon testing sensor I've come to the conclusion that all that # setting gain and timing gives me nothing, sensor reports limits # on those instead. self.idx = idx _vals = [] _iter = 0 for i in self._regs.keys(): _vals.append(self.bus.read_word_data(self.tsl_addr, self._regs.get(i))) #for i in _vals: # if self.safe_read < i < self.max_read: # pass # elif i >= self.max_read and self.idx < 3: # idx+=1 # # if excessive reading decrease gain/exposure # self.n -=1 # try: # gain = self._gain[self.n][1] # self.setGain(gain) # increment gain position by one # self.read(self.idx) # except IndexError: # _vals = [self.nan, self.nan] # elif 0 <= i <= self.safe_read: # # attempt to increase gain/exposure to optimal # idx+=1 # if idx >= 3: # pass # else: # try: # self.n +=1 # gain = self._gain[self.n][1] # self.setGain(gain) # self.read(self.idx) # except IndexError: # pass # else: # _vals = ['Nan', 'Nan'] lux = self.calculate_lux(_vals) now = strftime(self.timestamp_format, localtime()) reading = { 'timestamp': now, f'tsl_{self.tsl}-Broadband': _vals[0], f'tsl_{self.tsl}-Infrared': _vals[1], f'tsl_{self.tsl}-Lux': lux } return reading def calculate_lux(self, _vals): """ Calculate lux from tuple 'visible+infrared'/'infrared' as returned by read() function. From datasheet for FN package sensor version. """ ch0, ch1 = _vals if ch0 == 0: lux = 0 elif ch0 == self.nan or ch1 == self.nan: lux = self.nan # normally broadband maxes out before infrared which makes visible # drop which is clearly wrong hence: elif ch0 == max_read or ch1 == max_read: lux = self.nan else: ratio = ch1/ch0 if 0 < ratio <= 0.52: lux = 0.0304 * ch0 - 0.062 * ch0 * (ratio ** 1.4) elif 0.5 < ratio <= 0.61: lux = 0.0224 * ch0 - 0.031 * ch1 elif 0.61 < ratio <= 0.8: lux = 0.0128 * ch0 - 0.0153 * ch1 elif 0.8 < ratio <= 1.3: lux = 0.00146 * ch0 - 0.00112 * ch1 else: lux = 0 lux = round(lux, 4) return lux class Rain(Sensor): def __init__(self): super().__init__() self.pin = cfg.getint('hardware_settings', 'rain_pin') self.pi = pigpio.pi() self.pi.set_pull_up_down(self.pin, pigpio.PUD_UP) self.rain = self.pi.callback(self.pin) self.bucket = cfg.getfloat('hardware_settings', 'bucket_size') def read(self): """ Read and return rain data. """ amount = str(round(self.rain.tally() * self.bucket, 4)) now = strftime(self.timestamp_format, localtime()) data = { 'timestamp': now, 'amount' : amount } self.rain.reset_tally() return data # GPIO.add_event_detect(self.pin, GPIO.FALLING, callback=self.read(), # bouncetime=300) if __name__ == '__main__': fq = cfg.getint('hardware_settings', 'read_frequency') print("Instantiating gpio connection") dht = Humidity() tmp = Temperature() tsl = Light(0) # tsl_sensors = [] # tsl_no = cfg.getint('hardware_settings', 'tsl2561_number') # for i in range(tsl_no): # tsl_sensors.append(Light(i)) rain = Rain() while True: #if not pi.connected: # try: # print('Attempting to reconnect to pigpiod ...') # pi = pigpio.pi() # except: # print("Pigpio couldn't connect to pi on port 8888") # exit() #else: # pass tmp.check_w1() tmp.db_write('temperature', tmp.read()) dht.db_write('humidity', dht.read()) tsl.db_write('light', tsl.read()) rain.db_write('rain', rain.read()) #for i in tsl_sensors: # i.log = f'{log_files[light_log]}_{i}' # i.write(i.read(), i.log) #pi.stop() sleep(fq) sys.exit()