Die Daten vom Revolution Pi empfangen wir nun per MQTT. Diese wollen wir aber natürlich auch speichern um sie auszuwerten.
3) Datenspeicherung auf dem Server
Für die Speicherung der Datenmassen verwenden wir InfluxDB, eine nicht relationale Datenbank für Zeitserien.
Um InfluxDB auf unserem Cloud-Server zu installieren, binden wir die InfluxData-Repositories ein. Damit können wir das System auch auf dem aktuellen Stand halten.
Repository von InfluxDB einbinden und installieren
# Schlüssel des InfluxDB Repositories in apt integrieren pi@raspberrypi:~ $ sudo /bin/bash -c 'wget -O - https://repos.influxdata.com/influxdb.key | apt-key add -' # InfluxDB Repository zum apt hinzufügen pi@raspberrypi:~ $ sudo /bin/bash -c 'echo "deb https://repos.influxdata.com/debian/ stretch stable" > /etc/apt/sources.list.d/influx.list' # Alle Repositories neu laden pi@raspberrypi:~ $ sudo apt-get update # InfluxDB installieren pi@raspberrypi:~ $ sudo apt-get install influxdb influxdb-client
Mit Abschluss der Installation wird der InfluxDB Server bereits ausgeführt und ist verfügbar.
Wir werden gleich ein Programm erstellen, welches die Daten von MQTT in die InfluxDB schreibt. Unsere Visualisierung wird später ebenfalls auf unserem Cloud-Sever installiert und ließt die Daten aus der InfluxDB.
Da ein Zugriff auf die Datenbank nur vom lokalen System aus passiert, binden wir die Dienste vom InfluxDB Server an localhost.
Dafür bearbeiten wir die Datei /etc/influxdb/influxdb.conf:
pi@raspberrypi:~ $ sudo nano /etc/influxdb/influxdb.conf (...) # Change this option to true to disable reporting. reporting-disabled = true # --- EINFÜGEN --- # Bind address to use for the RPC service for backup and restore. bind-address = "127.0.0.1:8088" # --- ENDE EINFÜGEN --- ### ### [meta] (...) ### ### [admin] ### ### Controls the availability of the built-in, web-based admin interface. If HTTPS is ### enabled for the admin interface, HTTPS must also be enabled on the [http] service. ### [admin] enabled = true bind-address = "127.0.0.1:8083" https-enabled = false https-certificate = "/etc/ssl/influxdb.pem" ### ### [http] ### ### Controls how the HTTP endpoints are configured. These are the primary ### mechanism for getting data into and out of InfluxDB. ### [http] enabled = true # bind-address = ":8086" # Ändern zu bind-address = "127.0.0.1:8086" (...)
Wir setzen die bind-address von allen Diensten auf 127.0.0.1: und speichern diese Datei. Danach starten wir den InfluxDB Server neu sudo systemctl restart influxdb.
Das Pythonprogramm auf dem Cloud-Server
Alles was wir nun noch benötigen ist ein Programm um die MQTT-Daten in die InfluxDB zu schreiben.
Auf unserem Cloud-Server installieren wir dafür das Python3 Modul für InfluxDB und das, aus dem vorherigen Blogeintrag bekannte, Paho.mqtt Modul:
# InfluxDB Modul für Python3 installieren pi@raspberrypi:~ $ sudo apt-get install python3-influxdb # Sollte pip3 noch nicht installiert sein, muss dies jetzt gemacht werden pi@raspberrypi:~ $ sudo apt-get install python3-pip # paho-mqtt über pip3 installieren, da es nicht in den Repositories ist pi@raspberrypi:~ $ sudo pip3 install paho-mqtt Downloading/unpacking paho-mqtt Downloading paho-mqtt-1.3.1.tar.gz (80kB): 80kB downloaded Running setup.py (path:/tmp/pip-build-t3xxgmr2/paho-mqtt/setup.py) egg_info for package paho-mqtt Installing collected packages: paho-mqtt Running setup.py install for paho-mqtt Successfully installed paho-mqtt Cleaning up...
LittleMqttToInflux – Da ist die Magie
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""Main program."""
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2018 Sven Sager"
__license__ = "GPLv3"
__version__ = "0.1.1"
import logging
import signal
from influxdb import InfluxDBClient
from os.path import join
from paho.mqtt.client import Client
from ssl import CERT_NONE
from threading import Event
logger = logging.getLogger()
# !!! Nur zum TESTEN, auf INFO belassen, sonst WARNING !!!
logging.basicConfig(level=logging.INFO)
# logging.basicConfig(level=logging.WARNING)
class LittleMqttToInflux():
"""Main program class."""
def __init__(self):
"""Init LittleMqttToInflux class."""
# System variables
self._evt_exit = Event()
self._idb = None
self._mqtt_rc = -1
# --------------------------------------------------
# TODO: Diese Werte an eigenen Server anpassen!
# Basetopic muss hier OHNE '/revpi01' angegeben werden! Es handelt
# sich um die unterste MQTT-Ebene, dies wird unser Datanbankname.
self.basetopic = "datensammlung"
# MQTT Einstellungen
self.mqtthost = "127.0.0.1"
self.mqttuser = "systemreader"
self.mqttpassword = "test"
self.mqttport = 1883
self.mqtttls = False
# Influx Einstellungen
self.influxdatabase = self.basetopic
self.influxserver = "127.0.0.1"
self.influxport = 8086
# --------------------------------------------------
# Catch signales
signal.signal(signal.SIGINT, self._sigexit)
signal.signal(signal.SIGTERM, self._sigexit)
def _sigexit(self, signum, sigframe):
"""Signal handler to clean and exit program."""
self.stop()
def mqtt_connect(self, client, userdata, flags, rc):
"""Wird ausgeloest bei connect/reconnect zum Broker."""
# Verbindungs rc merken
self._mqtt_rc = rc
if rc > 0:
return
# Basetopic subscriben
client.subscribe(join(self.basetopic, "#"))
def mqtt_message(self, client, userdata, msg):
"""Wird immer ausgeloest wenn eine MQTT-Nachricht kommt."""
# Topics aufsplitten. Dabei entsteht eine Liste, deren Einträge wir
# wie folgt interpretieren:
# Topic: datensammlung/revpi01/io/temp_heater
# [ datenbankname , measurement, text , sensorname ]
# ["datensammlung", "revpi01" , "io" , "temp_heater"]
topics = msg.topic.split("/")
# Wenn es nicht genau 4 Einträge gibt, verwerfen wir den Datensatz
if len(topics) != 4:
return
# Die Daten stehen als bytes() in msg.payload. Diese versuchen wir
# IMMER in float() zu wandeln.
try:
value = float(msg.payload)
except Exception:
# Datensatz verwerfen
return
# Directory für das Schreiben in InfluxDB erstellen
dataset = {
"measurement": topics[1],
"tags": {
"sensorname": topics[3],
},
"fields": {
"value": value
}
}
# Directory in InfluxDB schreiben
try:
self._idb.write_points([dataset])
logger.info("geschrieben: {0} {1}".format(topics[3], value))
except Exception:
logger.error("Fehler beim Schreiben in InfluxDB")
def start(self):
"""main program with loop."""
# InfluxDB einrichten
self._idb = InfluxDBClient(
database=self.influxdatabase,
host=self.influxserver,
port=self.influxport
)
# Datenbank anlegen, sollte sie nicht existieren
try:
self._idb.create_database(self.basetopic)
except Exception:
logger.error("Kann Datenbank nicht erzeugen")
raise RuntimeError("Kann Datenbank nicht erzeugen")
# MQTT aktivieren
mqtt = Client()
if self.mqttuser != "":
mqtt.username_pw_set(self.mqttuser, self.mqttpassword)
if self.mqtttls:
mqtt.tls_set(cert_reqs=CERT_NONE)
mqtt.tls_insecure_set(True)
mqtt.on_connect = self.mqtt_connect
mqtt.on_message = self.mqtt_message
# mainloop
while not self._evt_exit.is_set():
# Erstes MQTT Verbinden versuchen
if self._mqtt_rc == -1:
try:
mqtt.connect(self.mqtthost, self.mqttport, 60)
except Exception:
logger.error("mqtt connect error")
else:
mqtt.loop_start()
# Wait (never use sleep)
self._evt_exit.wait(1)
# MQTT trennen
mqtt.loop_stop()
def stop(self):
"""Stop mainloop."""
self._evt_exit.set()
if __name__ == "__main__":
root = LittleMqttToInflux()
root.start()
Dieses Programm verbindet sich mit der InfluxDB und dem MQTT Broker. Es erstellt ein subscribe auf alle Topics, die mit datensammlung beginnen. Wenn weitere Revolution Pis mit anderen Namen revpi02, revpi03 an die Cloud angehängt werden, werden auch deren Daten verarbeitet.
Wenn man eigene Programme geschrieben hat oder Sensoren besitzt, die MQTT sprechen, kann man diese Werte natürlich auch verarbeiten. Wichtig ist nur ein eindeutiger Name: datensammlung/mein_device01
Das Programm kann direkt auf den Cloud-Server übertragen und in dem Terminal gestartet werden. Die Datenverarbeitung ist damit aktiv. Wenn der Revlolution Pi bereits Daten sendet, erhalten wir bei jeder Übertragung (alle 10 Sekunden) diese Ausgabe:
pi@raspberrypi:~ $ python3 littlemqtttoinflux.py INFO:root:geschrieben: door_corridor 1.0 INFO:root:geschrieben: door_workshop 1.0 INFO:root:geschrieben: temp_heater 228.0 INFO:root:geschrieben: temp_outside 378.0
Landen die Daten wirklich in der Datenbank?
Über das Kommandozeilenprogramm von InfluxDB können wir prüfen, ob bereits Daten in unserer Datenbank zu sehen sind:
pi@raspberrypi:~ $ influx Visit https://enterprise.influxdata.com to register for updates, InfluxDB server management, and monitoring. Connected to http://localhost:8086 version 1.0.2 InfluxDB shell version: 1.0.2 > show databases name: databases --------------- name _internal datensammlung > use datensammlung Using database datensammlung > show measurements name: measurements ------------------ name revpi01 > select * from revpi01 limit 8 name: revpi01 ------------- time sensorname value 1524230645850060456 door_corridor 1 1524230645874043893 door_workshop 1 1524230645921271654 temp_heater 228 1524230645942686029 temp_outside 341 1524230655864807327 door_corridor 1 1524230655896276077 door_workshop 1 1524230655942289983 temp_heater 227 1524230655979327691 temp_outside 341
Wir prüfen mit show databases ob unsere Datenbank datensammlung vorhanden ist. Danach wechseln wir mit use datensammlung in Diese und zeigen alle Measurements mit show measurement an. Dort sollte unser revpi01 existieren, dessen Daten wir auslesen:
select * from revpi01 limit 8
Das Pythonprogramm zu einem Dienst machen
Wir wollen nun natürlich, dass unser LittleMqttToInflux Programm mit dem System startet. Sonst würden wir ja keine Daten in unsere Datenbank bekommen.
Wir legen die Datei littlemqtttoinflux.py in das /usr/local/bin Verzeichnis ab und markieren es als ausführbar. Vorher aber bitte oben in der Datei level=logging.INFO auf level=logging.WARNING ändern!
# Programm verschieben pi@raspberrypi:~ $ sudo mv littlemqtttoinflux.py /usr/local/bin # Programm ausführbar markieren pi@raspberrypi:~ $ sudo chmod +x /usr/local/bin/littlemqtttoinflux.py
Für Systemd erstellen wir nun eine .service Datei für unser Programm:
# Systemd .service Datei anlegen und füllen
pi@raspberrypi:~ $ sudo nano /etc/systemd/system/mqttinflux.service
[Unit]
Description=MQTT-Werte in InfluxDB schreiben
After=network.target influxd.service
[Service]
ExecStart=/usr/local/bin/littlemqtttoinflux.py
Restart=on-failure
RestartSec=15
[Install]
WantedBy=multi-user.target
# Systemd neu laden
pi@raspberrypi:~ $ sudo systemctl daemon-reload
# mqttinflux.service bei jedem Systemstart starten
pi@raspberrypi:~ $ sudo systemctl enable mqttinflux
# mqttinflux.service jetzt starten
pi@raspberrypi:~ $ sudo systemctl start mqttinflux
# Prüfen ob mqttinflux.service läuft
pi@raspberrypi:~ $ sudo systemctl status mqttinflux
● mqttinflux.service - MQTT-Werte in InfluxDB schreiben
Loaded: loaded (/etc/systemd/system/mqttinflux.service; disabled; vendor preset: enabled)
Active: active (running) since Fri 2018-04-20 17:14:05 CEST; 7s ago
Main PID: 18410 (littlemqtttoinf)
CGroup: /system.slice/mqttinflux.service
└─18410 /usr/bin/python3 /usr/local/bin/littlemqtttoinflux.py
Apr 20 17:14:05 raspberrypi systemd[1]: Started MQTT-Werte in InfluxDB schreiben.
Das LittleMqttToInflux Programm wird nun automatisch mit dem System gestartet.
So geht es weiter
Da wir jetzt Daten in unsere Datenbank bekommen, wollen wir diese auch grafisch anzeigen! Im nächsten Blogeintrag zeigen wir wie…
