Created
April 1, 2015 22:48
-
-
Save DanielO/c136c2d86c287f686220 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import select | |
import socket | |
import sys | |
import objc | |
from PyObjCTools import AppHelper | |
objc.loadBundle("CoreBluetooth", globals(), | |
bundle_path=objc.pathForFramework(u'/System/Library/Frameworks/IOBluetooth.framework/Versions/A/Frameworks/CoreBluetooth.framework')) | |
mooshim_service = CBUUID.UUIDWithString_(u'1BC5FFA0-0200-62AB-E411-F254E005DBD4') | |
mooshim_name = CBUUID.UUIDWithString_(u'1BC5FFA2-0200-62AB-E411-F254E005DBD4') | |
mooshim_info = CBUUID.UUIDWithString_(u'1BC5FFA1-0200-62AB-E411-F254E005DBD4') | |
mooshim_settings = CBUUID.UUIDWithString_(u'1BC5FFA3-0200-62AB-E411-F254E005DBD4') | |
class MooshimDelegate(object): | |
def __init__(self): | |
self.manager = None | |
self.peripheral = None | |
self.service = None | |
self.name = None | |
self.info = None | |
self.settings = None | |
self.comms = None | |
def centralManagerDidUpdateState_(self, manager): | |
#print "centralManagerDidUpdateState_:", repr(manager) | |
self.manager = manager | |
manager.scanForPeripheralsWithServices_options_([mooshim_service], None) | |
def centralManager_didDiscoverPeripheral_advertisementData_RSSI_(self, manager, peripheral, data, rssi): | |
self.peripheral = peripheral | |
print "Found %s (RSSI: %.0f dBm)" % (peripheral.name(), rssi) | |
manager.connectPeripheral_options_(peripheral, None) | |
def centralManager_didConnectPeripheral_(self, manager, peripheral): | |
print "didConnectPeripheral: Connected to", peripheral.name() | |
self.peripheral.setDelegate_(self) | |
self.peripheral.discoverServices_([mooshim_service]) | |
def centralManager_didFailToConnectPeripheral_error_(self, manager, peripheral, error): | |
print "Failed to connect", repr(error) | |
def centralManager_didDisconnectPeripheral_error_(self, manager, peripheral, error): | |
print "Disconnected", repr(error) | |
AppHelper.stopEventLoop() | |
def peripheral_didDiscoverServices_(self, peripheral, services): | |
self.service = self.peripheral.services()[0] | |
self.peripheral.discoverCharacteristics_forService_([mooshim_name, mooshim_info, mooshim_settings], self.service) | |
def peripheral_didDiscoverCharacteristicsForService_error_(self, peripheral, service, error): | |
print "Discover characteristics", repr(service) | |
if error != None: | |
print " error:", repr(error) | |
return | |
for characteristic in self.service.characteristics(): | |
if characteristic.UUID() == mooshim_name: | |
self.name = characteristic | |
self.peripheral.setNotifyValue_forCharacteristic_(True, self.name) | |
self.peripheral.readValueForCharacteristic_(self.name) | |
elif characteristic.UUID() == mooshim_info: | |
self.info = characteristic | |
self.peripheral.setNotifyValue_forCharacteristic_(True, self.info) | |
self.peripheral.readValueForCharacteristic_(self.info) | |
elif characteristic.UUID() == mooshim_settings: | |
self.settings = characteristic | |
self.peripheral.setNotifyValue_forCharacteristic_(True, self.settings) | |
self.peripheral.readValueForCharacteristic_(self.settings) | |
def peripheral_didWriteValueForCharacteristic_error_(self, peripheral, characteristic, error): | |
print "didWriteValueForCharacteristic:", repr(error) | |
def peripheral_didUpdateNotificationStateForCharacteristic_error_(self, peripheral, characteristic, error): | |
print "didUpdateNotificationStateForCharcteristic:", repr(characteristic) | |
print " UUID:", repr(characteristic.UUID()) | |
def peripheral_didUpdateValueForCharacteristic_error_(self, peripheral, characteristic, error): | |
print "didUpdateValueForCharacteristic:", characteristic.UUID() | |
if True or error != None: | |
if characteristic == self.name: | |
dataname = "Name" | |
elif characteristic == self.info: | |
dataname = "Info" | |
elif characteristic == self.settings: | |
dataname = "Settings" | |
if characteristic.value() == None: | |
print dataname, "Data: <None>" | |
else: | |
print dataname, "Data:", repr(characteristic.value().bytes().tobytes()) | |
peripheral.readValueForCharacteristic_(characteristic) | |
else: | |
print "Error:", repr(error) | |
def shutdown(self): | |
if self.peripheral is not None: | |
self.manager.cancelPeripheralConnection_(self.peripheral) | |
else: | |
AppHelper.stopEventLoop() | |
class CommsManager(object): | |
def __init__(self, mooshim): | |
self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self.listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
self.listener.bind(("127.0.0.1", 9999)) | |
self.listener.listen(1) | |
self.connection = None | |
self.mooshim = mooshim | |
self.mooshim.comms = self | |
def loop(self): | |
endpoints = [sys.stdin, self.listener] | |
if self.connection is not None: | |
endpoints.append(self.connection) | |
r, w, e = select.select(endpoints, [], [], 0) | |
if sys.stdin in r: | |
delegate.shutdown() | |
return | |
if self.listener in r: | |
self.connection, _ = self.listener.accept() | |
if self.connection in r: | |
c = self.connection.recv(1) | |
if len(c) == 0: | |
print "closed" | |
self.connection.close() | |
self.connection = None | |
elif c not in ('\r', '\n'): | |
print repr(c) | |
self.mooshim.send(c) | |
AppHelper.callLater(0.1, self.loop) | |
def send(self, data): | |
while len(data): | |
sent = self.connection.send(data) | |
data = data[sent:] | |
def main(): | |
delegate = MooshimDelegate() | |
manager = CBCentralManager.alloc() | |
manager.initWithDelegate_queue_options_(delegate, None, None) | |
comms = CommsManager(delegate) | |
print repr(manager) | |
AppHelper.callLater(0.1, comms.loop) | |
AppHelper.runConsoleEventLoop() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment