Created
July 18, 2017 17:57
-
-
Save eni23/54395c442ddd207500ada9acc80881d0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from pyA20.gpio import gpio | |
from pyA20.gpio import port | |
#import RPi | |
class DHT22Result: | |
'DHT22 sensor result returned by DHT22.read() method' | |
ERR_NO_ERROR = 0 | |
ERR_MISSING_DATA = 1 | |
ERR_CRC = 2 | |
error_code = ERR_NO_ERROR | |
temperature = -1 | |
humidity = -1 | |
def __init__(self, error_code, temperature, humidity): | |
self.error_code = error_code | |
self.temperature = temperature | |
self.humidity = humidity | |
def is_valid(self): | |
return self.error_code == DHT22Result.ERR_NO_ERROR | |
class DHT22: | |
'DHT22 sensor reader class for Raspberry' | |
__pin = 0 | |
def __init__(self, pin): | |
self.__pin = pin | |
def read(self): | |
gpio.setcfg(self.__pin, gpio.OUTPUT) | |
# send initial high | |
self.__send_and_sleep(gpio.HIGH, 0.05) | |
# pull down to low | |
self.__send_and_sleep(gpio.LOW, 0.02) | |
# change to input using pull up | |
#gpio.setcfg(self.__pin, gpio.INPUT, gpio.PULLUP) | |
gpio.setcfg(self.__pin, gpio.INPUT) | |
gpio.pullup(self.__pin, gpio.PULLUP) | |
# collect data into an array | |
data = self.__collect_input() | |
# parse lengths of all data pull up periods | |
pull_up_lengths = self.__parse_data_pull_up_lengths(data) | |
# if bit count mismatch, return error (4 byte data + 1 byte checksum) | |
if len(pull_up_lengths) != 40: | |
return DHT22Result(DHT22Result.ERR_MISSING_DATA, 0, 0) | |
# calculate bits from lengths of the pull up periods | |
bits = self.__calculate_bits(pull_up_lengths) | |
# we have the bits, calculate bytes | |
the_bytes = self.__bits_to_bytes(bits) | |
# calculate checksum and check | |
checksum = self.__calculate_checksum(the_bytes) | |
if the_bytes[4] != checksum: | |
return DHT22Result(DHT22Result.ERR_CRC, 0, 0) | |
# ok, we have valid data, return it | |
return DHT22Result(DHT22Result.ERR_NO_ERROR, | |
(((the_bytes[2] & 0x7F)<<8)+the_bytes[3])/10.00, | |
((the_bytes[0]<<8)+the_bytes[1])/10.00) | |
def __send_and_sleep(self, output, sleep): | |
gpio.output(self.__pin, output) | |
time.sleep(sleep) | |
def __collect_input(self): | |
# collect the data while unchanged found | |
unchanged_count = 0 | |
# this is used to determine where is the end of the data | |
max_unchanged_count = 100 | |
last = -1 | |
data = [] | |
while True: | |
current = gpio.input(self.__pin) | |
data.append(current) | |
if last != current: | |
unchanged_count = 0 | |
last = current | |
else: | |
unchanged_count += 1 | |
if unchanged_count > max_unchanged_count: | |
break | |
return data | |
def __parse_data_pull_up_lengths(self, data): | |
STATE_INIT_PULL_DOWN = 1 | |
STATE_INIT_PULL_UP = 2 | |
STATE_DATA_FIRST_PULL_DOWN = 3 | |
STATE_DATA_PULL_UP = 4 | |
STATE_DATA_PULL_DOWN = 5 | |
state = STATE_INIT_PULL_DOWN | |
lengths = [] # will contain the lengths of data pull up periods | |
current_length = 0 # will contain the length of the previous period | |
for i in range(len(data)): | |
current = data[i] | |
current_length += 1 | |
if state == STATE_INIT_PULL_DOWN: | |
if current == gpio.LOW: | |
# ok, we got the initial pull down | |
state = STATE_INIT_PULL_UP | |
continue | |
else: | |
continue | |
if state == STATE_INIT_PULL_UP: | |
if current == gpio.HIGH: | |
# ok, we got the initial pull up | |
state = STATE_DATA_FIRST_PULL_DOWN | |
continue | |
else: | |
continue | |
if state == STATE_DATA_FIRST_PULL_DOWN: | |
if current == gpio.LOW: | |
# we have the initial pull down, the next will be the data pull up | |
state = STATE_DATA_PULL_UP | |
continue | |
else: | |
continue | |
if state == STATE_DATA_PULL_UP: | |
if current == gpio.HIGH: | |
# data pulled up, the length of this pull up will determine whether it is 0 or 1 | |
current_length = 0 | |
state = STATE_DATA_PULL_DOWN | |
continue | |
else: | |
continue | |
if state == STATE_DATA_PULL_DOWN: | |
if current == gpio.LOW: | |
# pulled down, we store the length of the previous pull up period | |
lengths.append(current_length) | |
state = STATE_DATA_PULL_UP | |
continue | |
else: | |
continue | |
return lengths | |
def __calculate_bits(self, pull_up_lengths): | |
# find shortest and longest period | |
shortest_pull_up = 1000 | |
longest_pull_up = 0 | |
for i in range(0, len(pull_up_lengths)): | |
length = pull_up_lengths[i] | |
if length < shortest_pull_up: | |
shortest_pull_up = length | |
if length > longest_pull_up: | |
longest_pull_up = length | |
# use the halfway to determine whether the period it is long or short | |
halfway = shortest_pull_up + (longest_pull_up - shortest_pull_up) / 2 | |
bits = [] | |
for i in range(0, len(pull_up_lengths)): | |
bit = False | |
if pull_up_lengths[i] > halfway: | |
bit = True | |
bits.append(bit) | |
return bits | |
def __bits_to_bytes(self, bits): | |
the_bytes = [] | |
byte = 0 | |
for i in range(0, len(bits)): | |
byte = byte << 1 | |
if (bits[i]): | |
byte = byte | 1 | |
else: | |
byte = byte | 0 | |
if ((i + 1) % 8 == 0): | |
the_bytes.append(byte) | |
byte = 0 | |
return the_bytes | |
def __calculate_checksum(self, the_bytes): | |
return the_bytes[0] + the_bytes[1] + the_bytes[2] + the_bytes[3] & 255 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment