import network import rp2 import time import json import machine from machine import Pin from umqtt.simple import MQTTClient import dht import urequests import wifi_config import mqtt_config # Change this GPIO PIN where your DHT22 sensor is connected DHT_22_GPIO_PIN = 3 # Debug Mode DEBUG = False def read_cpu_temp(): """ If you print the value of the temperature value you are going to get an integer number between 0 and 65535. So, we have to convert this value either to the Celsius degree scales. The temperature sensor works by delivering a voltage to the ADC4 pin that is proportional to the temperature. From the datasheet, a temperature of 27 degrees Celsius delivers a voltage of 0.706 V. With each additional degree the voltage reduces by 1.721 mV or 0.001721 V. The first step in converting the 16-bit temperature is to convert it back to volts, which is done based on the 3.3 V maximum voltage used by the Pico board. ref: https://how2electronics.com/read-temperature-sensor-value-from-raspberry-pi-pico/ """ cpu_temp_conversion_factor = 3.3 / 65535 cpu_temp_sensor = machine.ADC(4) reading = cpu_temp_sensor.read_u16() * cpu_temp_conversion_factor temperature_c = 27 - (reading - 0.706) / 0.001721 temperature_f = temperature_c * 9/5. + 32.0 return temperature_f def read_dht_22(sensor): """ reads the temperature and humidity from dht.DHT22 sensor. returns tuple(temperature, humidity) if no errors returns None if there was an error """ try: sensor.measure() temperature = sensor.temperature() humidity = sensor.humidity() return temperature, humidity except: time.sleep(2) return None def wlan_up(wlan): wlan.active(True) wlan.connect(wifi_config.HOME_WIFI_SSID, wifi_config.HOME_WIFI_PWD) # Wait for connect or fail max_wait = 10 while max_wait > 0: if wlan.status() < 0 or wlan.status() >= 3: break max_wait -= 1 if DEBUG: print('Waiting for WiFi connection...') time.sleep(1) if max_wait == 0: return None ifconfig = wlan.ifconfig() if DEBUG: print(ifconfig) return ifconfig def led_error_code(led, error_code: int): """Blink LED for a given error code (int). error code == number of times to blink""" if DEBUG: print("LED Error Status code: {}".format(error_code)) # Run a quick 'start error code sequence' # So we know when LED error sequence starts start_sequence_counter = 0 while start_sequence_counter < 3: led.value(True) time.sleep(0.1) led.value(False) time.sleep(0.1) start_sequence_counter += 1 # Run real error code sequence blink_counter = 0 while blink_counter < error_code: time.sleep(1) led.value(True) time.sleep(1) led.value(False) blink_counter += 1 # Make sure to turn off LED when this subroutine finished led.value(False) if DEBUG: print("LED Error Status code finished for: {}".format(error_code)) def main(): # Start Up Activities if DEBUG: print("Start up") count = 0 led = machine.Pin('LED', machine.Pin.OUT) led.value(False) led_error_code(led, 1) # Set Wi-Fi Country rp2.country('US') wlan = network.WLAN(network.STA_IF) # Create MQTT Client mqtt_client = MQTTClient(mqtt_config.MQTT_CLIENT_ID, mqtt_config.MQTT_HOST_NAME) # Create DHT22 sensor = dht.DHT22(Pin(DHT_22_GPIO_PIN)) # Let's Go! if DEBUG: print("Enter main loop") while True: # Loop Clean-Up and Prep led.value(False) count += 1 if DEBUG: print(f'\nStarting loop #{count}... \nWiFI Status is {wlan.status()}.') # WiFi Connection. if wlan.status() != 3: ifconfig = wlan_up(wlan) if ifconfig is None: if DEBUG: print("Trouble to connecting WiFi: {}".format(e)) led_error_code(led, 3) continue else: if DEBUG: print("Connected to WiFi: {}".format(ifconfig)) else: ifconfig = wlan.ifconfig() if DEBUG: print(f"Skipping WiFi Activation since WiFi Status is {wlan.status()} \n{ifconfig}.") # MQTT Conneciton. try: mqtt_client.connect() if DEBUG: print(f'MQTT Connected.') except Exception as e: if DEBUG: print("Trouble to connecting to MQTT: {}".format(e)) led_error_code(led, 2) continue # Start back at the top of the While Loop # DHT22 Reading. dht22_reading = read_dht_22(sensor) #debug_str = "None" if dht22_reading is not None: temp,hum = dht22_reading temp = temp * 9/5. + 32.0 # CPU Reading. cpu_temp = read_cpu_temp() # Timestamp time_now = time.localtime() timestamp = "{}/{}/{} {}:{}:{}".format(time_now[1],time_now[2],time_now[0],time_now[3],time_now[4],time_now[5]) # Build JSON Payloads dht_data = { 'Location':mqtt_config.MQTT_ZONE_ID, 'Temperature':temp, 'Relative Humidity':hum, } dht_payload = json.dumps(dht_data) hw_data = { 'Timestamp':timestamp, 'CPU Temperature':cpu_temp, 'Device':mqtt_config.MQTT_HW_ID, 'WiFi Information':ifconfig, } hw_payload = json.dumps(hw_data) # Publish mqtt_client.publish("home/{}".format(mqtt_config.MQTT_ZONE_ID),dht_payload, retain=True) mqtt_client.publish("hw/{}".format(mqtt_config.MQTT_HW_ID),hw_payload, retain=True) mqtt_client.disconnect() if DEBUG: print(f'MQTT Disconnected.') # Sleep for a bit. if DEBUG: print(f'Finished loop #{count}.') time.sleep(5) main()