Skip to content

Instantly share code, notes, and snippets.

@TerryE
Created May 17, 2020 21:36
Show Gist options
  • Save TerryE/6a1c4f8c5a3c0637a1e826f05d01c369 to your computer and use it in GitHub Desktop.
Save TerryE/6a1c4f8c5a3c0637a1e826f05d01c369 to your computer and use it in GitHub Desktop.
Minimal Python Uploader for NodeMCU ESP modules
#!/usr/bin/env python
# Minimal file downloader
import os
import serial
import sys
import argparse
import time
import logging
import re
log = logging.getLogger(__name__)
def fatal(msg): log.error(msg); sys.exit(1)
CHUNKSIZE = 250
STX,EOT,ACK,NAK = '\2','\4','\6','\25'
class Uploader:
PORT = '/dev/ttyUSB0'
TIMEOUT = 2
BAUD = 115200
def expect(self, exp, timeout=TIMEOUT):
recd, save_to, end = '', self._port.timeout, time.time()+timeout
self._port.timeout = 0.0001 # Checking for new data every 100us is fast enough
while not recd.endswith(exp) and time.time() <= end:
recd += self._port.read()
if time.time() > end:
fatal('expect timeout waiting for: %s, received: [%s]' % (exp, recd))
self._port.timeout = save_to
return recd
def write(self, output):
self._port.write(output)
self._port.flush()
def exchange(self, output, exp='\n> '):
self.write(output + '\n')
log.debug('writing: %s', output)
return self.expect(exp)
def restart(self):
self._port.setDTR(False) # IO0=HIGH
self._port.setRTS(True) # EN=LOW, chip in reset
time.sleep(0.1)
self._port.setRTS(False) # EN=LOW, chip in reset
self._port.setDTR(False) # IO0=HIGH
self.exchange('')
log.debug('ESP restarted')
def __init__(self, port = 0, baud = 115200):
self._port = serial.Serial(port, Uploader.BAUD, timeout=Uploader.TIMEOUT)
self.restart()
self.exchange('_PROMPT2="> "')
lua_writer = r"""
do local u,STX,EOT,ACK,NAK,NAK2,s,f=uart,2,4,'\6','\21','\22',%s
local o,w=u.on,u.write;local function rb(d) if d:byte(1) == STX then
local l,x = d:byte(2),d:sub(1,6) if l > 0 then f:write(d:sub(3,l+2)) end
if l<s then f:close();f=nil;o('data') end w(0,l<s and EOT or ACK)
else w(0,NAK2);o('data') end collectgarbage() end
local function prb(d) xpcall(function(d) rb(b) end, function(e) ERR=debug.traceback(e) end) end
function recv(n) f = file.open(n, 'w');o('data', s+2, rb, 0);w(0, 'C') end end"""
lines = (lua_writer % CHUNKSIZE).replace('\r', '').split('\n')
for line in lines: self.exchange(line.strip())
time.sleep(0.1)
self._port.flush()
def write_file(self, filename, destination):
tot,start = 0, time.time()
self.exchange("recv[[%s]]" % destination, '\nC> ')
with open(filename, 'r') as f:
chunk = f.read(CHUNKSIZE)
l, resp = len(chunk), ACK
# The last block is signalled by len (possibly 0) < CHUNKSIZE
while l > 0 and resp[0:1] == ACK:
l = len(chunk)
tot += l
packet = STX + chr(l) + chunk + (' ' * (CHUNKSIZE - l))
log.debug('packet (%3u): |%s|', l, packet[2:].replace('\n', '|'))
self.write(packet)
resp = self._port.read(1)
log.debug('writing %d bytes written, resp: %s %s', l, resp.encode('hex'), resp)
chunk = f.read(CHUNKSIZE)
l = len(chunk)
if resp[0:1] != EOT:
time.sleep(2)
resp += self._port.read()
fatal('Write to %s aborted after %u byte with response %s %s' % (filename, l, resp.encode('hex'), resp))
t = time.time()-start
print "File %s (%u bytes) transfered in %.3f sec (%.1f Kb/sec)" % (destination, tot, t, tot/t/1024)
def close(self):
self._port.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description = 'ESP minimal file uploader', prog = 'nodemcu-uploader')
parser.add_argument( '--verbose', help = 'verbose output', action = 'store_true', default = False)
parser.add_argument( '--port', '-p', help = 'Serial port device', default = Uploader.PORT)
subparsers = parser.add_subparsers(dest='operation', help = 'Run nodemcu-uploader {command} -h for additional help')
upload_parser = subparsers.add_parser('upload', help = 'Path to one or more files to be uploaded.')
upload_parser.add_argument('filename', nargs='+', help = 'Lua file to upload. Use colon to give alternate destination.')
args = parser.parse_args()
logging.basicConfig(level=(logging.DEBUG if args.verbose else logging.INFO))
uploader = Uploader(args.port, 115200)
if args.operation == 'upload':
source, dest = args.filename, []
for i in range(0, len(source)):
sd = source[i].split(':')
if len(sd) == 1:
dest.append(os.path.basename(sd[0]))
else:
source[i]=sd[0]
dest.append(sd[1])
for f, d in zip(source, dest):
uploader.write_file(f, d)
uploader.restart()
uploader.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment