""" Script reading from mqtt broker and writing to rrd database. """ ######################################################### # IMPORTS # ######################################################### import os import ssl from configparser import ConfigParser from time import time import rrdtool from paho.mqtt.client import Client as MQTTClient ######################################################### # SETTINGS # ######################################################### CONFIG_FILE = 'settings.cfg' USER = os.getlogin() RRD_DATABASE = f"/home/{USER}/w1_database.rrd" ######################################################### # CODE # ######################################################### SSL_SET = None CERT_DIR = None CA_CERT = None CERT_FILE = None KEY_FILE = None ONE_WIRE_UPPER = None ONE_WIRE_LOWER = None HOSTNAME = None PORT = None KEEPALIVE = None DATA_LENGTH_LIMIT = None TOPIC = None TIMEFORMAT = None Y_LIMIT = None X_LIMIT = None INTERVAL = None def get_settings(config_file): """ Get settings from configparser. """ cfg = ConfigParser(interpolation=None) cfg.read(config_file) global SSL_SET global CERT_DIR global CA_CERT global CERT_FILE global KEY_FILE global ONE_WIRE_UPPER global ONE_WIRE_LOWER global HOSTNAME global PORT global KEEPALIVE global DATA_LENGTH_LIMIT global TOPIC global TIMEFORMAT global Y_LIMIT global X_LIMIT global INTERVAL # SSL certificates: SSL_SET = cfg.getboolean('Certificates', 'ssl') CERT_DIR = cfg.get('Certificates', 'certificate_directory') CA_CERT_PATH = os.path.join( CERT_DIR, cfg.get('Certificates', 'ca_crt') ) CERT_FILE_PATH = os.path.join( CERT_DIR, cfg.get('Certificates', 'cert_file'), ) KEY_FILE_PATH = os.path.join( CERT_DIR, cfg.get('Certificates', 'key_file') ) CA_CERT = CA_CERT_PATH if CA_CERT_PATH else None CERT_FILE = CERT_FILE_PATH if CERT_FILE_PATH else None KEY_FILE = KEY_FILE_PATH if KEY_FILE_PATH else None # One wire mapping: ONE_WIRE_UPPER = cfg.get('Sensor_mapping', 'upper') ONE_WIRE_LOWER = cfg.get('Sensor_mapping', 'lower') # MQTT host: HOSTNAME = cfg.get('Mqtt', 'hostname') PORT = cfg.getint('Mqtt', 'port') KEEPALIVE = cfg.getint('Mqtt', 'keepalive') DATA_LENGTH_LIMIT = cfg.get('Main', 'data_length_limit') TOPIC = cfg.get('Main', 'topic') TIMEFORMAT = cfg.get('Main', 'timeformat') # Plot settings: Y_LIMIT = ( cfg.getint('Main', 'y_min'), cfg.getint('Main', 'y_max') ) X_LIMIT = cfg.getint('Main', 'x_limit') INTERVAL = cfg.getint('Main', 'interval') class MyMQTTClient(MQTTClient): """ MQTT client class on_message writing to rrd database. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.data_limit = DATA_LENGTH_LIMIT if SSL_SET: self.tls_set( ca_certs=CA_CERT, certfile=CERT_FILE, keyfile=KEY_FILE, cert_reqs=ssl.CERT_REQUIRED, ) self.connect(host=HOSTNAME, port=PORT, keepalive=KEEPALIVE) self.bare_topic = TOPIC.strip("/#") self.full_topic = f'{self.bare_topic}/{ONE_WIRE_UPPER}' self.subscribe(self.full_topic) self.on_message = self.onmessage self.on_disconnect = self.ondisconnect def ondisconnect(self, client): print("Disconnected from MQTT") client.loop_stop() def on_connect(self, client, userdata, flags, rc): print("Connected to MQTT, returncode: ", str(rc)) # Subscribing in on_connect() means that if we lose the connection and # reconnect then subscriptions will be renewed. client.subscribe("$SYS/#") def onmessage(self, client, userdata, msg): """ On message update rrd database. """ temperature = msg.payload.decode("utf-8") if msg.topic == self.full_topic: now = time() # TODO: pass timestamp in the message body rrdtool.update(RRD_DATABASE, '%s:%s' % (now, temperature)) def main(): """Main loop""" get_settings(CONFIG_FILE) client = MyMQTTClient() client.loop_forever() if __name__ == '__main__': main()