Skip to content

Instantly share code, notes, and snippets.

@iSevenDays
Created August 7, 2016 21:01
Show Gist options
  • Save iSevenDays/5440c1f0b7a7dbbe380f5db0fbe56dd9 to your computer and use it in GitHub Desktop.
Save iSevenDays/5440c1f0b7a7dbbe380f5db0fbe56dd9 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
#from __future__ import print_function
"""
pyboard interface
This module provides the Pyboard class, used to communicate with and
control the pyboard over a serial USB connection.
Example usage:
1. first import this file with:
import pyboard
2. Then create a conection:
in case of a serial port:
pyb = pyboard.Pyboard('/dev/ttyACM0')
or in Windows:
pyb = pyboard.Pyboard('COM2')
For telnet connections:
pyb = pyboard.Pyboard('192.168.1.1')
Or even (if the name is defined in the DNS or the 'hosts' file:
pyb = pyboard.Pyboard('mypyboard')
You can also include a custom port number:
pyb = pyboard.Pyboard('mypyboard:2323')
Then:
pyb.enter_raw_repl()
pyb.exec('pyb.LED(1).on()')
pyb.exit_raw_repl()
Note: if using Python2 then pyb.exec must be written as pyb.exec_.
To run a script from the local machine on the board and print out the results:
import pyboard
pyboard.execfile('test.py', device='/dev/ttyACM0')
This script can also be run directly. To execute a local script, use:
./pyboard.py test.py
Or:
python pyboard.py test.py
"""
"""ESP8266 support"""
from collections import deque
import time
websocket_allowed = True
def println(message):
pass
print(message)
try:
from websocket import WebSocketApp
import websocket
import threading
from threading import Thread
except:
import pip
try:
pip.main(['install', '--no-compile', 'websocket'])
pip.main(['install', '--no-compile', 'websocket-client'])
pip.main(['install', '--no-compile', 'threading'])
except:
println('cannot install packages')
websocket_allowed = False
# do it again
from websocket import WebSocketApp
import websocket
import threading
class Websock_connection(threading.Thread):
def __init__(self, uri):
threading.Thread.__init__(self)
self.connected = False
self.daemon = True
self.fifo = deque()
self.fifo_lock = threading.Lock()
self.uri = uri
self.timeout = 10.0
# websocket.enableTrace(logging.root.getEffectiveLevel() < logging.INFO)
self.ws = websocket.WebSocketApp("%s" % self.uri,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
self.start()
self.connected = False
if b'Password:' in self.read_until("Password:", timeout=10):
self.connected = True
println("Connected: %s" % self.connected)
println("uri: %s" % self.uri)
# user is not used
def authenticate(self, user, password):
println("Authentication start")
self.ws.send(password + "\r")
self.connected = True
if not b'WebREPL connected' in self.read_until("WebREPL connected", timeout=10):
self.connected = False
raise PyboardError('Invalid credentials')
println("Authentication success")
return True
def run(self):
self.ws.run_forever()
def __del__(self):
self.close()
def on_message(self, ws, message):
self.fifo.extend(message)
try:
self.fifo_lock.release()
except:
pass
def on_error(self, ws, error):
try:
self.fifo_lock.release()
except:
pass
def on_close(self, ws):
try:
self.fifo_lock.release()
except:
pass
def close(self):
try:
self.ws.close()
println("ws.close")
try:
self.fifo_lock.release()
except:
pass
except:
try:
self.fifo_lock.release()
except:
pass
def read(self, size=1, blocking=True):
data = ''
tstart = time.time()
while (len(data) < size) and (time.time() - tstart < self.timeout):
if len(self.fifo) > 0:
data += self.fifo.popleft()
elif blocking:
self.fifo_lock.acquire()
return data.encode("utf-8")
def write(self, data):
println("Will write data %s" % data)
self.ws.send(data)
return len(data)
def inWaiting(self):
return len(self.fifo)
def survives_soft_reset(self):
return False
def keep_alive(self):
pass
def read_some(self):
return self.inWaiting()
def read_eager(self):
return ''
def flush(self):
pass
def read_until(self, ending, timeout=10):
current_timeout = self.timeout
self.timeout = 0.1
timeout_count = 0
data = ''
while True:
if data.endswith(ending):
break
elif self.inWaiting() > 0:
new_data = self.read(1)
data = data + new_data
timeout_count = 0
else:
timeout_count += 1
if timeout is not None and timeout_count >= 100 * timeout:
break
time.sleep(0.01)
self.timeout = current_timeout
return data
@staticmethod
def is_websocket_uri(uri):
if not websocket_allowed:
println("websocket_allowed == False")
return websocket_allowed and 'ws://' in uri
"""end ESP8266 support"""
import sys
import time
import os
import stat
try:
stdout = sys.stdout.buffer
except AttributeError:
# Python2 doesn't have buffer attr
stdout = sys.stdout
def stdout_write_bytes(b):
b = b.replace(b"\x04", b"")
stdout.write(b)
stdout.flush()
class PyboardError(BaseException):
def __init__(self, value):
self.value = value
println("Exception %s" % value)
def __str__(self):
return repr(self.value)
class Periodic:
def __init__(self, period, callback):
self.thread = Thread(target=self.on_timer, args=(period, callback))
self.run = True
self.thread.start()
def __del__(self):
self.stop()
def stop(self):
self.run = False
def on_timer(self, period, callback):
while self.run == True:
time.sleep(period)
callback()
class Serial_connection:
def __init__(self, device, baudrate=115200, timeout=0):
import serial
delayed = False
self.connected = False
for attempt in range(timeout + 1):
try:
self.stream = serial.Serial(device, baudrate=baudrate, interCharTimeout=1)
self.connected = True
self.__expose_stream_methods()
if sys.version_info[0] >= 3:
self.__in_waiting = lambda: self.stream.in_waiting
else:
self.__in_waiting = self.stream.inWaiting
if attempt != 0:
print('') # to break the dotted line printed on retries
break
except (OSError, IOError): # Py2 and Py3 have different errors
if timeout == 0:
continue
if attempt == 0 and timeout != 0:
sys.stdout.write('Waiting {} seconds for pyboard '.format(timeout))
time.sleep(1)
sys.stdout.write('.')
sys.stdout.flush()
def authenticate(self, user, password):
# needs no authentication
return True
def keep_alive(self):
pass
def __expose_stream_methods(self):
self.close = self.stream.close
self.write = self.stream.write
def read_until(self, ending, timeout):
''' implement a telnetlib-like read_until '''
current_timeout = self.stream.timeout
self.stream.timeout = 0.1
timeout_count = 0
data = ''
while True:
if data.endswith(ending):
break
new_data = self.stream.read(1)
data = data + new_data
if new_data == '':
timeout_count += 1
if timeout_count >= 10 * timeout:
break
self.stream.timeout = current_timeout
return data
def read_eager(self):
''' implement a telnetlib-like read_eager '''
waiting = self.__in_waiting()
if waiting == 0:
return ''
return self.stream.read(waiting)
def read_some(self):
''' implement a telnetlib-like read_some '''
waiting = self.__in_waiting()
if waiting != 0:
return self.stream.read(waiting)
else:
return self.stream.read(1)
@staticmethod
def is_serial_port(name):
return '/dev/tty' in name or name[:3] == 'COM' or (os.path.exists(name) == True and \
stat.S_ISCHR(os.stat(name).st_mode) == True)
class Telnet_connection:
import telnetlib
import socket
def __init__(self, uri, timeout=15, read_timeout=10):
self.__read_timeout = read_timeout
self.connected = False
try:
address, port = self.__separateAdressPort(uri)
self.stream = Telnet_connection.telnetlib.Telnet(address, port, timeout=15)
self.__socket = self.stream.get_socket()
self.__socket.setsockopt(Telnet_connection.socket.SOL_SOCKET,
Telnet_connection.socket.SO_KEEPALIVE, 1)
self.connected = True
self.__expose_stream_methods()
except Telnet_connection.socket.error:
pass
def _wait_for_exact_text(self, remote_text):
remote_text = remote_text.encode('ascii')
return remote_text in self.stream.read_until(remote_text, self.__read_timeout)
def _wait_for_multiple_exact_text(self, remote_options):
for i in xrange(0, len(remote_options)):
remote_options[i] = remote_options[i].encode('ascii')
return self.stream.expect(remote_options, self.__read_timeout)[0]
def _wait_and_respond(self, remote_text, response, delay_before=0):
response = response.encode('ascii')
if self._wait_for_exact_text(remote_text) == True:
if delay_before != 0:
time.sleep(delay_before)
self.stream.write(response + b'\r\n')
return True
else:
return False
def authenticate(self, user, password):
if self._wait_and_respond('Login as:', user) == True and \
self._wait_and_respond('Password:', password, 0.2) == True:
status = self._wait_for_multiple_exact_text([
'Type "help()" for more information.',
'Invalid credentials, try again.'])
if status == 0:
return True
elif status == 1:
raise PyboardError('Invalid credentials')
else:
return False
def close(self):
self.__socket.shutdown(Telnet_connection.socket.SHUT_RDWR)
self.stream.close()
def write(self, data):
self.stream.write(data)
return len(data)
def keep_alive(self):
self.__socket.sendall(Telnet_connection.telnetlib.IAC +
Telnet_connection.telnetlib.NOP +
Telnet_connection.telnetlib.NOP)
def __expose_stream_methods(self):
self.read_until = self.stream.read_until
self.read_eager = self.stream.read_eager
self.read_some = self.stream.read_some
def __separateAdressPort(self, string):
import re
match = re.match(r'([\w.]+)$|([\w.]+):(\d+)$', string)
if match is None:
raise PyboardError('\nInvalid address')
address = match.group(1)
port = 23
if address == None:
address = match.group(2)
port = int(match.group(3))
return address, port
class Pyboard:
def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0, keep_alive=0):
self.connected = False
if Serial_connection.is_serial_port(device) == True:
self.connection = Serial_connection(device, baudrate=baudrate, timeout=wait)
elif Websock_connection.is_websocket_uri(device):
self.connection = Websock_connection(device)
else:
self.connection = Telnet_connection(device)
if self.connection.connected == False:
raise PyboardError('\nFailed to establish a connection with the board at: ' + device)
if self.connection.authenticate(user, password) == False:
raise PyboardError('\nFailed to authenticate with the board at: ' + device)
if keep_alive != 0:
self.keep_alive_interval = Periodic(keep_alive, self._keep_alive)
self.connected = True
def close(self):
if self.connected == False:
return
try:
self.keep_alive_interval.stop()
self.connected = False
self.connection.close()
try:
self.__disconnected_callback()
except:
pass
except:
# the connection might not exist, so ignore this one
pass
def set_disconnected_callback(self, callback):
self.__disconnected_callback = callback
def flush(self):
# flush input (without relying on serial.flushInput())
while self.connection.read_eager():
pass
def _keep_alive(self):
if self.connected == False:
return
try:
self.connection.keep_alive()
except:
self.close()
def check_connection(self):
self._keep_alive()
return self.connected
def read_until(self, ending, timeout=10, data_consumer=None):
if data_consumer == None:
data = self.connection.read_until(ending, timeout)
else:
data = ""
cycles = int(timeout / 0.1)
for i in xrange(1, cycles):
new_data = self.connection.read_until(ending, 0.1)
data = data + new_data
data_consumer(new_data)
if new_data.endswith(ending):
break
return data
def enter_raw_repl(self):
self.connection.write(b'\r\x03\x03') # ctrl-C twice: interrupt any running program
self.flush()
self.connection.write(b'\r\x01') # ctrl-A: enter raw REPL
data = self.read_until(b'raw REPL; CTRL-B to exit\r\n>')
if not data.endswith(b'raw REPL; CTRL-B to exit\r\n>'):
println("Error could not enter raw repl, data1: %s" % data)
raise PyboardError('could not enter raw repl')
self.connection.write(b'\x04') # ctrl-D: soft reset
data = self.read_until(b'soft reboot\r\n')
if not data.endswith(b'soft reboot\r\n'):
println("Error could not enter raw repl, data2: %s" % data)
raise PyboardError('could not enter raw repl')
# By splitting this into 2 reads, it allows boot.py to print stuff,
# which will show up after the soft reboot and before the raw REPL.
data = self.read_until(b'raw REPL; CTRL-B to exit\r\n')
if not data.endswith(b'raw REPL; CTRL-B to exit\r\n'):
println("Error could not enter raw repl, data3: %s" % data)
raise PyboardError('could not enter raw repl')
def exit_raw_repl(self):
self.connection.write(b'\r\x02') # ctrl-B: enter friendly REPL
def follow(self, timeout, data_consumer=None):
# wait for normal output
data = self.read_until(b'\x04', timeout=timeout, data_consumer=data_consumer)
if not data.endswith(b'\x04'):
raise PyboardError('timeout waiting for first EOF reception')
data = data[:-1]
# wait for error output
data_err = self.read_until(b'\x04', timeout=timeout)
if not data_err.endswith(b'\x04'):
raise PyboardError('timeout waiting for second EOF reception')
data_err = data_err[:-1]
# return normal and error output
return data, data_err
def send(self, data):
data = data.encode('ascii')
try:
self.connection.write(data)
except:
self.close()
def recv(self, callback):
import socket
while True:
try:
data = self.connection.read_some()
if data:
callback(data)
else:
break
except socket.timeout:
continue
except:
break
self.close()
def _wait_for_exact_text(self, remote_text):
remote_text = remote_text.decode('ascii')
return remote_text in self.connection.read_until(remote_text)
def reset(self):
self.exit_raw_repl()
if not self._wait_for_exact_text(b'Type "help()" for more information.\r\n'):
println("could not enter reset1")
raise PyboardError('could not enter reset')
self.connection.write(b'\x04')
if not self._wait_for_exact_text(b'PYB: soft reboot\r\n'):
println("could not enter reset2")
raise PyboardError('could not enter reset')
def exec_raw_no_follow(self, command):
if isinstance(command, bytes):
command_bytes = command
else:
command_bytes = command.encode('utf8')
# check we have a prompt
data = self.read_until(b'>')
if not data.endswith(b'>'):
raise PyboardError('could not enter raw repl')
# write command
for i in range(0, len(command_bytes), 256):
self.connection.write(command_bytes[i:min(i + 256, len(command_bytes))])
time.sleep(0.01)
self.connection.write(b'\x04')
# check if we could exec command
data = self.connection.read_until(b'OK')
if data != b'OK':
raise PyboardError('could not exec command')
def exec_raw(self, command, timeout=10, data_consumer=None):
self.exec_raw_no_follow(command);
return self.follow(timeout, data_consumer)
def eval(self, expression):
ret = self.exec_('print({})'.format(expression))
ret = ret.strip()
return ret
def exec_(self, command):
ret, ret_err = self.exec_raw(command)
if ret_err:
raise PyboardError('exception', ret, ret_err)
return ret
def execfile(self, filename):
with open(filename, 'rb') as f:
pyfile = f.read()
return self.exec_(pyfile)
def get_time(self):
t = str(self.eval('pyb.RTC().datetime()'), encoding='utf8')[1:-1].split(', ')
return int(t[4]) * 3600 + int(t[5]) * 60 + int(t[6])
# in Python2 exec is a keyword so one must use "exec_"
# but for Python3 we want to provide the nicer version "exec"
setattr(Pyboard, "exec", Pyboard.exec_)
def execfile(filename, device='/dev/ttyACM0', baudrate=115200, user='micro', password='python'):
pyb = Pyboard(device, baudrate, user, password)
pyb.enter_raw_repl()
output = pyb.execfile(filename)
stdout_write_bytes(output)
pyb.exit_raw_repl()
pyb.close()
def main():
import argparse
cmd_parser = argparse.ArgumentParser(description='Run scripts on the pyboard.')
cmd_parser.add_argument('--device', default='ws://192.168.76.9:8266', help='the serial device or the IP address of the pyboard')
cmd_parser.add_argument('-b', '--baudrate', default=115200, help='the baud rate of the serial device')
cmd_parser.add_argument('-u', '--user', default='micro', help='the telnet login username')
cmd_parser.add_argument('-p', '--password', default='anton1009', help='the telnet login password')
cmd_parser.add_argument('-c', '--command', help='program passed in as string')
cmd_parser.add_argument('-w', '--wait', default=0, type=int, help='seconds to wait for USB connected board to become available')
cmd_parser.add_argument('--follow', action='store_true', help='follow the output after running the scripts [default if no scripts given]')
cmd_parser.add_argument('files', nargs='*', help='input files')
args = cmd_parser.parse_args()
def execbuffer(buf):
try:
pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait)
pyb.enter_raw_repl()
ret, ret_err = pyb.exec_raw(buf, timeout=None, data_consumer=stdout_write_bytes)
pyb.exit_raw_repl()
pyb.close()
except PyboardError as er:
print(er)
sys.exit(1)
except KeyboardInterrupt:
sys.exit(1)
if ret_err:
stdout_write_bytes(ret_err)
sys.exit(1)
if args.command is not None:
execbuffer(args.command.encode('utf-8'))
for filename in args.files:
with open(filename, 'rb') as f:
pyfile = f.read()
execbuffer(pyfile)
if args.follow or (args.command is None and len(args.files) == 0):
try:
pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait)
ret, ret_err = pyb.follow(timeout=10, data_consumer=stdout_write_bytes)
pyb.close()
except PyboardError as er:
print(er)
sys.exit(1)
except KeyboardInterrupt:
sys.exit(1)
if ret_err:
stdout_write_bytes(ret_err)
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment