summaryrefslogtreecommitdiff
path: root/mqtt_to_rrd.py
diff options
context:
space:
mode:
Diffstat (limited to 'mqtt_to_rrd.py')
-rw-r--r--mqtt_to_rrd.py160
1 files changed, 160 insertions, 0 deletions
diff --git a/mqtt_to_rrd.py b/mqtt_to_rrd.py
new file mode 100644
index 0000000..70818eb
--- /dev/null
+++ b/mqtt_to_rrd.py
@@ -0,0 +1,160 @@
+"""
+Script reading from mqtt broker and writing to rrd database.
+"""
+#########################################################
+# IMPORTS #
+#########################################################
+
+import os
+import ssl
+from configparser import ConfigParser
+from time import time
+import rrdtool
+from paho.mqtt.client import Client as MQTTClient
+
+#########################################################
+# SETTINGS #
+#########################################################
+
+CONFIG_FILE = 'settings.cfg'
+USER = os.getlogin()
+RRD_DATABASE = f"/home/{USER}/w1_database.rrd"
+
+#########################################################
+# CODE #
+#########################################################
+
+
+SSL_SET = None
+CERT_DIR = None
+CA_CERT = None
+CERT_FILE = None
+KEY_FILE = None
+ONE_WIRE_UPPER = None
+ONE_WIRE_LOWER = None
+HOSTNAME = None
+PORT = None
+KEEPALIVE = None
+DATA_LENGTH_LIMIT = None
+TOPIC = None
+TIMEFORMAT = None
+Y_LIMIT = None
+X_LIMIT = None
+INTERVAL = None
+
+
+def get_settings(config_file):
+ """
+ Get settings from configparser.
+ """
+ cfg = ConfigParser(interpolation=None)
+ cfg.read(config_file)
+
+ global SSL_SET
+ global CERT_DIR
+ global CA_CERT
+ global CERT_FILE
+ global KEY_FILE
+ global ONE_WIRE_UPPER
+ global ONE_WIRE_LOWER
+ global HOSTNAME
+ global PORT
+ global KEEPALIVE
+ global DATA_LENGTH_LIMIT
+ global TOPIC
+ global TIMEFORMAT
+ global Y_LIMIT
+ global X_LIMIT
+ global INTERVAL
+ # SSL certificates:
+ SSL_SET = cfg.getboolean('Certificates', 'ssl')
+ CERT_DIR = cfg.get('Certificates', 'certificate_directory')
+ CA_CERT_PATH = os.path.join(
+ CERT_DIR,
+ cfg.get('Certificates', 'ca_crt')
+ )
+ CERT_FILE_PATH = os.path.join(
+ CERT_DIR,
+ cfg.get('Certificates', 'cert_file'),
+ )
+ KEY_FILE_PATH = os.path.join(
+ CERT_DIR,
+ cfg.get('Certificates', 'key_file')
+ )
+ CA_CERT = CA_CERT_PATH if CA_CERT_PATH else None
+ CERT_FILE = CERT_FILE_PATH if CERT_FILE_PATH else None
+ KEY_FILE = KEY_FILE_PATH if KEY_FILE_PATH else None
+
+ # One wire mapping:
+ ONE_WIRE_UPPER = cfg.get('Sensor_mapping', 'upper')
+ ONE_WIRE_LOWER = cfg.get('Sensor_mapping', 'lower')
+
+ # MQTT host:
+ HOSTNAME = cfg.get('Mqtt', 'hostname')
+ PORT = cfg.getint('Mqtt', 'port')
+ KEEPALIVE = cfg.getint('Mqtt', 'keepalive')
+
+ DATA_LENGTH_LIMIT = cfg.get('Main', 'data_length_limit')
+ TOPIC = cfg.get('Main', 'topic')
+ TIMEFORMAT = cfg.get('Main', 'timeformat')
+
+ # Plot settings:
+ Y_LIMIT = (
+ cfg.getint('Main', 'y_min'),
+ cfg.getint('Main', 'y_max')
+ )
+ X_LIMIT = cfg.getint('Main', 'x_limit')
+ INTERVAL = cfg.getint('Main', 'interval')
+
+
+class MyMQTTClient(MQTTClient):
+ """
+ MQTT client class on_message writing to rrd database.
+ """
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.data_limit = DATA_LENGTH_LIMIT
+ if SSL_SET:
+ self.tls_set(
+ ca_certs=CA_CERT,
+ certfile=CERT_FILE,
+ keyfile=KEY_FILE,
+ cert_reqs=ssl.CERT_REQUIRED,
+ )
+ self.connect(host=HOSTNAME, port=PORT, keepalive=KEEPALIVE)
+ self.bare_topic = TOPIC.strip("/#")
+ self.full_topic = f'{self.bare_topic}/{ONE_WIRE_UPPER}'
+ self.subscribe(self.full_topic)
+ self.on_message = self.onmessage
+ self.on_disconnect = self.ondisconnect
+
+ def ondisconnect(self, client):
+ print("Disconnected from MQTT")
+ client.loop_stop()
+
+ def on_connect(self, client, userdata, flags, rc):
+ print("Connected to MQTT, returncode: ", str(rc))
+ # Subscribing in on_connect() means that if we lose the connection and
+ # reconnect then subscriptions will be renewed.
+ client.subscribe("$SYS/#")
+
+ def onmessage(self, client, userdata, msg):
+ """
+ On message update rrd database.
+ """
+ temperature = msg.payload.decode("utf-8")
+ if msg.topic == self.full_topic:
+ now = time() # TODO: pass timestamp in the message body
+
+ rrdtool.update(RRD_DATABASE, '%s:%s' % (now, temperature))
+
+
+def main():
+ """Main loop"""
+ get_settings(CONFIG_FILE)
+ client = MyMQTTClient()
+ client.loop_forever()
+
+
+if __name__ == '__main__':
+ main()