Treiber für Virtuelles Device aus piCtory

Wir entwickeln mit RevPiModIODriver() einen Treiber für ein virtuelles Device in piCtory.

In unserem Beispiel sammeln wir Daten vom RevolutionPi und schreiben diese in ein virtuelles Device. Dabei handelt es sich um die Systemzeit in UTC, die prozentuale Festplattenbelegung und die CPU-Temperatur.

Quelltext für RevPiModIO Version 2

# -*- coding: utf-8 -*-
import revpimodio2
import time
from os import statvfs


class MyDriver():

    def __init__(self):
        """Bereitet alles fuer den Treiber vor."""

        # RevPiModIODriver für virtuelles Device an Positon 64 instanziieren
        self.rpi = revpimodio2.RevPiModIODriver(64, autorefresh=True)

        # Signale vom Betriebssystem für "Programm beenden" verwalten
        # oder Strg+C Signal - Der Treiber wird dann sauber beendet
        self.rpi.handlesignalend()

        # RevPiModIODriver Inputs für Daten anlegen
        # Wir sind ein Treiber und können in INPUTS schreiben!!!

        # Input-Byte 0 bis 3 als int verwenden für timestamp
        self.rpi.io.Input_1.replace_io("timestamp", "L")

        # Input-Byte 4 als int verwenden für n% Belegung root mount
        self.rpi.io.Input_5.replace_io("roothdused", "B")

        # Input-Byte 5 - 6 als int für CPU Temperatur * 100 (Centigrad)
        self.rpi.io.Input_6.replace_io("cputemp", "H")

    def cyclefunction(self, cycletools):
        """Diese Funktion wird zyklisch von RevPiModIODriver aufgerufen."""

        # Timestamp - RevPi-Zeit schreiben
        self.rpi.io.timestamp.value = int(time.time())

        # Speicherbelegung des root mounts
        hd = statvfs("/")
        usedspace = int(
            (hd.f_blocks - hd.f_bavail) / hd.f_blocks * 100
        )
        self.rpi.io.roothdused.value = usedspace

        # CPU-Temperatur
        with open("/sys/devices/virtual/thermal/thermal_zone0/temp") as fh:
            tmp = fh.read()
        self.rpi.io.cputemp.value = int(tmp.strip()[:-1])

    def start(self):
        """Startet den Cycleloop, der die cyclefunction zyklisch aufruft."""
        # Programm blockiert hier und vararbeitet zyklisch die Daten.
        # Die Zykluszeit stellen wir auf 1000 Millisekunden ein, das reicht uns
        self.rpi.cycleloop(self.cyclefunction, cycletime=1000)


if __name__ == "__main__":
    root = MyDriver()
    root.start()
Quelltext für RevPiModIO Version 1
import revpimodio
import signal
import time
from os import statvfs
from threading import Event


class MyDriver():

    def __init__(self):
        """Instanziiert den Treiber."""
        self._evt_exit = Event()

        # Einstellungen laden
        self._loadconfig()

        # Signal events
        signal.signal(signal.SIGINT, self._sigexit)
        signal.signal(signal.SIGTERM, self._sigexit)
        signal.signal(signal.SIGHUP, self._loadconfig)

    def _loadconfig(self, signum=None, frame=None):
        """Load/Reload configuration."""
        # RevPiModIODriver instanziieren
        self.rpi = revpimodio.RevPiModIODriver([64])

        # RevPiModIODriver konfigurieren

        # Input-Byte 0 bis 3 als int verwenden für timestamp
        self.rpi.devices[64].reg_out("timestamp", "Input_1_i09", "L")

        # Input-Byte 4 als int verwenden für n% Belegung root mount
        self.rpi.devices[64].reg_out("roothdused", "Input_5_i09", "B")

        # Input-Byte 5 - 6 als int für CPU Temperatur * 100 (Centigrad)
        self.rpi.devices[64].reg_out("cputemp", "Input_6_i09", "H")

        # Nach run() Durchlauf diese Sekunden warten bis erneuter Durchlauf
        self.refresh = 1

    def _sigexit(self, signum, frame):
        """Signal handler to exit."""
        self._evt_exit.set()

    def run(self):
        """Laeuft bis exit gesetzt wird."""
        while not self._evt_exit.is_set():

            # Inputs lesen, aus Sicht von logiCAD sind dies Outputs!
            # (auch wenn wir diese momentan nicht verwenden)
            self.rpi.devices.readprocimg()

            # Timestamp - RevPi-Zeit schreiben
            self.rpi.devices[64]["timestamp"].value = int(time.time())

            # Speicherbelegung des root mounts
            hd = statvfs("/")
            usedspace = int(
                (hd.f_blocks - hd.f_bavail) / hd.f_blocks * 100
            )
            self.rpi.devices[64]["roothdused"].value = usedspace

            # CPU-Temperatur
            with open("/sys/devices/virtual/thermal/thermal_zone0/temp") as fh:
                tmp = fh.read()
            self.rpi.devices[64]["cputemp"].value = int(tmp.strip()[:-1])

            # Outputs schreiben, aus Sicht von logiCAD sind dies Inputs!
            self.rpi.devices.writeprocimg()

            # Warten
            self._evt_exit.wait(self.refresh)


if __name__ == "__main__":
    root = MyDriver()
    root.run()

 

Diese Daten stehen dann Programme wie logiCAD zur Verfügung.