Skip to content

Instantly share code, notes, and snippets.

@benjaminchodroff
Created February 18, 2017 05:57
Show Gist options
  • Save benjaminchodroff/4264f1d76ea17c2193f9b29dde948071 to your computer and use it in GitHub Desktop.
Save benjaminchodroff/4264f1d76ea17c2193f9b29dde948071 to your computer and use it in GitHub Desktop.
LoPy LoRa range test - transmitter
import socket
import time
import struct
from network import LoRa
import pycom
pycom.heartbeat(False)
pycom.rgbled(0x000fff) #blue
# A basic package header, B: 1 byte for the deviceId, B: 1 bytes for the pkg size
_LORA_PKG_FORMAT = "BB%ds"
_LORA_PKG_ACK_FORMAT = "BBB"
DEVICE_ID = 0x01
# Open a Lora Socket, use tx_iq to avoid listening to our own messages
lora = LoRa(mode=LoRa.LORA, tx_iq=True, tx_power=14, power_mode=LoRa.ALWAYS_ON, public=True, tx_retries=1)
lora_sock = socket.socket(socket.AF_LORA, socket.SOCK_RAW)
lora_sock.setblocking(False)
colors = {"red":"0x7f0000", "orange": "0xff8000", "gold": "0xffbf00","yellow": "0x7f7f00", "green": "0x007f00"}
max_time_waiting=4 #periods
pause_time=300 #ms
while(True):
try:
# Package send containing a simple string
for color in list(colors.values()):
msg = color
pkg = struct.pack(_LORA_PKG_FORMAT % len(msg), DEVICE_ID, len(msg), msg)
lora_sock.send(pkg)
waiting_ack = True
time_waiting = max_time_waiting
while(waiting_ack):
time_waiting = time_waiting-1
recv_ack = lora_sock.recv(256)
if (len(recv_ack) > 0):
device_id, pkg_len, ack = struct.unpack(_LORA_PKG_ACK_FORMAT, recv_ack)
if (device_id == DEVICE_ID):
if (ack == 200):
waiting_ack = False
# If the uart = machine.UART(0, 115200) and os.dupterm(uart) are set in the boot.py this print should appear in the serial port
#print("ACK")
pycom.rgbled(int(msg))
else:
waiting_ack = False
# If the uart = machine.UART(0, 115200) and os.dupterm(uart) are set in the boot.py this print should appear in the serial port
print("Message Failed")
pycom.rgbled(0xffffff) #white
else:
if(time_waiting <= 0):
waiting_ack = False
print("Message Lost")
pycom.rgbled(0x000fff) #blue
else:
time.sleep(pause_time/1000)
except:
print("Exception encountered")
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment