Skip to content

Instantly share code, notes, and snippets.

@morgulbrut
Last active June 28, 2018 17:17
Show Gist options
  • Save morgulbrut/7d3c16657becea91f00de4913f3b5523 to your computer and use it in GitHub Desktop.
Save morgulbrut/7d3c16657becea91f00de4913f3b5523 to your computer and use it in GitHub Desktop.
We can make this file beautiful and searchable if this error is corrected: It looks like row 2 should actually have 8 columns, instead of 1. in line 1.
DADDR,NWKSKEY,APPSKEY,DEUI,APPEUI,APPKEY,CYCLE,NJM
# miromico AG @Zurich
#
# Please fill in the data you have got from your service provider.
# e.g for DADDR 1234ABCD or for DEUI 1234567890ABCDEF
#
# You can use a hash (#) at the begining of a line for comments.
# for example if you want to name your devices.
#
# DADDR,NWKSKEY,APPSKEY,DEUI,APPEUI,APPKEY,CYCLE,NJM
#
# For ABP please fill in at least
# - DADDR: device address (8 characters)
# - NWKSKEY: network session key (32 characters)
# - APPSKEY: application session key (32 characters)
# - NJM: 0
# e.g.
# DADDR,NWKSKEY,APPSKEY,,,,30000,NJM
#
# For OTAA please fill in at least
# - DEUI: device EUI (16 characters)
# - APPEUI: application EUI (16 characters)
# - APPKEY: application key (32 characters)
# - NJM: 1
# e.g.
# ,,,DEUI,APPEUI,APPKEY,30000,1
#
# CYCLE is the intervall between two measurements in milliseconds
#
00000000,00000000000000000000000000000000,00000000000000000000000000000000,0000000000000000,0000000000000000,00000000000000000000000000000000,300000,0
# !/usr/bin/env python
# encoding: utf-8
from enum import Enum
import serial
import argparse
import logging
import Colorer
import csv
class AgumentType(Enum):
FILE = 1
log_level = logging.INFO
FORMAT = '%(levelname)-8s %(message)s'
header = []
vals_to_reformat = ['DEUI', 'DADDR', 'NWKSKEY', 'APPSKEY', 'APPEUI', 'APPKEY']
serial_timeout = 0.1
ser = serial.Serial()
ser._baudrate = 115200
ser.port = 'COM11'
ser.timeout = serial_timeout
reformat = False
def addparser_init():
parser = argparse.ArgumentParser()
parser.add_argument('-i',
help='input csv file')
parser.add_argument('-p',
help='Serial port')
parser.add_argument('--debug',
action='store_true',
help='set logging level to DEBUG')
parser.add_argument('--reformat',
action='store_true',
help='reformat the EUIs, keys and adress')
return parser
def parse_arguments(parser):
global reformat
args = parser.parse_args()
if args.debug:
logging.basicConfig(level=logging.DEBUG, format=FORMAT)
else:
logging.basicConfig(level=log_level, format=FORMAT)
if args.reformat:
reformat=True
else:
reformat=False
if args.p:
ser.port = args.p
print(args.p)
if args.i:
return [AgumentType.FILE, args.i]
def wait_for(char):
while True:
data_raw = ser.readline()
if data_raw:
data = data_raw.decode('utf-8').strip()
logging.debug(data)
if data == char:
logging.debug(char)
return data
if data == 'AT_ERROR':
return data
if data == 'AT_PARAM_ERROR'.encode('utf-8'):
return data
if data == 'AT_BUSY_ERROR':
return data
if data == 'AT_TEST_PARAM_OVERFLOW':
return data
if data == 'AT_NO_NETWORK_JOINED':
return data
if data == 'AT_RX_ERROR':
return data
def read_csv_file(file):
try:
f = open(file, 'rt')
return csv.reader(f)
except FileNotFoundError:
logging.critical(file + ': File not found')
except IndexError:
logging.critical(file + ': File not formated well')
def parse_row(row):
global reformat
try:
if row[0].strip()[:1] == '#':
pass
else:
logging.info('Waiting for console....')
wait_for('$')
for i in range(len(row)):
if row[i] and header[i] in vals_to_reformat and reformat:
val = list_hex(split_to_list(row[i].strip()))
write_cmd('AT+' + header[i].strip() + '=' + val, 'OK')
elif row[i]:
write_cmd('AT+' + header[i].strip() + '=' + row[i].strip(), 'OK')
logging.info('Comissioning done, please reset module')
except IndexError:
pass
def reset_module():
ser.write(bytes('ATZ' + '\r\n', 'utf-8'))
def split_to_list(input, no_of_signs=2):
return [input[i:i + no_of_signs] for i in range(0, len(input), no_of_signs)]
def list_hex(input):
temp = ''
for i in input:
if input.index(i) > 0:
temp += ':'
temp += str(i)
return temp
def write_cmd(cmd, ans):
logging.info(cmd.upper())
ser.write(bytes(cmd + '\r\n'.upper(), 'utf-8'))
if wait_for(ans) == ans:
pass
else:
logging.critical(wait_for(ans))
def connect_serial():
try:
ser.open()
ser.flushInput()
ser.flushOutput()
return True
except serial.serialutil.SerialException as e:
logging.critical(e)
return False
def parse_csv(reader):
global header
header = next(reader)
for row in reader:
parse_row(row)
def main():
if connect_serial():
args = parse_arguments(addparser_init())
if args[0] == AgumentType.FILE:
logging.info('Comissioning started')
reader = read_csv_file(args[1])
parse_csv(reader)
if __name__ == "__main__":
main()
@morgulbrut
Copy link
Author

morgulbrut commented Jun 26, 2018

Quick and dirty hacks

CSV first row are the AT commands without "AT+"

you probably need to set the UART yourself...

@yene
Copy link

yene commented Jun 28, 2018

What is the UART pin layout from left to right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment