Habe eine bouncetime von 100 eingefügt und es klappt nun.
Widerstand beträgt 10k, Kabel sind bis zu einem Meter lang.
Kompletter Quelltext:
from signal import signal, SIGINT
from sys import exit
import tracemalloc
import asyncio
import _thread
import RPi.GPIO as GPIO
import os
from flask.helpers import make_response
import SmartGardenSensors
import EmailSender
import SmartGardenConfigLoader
from SmartGardenConfigLoader import config
from SmartGardenLogger import SmartGardenLogger
from AirWatcher import AirWatcher
from LightWatcher import LightWatcher
from PlantController import PlantController
from SmartGardenRestAPI import SmartGardenRestAPI
import datetime
from interfaces.IDumpable import IDumpable
_LOOP = asyncio.get_event_loop()
_garden = 0
tracemalloc.start()
GPIO.setmode(GPIO.BCM)
smartGardenConfig = config()
""" Email Settings """
SENDER_MAIL = smartGardenConfig["email"]["sender"]
SENDER_PW = smartGardenConfig["email"]["sender_password"]
RECEIVER_MAIL = smartGardenConfig["email"]["receiver"]
class SmartGarden(IDumpable):
#Region
@property
def currentTemperature(self):
return self._currentTemperature
@currentTemperature.setter
def currentTemperature(self, value):
if not value is None:
self._currentTemperature = value
self.temperatureIsOkay = False if self.currentTemperature > self.maxTemperature or self.currentTemperature < self.minTemperature else True
@currentTemperature.deleter
def currentTemperature(self):
del self._currentTemperature
@property
def currentHumidity(self):
return self._currentHumidity
@currentHumidity.setter
def currentHumidity(self, value):
if not value is None:
self._currentHumidity = value
self.humidityIsOkay = False if self._currentHumidity > self.maxHumidity or self._currentHumidity < self.minHumidity else True
@currentHumidity.deleter
def currentHumidity(self):
del self._currentHumidity
def __init__(self):
self.isActive = False
self.logger = SmartGardenLogger(True)
self.log("Initializing Smart Garden")
self.plants = {}
self.lights = {}
self.fans = {}
self.config = config()
self.minHumidity = self.config["humidity"]["minimum"]
self.maxHumidity = self.config["humidity"]["maximum"]
self.humidityIsOkay = True
self.minTemperature = self.config["temperature"]["minimum"]
self.maxTemperature = self.config["temperature"]["maximum"]
self.temperatureIsOkay = True
self.activeSinceTimestamp = 0
self.lightIntensity = 0
self.lastHumidity = 0
self.currentTemperature = 0
self._currentHumidity = 0
self.currentLightIntensity = 0
self.waterShortageGPIO = self.config["gpios"]["water_shortage_float_sensor"]
self.gardenLidGPIO = self.config["gpios"]["garden_lid_sensor"]
self.airWatcher = AirWatcher(self.config["airQualityCheckIntervall"], self.onHumidityDidChange, self.onTemperatureDidChange, self.logger)
_thread.start_new_thread(self.airWatcher.start, ())
GPIO.setup(self.waterShortageGPIO,GPIO.IN)
GPIO.add_event_detect(self.waterShortageGPIO, GPIO.BOTH, callback=self.onWaterSupplyStateDidChange)
self.waterShortageDetected = GPIO.input(self.waterShortageGPIO)
self.gardenLidIsOpen = 0
GPIO.setup(self.gardenLidGPIO,GPIO.IN)
GPIO.add_event_detect(self.gardenLidGPIO, GPIO.BOTH, callback=self.onLidStateDidChange, bouncetime=100)
self.onLidStateDidChange(self.gardenLidGPIO)
""" Called when the GPIO for the floating sensor (water sensor) detects a change """
def handleWaterCapacityStatus(self):
if (self.waterShortageDetected == True):
if (EmailSender.sendMail(SENDER_MAIL,SENDER_PW,"Smart Garden","Did stop due to water shortage. Refill the water tank in order to resume.",RECEIVER_MAIL) == False):
self.logger.log("SmartGarden - Unable to send mail bout water shortage")
self.stopAllPlantControllers()
self.log("Water is running low. Stopping PlantDoctors. Refill to resume.")
else:
self.log("Water has been refilled. Resuming PlantDoctors.")
self.resumeAllPlantControllers()
""" Stops all plant controllers """
def stopAllPlantControllers(self):
self.log("Stopping all plant controllers due to water shortage")
for plant in self.plants:
plant.stop()
""" Resumes all plant controllers """
def resumeAllPlantControllers(self):
self.log("Resuming all plant controllers as water was refilled")
for plant in self.plants:
plant.start()
""" Logs a message """
def log(self, textToLog: str):
self.logger.log(textToLog)
""" Called when water status changes from empty to filled """
def refillCallback(self):
self.waterShortageDetected = False
if (self.waterShortageDetected == True):
self.log("Water was refilled - Continuing")
""" Called whenever the temperature did change """
def onTemperatureDidChange(self, newTemperature: float, oldTemperature: float):
# TODO: Alle Pflanzen checken ob evtl ein Temperatur-Schwellwert erreicht wurde
_type = 'rise'
if (newTemperature < oldTemperature):
_type = 'fall'
self.currentTemperature = newTemperature
if newTemperature > self.maxTemperature:
self.startFansDueToHighTemperature()
self.log("Temperature did %s " % _type + "from %d " % oldTemperature + "to %d " % newTemperature + "degree")
""" Called whenever humidity did change """
def onHumidityDidChange(self, newHumidity: float, oldHumidity: float):
# TODO: Auf minHumidity und maxHumidity checken, Flag setzen wenn kritisch, mail versenden, Flag wieder auf gut setzen wenn wieder toll
_type = 'rise'
if (newHumidity < oldHumidity):
_type = 'fall'
self.log("Humidity did %s " % _type + "from %d " % oldHumidity + "to %d" % newHumidity +"%")
self._currentHumidity = newHumidity
if (newHumidity < self.minHumidity and self.humidityIsOkay == True):
self.humidityIsOkay = False
subject = "SmartGarden - Humidity is lower than defined minimum humidity"
content = "Current humidity is at %d percent" % newHumidity + ". Minimum humidity is set to %d percent" % self.minHumidity
if (EmailSender.sendMail(SENDER_MAIL,SENDER_PW,subject,content,RECEIVER_MAIL) == False):
self.logger.log("SmartGarden - Unable to send mail bout min. humidity")
self.log(subject)
elif (newHumidity > self.maxHumidity and self.humidityIsOkay == True):
self.humidityIsOkay = False
subject = f"SmartGarden - Humidity is higher than defined maximum humidity ({newHumidity}% / {self.maxHumidity}%)"
content = "Current humidity is at %d percent" % newHumidity + ". Maximum humidity is set to %d percent " % self.maxHumidity
if (EmailSender.sendMail(SENDER_MAIL,SENDER_PW,subject,content,RECEIVER_MAIL) == False):
self.logger.log("SmartGarden - Unable to send mail bout max. humidity")
self.log(subject)
self.startFansDueToHighHumidity()
else:
self.humidityIsOkay = True
def startFansDueToHighHumidity(self):
for _fanId, _fan in self.fans.items():
_fan.runDueToHighHumidity()
def startFansDueToHighTemperature(self):
for _fanId, _fan in self.fans.items():
_fan.runDueToHighTemperature()
""" Called whenever the water supply state did change """
def onWaterSupplyStateDidChange(self,channel):
if channel != self.waterShortageGPIO:
return
_statusDidChange = False
_newStatus = GPIO.input(self.waterShortageGPIO)
if _newStatus != self.waterShortageDetected:
_statusDidChange = True
self.waterShortageDetected = _newStatus
if _statusDidChange == True:
self.handleWaterCapacityStatus()
def onLidStateDidChange(self,channel):
if channel != self.gardenLidGPIO:
return
_lidStatusDidChange = False
_newLidStatus = GPIO.input(self.gardenLidGPIO)
if _newLidStatus != self.gardenLidIsOpen:
_lidStatusDidChange = True
if _lidStatusDidChange == True:
self.handleGardenLidStatus(_newLidStatus)
def handleGardenLidStatus(self, _newLidStatus: int):
self.gardenLidIsOpen = _newLidStatus
if self.gardenLidIsOpen == 0:
self.logger.log("Lid was opened")
else:
self.logger.log("Lid was closed")
def performInitialCheck(self):
self.log("Reading datetime (preferably from RTC): " + str(SmartGardenSensors.currentDateTime()))
self.currentLightIntensity = SmartGardenSensors.currentLight()
# TODO: Light is only checked in here... needs to be done regulary
self.log(f'Initial light intensity: {self.currentLightIntensity}%')
""" Creates a PlantController-instance for each plant in the config """
def loadPlants(self):
_plants = SmartGardenConfigLoader.createPlantControllersFromDirectoryPath(self.config["plants_path"],self.logger)
for _doc in _plants:
self.plants[_doc.id] = _doc
def startPlants(self):
for plantId, plant in self.plants.items():
if plant.isActive == True:
_thread.start_new_thread(plant.start, ())
def loadFans(self):
_fans = SmartGardenConfigLoader.createFanControllersFromDirectoryPath(self.config["fans_path"],self.logger)
for _fan in _fans:
self.fans[_fan.id] = _fan
def startEmergencyWatcher(self):
os.system('sudo systemctl start SmartGardenEmergencyWatcher.service')
def stopEmergencyWatcher(self):
os.system('sudo systemctl stop SmartGardenEmergencyWatcher.service')
def loadLights(self):
self.logger.log('Creating lights')
_lights = SmartGardenConfigLoader.createLightsFromDirectoryPath(self.config["lights_path"],self.logger)
for _light in _lights:
self.lights[_light.id] = _light
def startLights(self):
for _lightId, _light in self.lights.items():
_thread.start_new_thread(_light.start, ())
def stopLights(self):
for _lightId, _light in self.lights.items():
_light.stop()
_light.kill()
def stopPlantControllers(self):
for plantId, plant in self.plants.items():
if plant.isRunning == True:
plant.stop()
plant.kill()
async def start(self):
GPIO.setmode(GPIO.BCM)
self.isActive = True
self.activeSinceTimestamp = datetime.datetime.now().timestamp()
self.log("Starting Smart Garden")
self.log("Turning on main power")
GPIO.setup(self.config["gpios"]["main_power"],GPIO.OUT,initial=GPIO.HIGH)
createDirectories()
self.loadLights()
if (self.waterShortageDetected == True):
self.log("Water level is low. Exiting")
self.stop()
exit(0)
else:
self.log("Water capacity is sufficient")
self.log("Starting Emergency Watcher")
self.startEmergencyWatcher()
self.log("Performing initial check for sensors")
self.performInitialCheck()
self.startLights()
self.loadPlants()
self.startPlants()
self.loadFans()
def stop(self):
self.isActive = False
self.log("Stopping Smart Garden")
self.stopEmergencyWatcher()
self.stopLights()
self.lights = {}
self.fans = {}
self.airWatcher.stop()
self.stopPlantControllers()
self.plants = {}
self.log("Turning off main power")
GPIO.output(self.config["gpios"]["main_power"], GPIO.LOW)
GPIO.cleanup()
#region Rest-Functions
def dump(self) -> dict():
self.currentLightIntensity = SmartGardenSensors.currentLight()
self.currentTemperature = SmartGardenSensors.currentTemperature()
_status = {}
_status["isActive"] = self.isActive
_status["activeSince"] = self.activeSinceTimestamp
_status["didDetectWaterShortage"] = bool(self.waterShortageDetected)
_status["humidity"] = self.currentHumidity
_status["minimumHumidity"] = self.minHumidity
_status["maximumHumidity"] = self.maxHumidity
_status["humidityIsWithinSafeRange"] = self.humidityIsOkay
_status["temperature"] = self.currentTemperature
_status["lightIntensity"] = self.currentLightIntensity
_status["minimumTemperature"] = self.minTemperature
_status["maximumTemperature"] = self.maxTemperature
_status["temperatureIsWithinSafeRange"] = self.temperatureIsOkay
return _status
def getHumidity(self):
_humidity = {}
_humidity["humidityIsWithinSafeRange"] = self.humidityIsOkay
_humidity["humidity"] = self.currentHumidity
return _humidity
def getTemperature(self):
_temperature = {}
_temperature["temperatureIsWithinSafeRange"] = self.temperatureIsOkay
_temperature["temperature"] = self.currentTemperature
return _temperature
def getLightIntensity(self):
self.currentLightIntensity = SmartGardenSensors.currentLight()
_light = {}
_light["lightIntensity"] = self.currentLightIntensity
return _light
def listPlants(self) -> list(dict()):
_plants = []
for _plantId, _plant in self.plants.items():
_plants.append(_plant.dump())
return _plants
def listLights(self) -> list(dict()):
_lights = []
for _lightId, _light in self.lights.items():
_lights.append(_light.dump())
return _lights
def listFans(self) -> list(dict()):
_fans = []
for _fanId, _fan in self.fans.items():
_fans.append(_fan.dump())
return _fans
#endregion
def createDirectories():
rootPath = smartGardenConfig["status_path"]
plantStatusPath = smartGardenConfig["status_path"]+'plants/'
gardenPath = smartGardenConfig["status_path"]+'garden/'
if not os.path.exists(rootPath):
os.mkdir(rootPath)
if not os.path.exists(plantStatusPath):
os.mkdir(plantStatusPath)
if not os.path.exists(gardenPath):
os.mkdir(gardenPath)
pass
def handler(signal_received, frame):
# Handle cleanup
_garden.stop()
exit(0)
return
if __name__ == '__main__':
signal(SIGINT, handler)
_garden = SmartGarden()
try:
_LOOP.run_until_complete(_garden.start())
finally:
_LOOP.close()
#asyncio.run(_garden.start())
# Run RestAPI on 0.0.0.0 so its visible in local network
#asyncio.run(_restApi.run("0.0.0.0"))
_restApi = SmartGardenRestAPI(_garden)
asyncio.run(_restApi.run())
while True:
pass
Display More
Kleiner Zusatz:
Ist mein erstes echtes Hobby-Projekt mit Python, daher sieht der Quelltext auch nach "Historisch gewachsen" aus