Skip to content

Instantly share code, notes, and snippets.

@Cees-Meijer
Last active November 2, 2019 15:11
Show Gist options
  • Save Cees-Meijer/6370f560d2659763e17a0c03f0674dcd to your computer and use it in GitHub Desktop.
Save Cees-Meijer/6370f560d2659763e17a0c03f0674dcd to your computer and use it in GitHub Desktop.
""""
Raspberry Pi Single Channel Gateway
Learn Guide: https://learn.adafruit.com/raspberry-pi-single-channel-lorawan-gateway
Author: Brent Rubell for Adafruit Industries
Modified for Console output only - 2019 Cees Meijer
"""
# Import Python System Libraries
import sys
import json
import time
import subprocess
import uuid
# Import Adafruit Blinka Libraries
import busio
from digitalio import DigitalInOut, Direction, Pull
import board
# Create the I2C interface.
i2c = busio.I2C(board.SCL, board.SDA)
# Gateway id calculation (based off MAC address)
mac_addr = hex(uuid.getnode()).replace('0x', '')
print('Gateway ID: {0}:{1}:{2}:ff:ff:{3}:{4}:{5}'.format(mac_addr[0:2],mac_addr[2:4],
mac_addr[4:6],mac_addr[6:8],
mac_addr[8:10], mac_addr[10:12]))
# Parse `global_conf.json`
with open('global_conf.json', 'r') as config:
gateway_config = json.load(config)
# parse `SX127x_conf`
SX127x_conf = gateway_config['SX127x_conf']
gateway_freq = SX127x_conf['freq']/1000000
gateway_sf = SX127x_conf['spread_factor']
# parse `gateway_conf`
gateway_conf = gateway_config['gateway_conf']
gateway_name = gateway_conf['name']
# parse 'gateway_conf[servers]'
server_list = gateway_conf['servers']
ttn_server = server_list[0]
ttn_server_addr = ttn_server['address']
def stats():
"""Prints information about the Pi
to a display
"""
print('Pi Stats')
# Shell scripts for system monitoring from here :
# https://unix.stackexchange.com/questions/119126/command-to-display-memory-usage-disk-usage-and-cpu-load
cmd = "hostname -I | cut -d\' \' -f1"
IP = subprocess.check_output(cmd, shell=True).decode("utf-8")
cmd = "top -bn1 | grep load | awk '{printf \"CPU Load: %.2f\", $(NF-2)}'"
CPU = subprocess.check_output(cmd, shell=True).decode("utf-8")
cmd = "free -m | awk 'NR==2{printf \"Mem: %s/%s MB %.2f%%\", $3,$2,$3*100/$2 }'"
MemUsage = subprocess.check_output(cmd, shell=True).decode("utf-8")
# write text to display
print("IP: "+str(IP)+"CPU:"+str(CPU)+"MEM:"+str(MemUsage))
def gateway():
"""Runs the single channel packet forwarder,
sends output to a display.
"""
print('MODE: Pi Gateway')
print('starting gateway...')
try:
proc = subprocess.Popen("./single_chan_pkt_fwd",
bufsize=-1, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except FileNotFoundError:
print("To run the single packet forwarder, you'll need to run `sudo make all` first.")
return
while True:
new_line = proc.stdout.readline().decode('utf-8')
print(new_line)
# grab new data on gateway status update
if new_line == "gateway status update\n":
gtwy_timestamp = proc.stdout.readline().decode('utf-8')
print('time:', gtwy_timestamp)
gtwy_status = proc.stdout.readline().decode('utf-8')
print(gtwy_status)
sys.stdout.write(u"\u001b[1000D") # Move left
sys.stdout.write(u"\u001b[8A") # Move up
elif new_line == "incoming packet...\n":
print('incoming pkt... ')
# read incoming packet info
pkt_json = proc.stdout.readline().decode('utf-8')
print(pkt_json)
# parse packet
pkt_data = json.loads(pkt_json)
rxpk_data = pkt_data['rxpk']
pkt_data = rxpk_data.pop(0)
# display packet info
pkt_freq = pkt_data['freq']
pkt_size = pkt_data['size']
pkt_rssi = pkt_data['rssi']
pkt_tmst = pkt_data['tmst']
print('* PKT RX on {0}MHz '.format(pkt_freq))
print('RSSI: {0}dBm, Size: {1} bytes '.format(pkt_rssi, pkt_size))
print('timestamp: {0} '.format(pkt_tmst))
print();print();
def gateway_info():
"""Displays information about the LoRaWAN gateway.
"""
print('MODE: Gateway Info')
print('Server: ', ttn_server_addr[0:9])
print('Freq: ', gateway_freq)
print('SF: ', gateway_sf)
print('Gateway Name:', gateway_name)
while True:
print('LoRaWAN Gateway EUI:')
print('{0}:{1}:{2}:ff:ff:{3}:{4}:{5}'.format(mac_addr[0:2], mac_addr[2:4],
mac_addr[4:6],mac_addr[6:8],mac_addr[8:10],
mac_addr[10:12]))
stats()
gateway_info()
gateway()
sys.stdout.write(u"\u001b[1000D") # Move left
sys.stdout.write(u"\u001b[10A") # Move up
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment