Archived
1
0
This repository has been archived on 2025-04-27. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
pico-dht22/main.py
2025-04-26 21:43:24 -04:00

304 lines
9.0 KiB
Python

import network
import rp2
import time
import json
import os
import machine
from machine import Pin
from umqtt.robust import MQTTClient
import dht
import urequests
from dotenv import load_dotenv
load_dotenv()
# Change this GPIO PIN where your DHT22 sensor is connected
DHT_22_GPIO_PIN = 3
# Debug Mode
DEBUG = False
def read_cpu_temp():
"""
If you print the value of the temperature value you are going to get an integer number between 0 and 65535.
So, we have to convert this value either to the Celsius degree scales.
The temperature sensor works by delivering a voltage to the ADC4 pin that is proportional to the temperature.
From the datasheet, a temperature of 27 degrees Celsius delivers a voltage of 0.706 V.
With each additional degree the voltage reduces by 1.721 mV or 0.001721 V.
The first step in converting the 16-bit temperature is to convert it back to volts, which is done based on the 3.3 V maximum voltage used by the Pico board.
ref: https://how2electronics.com/read-temperature-sensor-value-from-raspberry-pi-pico/
"""
cpu_temp_conversion_factor = 3.3 / 65535
cpu_temp_sensor = machine.ADC(4)
reading = cpu_temp_sensor.read_u16() * cpu_temp_conversion_factor
temperature_c = 27 - (reading - 0.706) / 0.001721
temperature_f = temperature_c * 9 / 5.0 + 32.0
return temperature_f
def read_dht_22(sensor):
"""
reads the temperature and humidity from dht.DHT22 sensor.
returns tuple(temperature, humidity) if no errors
returns None if there was an error
"""
try:
sensor.measure()
temperature = sensor.temperature()
humidity = sensor.humidity()
return temperature, humidity
except:
time.sleep(2)
return None
def wlan_up(wlan):
wlan.active(True)
wlan.connect(os.getenv("HOME_WIFI_SSID"), os.getenv("HOME_WIFI_PWD"))
# Wait for connect or fail
max_wait = 10
while max_wait > 0:
if wlan.status() < 0 or wlan.status() >= 3:
break
max_wait -= 1
if DEBUG:
print("Waiting for WiFi connection...")
time.sleep(1)
if max_wait == 0:
return None
ifconfig = wlan.ifconfig()
if DEBUG:
print(ifconfig)
return ifconfig
def led_error_code(led, error_code: int):
"""Blink LED for a given error code (int). error code == number of times to blink"""
if DEBUG:
print("LED Error Status code: {}".format(error_code))
# Run a quick 'start error code sequence'
# So we know when LED error sequence starts
start_sequence_counter = 0
while start_sequence_counter < 3:
led.value(True)
time.sleep(0.1)
led.value(False)
time.sleep(0.1)
start_sequence_counter += 1
# Run real error code sequence
blink_counter = 0
while blink_counter < error_code:
time.sleep(1)
led.value(True)
time.sleep(1)
led.value(False)
blink_counter += 1
# Make sure to turn off LED when this subroutine finished
led.value(False)
if DEBUG:
print("LED Error Status code finished for: {}".format(error_code))
def main():
# Start Up Activities
if DEBUG:
print("Start up")
count = 0
led = machine.Pin("LED", machine.Pin.OUT)
led.value(False)
led_error_code(led, 1)
# Set Wi-Fi Country
rp2.country("US")
wlan = network.WLAN(network.STA_IF)
# Create MQTT Client
mqtt_client = MQTTClient(
client_id=os.getenv("MQTT_CLIENT_ID"),
server=os.getenv("MQTT_HOST_NAME"),
)
# Create Home Assistant MQTT Client
ha_mqtt_client = MQTTClient(
client_id=os.getenv("MQTT_CLIENT_ID"),
server=os.getenv("MQTT_HOST_NAME"),
keepalive=os.getenv("MQTT_KEEP"),
user=os.getenv("MQTT_USERNAME"),
password=os.getenv("MQTT_PASSWORD"),
)
# Create DHT22
sensor = dht.DHT22(Pin(DHT_22_GPIO_PIN))
# Let's Go!
if DEBUG:
print("Enter main loop")
while True:
# Loop Clean-Up and Prep
led.value(False)
count += 1
if DEBUG:
print(f"\nStarting loop #{count}... \nWiFI Status is {wlan.status()}.")
# Create Local Flags for Control
wifi_ready = False
mqtt_ready = False
dht22_ready = False
# DHT22 and CPU Reading.
try:
# CPU Reading.
cpu_temp = read_cpu_temp()
# DHT22 Reading.
dht22_reading = read_dht_22(sensor)
# debug_str = "None"
if dht22_reading is not None:
temp, hum = dht22_reading
temp = temp * 9 / 5.0 + 32.0
dht22_ready = True
except:
continue
# WiFi Connection.
try:
if wlan.status() != 3:
ifconfig = wlan_up(wlan)
if ifconfig is None:
if DEBUG:
print("Trouble to connecting WiFi: {}".format(e))
led_error_code(led, 3)
time.sleep(10)
continue
else:
wifi_ready = True
if DEBUG:
print("Connected to WiFi: {}".format(ifconfig))
else:
wifi_ready = True
ifconfig = wlan.ifconfig()
if DEBUG:
print(
f"Skipping WiFi Activation since WiFi Status is {wlan.status()} \n{ifconfig}."
)
except:
continue
# MQTT Connection to Primary Broker.
try:
mqtt_client.connect(clean_session=False)
mqtt_ready = True
mqtt = "mosquitto"
if DEBUG:
print(f"Connected to Mosquitto MQTT broker.")
except Exception as e:
if DEBUG:
print("Trouble to connecting to Mosquitto MQTT: {}".format(e))
# MQTT Connection to Back Up Broker.
if not mqtt_ready:
try:
try:
ha_mqtt_client.connect(clean_session=False)
mqtt_ready = True
mqtt = "ha"
if DEBUG:
print(f"Connected to the back up, Home Assistant, MQTT broker.")
except Exception as f:
if DEBUG:
print(
"Trouble to connecting to Home Assistant MQTT: {}".format(f)
)
led_error_code(led, 2)
time.sleep(5)
continue # Start back at the top of the While Loop
except:
continue
# Ready to Publish?
try:
if wifi_ready and mqtt_ready and dht22_ready:
# Timestamp
time_now = time.localtime()
timestamp = "{}/{}/{} {}:{}:{}".format(
time_now[1],
time_now[2],
time_now[0],
time_now[3],
time_now[4],
time_now[5],
)
# Build JSON Payloads
dht_data = {
"Location": os.getenv("MQTT_ZONE_ID"),
"Temperature": temp,
"Relative Humidity": hum,
}
dht_payload = json.dumps(dht_data)
hw_data = {
"Timestamp": timestamp,
"CPU Temperature": cpu_temp,
"Device": os.getenv("MQTT_HW_ID"),
"WiFi Information": ifconfig,
}
hw_payload = json.dumps(hw_data)
if DEBUG:
print(f"Trying to publish...")
if mqtt == "mosquitto":
# Publish
mqtt_client.publish(
"home/{}".format(os.getenv("MQTT_ZONE_ID")),
dht_payload,
retain=True,
)
mqtt_client.publish(
"hw/{}".format(os.getenv("MQTT_HW_ID")), hw_payload, retain=True
)
mqtt_client.disconnect()
if DEBUG:
print(f"MQTT Disconnected.")
if mqtt == "ha":
# Publish
ha_mqtt_client.publish(
"home/{}".format(os.getenv("HA_MQTT_ZONE_ID")),
dht_payload,
retain=True,
)
ha_mqtt_client.publish(
"hw/{}".format(os.getenv("HA_MQTT_HW_ID")),
hw_payload,
retain=True,
)
ha_mqtt_client.disconnect()
if DEBUG:
print(f"MQTT Disconnected.")
except:
continue
# Sleep for a bit.
if DEBUG:
print(f"Finished loop #{count}.")
time.sleep(5)
main()