Skip to content

Instantly share code, notes, and snippets.

@hilt86
Created August 26, 2020 10:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hilt86/b21d8d6d6491416e71043bfd0db8759a to your computer and use it in GitHub Desktop.
Save hilt86/b21d8d6d6491416e71043bfd0db8759a to your computer and use it in GitHub Desktop.
Lopy4 code
from network import LoRa
import socket
import binascii
import struct
import time
import pycom
from machine import Pin
import machine
from machine import Timer
# defining the pin handler early on so it can catch events while waking up
def pin_handler(arg):
'''
counts how many interrupts...
'''
# blue
pycom.rgbled(0x0032e6)
if (pycom.nvs_get('rain') == None):
# set value to zero if we haven't recorded any rain
rainInMm = 0
else: # increment value if we have recorded rainfall
rainInMm = pycom.nvs_get('rain')
rainInMm += 1
pycom.nvs_set('rain', rainInMm)
# CONSTANTS
LORA_FREQUENCY = 915200000 # start of the 1st subband
LORA_NODE_DR = 4
# create an OTA authentication params
dev_eui = binascii.unhexlify('blah')
app_key = binascii.unhexlify('blah') # Application EUI
nwk_key = binascii.unhexlify('blah')
# SETUP + INIT STUFF
pycom.heartbeat(False)
# setup pin to use internal PULL_UP resistor
p_in = Pin('P4', mode=Pin.IN, pull=Pin.PULL_UP)
print("Failed to send : ", pycom.nvs_get('rainfail'))
print("Queued to send : ", pycom.nvs_get('rain'))
# how long to wait / sleep for
sleepSeconds = 11
# how many times to re-try OTAA (~2.5s per try)
loraRetries = 15
# configure sleep
machine.pin_sleep_wakeup(['P4'], not machine.WAKEUP_ANY_HIGH, True)
# instantiate pin callback
p_in.callback(Pin.IRQ_RISING, pin_handler)
'''
utility function to setup the lora channels
'''
def prepare_channels(lora, channel, data_rate):
AU915_FREQUENCIES = [
{ "chan": 64, "fq": "915200000" }
]
if not channel in range(64,65):
raise RuntimeError("only channel 64 is implemented in this example)")
upstream = (item for item in AU915_FREQUENCIES if item["chan"] == channel).__next__()
lora.add_channel(int(upstream.get('chan')), frequency=int(upstream.get('fq')), dr_min=0, dr_max=int(data_rate))
print("*** Adding channel up %s %s" % (upstream.get('chan'), upstream.get('fq')))
for index in range(0, 71):
if index != upstream.get('chan'):
lora.remove_channel(index)
return lora
'''
call back for handling RX packets
'''
def lora_cb(lora):
events = lora.events()
if events & LoRa.RX_PACKET_EVENT:
if lora_socket is not None:
frame, port = lora_socket.recvfrom(512) # longuest frame is +-220
print(port, frame)
if events & LoRa.TX_PACKET_EVENT:
print("tx_time_on_air: {} ms @dr {}", lora.stats().tx_time_on_air, lora.stats().sftx)
'''
Main operations: this is sample code for LoRaWAN on AU915
'''
lora = LoRa(mode=LoRa.LORAWAN, region=LoRa.AU915, device_class=LoRa.CLASS_C)
'''
TO GET DEVICE EUI
from network import LoRa
import ubinascii
lora = LoRa(mode=LoRa.LORAWAN)
print(ubinascii.hexlify(lora.mac()).upper().decode('utf-8'))
'''
prepare_channels(lora, 64, 5)
# join a network using OTAA
lora.join(activation=LoRa.OTAA, auth=(dev_eui, app_key, nwk_key), timeout=0, dr=0) # DR is 2 in v1.1rb but 0 worked for ne
# try join the lora network
print('Over the air network activation ', end='')
while (not lora.has_joined()) & (loraRetries > 0) :
# red
pycom.rgbled(0xff1500)
time.sleep(2.5)
print('.', end='')
loraRetries -= 1
print('')
# Go to sleep if we failed to join the lora network to save battery
if (not lora.has_joined()):
print("Can't complete OTTA, going to deepsleep")
machine.deepsleep(6000000)
else:
# green
pycom.rgbled(0x006102)
for i in range(0, 8):
fq = LORA_FREQUENCY + (i * 200000)
lora.add_channel(i, frequency=fq, dr_min=0, dr_max=LORA_NODE_DR)
print("AU915 Adding channel up %s %s" % (i, fq))
# create a LoRa socket
lora_socket = socket.socket(socket.AF_LORA, socket.SOCK_RAW)
# set the LoRaWAN data rate
lora_socket.setsockopt(socket.SOL_LORA, socket.SO_DR, LORA_NODE_DR)
# msg are confirmed at the FMS level
lora_socket.setsockopt(socket.SOL_LORA, socket.SO_CONFIRMED, 0)
# make the socket non blocking y default
lora_socket.setblocking(False)
lora.callback(trigger=( LoRa.RX_PACKET_EVENT |
LoRa.TX_PACKET_EVENT |
LoRa.TX_FAILED_EVENT ), handler=lora_cb)
time.sleep(4) # this timer is important and caused me some trouble ...
def transmitRain(rainInt):
# rate limit transmissions to 1 per sec max
time.sleep(1)
pkt = struct.pack('h', rainInt)
#print('Rain bucket 5mm, Sending:', pkt)
# orange
pycom.rgbled(0xFF4500)
try:
lora_socket.send(pkt)
return True
except OSError as e:
if e.args[0] == 11:
print("error transmitting")
return False
def beacon():
pkt = struct.pack('h', 0)
try:
lora_socket.send(pkt)
return True
except OSError as e:
if e.args[0] == 11:
print("error transmitting")
return False
beacon()
print("Booted Successfully!")
# MAIN ROUTINE
# transmit if we have an item on list to transmit
while (pycom.nvs_get('rain') > 0) | (sleepSeconds > 0) :
print("Queued rain data (Nvram contents) : ", pycom.nvs_get('rain'))
# remove the item if transmit was successful
if (pycom.nvs_get('rain') > 0):
if transmitRain(pycom.nvs_get('rain')):
pycom.nvs_set('rain', 0)
sleepSeconds += 1
else:
# clear stored data if we've transmit failed 15x
if (sleepSeconds == 1):
# backup to rainfail, just in case
pycom.nvs_set('rainfail', pycom.nvs_get('rain') )
pycom.nvs_set('rain', 0)
print("Sleeping for ", sleepSeconds, " seconds" )
time.sleep(sleepSeconds)
sleepSeconds -= 1
print("Going to deepsleep")
machine.deepsleep(6000000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment