Hallo zusammen.
Ich habe gerade ein Problem und brauche mal wieder Hilfe.
Ich habe einen RPi2 als Heimautomationsserver. Da fand ich die Idee toll eine 433MHz Fernbedienung zu nutzen. Das ganze habe ich jetzt verdrahtet und läuft auch soweit. Ich nutze die Lib von Milaq (https://github.com/milaq/rpi-rf). Der RXB6 ist an GPIO 27.
So, nun habe ich der Taste A gerade die Funktion zugewiesen das Licht anzuschalten. Dazu sendet der RPi mit einem NRF24L01 ein Signal zu einem Arduino. Der NRF wird jedoch auch über GPIO angesprochen. Diese Funktion macht natürlich, wie es sich gehört, einen GPIO.cleanup(). Damit scheint sie aber auch den GPIO Aufruf für den 433MHz Receiver zu 'cleanen'.
Wie löst man das jetzt am elegantesten? Der Receiver für den RXB6 muss die ganze Zeit laufen... man weiß ja nie wann mal einer die Fernbedienung drückt.
Kann man das cleanup generell weglassen? Nicht so toll, oder? Kann man nur eine Instanz cleanen?
GPIO zeitgleiche mehrfachverwendung
-
HeAdLeSs -
18. Januar 2018 um 17:59 -
Erledigt
Jeden Donnerstag 20:30 Uhr hier im Chat.
Wer Lust hat, kann sich gerne beteiligen. ;)
-
-
GPIO zeitgleiche mehrfachverwendung? Schau mal ob du hier fündig wirst!
-
Leider kennen wir/ich deine verwendeten Skripte nicht - zeig die doch bitte einmal
-
Die Scripte... ok... mal gucken
Hier das Script für den 433 Receiver:
Python
Alles anzeigen#!/usr/bin/env python3 import argparse import signal import sys import time import logging import subprocess import datetime from modules.rpi_rf import RFDevice import modules.database as database import modules.bob as bob rfdevice = None # pylint: disable=unused-argument def exithandler(signal, frame): rfdevice.cleanup() sys.exit(0) logging.basicConfig(level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S', format='%(asctime)-15s - [%(levelname)s] %(module)s: %(message)s', ) signal.signal(signal.SIGINT, exithandler) rfdevice = RFDevice(27) rfdevice.enable_rx() timestamp = None print("Listening for codes") while True: if rfdevice.rx_code_timestamp != timestamp: timestamp = rfdevice.rx_code_timestamp if rfdevice.rx_proto == 1: logging.info(str(rfdevice.rx_code) + " [pulselength " + str(rfdevice.rx_pulselength) + ", protocol " + str(rfdevice.rx_proto) + "]") if int(rfdevice.rx_code) == 123: # COMMING HOME statusA = database.get_setting("relais_2") if int(statusA) == 1: bob.switch_small_relay(2, 0) else: bob.switch_small_relay(2, 1) print("Taste A") time.sleep(3) time.sleep(0.01) rfdevice.cleanup()
Da benutze ich, wie gesagt, die Lib von Milaq (https://github.com/milaq/rpi-rf).
Python
Alles anzeigen""" Sending and receiving 433/315Mhz signals with low-cost GPIO RF Modules on a Raspberry Pi. """ import logging import time from collections import namedtuple import RPi.GPIO as GPIO MAX_CHANGES = 67 _LOGGER = logging.getLogger(__name__) Protocol = namedtuple('Protocol', ['pulselength', 'sync_high', 'sync_low', 'zero_high', 'zero_low', 'one_high', 'one_low']) PROTOCOLS = (None, Protocol(350, 1, 31, 1, 3, 3, 1), Protocol(650, 1, 10, 1, 2, 2, 1), Protocol(100, 30, 71, 4, 11, 9, 6), Protocol(380, 1, 6, 1, 3, 3, 1), Protocol(500, 6, 14, 1, 2, 2, 1)) class RFDevice: """Representation of a GPIO RF device.""" # pylint: disable=too-many-instance-attributes,too-many-arguments def __init__(self, gpio, tx_proto=1, tx_pulselength=None, tx_repeat=10, tx_length=24, rx_tolerance=80): """Initialize the RF device.""" self.gpio = gpio self.tx_enabled = False self.tx_proto = tx_proto if tx_pulselength: self.tx_pulselength = tx_pulselength else: self.tx_pulselength = PROTOCOLS[tx_proto].pulselength self.tx_repeat = tx_repeat self.tx_length = tx_length self.rx_enabled = False self.rx_tolerance = rx_tolerance # internal values self._rx_timings = [0] * (MAX_CHANGES + 1) self._rx_last_timestamp = 0 self._rx_change_count = 0 self._rx_repeat_count = 0 # successful RX values self.rx_code = None self.rx_code_timestamp = None self.rx_proto = None self.rx_bitlength = None self.rx_pulselength = None GPIO.setmode(GPIO.BCM) _LOGGER.debug("Using GPIO " + str(gpio)) def cleanup(self): """Disable TX and RX and clean up GPIO.""" if self.tx_enabled: self.disable_tx() if self.rx_enabled: self.disable_rx() _LOGGER.debug("Cleanup") GPIO.cleanup() def enable_tx(self): """Enable TX, set up GPIO.""" if self.rx_enabled: _LOGGER.error("RX is enabled, not enabling TX") return False if not self.tx_enabled: self.tx_enabled = True GPIO.setup(self.gpio, GPIO.OUT) _LOGGER.debug("TX enabled") return True def disable_tx(self): """Disable TX, reset GPIO.""" if self.tx_enabled: # set up GPIO pin as input for safety GPIO.setup(self.gpio, GPIO.IN) self.tx_enabled = False _LOGGER.debug("TX disabled") return True def tx_code(self, code, tx_proto=None, tx_pulselength=None): """ Send a decimal code. Optionally set protocol and pulselength. When none given reset to default protocol and pulselength. """ if tx_proto: self.tx_proto = tx_proto else: self.tx_proto = 1 if tx_pulselength: self.tx_pulselength = tx_pulselength else: self.tx_pulselength = PROTOCOLS[self.tx_proto].pulselength rawcode = format(code, '#0{}b'.format(self.tx_length + 2))[2:] _LOGGER.debug("TX code: " + str(code)) return self.tx_bin(rawcode) def tx_bin(self, rawcode): """Send a binary code.""" _LOGGER.debug("TX bin: " + str(rawcode)) for _ in range(0, self.tx_repeat): for byte in range(0, self.tx_length): if rawcode[byte] == '0': if not self.tx_l0(): return False else: if not self.tx_l1(): return False if not self.tx_sync(): return False return True def tx_l0(self): """Send a '0' bit.""" if not 0 < self.tx_proto < len(PROTOCOLS): _LOGGER.error("Unknown TX protocol") return False return self.tx_waveform(PROTOCOLS[self.tx_proto].zero_high, PROTOCOLS[self.tx_proto].zero_low) def tx_l1(self): """Send a '1' bit.""" if not 0 < self.tx_proto < len(PROTOCOLS): _LOGGER.error("Unknown TX protocol") return False return self.tx_waveform(PROTOCOLS[self.tx_proto].one_high, PROTOCOLS[self.tx_proto].one_low) def tx_sync(self): """Send a sync.""" if not 0 < self.tx_proto < len(PROTOCOLS): _LOGGER.error("Unknown TX protocol") return False return self.tx_waveform(PROTOCOLS[self.tx_proto].sync_high, PROTOCOLS[self.tx_proto].sync_low) def tx_waveform(self, highpulses, lowpulses): """Send basic waveform.""" if not self.tx_enabled: _LOGGER.error("TX is not enabled, not sending data") return False GPIO.output(self.gpio, GPIO.HIGH) time.sleep((highpulses * self.tx_pulselength) / 1000000) GPIO.output(self.gpio, GPIO.LOW) time.sleep((lowpulses * self.tx_pulselength) / 1000000) return True def enable_rx(self): """Enable RX, set up GPIO and add event detection.""" if self.tx_enabled: _LOGGER.error("TX is enabled, not enabling RX") return False if not self.rx_enabled: self.rx_enabled = True GPIO.setup(self.gpio, GPIO.IN) GPIO.add_event_detect(self.gpio, GPIO.BOTH) GPIO.add_event_callback(self.gpio, self.rx_callback) _LOGGER.debug("RX enabled") return True def disable_rx(self): """Disable RX, remove GPIO event detection.""" if self.rx_enabled: GPIO.remove_event_detect(self.gpio) self.rx_enabled = False _LOGGER.debug("RX disabled") return True # pylint: disable=unused-argument def rx_callback(self, gpio): """RX callback for GPIO event detection. Handle basic signal detection.""" timestamp = int(time.perf_counter() * 1000000) duration = timestamp - self._rx_last_timestamp if duration > 5000: if duration - self._rx_timings[0] < 200: self._rx_repeat_count += 1 self._rx_change_count -= 1 if self._rx_repeat_count == 2: for pnum in range(1, len(PROTOCOLS)): if self._rx_waveform(pnum, self._rx_change_count, timestamp): _LOGGER.debug("RX code " + str(self.rx_code)) break self._rx_repeat_count = 0 self._rx_change_count = 0 if self._rx_change_count >= MAX_CHANGES: self._rx_change_count = 0 self._rx_repeat_count = 0 self._rx_timings[self._rx_change_count] = duration self._rx_change_count += 1 self._rx_last_timestamp = timestamp def _rx_waveform(self, pnum, change_count, timestamp): """Detect waveform and format code.""" code = 0 delay = int(self._rx_timings[0] / PROTOCOLS[pnum].sync_low) delay_tolerance = delay * self.rx_tolerance / 100 for i in range(1, change_count, 2): if (self._rx_timings[i] - delay * PROTOCOLS[pnum].zero_high < delay_tolerance and self._rx_timings[i+1] - delay * PROTOCOLS[pnum].zero_low < delay_tolerance): code <<= 1 elif (self._rx_timings[i] - delay * PROTOCOLS[pnum].one_high < delay_tolerance and self._rx_timings[i+1] - delay * PROTOCOLS[pnum].one_low < delay_tolerance): code <<= 1 code |= 1 else: return False if self._rx_change_count > 6 and code != 0: self.rx_code = code self.rx_code_timestamp = timestamp self.rx_bitlength = int(change_count / 2) self.rx_pulselength = delay self.rx_proto = pnum return True return False
So, dann wird die Taste A gedrückt und die Funktion:
aufgerufen. Die 2 ist einfach der Index des Relais und die 0 = Aus (1 = An).
Code
Alles anzeigendef switch_small_relay(number, status=1): pipes = [...] res = CWA.communication_via_nrf24l01(pipes, "11|" + str(number) + "|" + str(status) + "\n") if res == "11|" + str(number) + "|" + str(status) + "\n": database.set_setting("relais_" + str(number), status) bob_online.bob_online_send_relay_status(str(number), status) if session: session.clear() return 1 else: return 0
Und die Funktion CWA.communication_via_nrf24l01 sieht so aus:
Python
Alles anzeigendef communication_via_nrf24l01(pipes, send_msg): import RPi.GPIO as GPIO GPIO.setmode(GPIO.BCM) from libs.lib_nrf24 import NRF24 import time import spidev from time import gmtime, strftime try: radio = NRF24(GPIO, spidev.SpiDev()) radio.begin(0, 25) radio.enableDynamicPayloads() radio.setRetries(15,15) radio.setPayloadSize(32) radio.setChannel(...) radio.setDataRate(NRF24.BR_250KBPS) radio.setPALevel(NRF24.PA_LOW) radio.setCRCLength(NRF24.CRC_8); radio.setAutoAck(1) radio.openWritingPipe(pipes[1]) radio.openReadingPipe(1, pipes[0]) radio.enableAckPayload() radio.printDetails() radio.stopListening() buf = bytes(send_msg, "UTF-8") # send a packet to receiver radio.write(buf) # did it return with a payload? pipe = [0] radio.startListening() timeout_counter = 0 timeout = False while not radio.available(pipe): time.sleep(10000/1000000.0) if timeout_counter == 100: timout = True break else: timeout_counter += 1 if timeout == True: GPIO.cleanup() return 0 else: recv_buffer = [] radio.read(recv_buffer, radio.getDynamicPayloadSize()) recv = "" for c in recv_buffer: recv += chr(c) #print ("recv-" + recv + "-") GPIO.cleanup() if send_msg == recv: return recv else: return 0 except: GPIO.cleanup() print(("Error: %s" % sys.exc_info()[0])) exc_type, exc_value, exc_traceback = sys.exc_info() config.bob_logging(traceback.format_exception(exc_type, exc_value, exc_traceback), "CRITICAL") return 0
Diese macht, nach der Arbeit, ein GPIO.cleanup(). Dieses zerstört aber auch das GPIO Handling der rpi-rf... woraufhin diese keine Daten mehr Empfangen kann und einfach nichts mehr macht. Bei STRG+C kommt:
ZitatTraceback (most recent call last):
File "remote_control.py", line 75, in <module>
time.sleep(0.01)
File "remote_control.py", line 18, in exithandler
rfdevice.cleanup()
File "/home/pi/bob2/modules/rpi_rf.py", line 66, in cleanup
self.disable_rx()
File "/home/pi/bob2/modules/rpi_rf.py", line 176, in disable_rx
GPIO2.remove_event_detect(self.gpio)
RuntimeError: Please set pin numbering mode using GPIO.setmode(GPIO.BOARD) or GPIO.setmode(GPIO.BCM)
Edit: Ok, das letzte hat "nichts" mit dem Problem zu tun. Das ist eine Cleanup Funktion von rpi-rf, die beim Beenden (auch STRG+C) ausgeführt wird.
-
Man kann mehrere Python Scripts bzgl. GPIO gleichzeitig nutzen / laufen haben.
Durch GPIO.cleanup() werden nicht _alle_ GPIO's auf Werkseinstellungen zurückgesetzt, sondern nur die die vom jeweiligen Script tatsächlich verwendet wurden.
Man kann GPIO.cleanup() aber auch den GPIO übergeben der zurückgesetzt werden soll: GPIO.cleanup(17)
Wenn du GPIO.cleanup() in der RFDevice Klasse (Zeile 68) entsprechend anpasst sollte es eigentlich den gewünschten Effekt haben...
Ggf. müsstest du dann noch ein GPIO.setwarnings(False) nach Zeile 58 einfügen. (Optional)
Zeile 68 müsste wie folgt angepasst werden: GPIO.cleanup(self.gpio)
In der cleanup() Methode der RFDevice Klasse wird halt noch mehr gemacht, deshalb solltest du die Methode schon weiterhin aufrufen.
Ausprobiert hab ich das noch nicht, aber du ja gleich
-
Funktioniert leider nicht. Das Skript bleibt immer noch 'stecken' und es werden keine Signale der Fernbedienung mehr angezeigt.
Wobei ich auch nicht ganz verstehe warum es in der RFDevice Klasse geändert werden soll. Müsste nicht die communication_via_nrf24l01 bearbeitet werden? Denn die RFDevice Klasse ist ja die, die dauerhaft läuft und die communication_via_nrf24l01 Funktion immer nur zwischendurch.
Wie dem auch sei, ich habe es an beiden Stellen probiert... es gibt zwar keinen Fehler, aber funktionieren tut es auch nicht. Aus irgend einem Grund unterbricht die communication_via_nrf24l01 den Ablauf der RFDevice. -
-
Achso ich hab das anders herum verstanden, dass das RFDevice Script zwischendurch gestartet wird. Leider hast du nicht erwähnt welches nrf24l01 Module du da nutzt...
Dann pass mal in der communication_via_nrf24l01() Funktion Zeile 44, 54 und 60 an bzw kommentier alle GPIO.cleanup() Zeilen aus und füge nach Zeile 3 GPIO.setwarnings(False) ein.
Bei communication_via_nrf24l01() gibt es halt eine Besonderheit: NRF24() wird nur das Module zur Steuerung der GPIOs übergeben, das wiederum kümmert sich selber um die Initialisierung aber macht normalerweise kein cleanup.
Somit müsste es also eigentlich reichen communication_via_nrf24l01() anzupassen.
-
Warum macht es normalerweise kein Cleanup? Sollte nicht immer ein Cleanup der GPIOs gemacht werden, da sie sonst - auch nach Programmende - weiterhin belegt sind und nicht neu genutzt werden können?
Ich habe jetzt einen neuen Parameter in communication_via_nrf24l01() eingebaut; gpio_clean_upDamit übergebe ich der Funktion den Hinweis ob ein cleanup gemacht werden soll oder nicht. Die Funktion wird auch von anderen Programmfunktionen aufgerufen und da ist dann ein CleanUp notwendig. So ganz richtig fühlt sich das aber noch nicht an, da einen Parameter überall durchzuschleifen. Scheint wohl aber nicht anders zu gehen.
-
Wie gesagt: Ich weiß absolut nicht welches Module Du für NRF24() verwendest, da du das nirgends erwähnt hast. Daher kann ich auch nicht 100% sagen Ob oder Ob Nicht vom Module ein Cleanup gemacht wird.
Jetzt mitmachen!
Du hast noch kein Benutzerkonto auf unserer Seite? Registriere dich kostenlos und nimm an unserer Community teil!