Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import select
import socket
import sys
import objc
from PyObjCTools import AppHelper
objc.loadBundle("CoreBluetooth", globals(),
bundle_path=objc.pathForFramework(u'/System/Library/Frameworks/IOBluetooth.framework/Versions/A/Frameworks/CoreBluetooth.framework'))
mooshim_service = CBUUID.UUIDWithString_(u'1BC5FFA0-0200-62AB-E411-F254E005DBD4')
mooshim_name = CBUUID.UUIDWithString_(u'1BC5FFA2-0200-62AB-E411-F254E005DBD4')
mooshim_info = CBUUID.UUIDWithString_(u'1BC5FFA1-0200-62AB-E411-F254E005DBD4')
mooshim_settings = CBUUID.UUIDWithString_(u'1BC5FFA3-0200-62AB-E411-F254E005DBD4')
class MooshimDelegate(object):
def __init__(self):
self.manager = None
self.peripheral = None
self.service = None
self.name = None
self.info = None
self.settings = None
self.comms = None
def centralManagerDidUpdateState_(self, manager):
#print "centralManagerDidUpdateState_:", repr(manager)
self.manager = manager
manager.scanForPeripheralsWithServices_options_([mooshim_service], None)
def centralManager_didDiscoverPeripheral_advertisementData_RSSI_(self, manager, peripheral, data, rssi):
self.peripheral = peripheral
print "Found %s (RSSI: %.0f dBm)" % (peripheral.name(), rssi)
manager.connectPeripheral_options_(peripheral, None)
def centralManager_didConnectPeripheral_(self, manager, peripheral):
print "didConnectPeripheral: Connected to", peripheral.name()
self.peripheral.setDelegate_(self)
self.peripheral.discoverServices_([mooshim_service])
def centralManager_didFailToConnectPeripheral_error_(self, manager, peripheral, error):
print "Failed to connect", repr(error)
def centralManager_didDisconnectPeripheral_error_(self, manager, peripheral, error):
print "Disconnected", repr(error)
AppHelper.stopEventLoop()
def peripheral_didDiscoverServices_(self, peripheral, services):
self.service = self.peripheral.services()[0]
self.peripheral.discoverCharacteristics_forService_([mooshim_name, mooshim_info, mooshim_settings], self.service)
def peripheral_didDiscoverCharacteristicsForService_error_(self, peripheral, service, error):
print "Discover characteristics", repr(service)
if error != None:
print " error:", repr(error)
return
for characteristic in self.service.characteristics():
if characteristic.UUID() == mooshim_name:
self.name = characteristic
self.peripheral.setNotifyValue_forCharacteristic_(True, self.name)
self.peripheral.readValueForCharacteristic_(self.name)
elif characteristic.UUID() == mooshim_info:
self.info = characteristic
self.peripheral.setNotifyValue_forCharacteristic_(True, self.info)
self.peripheral.readValueForCharacteristic_(self.info)
elif characteristic.UUID() == mooshim_settings:
self.settings = characteristic
self.peripheral.setNotifyValue_forCharacteristic_(True, self.settings)
self.peripheral.readValueForCharacteristic_(self.settings)
def peripheral_didWriteValueForCharacteristic_error_(self, peripheral, characteristic, error):
print "didWriteValueForCharacteristic:", repr(error)
def peripheral_didUpdateNotificationStateForCharacteristic_error_(self, peripheral, characteristic, error):
print "didUpdateNotificationStateForCharcteristic:", repr(characteristic)
print " UUID:", repr(characteristic.UUID())
def peripheral_didUpdateValueForCharacteristic_error_(self, peripheral, characteristic, error):
print "didUpdateValueForCharacteristic:", characteristic.UUID()
if True or error != None:
if characteristic == self.name:
dataname = "Name"
elif characteristic == self.info:
dataname = "Info"
elif characteristic == self.settings:
dataname = "Settings"
if characteristic.value() == None:
print dataname, "Data: <None>"
else:
print dataname, "Data:", repr(characteristic.value().bytes().tobytes())
peripheral.readValueForCharacteristic_(characteristic)
else:
print "Error:", repr(error)
def shutdown(self):
if self.peripheral is not None:
self.manager.cancelPeripheralConnection_(self.peripheral)
else:
AppHelper.stopEventLoop()
class CommsManager(object):
def __init__(self, mooshim):
self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.listener.bind(("127.0.0.1", 9999))
self.listener.listen(1)
self.connection = None
self.mooshim = mooshim
self.mooshim.comms = self
def loop(self):
endpoints = [sys.stdin, self.listener]
if self.connection is not None:
endpoints.append(self.connection)
r, w, e = select.select(endpoints, [], [], 0)
if sys.stdin in r:
delegate.shutdown()
return
if self.listener in r:
self.connection, _ = self.listener.accept()
if self.connection in r:
c = self.connection.recv(1)
if len(c) == 0:
print "closed"
self.connection.close()
self.connection = None
elif c not in ('\r', '\n'):
print repr(c)
self.mooshim.send(c)
AppHelper.callLater(0.1, self.loop)
def send(self, data):
while len(data):
sent = self.connection.send(data)
data = data[sent:]
def main():
delegate = MooshimDelegate()
manager = CBCentralManager.alloc()
manager.initWithDelegate_queue_options_(delegate, None, None)
comms = CommsManager(delegate)
print repr(manager)
AppHelper.callLater(0.1, comms.loop)
AppHelper.runConsoleEventLoop()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.