Daten auf Cloud-Server speichern

Die Daten vom Revolution Pi empfangen wir nun per MQTT. Diese wollen wir aber natürlich auch speichern um sie auszuwerten.

3) Datenspeicherung auf dem Server

Für die Speicherung der Datenmassen verwenden wir InfluxDB, eine nicht relationale Datenbank für Zeitserien.

Um InfluxDB auf unserem Cloud-Server zu installieren, binden wir die InfluxData-Repositories ein. Damit können wir das System auch auf dem aktuellen Stand halten.

Repository von InfluxDB einbinden und installieren
# Schlüssel des InfluxDB Repositories in apt integrieren
pi@raspberrypi:~ $ sudo /bin/bash -c 'wget -O - https://repos.influxdata.com/influxdb.key | apt-key add -'

# InfluxDB Repository zum apt hinzufügen
pi@raspberrypi:~ $ sudo /bin/bash -c 'echo "deb https://repos.influxdata.com/debian/ stretch stable" > /etc/apt/sources.list.d/influx.list'

# Alle Repositories neu laden
pi@raspberrypi:~ $ sudo apt-get update

# InfluxDB installieren
pi@raspberrypi:~ $ sudo apt-get install influxdb influxdb-client

Mit Abschluss der Installation wird der InfluxDB Server bereits ausgeführt und ist verfügbar.

Wir werden gleich ein Programm erstellen, welches die Daten von MQTT in die InfluxDB schreibt.  Unsere Visualisierung wird später ebenfalls auf unserem Cloud-Sever installiert und ließt die Daten aus der InfluxDB.
Da ein Zugriff auf die Datenbank nur vom lokalen System aus passiert, binden wir die Dienste vom InfluxDB Server an localhost.
Dafür bearbeiten wir die Datei /etc/influxdb/influxdb.conf:

pi@raspberrypi:~ $ sudo nano /etc/influxdb/influxdb.conf

(...)
# Change this option to true to disable reporting.
reporting-disabled = true

# --- EINFÜGEN ---
# Bind address to use for the RPC service for backup and restore.
bind-address = "127.0.0.1:8088"
# --- ENDE EINFÜGEN ---

###
### [meta]

(...)

### 
### [admin]
###  
### Controls the availability of the built-in, web-based admin interface. If HTTPS is
### enabled for the admin interface, HTTPS must also be enabled on the [http] service.
###  

[admin]
  enabled = true
  bind-address = "127.0.0.1:8083"
  https-enabled = false
  https-certificate = "/etc/ssl/influxdb.pem"

### 
### [http]
### 
### Controls how the HTTP endpoints are configured. These are the primary
### mechanism for getting data into and out of InfluxDB.
###

[http]
  enabled = true
  # bind-address = ":8086"
  # Ändern zu
  bind-address = "127.0.0.1:8086"
(...)

Wir setzen die bind-address von allen Diensten auf 127.0.0.1: und speichern diese Datei. Danach starten wir den InfluxDB Server neu sudo systemctl restart influxdb.

Das Pythonprogramm auf dem Cloud-Server

Alles was wir nun noch benötigen ist ein Programm um die MQTT-Daten in die InfluxDB zu schreiben.

Auf unserem Cloud-Server installieren wir dafür das Python3 Modul für InfluxDB und das, aus dem vorherigen Blogeintrag bekannte, Paho.mqtt Modul:

# InfluxDB Modul für Python3 installieren
pi@raspberrypi:~ $ sudo apt-get install python3-influxdb

# Sollte pip3 noch nicht installiert sein, muss dies jetzt gemacht werden
pi@raspberrypi:~ $ sudo apt-get install python3-pip

# paho-mqtt über pip3 installieren, da es nicht in den Repositories ist
pi@raspberrypi:~ $ sudo pip3 install paho-mqtt
Downloading/unpacking paho-mqtt
 Downloading paho-mqtt-1.3.1.tar.gz (80kB): 80kB downloaded
 Running setup.py (path:/tmp/pip-build-t3xxgmr2/paho-mqtt/setup.py) egg_info for package paho-mqtt
 
Installing collected packages: paho-mqtt
 Running setup.py install for paho-mqtt
 
Successfully installed paho-mqtt
Cleaning up...
LittleMqttToInflux – Da ist die Magie
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""Main program."""
__author__ = "Sven Sager"
__copyright__ = "Copyright (C) 2018 Sven Sager"
__license__ = "GPLv3"
__version__ = "0.1.1"

import logging
import signal
from influxdb import InfluxDBClient
from os.path import join
from paho.mqtt.client import Client
from ssl import CERT_NONE
from threading import Event

logger = logging.getLogger()

# !!! Nur zum TESTEN, auf INFO belassen, sonst WARNING !!!
logging.basicConfig(level=logging.INFO)
# logging.basicConfig(level=logging.WARNING)


class LittleMqttToInflux():

    """Main program class."""

    def __init__(self):
        """Init LittleMqttToInflux class."""
        # System variables
        self._evt_exit = Event()
        self._idb = None
        self._mqtt_rc = -1
        # --------------------------------------------------

        # TODO: Diese Werte an eigenen Server anpassen!

        # Basetopic muss hier OHNE '/revpi01' angegeben werden! Es handelt
        # sich um die unterste MQTT-Ebene, dies wird unser Datanbankname.
        self.basetopic = "datensammlung"

        # MQTT Einstellungen
        self.mqtthost = "127.0.0.1"
        self.mqttuser = "systemreader"
        self.mqttpassword = "test"
        self.mqttport = 1883
        self.mqtttls = False

        # Influx Einstellungen
        self.influxdatabase = self.basetopic
        self.influxserver = "127.0.0.1"
        self.influxport = 8086

        # --------------------------------------------------
        # Catch signales
        signal.signal(signal.SIGINT, self._sigexit)
        signal.signal(signal.SIGTERM, self._sigexit)

    def _sigexit(self, signum, sigframe):
        """Signal handler to clean and exit program."""
        self.stop()

    def mqtt_connect(self, client, userdata, flags, rc):
        """Wird ausgeloest bei connect/reconnect zum Broker."""
        # Verbindungs rc merken
        self._mqtt_rc = rc
        if rc > 0:
            return

        # Basetopic subscriben
        client.subscribe(join(self.basetopic, "#"))

    def mqtt_message(self, client, userdata, msg):
        """Wird immer ausgeloest wenn eine MQTT-Nachricht kommt."""

        # Topics aufsplitten. Dabei entsteht eine Liste, deren Einträge wir
        # wie folgt interpretieren:

        # Topic: datensammlung/revpi01/io/temp_heater
        # [ datenbankname , measurement,  text   ,   sensorname ]
        # ["datensammlung",  "revpi01" ,  "io"   , "temp_heater"]
        topics = msg.topic.split("/")

        # Wenn es nicht genau 4 Einträge gibt, verwerfen wir den Datensatz
        if len(topics) != 4:
            return

        # Die Daten stehen als bytes() in msg.payload. Diese versuchen wir
        # IMMER in float() zu wandeln.
        try:
            value = float(msg.payload)
        except Exception:
            # Datensatz verwerfen
            return

        # Directory für das Schreiben in InfluxDB erstellen
        dataset = {
            "measurement": topics[1],
            "tags": {
                "sensorname": topics[3],
            },
            "fields": {
                "value": value
            }
        }

        # Directory in InfluxDB schreiben
        try:
            self._idb.write_points([dataset])
            logger.info("geschrieben: {0} {1}".format(topics[3], value))
        except Exception:
            logger.error("Fehler beim Schreiben in InfluxDB")

    def start(self):
        """main program with loop."""

        # InfluxDB einrichten
        self._idb = InfluxDBClient(
            database=self.influxdatabase,
            host=self.influxserver,
            port=self.influxport
        )

        # Datenbank anlegen, sollte sie nicht existieren
        try:
            self._idb.create_database(self.basetopic)
        except Exception:
            logger.error("Kann Datenbank nicht erzeugen")
            raise RuntimeError("Kann Datenbank nicht erzeugen")

        # MQTT aktivieren
        mqtt = Client()
        if self.mqttuser != "":
            mqtt.username_pw_set(self.mqttuser, self.mqttpassword)
        if self.mqtttls:
            mqtt.tls_set(cert_reqs=CERT_NONE)
            mqtt.tls_insecure_set(True)

        mqtt.on_connect = self.mqtt_connect
        mqtt.on_message = self.mqtt_message

        # mainloop
        while not self._evt_exit.is_set():

            # Erstes MQTT Verbinden versuchen
            if self._mqtt_rc == -1:
                try:
                    mqtt.connect(self.mqtthost, self.mqttport, 60)
                except Exception:
                    logger.error("mqtt connect error")
                else:
                    mqtt.loop_start()

            # Wait (never use sleep)
            self._evt_exit.wait(1)

        # MQTT trennen
        mqtt.loop_stop()

    def stop(self):
        """Stop mainloop."""
        self._evt_exit.set()


if __name__ == "__main__":
    root = LittleMqttToInflux()
    root.start()

DOWNLOAD

Dieses Programm verbindet sich mit der InfluxDB und dem MQTT Broker. Es erstellt ein subscribe auf alle Topics, die mit datensammlung beginnen. Wenn weitere Revolution Pis mit anderen Namen revpi02, revpi03 an die Cloud angehängt werden, werden auch deren Daten verarbeitet.
Wenn man eigene Programme geschrieben hat oder Sensoren besitzt, die MQTT sprechen, kann man diese Werte natürlich auch verarbeiten. Wichtig ist nur ein eindeutiger Name: datensammlung/mein_device01

Das Programm kann direkt auf den Cloud-Server übertragen und in dem Terminal gestartet werden. Die Datenverarbeitung ist damit aktiv. Wenn der Revlolution Pi bereits Daten sendet, erhalten wir bei jeder Übertragung (alle 10 Sekunden) diese Ausgabe:

pi@raspberrypi:~ $ python3 littlemqtttoinflux.py 
INFO:root:geschrieben: door_corridor 1.0
INFO:root:geschrieben: door_workshop 1.0
INFO:root:geschrieben: temp_heater 228.0
INFO:root:geschrieben: temp_outside 378.0
Landen die Daten wirklich in der Datenbank?

Über das Kommandozeilenprogramm von InfluxDB können wir prüfen, ob bereits Daten in unserer Datenbank zu sehen sind:

pi@raspberrypi:~ $ influx

Visit https://enterprise.influxdata.com to register for updates, InfluxDB server management, and monitoring.
Connected to http://localhost:8086 version 1.0.2
InfluxDB shell version: 1.0.2
> show databases
name: databases
---------------
name
_internal
datensammlung

> use datensammlung
Using database datensammlung
> show measurements
name: measurements
------------------
name
revpi01

> select * from revpi01 limit 8
name: revpi01
-------------
time                    sensorname      value
1524230645850060456     door_corridor   1
1524230645874043893     door_workshop   1
1524230645921271654     temp_heater     228
1524230645942686029     temp_outside    341
1524230655864807327     door_corridor   1
1524230655896276077     door_workshop   1
1524230655942289983     temp_heater     227
1524230655979327691     temp_outside    341

Wir prüfen mit show databases ob unsere Datenbank datensammlung vorhanden ist. Danach wechseln wir mit use datensammlung in Diese und zeigen alle Measurements mit show measurement an. Dort sollte unser revpi01 existieren, dessen Daten wir auslesen:
select * from revpi01 limit 8

Das Pythonprogramm zu einem Dienst machen

Wir wollen nun natürlich, dass unser LittleMqttToInflux Programm mit dem System startet. Sonst würden wir ja keine Daten in unsere Datenbank bekommen.

Wir legen die Datei littlemqtttoinflux.py in das /usr/local/bin Verzeichnis ab und markieren es als ausführbar. Vorher aber bitte oben in der Datei level=logging.INFO auf level=logging.WARNING ändern!

# Programm verschieben
pi@raspberrypi:~ $ sudo mv littlemqtttoinflux.py /usr/local/bin

# Programm ausführbar markieren
pi@raspberrypi:~ $ sudo chmod +x /usr/local/bin/littlemqtttoinflux.py

Für Systemd erstellen wir nun eine .service Datei für unser Programm:

# Systemd .service Datei anlegen und füllen
pi@raspberrypi:~ $ sudo nano /etc/systemd/system/mqttinflux.service

[Unit]
Description=MQTT-Werte in InfluxDB schreiben
After=network.target influxd.service

[Service]
ExecStart=/usr/local/bin/littlemqtttoinflux.py
Restart=on-failure
RestartSec=15

[Install]
WantedBy=multi-user.target


# Systemd neu laden
pi@raspberrypi:~ $ sudo systemctl daemon-reload

# mqttinflux.service bei jedem Systemstart starten
pi@raspberrypi:~ $ sudo systemctl enable mqttinflux

# mqttinflux.service jetzt starten
pi@raspberrypi:~ $ sudo systemctl start mqttinflux

# Prüfen ob mqttinflux.service läuft
pi@raspberrypi:~ $ sudo systemctl status mqttinflux
● mqttinflux.service - MQTT-Werte in InfluxDB schreiben
   Loaded: loaded (/etc/systemd/system/mqttinflux.service; disabled; vendor preset: enabled)
   Active: active (running) since Fri 2018-04-20 17:14:05 CEST; 7s ago
 Main PID: 18410 (littlemqtttoinf)
   CGroup: /system.slice/mqttinflux.service
           └─18410 /usr/bin/python3 /usr/local/bin/littlemqtttoinflux.py

Apr 20 17:14:05 raspberrypi systemd[1]: Started MQTT-Werte in InfluxDB schreiben.

Das LittleMqttToInflux Programm wird nun automatisch mit dem System gestartet.

So geht es weiter

Da wir jetzt Daten in unsere Datenbank bekommen, wollen wir diese auch grafisch anzeigen! Im nächsten Blogeintrag zeigen wir wie…

Weiter >>