Last active
June 28, 2018 17:17
-
-
Save morgulbrut/7d3c16657becea91f00de4913f3b5523 to your computer and use it in GitHub Desktop.
We can make this file beautiful and searchable if this error is corrected: It looks like row 2 should actually have 8 columns, instead of 1. in line 1.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DADDR,NWKSKEY,APPSKEY,DEUI,APPEUI,APPKEY,CYCLE,NJM | |
# miromico AG @Zurich | |
# | |
# Please fill in the data you have got from your service provider. | |
# e.g for DADDR 1234ABCD or for DEUI 1234567890ABCDEF | |
# | |
# You can use a hash (#) at the begining of a line for comments. | |
# for example if you want to name your devices. | |
# | |
# DADDR,NWKSKEY,APPSKEY,DEUI,APPEUI,APPKEY,CYCLE,NJM | |
# | |
# For ABP please fill in at least | |
# - DADDR: device address (8 characters) | |
# - NWKSKEY: network session key (32 characters) | |
# - APPSKEY: application session key (32 characters) | |
# - NJM: 0 | |
# e.g. | |
# DADDR,NWKSKEY,APPSKEY,,,,30000,NJM | |
# | |
# For OTAA please fill in at least | |
# - DEUI: device EUI (16 characters) | |
# - APPEUI: application EUI (16 characters) | |
# - APPKEY: application key (32 characters) | |
# - NJM: 1 | |
# e.g. | |
# ,,,DEUI,APPEUI,APPKEY,30000,1 | |
# | |
# CYCLE is the intervall between two measurements in milliseconds | |
# | |
00000000,00000000000000000000000000000000,00000000000000000000000000000000,0000000000000000,0000000000000000,00000000000000000000000000000000,300000,0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# !/usr/bin/env python | |
# encoding: utf-8 | |
from enum import Enum | |
import serial | |
import argparse | |
import logging | |
import Colorer | |
import csv | |
class AgumentType(Enum): | |
FILE = 1 | |
log_level = logging.INFO | |
FORMAT = '%(levelname)-8s %(message)s' | |
header = [] | |
vals_to_reformat = ['DEUI', 'DADDR', 'NWKSKEY', 'APPSKEY', 'APPEUI', 'APPKEY'] | |
serial_timeout = 0.1 | |
ser = serial.Serial() | |
ser._baudrate = 115200 | |
ser.port = 'COM11' | |
ser.timeout = serial_timeout | |
reformat = False | |
def addparser_init(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-i', | |
help='input csv file') | |
parser.add_argument('-p', | |
help='Serial port') | |
parser.add_argument('--debug', | |
action='store_true', | |
help='set logging level to DEBUG') | |
parser.add_argument('--reformat', | |
action='store_true', | |
help='reformat the EUIs, keys and adress') | |
return parser | |
def parse_arguments(parser): | |
global reformat | |
args = parser.parse_args() | |
if args.debug: | |
logging.basicConfig(level=logging.DEBUG, format=FORMAT) | |
else: | |
logging.basicConfig(level=log_level, format=FORMAT) | |
if args.reformat: | |
reformat=True | |
else: | |
reformat=False | |
if args.p: | |
ser.port = args.p | |
print(args.p) | |
if args.i: | |
return [AgumentType.FILE, args.i] | |
def wait_for(char): | |
while True: | |
data_raw = ser.readline() | |
if data_raw: | |
data = data_raw.decode('utf-8').strip() | |
logging.debug(data) | |
if data == char: | |
logging.debug(char) | |
return data | |
if data == 'AT_ERROR': | |
return data | |
if data == 'AT_PARAM_ERROR'.encode('utf-8'): | |
return data | |
if data == 'AT_BUSY_ERROR': | |
return data | |
if data == 'AT_TEST_PARAM_OVERFLOW': | |
return data | |
if data == 'AT_NO_NETWORK_JOINED': | |
return data | |
if data == 'AT_RX_ERROR': | |
return data | |
def read_csv_file(file): | |
try: | |
f = open(file, 'rt') | |
return csv.reader(f) | |
except FileNotFoundError: | |
logging.critical(file + ': File not found') | |
except IndexError: | |
logging.critical(file + ': File not formated well') | |
def parse_row(row): | |
global reformat | |
try: | |
if row[0].strip()[:1] == '#': | |
pass | |
else: | |
logging.info('Waiting for console....') | |
wait_for('$') | |
for i in range(len(row)): | |
if row[i] and header[i] in vals_to_reformat and reformat: | |
val = list_hex(split_to_list(row[i].strip())) | |
write_cmd('AT+' + header[i].strip() + '=' + val, 'OK') | |
elif row[i]: | |
write_cmd('AT+' + header[i].strip() + '=' + row[i].strip(), 'OK') | |
logging.info('Comissioning done, please reset module') | |
except IndexError: | |
pass | |
def reset_module(): | |
ser.write(bytes('ATZ' + '\r\n', 'utf-8')) | |
def split_to_list(input, no_of_signs=2): | |
return [input[i:i + no_of_signs] for i in range(0, len(input), no_of_signs)] | |
def list_hex(input): | |
temp = '' | |
for i in input: | |
if input.index(i) > 0: | |
temp += ':' | |
temp += str(i) | |
return temp | |
def write_cmd(cmd, ans): | |
logging.info(cmd.upper()) | |
ser.write(bytes(cmd + '\r\n'.upper(), 'utf-8')) | |
if wait_for(ans) == ans: | |
pass | |
else: | |
logging.critical(wait_for(ans)) | |
def connect_serial(): | |
try: | |
ser.open() | |
ser.flushInput() | |
ser.flushOutput() | |
return True | |
except serial.serialutil.SerialException as e: | |
logging.critical(e) | |
return False | |
def parse_csv(reader): | |
global header | |
header = next(reader) | |
for row in reader: | |
parse_row(row) | |
def main(): | |
if connect_serial(): | |
args = parse_arguments(addparser_init()) | |
if args[0] == AgumentType.FILE: | |
logging.info('Comissioning started') | |
reader = read_csv_file(args[1]) | |
parse_csv(reader) | |
if __name__ == "__main__": | |
main() |
What is the UART pin layout from left to right?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Quick and dirty hacks
CSV first row are the AT commands without "AT+"
you probably need to set the UART yourself...