Last active
August 20, 2020 12:26
-
-
Save garystafford/bd5e781c8a1097dbac8a447abe18d0cd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import time | |
from argparse import ArgumentParser | |
from datetime import datetime | |
import serial | |
from colr import color as colr | |
# LoRaWAN IoT Sensor Demo | |
# Using REYAX RYLR896 transceiver modules | |
# Author: Gary Stafford | |
# Requirements: python3 -m pip install --user -r requirements.txt | |
# To Run: python3 ./rasppi_lora_receiver.py --tty /dev/ttyAMA0 --baud-rate 115200 | |
# constants | |
ADDRESS = 116 | |
NETWORK_ID = 6 | |
PASSWORD = "92A0ECEC9000DA0DCF0CAAB0ABA2E0EF" | |
def main(): | |
logging.basicConfig(filename='output.log', filemode='w', level=logging.DEBUG) | |
args = get_args() # get args | |
payload = "" | |
print("Connecting to REYAX RYLR896 transceiver module...") | |
serial_conn = serial.Serial( | |
port=args.tty, | |
baudrate=int(args.baud_rate), | |
timeout=5, | |
parity=serial.PARITY_NONE, | |
stopbits=serial.STOPBITS_ONE, | |
bytesize=serial.EIGHTBITS | |
) | |
if serial_conn.isOpen(): | |
set_lora_config(serial_conn) | |
check_lora_config(serial_conn) | |
while True: | |
serial_payload = serial_conn.readline() # read data from serial port | |
if len(serial_payload) > 0: | |
try: | |
payload = serial_payload.decode(encoding="utf-8") | |
except UnicodeDecodeError: # receiving corrupt data? | |
logging.error("UnicodeDecodeError: {}".format(serial_payload)) | |
payload = payload[:-2] | |
try: | |
data = parse_payload(payload) | |
print("\n----------") | |
print("Timestamp: {}".format(datetime.now())) | |
print("Payload: {}".format(payload)) | |
print("Sensor Data: {}".format(data)) | |
display_temperature(data[0]) | |
display_humidity(data[1]) | |
display_pressure(data[2]) | |
display_color(data[3], data[4], data[5], data[6]) | |
except IndexError: | |
logging.error("IndexError: {}".format(payload)) | |
except ValueError: | |
logging.error("ValueError: {}".format(payload)) | |
# time.sleep(2) # transmission frequency set on IoT device | |
def eight_bit_color(value): | |
return int(round(value / (4097 / 255), 0)) | |
def celsius_to_fahrenheit(value): | |
return (value * 1.8) + 32 | |
def display_color(r, g, b, a): | |
print("12-bit Color values (r,g,b,a): {},{},{},{}".format(r, g, b, a)) | |
r = eight_bit_color(r) | |
g = eight_bit_color(g) | |
b = eight_bit_color(b) | |
a = eight_bit_color(a) # ambient light intensity | |
print(" 8-bit Color values (r,g,b,a): {},{},{},{}".format(r, g, b, a)) | |
print("RGB Color") | |
print(colr("\t\t", fore=(127, 127, 127), back=(r, g, b))) | |
print("Light Intensity") | |
print(colr("\t\t", fore=(127, 127, 127), back=(a, a, a))) | |
def display_pressure(value): | |
print("Barometric Pressure: {} kPa".format(round(value, 2))) | |
def display_humidity(value): | |
print("Humidity: {}%".format(round(value, 2))) | |
def display_temperature(value): | |
temperature = celsius_to_fahrenheit(value) | |
print("Temperature: {}°F".format(round(temperature, 2))) | |
def get_args(): | |
arg_parser = ArgumentParser(description="BLE IoT Sensor Demo") | |
arg_parser.add_argument("--tty", required=True, help="serial tty", default="/dev/ttyAMA0") | |
arg_parser.add_argument("--baud-rate", required=True, help="serial baud rate", default=1152000) | |
args = arg_parser.parse_args() | |
return args | |
def parse_payload(payload): | |
# input: +RCV=116,29,23.94|37.71|99.89|16|38|53|80,-61,56 | |
# output: [23.94, 37.71, 99.89, 16.0, 38.0, 53.0, 80.0] | |
payload = payload.split(",") | |
payload = payload[2].split("|") | |
payload = [float(i) for i in payload] | |
return payload | |
def set_lora_config(serial_conn): | |
# configures the REYAX RYLR896 transceiver module | |
serial_conn.write(str.encode("AT+ADDRESS=" + str(ADDRESS) + "\r\n")) | |
serial_payload = (serial_conn.readline())[:-2] | |
print("Address set?", serial_payload.decode(encoding="utf-8")) | |
serial_conn.write(str.encode("AT+NETWORKID=" + str(NETWORK_ID) + "\r\n")) | |
serial_payload = (serial_conn.readline())[:-2] | |
print("Network Id set?", serial_payload.decode(encoding="utf-8")) | |
serial_conn.write(str.encode("AT+CPIN=" + PASSWORD + "\r\n")) | |
time.sleep(1) | |
serial_payload = (serial_conn.readline())[:-2] | |
print("AES-128 password set?", serial_payload.decode(encoding="utf-8")) | |
def check_lora_config(serial_conn): | |
serial_conn.write(str.encode("AT?\r\n")) | |
serial_payload = (serial_conn.readline())[:-2] | |
print("Module responding?", serial_payload.decode(encoding="utf-8")) | |
serial_conn.write(str.encode("AT+ADDRESS?\r\n")) | |
serial_payload = (serial_conn.readline())[:-2] | |
print("Address:", serial_payload.decode(encoding="utf-8")) | |
serial_conn.write(str.encode("AT+NETWORKID?\r\n")) | |
serial_payload = (serial_conn.readline())[:-2] | |
print("Network id:", serial_payload.decode(encoding="utf-8")) | |
serial_conn.write(str.encode("AT+IPR?\r\n")) | |
serial_payload = (serial_conn.readline())[:-2] | |
print("UART baud rate:", serial_payload.decode(encoding="utf-8")) | |
serial_conn.write(str.encode("AT+BAND?\r\n")) | |
serial_payload = (serial_conn.readline())[:-2] | |
print("RF frequency", serial_payload.decode(encoding="utf-8")) | |
serial_conn.write(str.encode("AT+CRFOP?\r\n")) | |
serial_payload = (serial_conn.readline())[:-2] | |
print("RF output power", serial_payload.decode(encoding="utf-8")) | |
serial_conn.write(str.encode("AT+MODE?\r\n")) | |
serial_payload = (serial_conn.readline())[:-2] | |
print("Work mode", serial_payload.decode(encoding="utf-8")) | |
serial_conn.write(str.encode("AT+PARAMETER?\r\n")) | |
serial_payload = (serial_conn.readline())[:-2] | |
print("RF parameters", serial_payload.decode(encoding="utf-8")) | |
serial_conn.write(str.encode("AT+CPIN?\r\n")) | |
serial_payload = (serial_conn.readline())[:-2] | |
print("AES128 password of the network", | |
serial_payload.decode(encoding="utf-8")) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment