Last active
August 29, 2015 14:25
-
-
Save av1024/8d058d31a6dfb74bbd0d to your computer and use it in GitHub Desktop.
Dry-run/Debug OTA script forESP8266 Arduino
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# this script will push an OTA update to the ESP | |
# use it like: python espota.py <ESP_IP_address> <ESP_IP_port> <sketch.bin> | |
# updated version with CRC check by Alexander Voronin, av1024@gmail.com | |
# Look at mDNS_SD_OTA sample for information | |
"""Use following (simplified) sample code for reception simulation in OTA handler: | |
// commets with 'OTA:' prefix are checkpoints from non dry-run code | |
//OTA: if (!Update.begin(sz)) return false; | |
int ww = 0; | |
WiFiClient cl; | |
if (!cl.connect(rmt, port)) return false; | |
cl.setTimeout(10000); | |
//OTA: while(!Update.isFinished()) { | |
//OTA: int z = Update.write(cl); | |
//OTA: if (z) cl.write(z); | |
int cntr = 0; | |
int z = 0; | |
uint16_t crc; | |
while (cntr<sz) { // simulate Update.isFinished() | |
z = 0; | |
crc = 0; | |
for(int i=cl.available();i>0;i--) { // simulate Update.write(cl) | |
int b = cl.read(); | |
if (b == -1) break; // timeout | |
crc += (byte)b; | |
z++; | |
cntr++; // Thanks @hallard | |
} | |
if (z) cl.printf("%d %d\n", z, crc); // Printback code differ from original espota.py | |
} | |
//OTA: if (!Update.end()) ... | |
""" | |
from __future__ import print_function | |
import socket | |
import sys | |
import os | |
def crc(buf): | |
ret = 0 | |
for x in buf: | |
ret += ord(x) | |
if ret > 65535: | |
ret -= 65536 | |
return ret | |
def serve(remoteAddr, remotePort, filename): | |
# Create a TCP/IP socket | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
serverPort = 48266 | |
server_address = ('0.0.0.0', serverPort) | |
print('Starting on %s:%s' % server_address, file=sys.stderr) | |
try: | |
sock.bind(server_address) | |
sock.listen(1) | |
except: | |
print('Listen Failed', file=sys.stderr) | |
return 1 | |
content_size = os.path.getsize(filename) | |
print('Upload size: %d' % content_size, file=sys.stderr) | |
message = '%d %d %d\n' % (0, serverPort, content_size) | |
# Wait for a connection | |
print('Sending invitation to:', remoteAddr, file=sys.stderr) | |
sock2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
remote_address = (remoteAddr, int(remotePort)) | |
sent = sock2.sendto(message, remote_address) | |
sock2.close() | |
print('Waiting for device...\n', file=sys.stderr) | |
try: | |
sock.settimeout(10) | |
connection, client_address = sock.accept() | |
sock.settimeout(None) | |
connection.settimeout(None) | |
except: | |
print('No response from device', file=sys.stderr) | |
sock.close() | |
return 1 | |
try: | |
f = open(filename, "rb") | |
sys.stderr.write('Uploading\n') | |
sys.stderr.flush() | |
offs = 0 | |
while True: | |
chunk = f.read(1460) | |
if not chunk: break | |
#sys.stderr.write('.') | |
cc = "%d %d" % (len(chunk), crc(chunk)) | |
sys.stderr.write("%d/%d: %s -> " % (offs, content_size, cc)) | |
sys.stderr.flush() | |
offs += len(chunk) | |
connection.settimeout(10) | |
try: | |
connection.sendall(chunk) | |
res = [] | |
while True: | |
c = connection.recv(1) | |
if ord(c) == ord('\n'): | |
break | |
res.append(c) | |
res = "".join(res) | |
sys.stderr.write(res) | |
if res == cc: | |
sys.stderr.write(" OK\n") | |
sys.stderr.flush() | |
else: | |
# --- ERROR. STOP SENDING --- | |
sys.stderr.write(" FAIL\n") | |
sys.stderr.flush() | |
break | |
except: | |
print('\nError Uploading', file=sys.stderr) | |
connection.close() | |
f.close() | |
sock.close() | |
return 1 | |
print('\nWaiting for result...\n', file=sys.stderr) | |
try: | |
connection.settimeout(60) | |
data = connection.recv(32) | |
print('Result: %s' % data, file=sys.stderr) | |
connection.close() | |
f.close() | |
sock.close() | |
return 0 | |
except: | |
print('Result: No Answer!', file=sys.stderr) | |
connection.close() | |
f.close() | |
sock.close() | |
return 1 | |
finally: | |
connection.close() | |
f.close() | |
sock.close() | |
return 1 | |
def main(args): | |
return serve(args[1], args[2], args[3]) | |
if __name__ == '__main__': | |
sys.exit(main(sys.argv)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment