Benni’s Run – Threads in the mainloop()

We got a nice sample program from Benjamin, which uses as_threads in event handling!

It uses the simulated RPi.GPIO.add_event_callback (…) of RevPiModIO – The function is known by the RaspberryPi.

In “BennisRun”, a function is started, which successively sets all outputs to True. It uses the sleep () function and therefore takes “much” time. When the input is reset, another function starts, which sets all outputs successively to False.

Normally, all functions triggered by events are completely executed before the mainloop () can continue to run any further events .This is wanted and is realized by executing the functions from the same thread that the mainloop () uses (the process image update runs in another thread and is of course NOT stopped! Unless, when entering the mainloop (), the parameter freeze = True is passed).

For “BennisRun”, this means that only one function must set all outputs to True and only then, the other function can be called, which sets all outputs to False.

UNLESS: We use the parameter “as_thread = True” for reg_event (). This is similar to the behavior of add_event_callback (…).

Old RevPiModIO syntax:
self.revpi.devices["devname"].reg_event("Inputname", self.eventfunktion, edge=revpimodio.RISING, as_thread=True)

New RevPiModIO2 syntax:
self.revpi.io.Inputname.reg_event(self.eventfunktion, edge=revpimodio2.RISING, as_thread=True)

Then the functions are started as single threads and can run in parallel – if the code is not checked, even the same function multiple times (The function must contain the keyword “thread” as the passing parameter).

Then it looks like this:

And this is the small source:

Source code for RevPiModIO version 2

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# (c) Benjamin
import revpimodio2
import time

# These values must be changed to the own piCtory configuration!
# Name of Input: I_1
outputformat = "O_{}"


class RevPiModIOTest():

    def __init__(self):
        self.out = [outputformat.format(x) for x in range(1, 15)]

        # Instantiate RevPiModIO2 and handle "program exit" signal
        self.revpi = revpimodio2.RevPiModIO(autorefresh=True)
        self.revpi.handlesignalend(self.exitfunktion)

        # Register Events, that are executed as THREAD
        self.revpi.io.I_1.reg_event(
            self.eventfunktion1, edge=revpimodio2.RISING, as_thread=True
        )
        self.revpi.io.I_1.reg_event(
            self.eventfunktion2, edge=revpimodio2.FALLING, as_thread=True
        )

    def exitfunktion(self):
        """This function is executed just before exit the program."""
        self.revpi.core.A1 = revpimodio2.OFF

    def eventfunktion1(self, thread):
        """Turn lights on in sequence."""
        for i in range(len(self.out)):
            self.revpi.io[self.out[i]].value = True
            time.sleep(0.1)

    def eventfunktion2(self, thread):
        """Turn lights off in sequence."""
        for i in range(len(self.out)):
            self.revpi.io[self.out[i]].value = False
            time.sleep(0.1)

    def start(self):
        """Starts the mainloop()."""
        self.revpi.core.A1 = revpimodio2.GREEN
        print("Gehe in den mainloop()")
        self.revpi.mainloop()

        print("Verlasse mainloop()")


if __name__ == "__main__":
    root = RevPiModIOTest()
    root.start()
Source code for RevPiModIO Version 1
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# (c) Benjamin
import revpimodio
import signal
import time

# These values must be changed to the own piCtory configuration!
outputformat = "O_{}"
iodevname = "dio02"
eventinput = "I_1"


class RevPiModIOTest():

    def __init__(self):
        self.out = [outputformat.format(x) for x in range(1, 15)]

        # Instantiate RevPiModIO2
        self.revpi = revpimodio.RevPiModIO(auto_refresh=True)

        # Register Events, that are executed as THREAD
        self.revpi.devices[iodevname].reg_event(
            eventinput, self.eventfunktion1,
            edge=revpimodio.RISING, as_thread=True
        )
        self.revpi.devices[iodevname].reg_event(
            eventinput, self.eventfunktion2,
            edge=revpimodio.FALLING, as_thread=True
        )

        # handle "program exit" signal
        signal.signal(signal.SIGINT, self._sigexit)
        signal.signal(signal.SIGTERM, self._sigexit)

    def _sigexit(self, signum, frame):
        """This function is executed just before exit the program."""
        self.revpi.devices.exit()
        print("Quitting mainloop()")

    def eventfunktion1(self, thread):
        """Turn lights on in sequence."""
        for i in range(len(self.out)):
            self.revpi.devices[iodevname][self.out[i]].value = 1
            time.sleep(0.1)

    def eventfunktion2(self, thread):
        """Turn lights off in sequence."""
        for i in range(len(self.out)):
            self.revpi.devices[iodevname][self.out[i]].value = 0
            time.sleep(0.1)

    def start(self):
        """Starts the mainloop()."""
        self.revpi.devices.core.A1 = revpimodio.GREEN
        print("Going in to the mainloop()")
        self.revpi.devices.mainloop()
        self.revpi.devices.core.A1 = revpimodio.OFF
        self.revpi.devices.writeprocimg()


if __name__ == "__main__":
    root = RevPiModIOTest()
    root.start()