Create a gist now

Instantly share code, notes, and snippets.

@nouyang /arduino.ino
Last active Aug 29, 2015

What would you like to do?
void rxCallback(uint8_t *buffer, uint8_t len)
{
// other code as per Adafruit
if ((byte)buffer[0] == (byte)10){ //color cmd received
setColor(buffer[1],buffer[2], buffer[3]);
}
else if ((byte)buffer[0] == (byte)20){ //string cmd received
Lservo.write(buffer[1]);
Rservo.write(map(buffer[2],0,180,180,0));
}
}
'''
July 5, 2014 Nancy Ouyang
Usage:
$ sudo python blescan.py
Copyright:
public domain
Automation of hcitool lescan to return MAC addresses
Requires sudo privileges
This operates with hcitool v5.20, tested on Ubuntu 14.04
Hardware: Requires a laptop with 4.0 dongle or with built-in bluetooth low energy capabilities
Note: You can check if your dongle is recognized by using hciconfig at a terminal prompt.
@params None
@returns ble_adrs - A set of MAC addresses of BLE peripherals
'''
import pexpect, time, sys, signal, os
from ctypes.util import find_library
SCAN_TIME = 1 #in seconds
if not os.geteuid() == 0:
sys.exit("hictool lescan only works as root")
btlib = find_library("bluetooth")
if not btlib:
raise Exception(
"Can't find required bluetooth libraries"
)
def closeall(connection):
isalive = connection.terminate(force=True)
#print 'process was killed: ', isalive
#hciout.kill(signal.SIGTERM)
connection.close(force = True)
def gatheradr(conn):
start_time = time.time()
print 'Scanning for addresses...'
ble_adrs = set()
try:
for line in conn:
address = line.strip().split(' ')[0]
#print address
if address != '':
ble_adrs.add(address)
elapsed_time = time.time() - start_time
if elapsed_time > SCAN_TIME: #in seconds
closeall(conn)
break
except (pexpect.TIMEOUT):
print 'No BLE addresses found! Are peripherals on and reset?'
closeall(conn)
return ble_adrs
def blescan():
ble_adrs = set()
# seems like we cannot catch the timeout here, must be inside gatheradr?
# pexpect.timeout thrown if no inputs, aka no addresses, are being found with hcitool lescan
hciout = pexpect.spawn('hcitool lescan', timeout=1)
i = hciout.expect(['LE Scan ...','File descriptor in bad state','failed', pexpect.TIMEOUT], timeout=1)
if i == 0:
ble_adrs = gatheradr(hciout)
if i == 1:
c = True
while c: #code to check if input acceptable
inp = raw_input('Check if dongle is plugged in. Type "y" to continue, or type "n" to cancel.')
if inp.lower().startswith('y'):
hciout.terminate()
hciout.close()
print 'Waiting a second for dongle to initialize'
time.sleep(1)
hciout = pexpect.spawn('hcitool lescan', timeout=1)
j = hciout.expect(['LE Scan ...',pexpect.TIMEOUT], timeout=1)
if j == 0:
ble_adrs = gatheradr(hciout)
break
if j == 1:
'Dongle not plugged in.'
closeall(hciout)
break
elif inp.lower().startswith('n'):
closeall(hciout)
break
else:
print 'Did not understand command. Try again.'
if i == 2:
print 'Is dongle plugged in? Could not scan: ', hciout.before, hciout.after
closeall(hciout)
if i == 3:
print 'Is dongle pluggeded in? Could not scan: ', hciout.before, hciout.after
closeall(hciout)
return ble_adrs
if __name__ == "__main__":
ble_adrs = blescan()
if ble_adrs:
print 'Addresses found: ', ble_adrs
else:
print 'Empty set. No addresses found'
#self.handle = 'b' #!! this is the TX service on the nRF8001 adafruit breakout with callbackEcho sketch
#= int(sys.argv[1])
#!/usr/bin/env python
# with help from https://github.com/msaunby/ble-sensor-pi/blob/master/sensortag/sensortag.py
# Michael Saunby. April 2013
import os, sys
from ctypes.util import find_library
import pexpect, traceback, threading, Queue, time, socket, select
import constants, blescan
if not os.geteuid() == 0:
sys.exit("script only works as root")
btlib = find_library("bluetooth")
if not btlib:
raise Exception(
"Can't find required bluetooth libraries"
)
class bleBot:
def __init__( self, ble_adr ):
self.ble_adr = ble_adr
self.con = pexpect.spawn('gatttool -b ' + self.ble_adr + ' -I -t random')
self.con.delaybeforesend = 0 #THIS LINE IS SUPER IMPORTANT
self.con.expect('\[LE\]', timeout=1)
self.handle = 'b' #!! this is the TX service on the nRF8001 adafruit breakout with callbackEcho sketch
def connect( self ):
# OH HEXAPOD this is so sketchy, it will break if bluez gatttool changes at all. I have version 5.20
print "Preparing to connect. Address: " + self.ble_adr
self.con.sendline('connect')
try:
self.con.read_nonblocking(2048,0) #flush the read pipe!! SUPER IMPORTANT
except:
pass
i = self.con.expect(['Attempting', 'Error'], timeout=1)
if i == 0:
#print 'Attempting to connect'
j = self.con.expect(['Connection successful', 'No route', 'busy', pexpect.TIMEOUT], timeout = 1)
if j == 0:
print self.ble_adr, ': connected!'
if j == 1:
print self.ble_adr, ': No route to host, is USB dongle plugged in?'
self.cleanup()
if j == 2:
print self.ble_adr, ': Device busy, is something else already connected to it?'
c = True
while c:
inp = raw_input('Try hitting reset. Type "y" to continue or "n" to quit.')
if inp.lower().startswith('y'):
self.con.sendline('connect')
try:
self.con.read_nonblocking(2048,0) #flush the read pipe!! SUPER IMPORTANT
except:
pass
k = self.con.expect(['Connection successful', pexpect.TIMEOUT], timeout = 1)
print 'k: ', k
if k == 0:
print self.ble_adr, ': connected!'
if k == 1:
print self.ble_adr, ': Could not connect'
self.cleanup()
break
elif inp.lower().startswith('n'):
self.cleanup()
break
else:
print 'Did not understand command. Try again.'
if j == 3:
print 'Attempting to connect, is device on and in range? '
#foostr = raw_input('Type anything to continue, or enter to cancel')
self.con.sendline('connect')
try:
self.con.read_nonblocking(2048,0) #flush the read pipe!! SUPER IMPORTANT
except:
pass
k = self.con.expect(['Connection successful', pexpect.TIMEOUT], timeout = 3)
print 'k: ', k
if k == 0:
print self.ble_adr, ': connected!'
if k == 1:
print self.ble_adr, ': Could not connect'
self.cleanup()
if i == 1:
print 'Is USB dongle plugged in?'
self.cleanup()
return self
def char_write_cmd( self, value ):
cmd = 'char-write-cmd 0x%s %s' % (self.handle, value)
#print self.ble_adr, cmd
self.con.sendline( cmd )
try:
print self.con.read_nonblocking(2048,0) #flush the read pipe!! SUPER IMPORTANT
except:
pass
#print 'After sending command, before: ', self.con.before, 'after :', self.con.after
return
def cleanup( self ):
print self.ble_adr, ': attempting to disconnect'
try:
self.con.sendline('disconnect')
self.con.sendline('exit')
try:
self.con.read_nonblocking(2048,0) #flush the read pipe!! SUPER IMPORTANT
except:
pass
isalive = self.con.terminate(force=True)
print self.ble_adr, ': has been terminated? ', isalive
self.con.close(force=True)
#print self.ble_adr, 'is alive: ', self.con.isalive()
except OSError:
print self.ble_adr, ': OSError'
pass
return
def tupToHex( foolist ):
hexed = ''
for i in foolist :
hexed += format(i, '02x')
#print hexed
return hexed
#def worker( address, commands ):
def worker( cmdQueue, connection):
while True:
cmd = cmdQueue.get()
#print 'queue items left: ', cmdQueue.qsize()
print connection.ble_adr, 'rcvd from queue:', cmd
#print connection.ble_adr, ': queue size: ', cmdQueue.qsize()
if cmd is None:
print connection.ble_adr, ': attempting to cleanup'
connection.cleanup()
return
else:
connection.char_write_cmd(tupToHex(cmd))
#def ports_init(portnum):
def sendRobotCmds(cmd, queues):
if cmd[1] == constants.PREFIX_COLOR:
if len(cmd) == constants.LENGTH_CMD_C:
botID = cmd[0]
queues[botID].put(cmd[1:])
##############
# for rgb music, because the packets tend to squish together
# such hack much disgust
else:
if len(cmd) == 2*constants.LENGTH_CMD_C:
cmd1 = cmd[:constants.LENGTH_CMD_C]
botID = cmd1[0]
queues[botID].put(cmd1[1:])
cmd2 = cmd[constants.LENGTH_CMD_C:]
botID = cmd2[0]
queues[botID].put(cmd2[1:])
#if len(cmd) == 2*constants.LENGTH_CMD_C-1:
#clean = cmd[:constants.LENGTH_CMD_C]
#clean[constants.LENGTH_CMD_C-1] = int(str(clean[constants.LENGTH_CMD_C-1])[:-1]) #strip final char from int
#botID = clean[0]
#queues[botID].put(clean[1:])
#clean = cmd[constants.LENGTH_CMD_C-1:]
#clean[0] = int(str(clean[0])[2:]) #strip first two char from int
#botID = clean[0]
#queues[botID].put(clean[1:])
else:
print 'invalid color command received: ', cmd
elif cmd[1] == constants.PREFIX_SERVO:
if len(cmd) == constants.LENGTH_CMD_S:
botID = cmd[0]
queues[botID].put(cmd[1:])
else:
if len(cmd) == 2*constants.LENGTH_CMD_S:
cmd1 = cmd[:constants.LENGTH_CMD_S]
botID = cmd1[0]
queues[botID].put(cmd1[1:])
cmd2 = cmd[constants.LENGTH_CMD_S:]
botID = cmd2[0]
queues[botID].put(cmd2[1:])
else:
print 'invalid servo command received: ', cmd
else:
print 'invalid command type (servo or color) received: ', cmd, 'expected: ', constants.CMD_FORMAT
def main():
print 'Server running, ready to scan for BTLE peripherals.'
connections = []
addresses = blescan.blescan()
while len(addresses) < constants.NUM_ROBOTS:
c = True
while c:
inp = raw_input('Expected ' + str(constants.NUM_ROBOTS) + ', found ' + str(len(addresses)) + ' robots. Try hitting reset on the robots. Type "y" to continue or "n" to quit.')
if inp.lower().startswith('y'):
addresses = blescan.blescan()
break
elif inp.lower().startswith('n'):
sys.exit('User cancelled program when told not enough robots found.')
break
else:
print 'Did not understand command. Try again.'
if len(addresses) > constants.NUM_ROBOTS:
culled_adr = []
c = True
while c:
addresses = list(addresses)
print addresses
inp = raw_input('Expected ' + str(constants.NUM_ROBOTS) + ', found ' + str(len(addresses)) + ' robots. \
Enter numbers of the robots you want, separated by commas. e.g. "0,4"')
indices = inp.split(',')
if inp.lower().startswith('n'):
sys.exit('User cancelled program when told too many robots found.')
break
for i in indices:
culled_adr = addresses[int(i)]
break
#else:
#print 'Did not understand command. Try again.'
print 'culled', culled_adr
addresses = set()
addresses.add(culled_adr)
for address in addresses:
b = bleBot(address)
# first connect them all because that takes the longest
connection = b.connect()
connections.append(connection)
queues = []
threads = []
for i in range(len(addresses)):
q = Queue.Queue()
queues.append(q)
cmdQueue = queues[i]
connection = connections[i]
#t = threading.Thread(target=worker, args=(connection, cmdList))
t = threading.Thread(target=worker, args=(cmdQueue, connection))
t.daemon = True
threads.append(t)
for t in threads:
t.start()
#ports_init(int(sys.argv[1]))
HOST = '' # Symbolic name meaning all available interfaces
PORT = constants.PORT_BTLE # Arbitrary non-privileged port
print 'expecting port: ', PORT
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((HOST, PORT))
server_socket.listen(constants.MAX_BTLE_CONNECTIONS)
sockets = []
sockets.append(server_socket)
print 'Server ready to accept commands to pass over BTLE to peripherals'
while 1:
try:
read_sockets,write_sockets,error_sockets = select.select(sockets,[],[])
for sock in read_sockets:
if sock == server_socket:
try:
conn, addr = server_socket.accept()
sockets.append(conn)
print "Client (%s, %s) connect" % addr
except (KeyboardInterrupt, ValueError, socket.error) as inst:
print "Caught exception: ", type(inst), "closing ble connections"
for connection in connections:
connection.cleanup()
for conn in sockets:
conn.shutdown(socket.SHUT_RDWR)
conn.close()
sys.exit(0)
print 'port: ', PORT, '|| Connected by', addr
else:
try:
data = sock.recv(2048)
#print 'received data', data
strRGB = data
##############
cmd = [int(s) for s in filter(None, strRGB.rstrip().split(','))] #for python
sendRobotCmds(cmd, queues)
except Exception as e:
print 'Exception', e
print "Client (%s, %s) is offline" % addr, 'removing from list of sockets: ', sockets
sock.close()
sockets.remove(sock)
continue
except (KeyboardInterrupt) as inst:
print type(inst)
print 'closing due to error'
print 'number of robots connected: ', len(connections)
print 'number of sockets connect: ', len(sockets)
for q in queues:
q.put(constants.KILL_CMD_C)
q.put(constants.KILL_CMD_S)
q.put(None)
for t in threads:
t.join() #timeout required so that main thread also receives KeyboardInterrupt
print 'threads closed'
for conn in sockets:
conn.shutdown(socket.SHUT_RDWR)
conn.close()
print 'serial connection closed: ', conn
sys.exit()
print 'closing normally'
server_socket.close()
if __name__ == "__main__":
main()
NUM_ROBOTS = 2
PORT_BTLE = 5207
PORT_SWISS = 3000
HOST_SWISS = 'localhost'
HOST_BTLE = '127.0.0.1'
#Command format: botID, cmdType=10, rval, gval, bval
CMD_FORMAT ='botID, cmdType=10 or 20, lservo, rservo'
PREFIX_COLOR = 10
PREFIX_SERVO = 20
LENGTH_CMD_C = 3+2 #1 for botid, 1 for cmdType
LENGTH_CMD_S = 2+2
KILL_CMD_C = [PREFIX_COLOR] + [0,0,0]
KILL_CMD_S = [PREFIX_SERVO] + [90,90]
MAX_BTLE_CONNECTIONS = 2
import processing.net.*;
Client myClient;
String COLOR_TYPE = "10";
void setup() {
myClient = new Client(this, "127.0.0.1", 5207);
}
void draw() {
rvals = "0," + COLOR_TYPE + "," + r + "," + g + "," + b + ",";
gvals = "1," + COLOR_TYPE + "," + r + "," + g + "," + b + ",";
bvals = "2," + COLOR_TYPE + "," + r + "," + g + "," + b + ",";
if (dropTime == dropMax){
myClient.write(rvals);
myClient.write(gvals);
//myClient.write(bvals);
}
## http://stackoverflow.com/questions/24564587/communicate-between-a-processing-sketch-and-a-python-program/24565160?noredirect=1#comment38049611_24565160
import time, sys, serial, socket
import constants
self.sock.send(message)
self.host = constants.HOST_BTLE
self.port = constants.PORT_BTLE
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.NUM_ROBOTS = constants.NUM_ROBOTS
COLOR_TYPE = 20
sock = socket.socket()
sock.connect((self.host,self.port))
def step():
message = ""
for i in range(0, self.NUM_ROBOTS):
message = "0,20,90,90"
self.sock.send(message)
def main():
while 1:
try:
s.step()
time.sleep(0.02)
except (socket.error, KeyboardInterrupt):
s.closeSocket()
sys.exit()
if __name__ == '__main__':
main()

@nouyang could this code be used to transfer data from a mobile device, i.e. iOS, Android to Pi?

Owner

nouyang commented Apr 19, 2015

@TiernanKennedy Sorry I did not see this until now -- Absolutely, I successfully used it for both android and iOS <-> arduino. I'll try to republish as a github repo shortly, but you should read http://www.orangenarwhals.com/2014/07/python-adafruit-nrf8001-bluetooth-low-energy-breakout-in-20-minutes-ubuntu-14-04/ (I never thought people would hit the gist directly).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment