Created
May 17, 2020 21:36
-
-
Save TerryE/6a1c4f8c5a3c0637a1e826f05d01c369 to your computer and use it in GitHub Desktop.
Minimal Python Uploader for NodeMCU ESP modules
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Minimal file downloader | |
import os | |
import serial | |
import sys | |
import argparse | |
import time | |
import logging | |
import re | |
log = logging.getLogger(__name__) | |
def fatal(msg): log.error(msg); sys.exit(1) | |
CHUNKSIZE = 250 | |
STX,EOT,ACK,NAK = '\2','\4','\6','\25' | |
class Uploader: | |
PORT = '/dev/ttyUSB0' | |
TIMEOUT = 2 | |
BAUD = 115200 | |
def expect(self, exp, timeout=TIMEOUT): | |
recd, save_to, end = '', self._port.timeout, time.time()+timeout | |
self._port.timeout = 0.0001 # Checking for new data every 100us is fast enough | |
while not recd.endswith(exp) and time.time() <= end: | |
recd += self._port.read() | |
if time.time() > end: | |
fatal('expect timeout waiting for: %s, received: [%s]' % (exp, recd)) | |
self._port.timeout = save_to | |
return recd | |
def write(self, output): | |
self._port.write(output) | |
self._port.flush() | |
def exchange(self, output, exp='\n> '): | |
self.write(output + '\n') | |
log.debug('writing: %s', output) | |
return self.expect(exp) | |
def restart(self): | |
self._port.setDTR(False) # IO0=HIGH | |
self._port.setRTS(True) # EN=LOW, chip in reset | |
time.sleep(0.1) | |
self._port.setRTS(False) # EN=LOW, chip in reset | |
self._port.setDTR(False) # IO0=HIGH | |
self.exchange('') | |
log.debug('ESP restarted') | |
def __init__(self, port = 0, baud = 115200): | |
self._port = serial.Serial(port, Uploader.BAUD, timeout=Uploader.TIMEOUT) | |
self.restart() | |
self.exchange('_PROMPT2="> "') | |
lua_writer = r""" | |
do local u,STX,EOT,ACK,NAK,NAK2,s,f=uart,2,4,'\6','\21','\22',%s | |
local o,w=u.on,u.write;local function rb(d) if d:byte(1) == STX then | |
local l,x = d:byte(2),d:sub(1,6) if l > 0 then f:write(d:sub(3,l+2)) end | |
if l<s then f:close();f=nil;o('data') end w(0,l<s and EOT or ACK) | |
else w(0,NAK2);o('data') end collectgarbage() end | |
local function prb(d) xpcall(function(d) rb(b) end, function(e) ERR=debug.traceback(e) end) end | |
function recv(n) f = file.open(n, 'w');o('data', s+2, rb, 0);w(0, 'C') end end""" | |
lines = (lua_writer % CHUNKSIZE).replace('\r', '').split('\n') | |
for line in lines: self.exchange(line.strip()) | |
time.sleep(0.1) | |
self._port.flush() | |
def write_file(self, filename, destination): | |
tot,start = 0, time.time() | |
self.exchange("recv[[%s]]" % destination, '\nC> ') | |
with open(filename, 'r') as f: | |
chunk = f.read(CHUNKSIZE) | |
l, resp = len(chunk), ACK | |
# The last block is signalled by len (possibly 0) < CHUNKSIZE | |
while l > 0 and resp[0:1] == ACK: | |
l = len(chunk) | |
tot += l | |
packet = STX + chr(l) + chunk + (' ' * (CHUNKSIZE - l)) | |
log.debug('packet (%3u): |%s|', l, packet[2:].replace('\n', '|')) | |
self.write(packet) | |
resp = self._port.read(1) | |
log.debug('writing %d bytes written, resp: %s %s', l, resp.encode('hex'), resp) | |
chunk = f.read(CHUNKSIZE) | |
l = len(chunk) | |
if resp[0:1] != EOT: | |
time.sleep(2) | |
resp += self._port.read() | |
fatal('Write to %s aborted after %u byte with response %s %s' % (filename, l, resp.encode('hex'), resp)) | |
t = time.time()-start | |
print "File %s (%u bytes) transfered in %.3f sec (%.1f Kb/sec)" % (destination, tot, t, tot/t/1024) | |
def close(self): | |
self._port.close() | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description = 'ESP minimal file uploader', prog = 'nodemcu-uploader') | |
parser.add_argument( '--verbose', help = 'verbose output', action = 'store_true', default = False) | |
parser.add_argument( '--port', '-p', help = 'Serial port device', default = Uploader.PORT) | |
subparsers = parser.add_subparsers(dest='operation', help = 'Run nodemcu-uploader {command} -h for additional help') | |
upload_parser = subparsers.add_parser('upload', help = 'Path to one or more files to be uploaded.') | |
upload_parser.add_argument('filename', nargs='+', help = 'Lua file to upload. Use colon to give alternate destination.') | |
args = parser.parse_args() | |
logging.basicConfig(level=(logging.DEBUG if args.verbose else logging.INFO)) | |
uploader = Uploader(args.port, 115200) | |
if args.operation == 'upload': | |
source, dest = args.filename, [] | |
for i in range(0, len(source)): | |
sd = source[i].split(':') | |
if len(sd) == 1: | |
dest.append(os.path.basename(sd[0])) | |
else: | |
source[i]=sd[0] | |
dest.append(sd[1]) | |
for f, d in zip(source, dest): | |
uploader.write_file(f, d) | |
uploader.restart() | |
uploader.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment