Die Daten vom Revolution Pi empfangen wir nun per MQTT. Diese wollen wir aber natürlich auch speichern um sie auszuwerten.
3) Datenspeicherung auf dem Server
Für die Speicherung der Datenmassen verwenden wir InfluxDB, eine nicht relationale Datenbank für Zeitserien.
Um InfluxDB auf unserem Cloud-Server zu installieren, binden wir die InfluxData-Repositories ein. Damit können wir das System auch auf dem aktuellen Stand halten.
Repository von InfluxDB einbinden und installieren
# Schlüssel des InfluxDB Repositories in apt integrieren pi@raspberrypi:~ $ sudo /bin/bash -c 'wget -O - https://repos.influxdata.com/influxdb.key | apt-key add -' # InfluxDB Repository zum apt hinzufügen pi@raspberrypi:~ $ sudo /bin/bash -c 'echo "deb https://repos.influxdata.com/debian/ stretch stable" > /etc/apt/sources.list.d/influx.list' # Alle Repositories neu laden pi@raspberrypi:~ $ sudo apt-get update # InfluxDB installieren pi@raspberrypi:~ $ sudo apt-get install influxdb influxdb-client
Mit Abschluss der Installation wird der InfluxDB Server bereits ausgeführt und ist verfügbar.
Wir werden gleich ein Programm erstellen, welches die Daten von MQTT in die InfluxDB schreibt. Unsere Visualisierung wird später ebenfalls auf unserem Cloud-Sever installiert und ließt die Daten aus der InfluxDB.
Da ein Zugriff auf die Datenbank nur vom lokalen System aus passiert, binden wir die Dienste vom InfluxDB Server an localhost
.
Dafür bearbeiten wir die Datei /etc/influxdb/influxdb.conf
:
pi@raspberrypi:~ $ sudo nano /etc/influxdb/influxdb.conf (...) # Change this option to true to disable reporting. reporting-disabled = true # --- EINFÜGEN --- # Bind address to use for the RPC service for backup and restore. bind-address = "127.0.0.1:8088" # --- ENDE EINFÜGEN --- ### ### [meta] (...) ### ### [admin] ### ### Controls the availability of the built-in, web-based admin interface. If HTTPS is ### enabled for the admin interface, HTTPS must also be enabled on the [http] service. ### [admin] enabled = true bind-address = "127.0.0.1:8083" https-enabled = false https-certificate = "/etc/ssl/influxdb.pem" ### ### [http] ### ### Controls how the HTTP endpoints are configured. These are the primary ### mechanism for getting data into and out of InfluxDB. ### [http] enabled = true # bind-address = ":8086" # Ändern zu bind-address = "127.0.0.1:8086" (...)
Wir setzen die bind-address
von allen Diensten auf 127.0.0.1:
und speichern diese Datei. Danach starten wir den InfluxDB Server neu sudo systemctl restart influxdb
.
Das Pythonprogramm auf dem Cloud-Server
Alles was wir nun noch benötigen ist ein Programm um die MQTT-Daten in die InfluxDB zu schreiben.
Auf unserem Cloud-Server installieren wir dafür das Python3 Modul für InfluxDB und das, aus dem vorherigen Blogeintrag bekannte, Paho.mqtt Modul:
# InfluxDB Modul für Python3 installieren pi@raspberrypi:~ $ sudo apt-get install python3-influxdb # Sollte pip3 noch nicht installiert sein, muss dies jetzt gemacht werden pi@raspberrypi:~ $ sudo apt-get install python3-pip # paho-mqtt über pip3 installieren, da es nicht in den Repositories ist pi@raspberrypi:~ $ sudo pip3 install paho-mqtt Downloading/unpacking paho-mqtt Downloading paho-mqtt-1.3.1.tar.gz (80kB): 80kB downloaded Running setup.py (path:/tmp/pip-build-t3xxgmr2/paho-mqtt/setup.py) egg_info for package paho-mqtt Installing collected packages: paho-mqtt Running setup.py install for paho-mqtt Successfully installed paho-mqtt Cleaning up...
LittleMqttToInflux – Da ist die Magie
#!/usr/bin/python3 # -*- coding: utf-8 -*- """Main program.""" __author__ = "Sven Sager" __copyright__ = "Copyright (C) 2018 Sven Sager" __license__ = "GPLv3" __version__ = "0.1.1" import logging import signal from influxdb import InfluxDBClient from os.path import join from paho.mqtt.client import Client from ssl import CERT_NONE from threading import Event logger = logging.getLogger() # !!! Nur zum TESTEN, auf INFO belassen, sonst WARNING !!! logging.basicConfig(level=logging.INFO) # logging.basicConfig(level=logging.WARNING) class LittleMqttToInflux(): """Main program class.""" def __init__(self): """Init LittleMqttToInflux class.""" # System variables self._evt_exit = Event() self._idb = None self._mqtt_rc = -1 # -------------------------------------------------- # TODO: Diese Werte an eigenen Server anpassen! # Basetopic muss hier OHNE '/revpi01' angegeben werden! Es handelt # sich um die unterste MQTT-Ebene, dies wird unser Datanbankname. self.basetopic = "datensammlung" # MQTT Einstellungen self.mqtthost = "127.0.0.1" self.mqttuser = "systemreader" self.mqttpassword = "test" self.mqttport = 1883 self.mqtttls = False # Influx Einstellungen self.influxdatabase = self.basetopic self.influxserver = "127.0.0.1" self.influxport = 8086 # -------------------------------------------------- # Catch signales signal.signal(signal.SIGINT, self._sigexit) signal.signal(signal.SIGTERM, self._sigexit) def _sigexit(self, signum, sigframe): """Signal handler to clean and exit program.""" self.stop() def mqtt_connect(self, client, userdata, flags, rc): """Wird ausgeloest bei connect/reconnect zum Broker.""" # Verbindungs rc merken self._mqtt_rc = rc if rc > 0: return # Basetopic subscriben client.subscribe(join(self.basetopic, "#")) def mqtt_message(self, client, userdata, msg): """Wird immer ausgeloest wenn eine MQTT-Nachricht kommt.""" # Topics aufsplitten. Dabei entsteht eine Liste, deren Einträge wir # wie folgt interpretieren: # Topic: datensammlung/revpi01/io/temp_heater # [ datenbankname , measurement, text , sensorname ] # ["datensammlung", "revpi01" , "io" , "temp_heater"] topics = msg.topic.split("/") # Wenn es nicht genau 4 Einträge gibt, verwerfen wir den Datensatz if len(topics) != 4: return # Die Daten stehen als bytes() in msg.payload. Diese versuchen wir # IMMER in float() zu wandeln. try: value = float(msg.payload) except Exception: # Datensatz verwerfen return # Directory für das Schreiben in InfluxDB erstellen dataset = { "measurement": topics[1], "tags": { "sensorname": topics[3], }, "fields": { "value": value } } # Directory in InfluxDB schreiben try: self._idb.write_points([dataset]) logger.info("geschrieben: {0} {1}".format(topics[3], value)) except Exception: logger.error("Fehler beim Schreiben in InfluxDB") def start(self): """main program with loop.""" # InfluxDB einrichten self._idb = InfluxDBClient( database=self.influxdatabase, host=self.influxserver, port=self.influxport ) # Datenbank anlegen, sollte sie nicht existieren try: self._idb.create_database(self.basetopic) except Exception: logger.error("Kann Datenbank nicht erzeugen") raise RuntimeError("Kann Datenbank nicht erzeugen") # MQTT aktivieren mqtt = Client() if self.mqttuser != "": mqtt.username_pw_set(self.mqttuser, self.mqttpassword) if self.mqtttls: mqtt.tls_set(cert_reqs=CERT_NONE) mqtt.tls_insecure_set(True) mqtt.on_connect = self.mqtt_connect mqtt.on_message = self.mqtt_message # mainloop while not self._evt_exit.is_set(): # Erstes MQTT Verbinden versuchen if self._mqtt_rc == -1: try: mqtt.connect(self.mqtthost, self.mqttport, 60) except Exception: logger.error("mqtt connect error") else: mqtt.loop_start() # Wait (never use sleep) self._evt_exit.wait(1) # MQTT trennen mqtt.loop_stop() def stop(self): """Stop mainloop.""" self._evt_exit.set() if __name__ == "__main__": root = LittleMqttToInflux() root.start()
Dieses Programm verbindet sich mit der InfluxDB und dem MQTT Broker. Es erstellt ein subscribe
auf alle Topics, die mit datensammlung
beginnen. Wenn weitere Revolution Pis mit anderen Namen revpi02
, revpi03
an die Cloud angehängt werden, werden auch deren Daten verarbeitet.
Wenn man eigene Programme geschrieben hat oder Sensoren besitzt, die MQTT sprechen, kann man diese Werte natürlich auch verarbeiten. Wichtig ist nur ein eindeutiger Name: datensammlung/mein_device01
Das Programm kann direkt auf den Cloud-Server übertragen und in dem Terminal gestartet werden. Die Datenverarbeitung ist damit aktiv. Wenn der Revlolution Pi bereits Daten sendet, erhalten wir bei jeder Übertragung (alle 10 Sekunden) diese Ausgabe:
pi@raspberrypi:~ $ python3 littlemqtttoinflux.py INFO:root:geschrieben: door_corridor 1.0 INFO:root:geschrieben: door_workshop 1.0 INFO:root:geschrieben: temp_heater 228.0 INFO:root:geschrieben: temp_outside 378.0
Landen die Daten wirklich in der Datenbank?
Über das Kommandozeilenprogramm von InfluxDB können wir prüfen, ob bereits Daten in unserer Datenbank zu sehen sind:
pi@raspberrypi:~ $ influx Visit https://enterprise.influxdata.com to register for updates, InfluxDB server management, and monitoring. Connected to http://localhost:8086 version 1.0.2 InfluxDB shell version: 1.0.2 > show databases name: databases --------------- name _internal datensammlung > use datensammlung Using database datensammlung > show measurements name: measurements ------------------ name revpi01 > select * from revpi01 limit 8 name: revpi01 ------------- time sensorname value 1524230645850060456 door_corridor 1 1524230645874043893 door_workshop 1 1524230645921271654 temp_heater 228 1524230645942686029 temp_outside 341 1524230655864807327 door_corridor 1 1524230655896276077 door_workshop 1 1524230655942289983 temp_heater 227 1524230655979327691 temp_outside 341
Wir prüfen mit show databases
ob unsere Datenbank datensammlung
vorhanden ist. Danach wechseln wir mit use datensammlung
in Diese und zeigen alle Measurements mit show measurement
an. Dort sollte unser revpi01
existieren, dessen Daten wir auslesen:
select * from revpi01 limit 8
Das Pythonprogramm zu einem Dienst machen
Wir wollen nun natürlich, dass unser LittleMqttToInflux Programm mit dem System startet. Sonst würden wir ja keine Daten in unsere Datenbank bekommen.
Wir legen die Datei littlemqtttoinflux.py
in das /usr/local/bin
Verzeichnis ab und markieren es als ausführbar. Vorher aber bitte oben in der Datei level=logging.INFO
auf level=logging.WARNING
ändern!
# Programm verschieben pi@raspberrypi:~ $ sudo mv littlemqtttoinflux.py /usr/local/bin # Programm ausführbar markieren pi@raspberrypi:~ $ sudo chmod +x /usr/local/bin/littlemqtttoinflux.py
Für Systemd erstellen wir nun eine .service Datei für unser Programm:
# Systemd .service Datei anlegen und füllen pi@raspberrypi:~ $ sudo nano /etc/systemd/system/mqttinflux.service [Unit] Description=MQTT-Werte in InfluxDB schreiben After=network.target influxd.service [Service] ExecStart=/usr/local/bin/littlemqtttoinflux.py Restart=on-failure RestartSec=15 [Install] WantedBy=multi-user.target # Systemd neu laden pi@raspberrypi:~ $ sudo systemctl daemon-reload # mqttinflux.service bei jedem Systemstart starten pi@raspberrypi:~ $ sudo systemctl enable mqttinflux # mqttinflux.service jetzt starten pi@raspberrypi:~ $ sudo systemctl start mqttinflux # Prüfen ob mqttinflux.service läuft pi@raspberrypi:~ $ sudo systemctl status mqttinflux ● mqttinflux.service - MQTT-Werte in InfluxDB schreiben Loaded: loaded (/etc/systemd/system/mqttinflux.service; disabled; vendor preset: enabled) Active: active (running) since Fri 2018-04-20 17:14:05 CEST; 7s ago Main PID: 18410 (littlemqtttoinf) CGroup: /system.slice/mqttinflux.service └─18410 /usr/bin/python3 /usr/local/bin/littlemqtttoinflux.py Apr 20 17:14:05 raspberrypi systemd[1]: Started MQTT-Werte in InfluxDB schreiben.
Das LittleMqttToInflux Programm wird nun automatisch mit dem System gestartet.
So geht es weiter
Da wir jetzt Daten in unsere Datenbank bekommen, wollen wir diese auch grafisch anzeigen! Im nächsten Blogeintrag zeigen wir wie…