Skip to content

Instantly share code, notes, and snippets.

@DanielO
Created April 1, 2015 22:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DanielO/c136c2d86c287f686220 to your computer and use it in GitHub Desktop.
Save DanielO/c136c2d86c287f686220 to your computer and use it in GitHub Desktop.
import select
import socket
import sys
import objc
from PyObjCTools import AppHelper
objc.loadBundle("CoreBluetooth", globals(),
bundle_path=objc.pathForFramework(u'/System/Library/Frameworks/IOBluetooth.framework/Versions/A/Frameworks/CoreBluetooth.framework'))
mooshim_service = CBUUID.UUIDWithString_(u'1BC5FFA0-0200-62AB-E411-F254E005DBD4')
mooshim_name = CBUUID.UUIDWithString_(u'1BC5FFA2-0200-62AB-E411-F254E005DBD4')
mooshim_info = CBUUID.UUIDWithString_(u'1BC5FFA1-0200-62AB-E411-F254E005DBD4')
mooshim_settings = CBUUID.UUIDWithString_(u'1BC5FFA3-0200-62AB-E411-F254E005DBD4')
class MooshimDelegate(object):
def __init__(self):
self.manager = None
self.peripheral = None
self.service = None
self.name = None
self.info = None
self.settings = None
self.comms = None
def centralManagerDidUpdateState_(self, manager):
#print "centralManagerDidUpdateState_:", repr(manager)
self.manager = manager
manager.scanForPeripheralsWithServices_options_([mooshim_service], None)
def centralManager_didDiscoverPeripheral_advertisementData_RSSI_(self, manager, peripheral, data, rssi):
self.peripheral = peripheral
print "Found %s (RSSI: %.0f dBm)" % (peripheral.name(), rssi)
manager.connectPeripheral_options_(peripheral, None)
def centralManager_didConnectPeripheral_(self, manager, peripheral):
print "didConnectPeripheral: Connected to", peripheral.name()
self.peripheral.setDelegate_(self)
self.peripheral.discoverServices_([mooshim_service])
def centralManager_didFailToConnectPeripheral_error_(self, manager, peripheral, error):
print "Failed to connect", repr(error)
def centralManager_didDisconnectPeripheral_error_(self, manager, peripheral, error):
print "Disconnected", repr(error)
AppHelper.stopEventLoop()
def peripheral_didDiscoverServices_(self, peripheral, services):
self.service = self.peripheral.services()[0]
self.peripheral.discoverCharacteristics_forService_([mooshim_name, mooshim_info, mooshim_settings], self.service)
def peripheral_didDiscoverCharacteristicsForService_error_(self, peripheral, service, error):
print "Discover characteristics", repr(service)
if error != None:
print " error:", repr(error)
return
for characteristic in self.service.characteristics():
if characteristic.UUID() == mooshim_name:
self.name = characteristic
self.peripheral.setNotifyValue_forCharacteristic_(True, self.name)
self.peripheral.readValueForCharacteristic_(self.name)
elif characteristic.UUID() == mooshim_info:
self.info = characteristic
self.peripheral.setNotifyValue_forCharacteristic_(True, self.info)
self.peripheral.readValueForCharacteristic_(self.info)
elif characteristic.UUID() == mooshim_settings:
self.settings = characteristic
self.peripheral.setNotifyValue_forCharacteristic_(True, self.settings)
self.peripheral.readValueForCharacteristic_(self.settings)
def peripheral_didWriteValueForCharacteristic_error_(self, peripheral, characteristic, error):
print "didWriteValueForCharacteristic:", repr(error)
def peripheral_didUpdateNotificationStateForCharacteristic_error_(self, peripheral, characteristic, error):
print "didUpdateNotificationStateForCharcteristic:", repr(characteristic)
print " UUID:", repr(characteristic.UUID())
def peripheral_didUpdateValueForCharacteristic_error_(self, peripheral, characteristic, error):
print "didUpdateValueForCharacteristic:", characteristic.UUID()
if True or error != None:
if characteristic == self.name:
dataname = "Name"
elif characteristic == self.info:
dataname = "Info"
elif characteristic == self.settings:
dataname = "Settings"
if characteristic.value() == None:
print dataname, "Data: <None>"
else:
print dataname, "Data:", repr(characteristic.value().bytes().tobytes())
peripheral.readValueForCharacteristic_(characteristic)
else:
print "Error:", repr(error)
def shutdown(self):
if self.peripheral is not None:
self.manager.cancelPeripheralConnection_(self.peripheral)
else:
AppHelper.stopEventLoop()
class CommsManager(object):
def __init__(self, mooshim):
self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.listener.bind(("127.0.0.1", 9999))
self.listener.listen(1)
self.connection = None
self.mooshim = mooshim
self.mooshim.comms = self
def loop(self):
endpoints = [sys.stdin, self.listener]
if self.connection is not None:
endpoints.append(self.connection)
r, w, e = select.select(endpoints, [], [], 0)
if sys.stdin in r:
delegate.shutdown()
return
if self.listener in r:
self.connection, _ = self.listener.accept()
if self.connection in r:
c = self.connection.recv(1)
if len(c) == 0:
print "closed"
self.connection.close()
self.connection = None
elif c not in ('\r', '\n'):
print repr(c)
self.mooshim.send(c)
AppHelper.callLater(0.1, self.loop)
def send(self, data):
while len(data):
sent = self.connection.send(data)
data = data[sent:]
def main():
delegate = MooshimDelegate()
manager = CBCentralManager.alloc()
manager.initWithDelegate_queue_options_(delegate, None, None)
comms = CommsManager(delegate)
print repr(manager)
AppHelper.callLater(0.1, comms.loop)
AppHelper.runConsoleEventLoop()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment