diff options
Diffstat (limited to 'mqtt_to_rrd.py')
-rw-r--r-- | mqtt_to_rrd.py | 160 |
1 files changed, 160 insertions, 0 deletions
diff --git a/mqtt_to_rrd.py b/mqtt_to_rrd.py new file mode 100644 index 0000000..70818eb --- /dev/null +++ b/mqtt_to_rrd.py @@ -0,0 +1,160 @@ +""" +Script reading from mqtt broker and writing to rrd database. +""" +######################################################### +# IMPORTS # +######################################################### + +import os +import ssl +from configparser import ConfigParser +from time import time +import rrdtool +from paho.mqtt.client import Client as MQTTClient + +######################################################### +# SETTINGS # +######################################################### + +CONFIG_FILE = 'settings.cfg' +USER = os.getlogin() +RRD_DATABASE = f"/home/{USER}/w1_database.rrd" + +######################################################### +# CODE # +######################################################### + + +SSL_SET = None +CERT_DIR = None +CA_CERT = None +CERT_FILE = None +KEY_FILE = None +ONE_WIRE_UPPER = None +ONE_WIRE_LOWER = None +HOSTNAME = None +PORT = None +KEEPALIVE = None +DATA_LENGTH_LIMIT = None +TOPIC = None +TIMEFORMAT = None +Y_LIMIT = None +X_LIMIT = None +INTERVAL = None + + +def get_settings(config_file): + """ + Get settings from configparser. + """ + cfg = ConfigParser(interpolation=None) + cfg.read(config_file) + + global SSL_SET + global CERT_DIR + global CA_CERT + global CERT_FILE + global KEY_FILE + global ONE_WIRE_UPPER + global ONE_WIRE_LOWER + global HOSTNAME + global PORT + global KEEPALIVE + global DATA_LENGTH_LIMIT + global TOPIC + global TIMEFORMAT + global Y_LIMIT + global X_LIMIT + global INTERVAL + # SSL certificates: + SSL_SET = cfg.getboolean('Certificates', 'ssl') + CERT_DIR = cfg.get('Certificates', 'certificate_directory') + CA_CERT_PATH = os.path.join( + CERT_DIR, + cfg.get('Certificates', 'ca_crt') + ) + CERT_FILE_PATH = os.path.join( + CERT_DIR, + cfg.get('Certificates', 'cert_file'), + ) + KEY_FILE_PATH = os.path.join( + CERT_DIR, + cfg.get('Certificates', 'key_file') + ) + CA_CERT = CA_CERT_PATH if CA_CERT_PATH else None + CERT_FILE = CERT_FILE_PATH if CERT_FILE_PATH else None + KEY_FILE = KEY_FILE_PATH if KEY_FILE_PATH else None + + # One wire mapping: + ONE_WIRE_UPPER = cfg.get('Sensor_mapping', 'upper') + ONE_WIRE_LOWER = cfg.get('Sensor_mapping', 'lower') + + # MQTT host: + HOSTNAME = cfg.get('Mqtt', 'hostname') + PORT = cfg.getint('Mqtt', 'port') + KEEPALIVE = cfg.getint('Mqtt', 'keepalive') + + DATA_LENGTH_LIMIT = cfg.get('Main', 'data_length_limit') + TOPIC = cfg.get('Main', 'topic') + TIMEFORMAT = cfg.get('Main', 'timeformat') + + # Plot settings: + Y_LIMIT = ( + cfg.getint('Main', 'y_min'), + cfg.getint('Main', 'y_max') + ) + X_LIMIT = cfg.getint('Main', 'x_limit') + INTERVAL = cfg.getint('Main', 'interval') + + +class MyMQTTClient(MQTTClient): + """ + MQTT client class on_message writing to rrd database. + """ + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.data_limit = DATA_LENGTH_LIMIT + if SSL_SET: + self.tls_set( + ca_certs=CA_CERT, + certfile=CERT_FILE, + keyfile=KEY_FILE, + cert_reqs=ssl.CERT_REQUIRED, + ) + self.connect(host=HOSTNAME, port=PORT, keepalive=KEEPALIVE) + self.bare_topic = TOPIC.strip("/#") + self.full_topic = f'{self.bare_topic}/{ONE_WIRE_UPPER}' + self.subscribe(self.full_topic) + self.on_message = self.onmessage + self.on_disconnect = self.ondisconnect + + def ondisconnect(self, client): + print("Disconnected from MQTT") + client.loop_stop() + + def on_connect(self, client, userdata, flags, rc): + print("Connected to MQTT, returncode: ", str(rc)) + # Subscribing in on_connect() means that if we lose the connection and + # reconnect then subscriptions will be renewed. + client.subscribe("$SYS/#") + + def onmessage(self, client, userdata, msg): + """ + On message update rrd database. + """ + temperature = msg.payload.decode("utf-8") + if msg.topic == self.full_topic: + now = time() # TODO: pass timestamp in the message body + + rrdtool.update(RRD_DATABASE, '%s:%s' % (now, temperature)) + + +def main(): + """Main loop""" + get_settings(CONFIG_FILE) + client = MyMQTTClient() + client.loop_forever() + + +if __name__ == '__main__': + main() |