Wir entwickeln mit RevPiModIODriver() einen Treiber für ein virtuelles Device in piCtory.
In unserem Beispiel sammeln wir Daten vom RevolutionPi und schreiben diese in ein virtuelles Device. Dabei handelt es sich um die Systemzeit in UTC, die prozentuale Festplattenbelegung und die CPU-Temperatur.
Diese Daten stehen dann Programme wie logiCAD zur Verfügung.# -*- coding: utf-8 -*-
import revpimodio2
import time
from os import statvfs
class MyDriver():
def __init__(self):
"""Bereitet alles fuer den Treiber vor."""
# RevPiModIODriver für virtuelles Device an Positon 64 instanziieren
self.rpi = revpimodio2.RevPiModIODriver(64, autorefresh=True)
# Signale vom Betriebssystem für "Programm beenden" verwalten
# oder Strg+C Signal - Der Treiber wird dann sauber beendet
self.rpi.handlesignalend()
# RevPiModIODriver Inputs für Daten anlegen
# Wir sind ein Treiber und können in INPUTS schreiben!!!
# Input-Byte 0 bis 3 als int verwenden für timestamp
self.rpi.io.Input_1.replace_io("timestamp", "L")
# Input-Byte 4 als int verwenden für n% Belegung root mount
self.rpi.io.Input_5.replace_io("roothdused", "B")
# Input-Byte 5 - 6 als int für CPU Temperatur * 100 (Centigrad)
self.rpi.io.Input_6.replace_io("cputemp", "H")
def cyclefunction(self, cycletools):
"""Diese Funktion wird zyklisch von RevPiModIODriver aufgerufen."""
# Timestamp - RevPi-Zeit schreiben
self.rpi.io.timestamp.value = int(time.time())
# Speicherbelegung des root mounts
hd = statvfs("/")
usedspace = int(
(hd.f_blocks - hd.f_bavail) / hd.f_blocks * 100
)
self.rpi.io.roothdused.value = usedspace
# CPU-Temperatur
with open("/sys/devices/virtual/thermal/thermal_zone0/temp") as fh:
tmp = fh.read()
self.rpi.io.cputemp.value = int(tmp.strip()[:-1])
def start(self):
"""Startet den Cycleloop, der die cyclefunction zyklisch aufruft."""
# Programm blockiert hier und vararbeitet zyklisch die Daten.
# Die Zykluszeit stellen wir auf 1000 Millisekunden ein, das reicht uns
self.rpi.cycleloop(self.cyclefunction, cycletime=1000)
if __name__ == "__main__":
root = MyDriver()
root.start()
import revpimodio
import signal
import time
from os import statvfs
from threading import Event
class MyDriver():
def __init__(self):
"""Instanziiert den Treiber."""
self._evt_exit = Event()
# Einstellungen laden
self._loadconfig()
# Signal events
signal.signal(signal.SIGINT, self._sigexit)
signal.signal(signal.SIGTERM, self._sigexit)
signal.signal(signal.SIGHUP, self._loadconfig)
def _loadconfig(self, signum=None, frame=None):
"""Load/Reload configuration."""
# RevPiModIODriver instanziieren
self.rpi = revpimodio.RevPiModIODriver([64])
# RevPiModIODriver konfigurieren
# Input-Byte 0 bis 3 als int verwenden für timestamp
self.rpi.devices[64].reg_out("timestamp", "Input_1_i09", "L")
# Input-Byte 4 als int verwenden für n% Belegung root mount
self.rpi.devices[64].reg_out("roothdused", "Input_5_i09", "B")
# Input-Byte 5 - 6 als int für CPU Temperatur * 100 (Centigrad)
self.rpi.devices[64].reg_out("cputemp", "Input_6_i09", "H")
# Nach run() Durchlauf diese Sekunden warten bis erneuter Durchlauf
self.refresh = 1
def _sigexit(self, signum, frame):
"""Signal handler to exit."""
self._evt_exit.set()
def run(self):
"""Laeuft bis exit gesetzt wird."""
while not self._evt_exit.is_set():
# Inputs lesen, aus Sicht von logiCAD sind dies Outputs!
# (auch wenn wir diese momentan nicht verwenden)
self.rpi.devices.readprocimg()
# Timestamp - RevPi-Zeit schreiben
self.rpi.devices[64]["timestamp"].value = int(time.time())
# Speicherbelegung des root mounts
hd = statvfs("/")
usedspace = int(
(hd.f_blocks - hd.f_bavail) / hd.f_blocks * 100
)
self.rpi.devices[64]["roothdused"].value = usedspace
# CPU-Temperatur
with open("/sys/devices/virtual/thermal/thermal_zone0/temp") as fh:
tmp = fh.read()
self.rpi.devices[64]["cputemp"].value = int(tmp.strip()[:-1])
# Outputs schreiben, aus Sicht von logiCAD sind dies Inputs!
self.rpi.devices.writeprocimg()
# Warten
self._evt_exit.wait(self.refresh)
if __name__ == "__main__":
root = MyDriver()
root.run()