Skip to content

Instantly share code, notes, and snippets.

@av1024
Last active August 29, 2015 14:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save av1024/8d058d31a6dfb74bbd0d to your computer and use it in GitHub Desktop.
Save av1024/8d058d31a6dfb74bbd0d to your computer and use it in GitHub Desktop.
Dry-run/Debug OTA script forESP8266 Arduino
#!/usr/bin/env python
#
# this script will push an OTA update to the ESP
# use it like: python espota.py <ESP_IP_address> <ESP_IP_port> <sketch.bin>
# updated version with CRC check by Alexander Voronin, av1024@gmail.com
# Look at mDNS_SD_OTA sample for information
"""Use following (simplified) sample code for reception simulation in OTA handler:
// commets with 'OTA:' prefix are checkpoints from non dry-run code
//OTA: if (!Update.begin(sz)) return false;
int ww = 0;
WiFiClient cl;
if (!cl.connect(rmt, port)) return false;
cl.setTimeout(10000);
//OTA: while(!Update.isFinished()) {
//OTA: int z = Update.write(cl);
//OTA: if (z) cl.write(z);
int cntr = 0;
int z = 0;
uint16_t crc;
while (cntr<sz) { // simulate Update.isFinished()
z = 0;
crc = 0;
for(int i=cl.available();i>0;i--) { // simulate Update.write(cl)
int b = cl.read();
if (b == -1) break; // timeout
crc += (byte)b;
z++;
cntr++; // Thanks @hallard
}
if (z) cl.printf("%d %d\n", z, crc); // Printback code differ from original espota.py
}
//OTA: if (!Update.end()) ...
"""
from __future__ import print_function
import socket
import sys
import os
def crc(buf):
ret = 0
for x in buf:
ret += ord(x)
if ret > 65535:
ret -= 65536
return ret
def serve(remoteAddr, remotePort, filename):
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serverPort = 48266
server_address = ('0.0.0.0', serverPort)
print('Starting on %s:%s' % server_address, file=sys.stderr)
try:
sock.bind(server_address)
sock.listen(1)
except:
print('Listen Failed', file=sys.stderr)
return 1
content_size = os.path.getsize(filename)
print('Upload size: %d' % content_size, file=sys.stderr)
message = '%d %d %d\n' % (0, serverPort, content_size)
# Wait for a connection
print('Sending invitation to:', remoteAddr, file=sys.stderr)
sock2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
remote_address = (remoteAddr, int(remotePort))
sent = sock2.sendto(message, remote_address)
sock2.close()
print('Waiting for device...\n', file=sys.stderr)
try:
sock.settimeout(10)
connection, client_address = sock.accept()
sock.settimeout(None)
connection.settimeout(None)
except:
print('No response from device', file=sys.stderr)
sock.close()
return 1
try:
f = open(filename, "rb")
sys.stderr.write('Uploading\n')
sys.stderr.flush()
offs = 0
while True:
chunk = f.read(1460)
if not chunk: break
#sys.stderr.write('.')
cc = "%d %d" % (len(chunk), crc(chunk))
sys.stderr.write("%d/%d: %s -> " % (offs, content_size, cc))
sys.stderr.flush()
offs += len(chunk)
connection.settimeout(10)
try:
connection.sendall(chunk)
res = []
while True:
c = connection.recv(1)
if ord(c) == ord('\n'):
break
res.append(c)
res = "".join(res)
sys.stderr.write(res)
if res == cc:
sys.stderr.write(" OK\n")
sys.stderr.flush()
else:
# --- ERROR. STOP SENDING ---
sys.stderr.write(" FAIL\n")
sys.stderr.flush()
break
except:
print('\nError Uploading', file=sys.stderr)
connection.close()
f.close()
sock.close()
return 1
print('\nWaiting for result...\n', file=sys.stderr)
try:
connection.settimeout(60)
data = connection.recv(32)
print('Result: %s' % data, file=sys.stderr)
connection.close()
f.close()
sock.close()
return 0
except:
print('Result: No Answer!', file=sys.stderr)
connection.close()
f.close()
sock.close()
return 1
finally:
connection.close()
f.close()
sock.close()
return 1
def main(args):
return serve(args[1], args[2], args[3])
if __name__ == '__main__':
sys.exit(main(sys.argv))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment