Create a gist now

Instantly share code, notes, and snippets.

@orviwan /README.md Secret
Last active Sep 18, 2016

PowerPoint Remote Control using Pebble
__title__ = 'libpebble'
__version__ = '0.0.1'
__build__ = 0x01
__license__ = 'MIT'
from .pebble import Pebble, PebbleError, PutBytesClient

PowerPoint Remote Control using Pebble

All of the good stuff comes from libpebble

Thanks to Hexxeh, hotdogs, cchschmch and all the folks in #pebble

Prerequisites

  1. A Pebble, Windows & Bluetooth

  2. Python 2.7

  3. pyserial>=2.6

    Command Prompt (as administrator): easy_install pyserial

  4. A Pebble watch, paired with Windows

    On Pebble, go to Settings > Bluetooth

    Initiate pairing from Windows

Usage

  1. Make sure bluetooth is enabled and Pebble is paired

  2. Download/extract this gist into a folder. e.g C:\Pebble

  3. Open a command prompt (as administrator)

  4. Run the following commands:

    cd c:\Pebble

    python p.py powerpoint

  5. Open PowerPoint and launch a slideshow

  6. Open the Music app on your Pebble and use the UP/DOWN/SELECT buttons to control the PPT.

Customising

All my code changes are commented with #DEMO

If you want to make this work with different applications like Media Player, try changing reference of shell.AppActivate('PowerPoint') and the keypresses you want e.g. shell.SendKeys('{RIGHT}').

You might be able to make this work on a Mac, by changing the shell and sendkeys. Mac keystrokes

#!/usr/bin/env python
import argparse
import pebble as libpebble
import sys
import time
from time import sleep
MAX_ATTEMPTS = 5
def cmd_ping(pebble, args):
pebble.ping(cookie=0xDEADBEEF)
def cmd_load(pebble, args):
pebble.install_app(args.app_bundle)
def cmd_load_fw(pebble, args):
pebble.install_firmware(args.fw_bundle)
time.sleep(5)
print 'resetting to apply firmware update...'
pebble.reset()
def cmd_logcat(pebble, args):
print 'listening for logs...'
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
return
def cmd_list_apps(pebble, args):
for app in pebble.get_appbank_status()['apps']:
print '[{}] {}'.format(app['index'], app['name'])
def cmd_rm_app(pebble, args):
for app in pebble.get_appbank_status()['apps']:
if app['index'] == args.app_index:
pebble.remove_app(app["id"], app["index"])
print 'removed app'
return
#DEMO
def cmd_powerpoint(pebble, args):
pebble.powerpoint()
#DEMO - keep open for PowerPoint
while True:
time.sleep(1)
def main():
parser = argparse.ArgumentParser(description='a utility belt for pebble development')
parser.add_argument('--pebble_id', type=str, help='the last 4 digits of the target Pebble\'s MAC address')
subparsers = parser.add_subparsers(help='commands', dest='which')
ping_parser = subparsers.add_parser('ping', help='send a ping message')
ping_parser.set_defaults(func=cmd_ping)
powerpoint_parser = subparsers.add_parser('powerpoint', help='control powerpoint')
powerpoint_parser.set_defaults(func=cmd_powerpoint)
load_parser = subparsers.add_parser('load', help='load an app onto a connected watch')
load_parser.add_argument('app_bundle', metavar='FILE', type=str, help='a compiled app bundle')
load_parser.set_defaults(func=cmd_load)
load_fw_parser = subparsers.add_parser('load_fw', help='load new firmware onto a connected watch')
load_fw_parser.add_argument('fw_bundle', metavar='FILE', type=str, help='a compiled app bundle')
load_fw_parser.set_defaults(func=cmd_load_fw)
logcat_parser = subparsers.add_parser('logcat', help='view logs sent from the connected watch')
logcat_parser.set_defaults(func=cmd_logcat)
list_apps_parser = subparsers.add_parser('list', help='list installed apps')
list_apps_parser.set_defaults(func=cmd_list_apps)
rm_app_parser = subparsers.add_parser('rm', help='remove installed apps')
rm_app_parser.add_argument('app_index', metavar='IDX', type=int, help='the app index to delete')
rm_app_parser.set_defaults(func=cmd_rm_app)
args = parser.parse_args()
attempts = 0
while True:
if attempts > MAX_ATTEMPTS:
raise 'Could not connect to Pebble'
try:
pebble = libpebble.Pebble(args.pebble_id)
break
except libpebble.PebbleError:
time.sleep(5)
attempts += 1
args.func(pebble, args)
if __name__ == '__main__':
main()
#!/usr/bin/env python
import binascii
import serial
import stm32_crc
import sys
import threading
import time
import traceback
import zipfile
from struct import pack, unpack
import os
import glob
import logging
import json
from serial.tools import list_ports
#DEMO - Used for SHELL
import win32com.client
# chose an implementation, depending on os
if os.name == 'nt':
from serial.tools.list_ports_windows import *
elif os.name == 'posix':
from serial.tools.list_ports_posix import *
else:
raise ImportError("Sorry: no implementation for your platform ('%s') available" % (os.name,))
def grep(regexp):
for port, desc, hwid in comports():
if re.search(regexp, port, re.I) or re.search(regexp, desc) or re.search(regexp, hwid):
yield port, desc, hwid
#DEMO - Instance of SHELL to sendkeys
shell = win32com.client.Dispatch("WScript.Shell")
log = logging.getLogger()
logging.basicConfig(format='[%(levelname)-8s] %(message)s')
log.setLevel(logging.DEBUG)
DEFAULT_PEBBLE_ID = None #Triggers autodetection
DEBUG_PROTOCOL = False
class PebbleBundle(object):
MANIFEST_FILENAME = 'manifest.json'
def __init__(self, bundle_path):
bundle_abs_path = os.path.abspath(bundle_path)
if not os.path.exists(bundle_abs_path):
raise "Bundle does not exist: " + bundle_path
self.zip = zipfile.ZipFile(bundle_abs_path)
self.path = bundle_abs_path
self.manifest = None
def get_manifest(self):
if (self.manifest):
return self.manifest
if self.MANIFEST_FILENAME not in self.zip.namelist():
raise "Could not find {}; are you sure this is a PebbleBundle?".format(self.MANIFEST_FILENAME)
self.manifest = json.loads(self.zip.read(self.MANIFEST_FILENAME))
return self.manifest
def close(self):
self.zip.close()
def is_firmware_bundle(self):
return 'firmware' in self.get_manifest()
def is_app_bundle(self):
return 'application' in self.get_manifest()
def has_resources(self):
return 'resources' in self.get_manifest()
def get_firmware_info(self):
if not self.is_firmware_bundle():
return None
return self.get_manifest()['firmware']
def get_application_info(self):
if not self.is_app_bundle():
return None
return self.get_manifest()['application']
def get_resources_info(self):
if not self.has_resources():
return None
return self.get_manifest()['resources']
class EndpointSync():
timeout = 10
def __init__(self, pebble, endpoint):
pebble.register_endpoint(endpoint, self.callback)
self.marker = threading.Event()
def callback(self, *args):
self.data = args
self.marker.set()
def get_data(self):
try:
self.marker.wait(timeout=self.timeout)
return self.data[1]
except:
return False
class PebbleError(Exception):
def __init__(self, id, message):
self._id = id
self._message = message
def __str__(self):
return "%s (ID:%s)" % (self._message, self._id)
class Pebble(object):
"""
A connection to a Pebble watch; data and commands may be sent
to the watch through an instance of this class.
"""
endpoints = {
"TIME": 11,
"VERSION": 16,
"PHONE_VERSION": 17,
"SYSTEM_MESSAGE": 18,
"MUSIC_CONTROL": 32,
"PHONE_CONTROL": 33,
"LOGS": 2000,
"PING": 2001,
"LOG_DUMP": 2002,
"RESET": 2003,
"APP": 2004,
"NOTIFICATION": 3000,
"RESOURCE": 4000,
"APP_MANAGER": 6000,
"PUTBYTES": 48879
}
@staticmethod
def AutodetectDevice():
if os.name == "posix":
pebbles = glob.glob("/dev/tty.Pebble????-SerialPortSe")
if len(pebbles) == 0:
raise PebbleError(None, "Autodetection could not find any Pebble devices")
elif len(pebbles) > 1:
log.warn("Autodetect found %d Pebbles; using most recent" % len(pebbles))
#NOTE: Not entirely sure if this is the correct approach
pebbles.sort(key=lambda x: os.stat(x).st_mtime, reverse=True)
id = pebbles[0][15:19]
elif os.name == "nt":
iterator = sorted(grep("0001005e")) #VID&0001005e_PID&0001
id = iterator[0][0]
log.info("Autodetect found a Pebble at port %s" % id)
return id
def __init__(self, id):
if os.name == "posix":
if id is None:
id = Pebble.AutodetectDevice()
devicefile = "/dev/tty.Pebble"+id+"-SerialPortSe"
elif os.name == "nt":
if id is None:
id = "AUTO"
devicefile = Pebble.AutodetectDevice() #returns COM port number
self.id = id
self._alive = True
self._endpoint_handlers = {}
self._internal_endpoint_handlers = {
self.endpoints["TIME"]: self._get_time_response,
self.endpoints["VERSION"]: self._version_response,
self.endpoints["SYSTEM_MESSAGE"]: self._system_message_response,
self.endpoints["LOGS"]: self._log_response,
self.endpoints["PING"]: self._ping_response,
self.endpoints["APP_MANAGER"]: self._appbank_status_response,
self.endpoints["MUSIC_CONTROL"]: self._powerpoint_response
} #DEMO response
try:
log.debug("Attempting to open %s as Pebble device %s" % (devicefile, id))
self._ser = serial.Serial(devicefile, 115200, timeout=1)
log.debug("Connected, discarding null response")
# we get a null response when we connect, discard it
self._ser.read(5)
#DEMO - This was the missing part from libpebble. Thanks hotdogs!
self._ser.write("\x00\x0d\x00\x11\x01\xff\xff\xff\xff\x80\x00\x00\x00\x00\x00\x00\x32")
# Eat any cruft that might be sitting in the serial buffer...
while self._ser.read():
pass
log.debug("Initializing reader thread")
self._read_thread = threading.Thread(target=self._reader)
self._read_thread.setDaemon(True)
self._read_thread.start()
log.debug("Reader thread loaded")
except:
raise PebbleError(id, "Failed to connect to Pebble")
def __del__(self):
try:
self._ser.close()
except:
pass
def _reader(self):
try:
while self._alive:
endpoint, resp = self._recv_message()
if resp == None:
continue
if DEBUG_PROTOCOL:
log.debug("Got message for endpoint %s of length %d" % (endpoint, len(resp)))
log.debug('<<< ' + resp.encode('hex'))
if endpoint in self._internal_endpoint_handlers:
resp = self._internal_endpoint_handlers[endpoint](endpoint, resp)
if endpoint in self._endpoint_handlers:
self._endpoint_handlers[endpoint](endpoint, resp)
except:
traceback.print_exc()
raise PebbleError(self.id, "Lost connection to Pebble")
self._alive = False
def _build_message(self, endpoint, data):
return pack("!HH", len(data), endpoint)+data
def _send_message(self, endpoint, data, callback = None):
if endpoint not in self.endpoints:
raise PebbleError(self.id, "Invalid endpoint specified")
msg = self._build_message(self.endpoints[endpoint], data)
if DEBUG_PROTOCOL:
log.debug('>>> ' + msg.encode('hex'))
self._ser.write(msg)
def _recv_message(self):
data = self._ser.read(4)
if len(data) == 0:
return (None, None)
elif len(data) < 4:
raise PebbleError(self.id, "Malformed response with length "+str(len(data)))
size, endpoint = unpack("!HH", data)
resp = self._ser.read(size)
return (endpoint, resp)
def register_endpoint(self, endpoint_name, func):
if endpoint_name not in self.endpoints:
raise PebbleError(self.id, "Invalid endpoint specified")
endpoint = self.endpoints[endpoint_name]
self._endpoint_handlers[endpoint] = func
def notification_sms(self, sender, body):
"""Send a 'SMS Notification' to the displayed on the watch."""
ts = str(int(time.time())*1000)
parts = [sender, body, ts]
data = "\x01"
for part in parts:
data += pack("!b", len(part))+part
self._send_message("NOTIFICATION", data)
def notification_email(self, sender, subject, body):
"""Send an 'Email Notification' to the displayed on the watch."""
ts = str(int(time.time())*1000)
parts = [sender, subject, ts, body]
data = "\x00"
for part in parts:
data += pack("!b", len(part))+part
self._send_message("NOTIFICATION", data)
def set_nowplaying_metadata(self, track, album, artist):
"""Update the song metadata displayed in Pebble's music app."""
parts = [artist, album, track]
data = pack("!b", 16)
for part in parts:
data += pack("!b", len(part))+part
self._send_message("MUSIC_CONTROL", data)
def get_versions(self, async = False):
"""
Retrieve a summary of version information for various software
(firmware, bootloader, etc) running on the watch.
"""
self._send_message("VERSION", "\x00")
if not async:
return EndpointSync(self, "VERSION").get_data()
def get_appbank_status(self, async = False):
"""
Retrieve a list of all installed watch-apps.
This is particularly useful when trying to locate a
free app-bank to use when installing a new watch-app.
"""
self._send_message("APP_MANAGER", "\x01")
if not async:
return EndpointSync(self, "APP_MANAGER").get_data()
def remove_app(self, appid, index):
"""Remove an installed application from the target app-bank."""
data = pack("!bII", 2, appid, index)
self._send_message("APP_MANAGER", data)
def get_time(self, async = False):
"""Retrieve the time from the Pebble's RTC."""
self._send_message("TIME", "\x00")
if not async:
return EndpointSync(self, "TIME").get_data()
def set_time(self, timestamp):
"""Set the time stored in the target Pebble's RTC."""
data = pack("!bL", 2, timestamp)
self._send_message("TIME", data)
def reinstall_app(self, name, pbz_path):
"""
A convenience method to uninstall and install an app.
This will only work if the app hasn't changed names between the new and old versions.
"""
apps = self.get_appbank_status()
for app in apps["apps"]:
if app["name"] == name:
self.remove_app(app["id"], app["index"])
self.install_app(pbz_path)
def install_app(self, pbz_path):
"""
Install an app bundle (*.pbw) to the target Pebble.
This will pick the first free app-bank available.
"""
bundle = PebbleBundle(pbz_path)
if not bundle.is_app_bundle():
raise PebbleError(self.id, "This is not an app bundle")
binary = bundle.zip.read(
bundle.get_application_info()['name'])
if bundle.has_resources():
resources = bundle.zip.read(
bundle.get_resources_info()['name'])
else:
resources = None
apps = self.get_appbank_status()
if not apps:
raise PebbleError(self.id, "could not obtain app list; try again")
first_free = 1
for app in apps["apps"]:
if app["index"] == first_free:
first_free += 1
if first_free == apps["banks"]:
raise PebbleError(self.id, "All %d app banks are full" % apps["banks"])
log.debug("Attempting to add app to bank %d of %d" % (first_free, apps["banks"]))
client = PutBytesClient(self, first_free, "BINARY", binary)
self.register_endpoint("PUTBYTES", client.handle_message)
client.init()
while not client._done and not client._error:
pass
if client._error:
raise PebbleError(self.id, "Failed to send application binary %s/pebble-app.bin" % pbz_path)
if resources:
client = PutBytesClient(self, first_free, "RESOURCES", resources)
self.register_endpoint("PUTBYTES", client.handle_message)
client.init()
while not client._done and not client._error:
pass
if client._error:
raise PebbleError(self.id, "Failed to send application resources %s/app_resources.pbpack" % pbz_path)
time.sleep(2)
self._add_app(first_free)
time.sleep(2)
def install_firmware(self, pbz_path, recovery=False):
"""Install a firmware bundle to the target watch."""
resources = None
with zipfile.ZipFile(pbz_path) as pbz:
binary = pbz.read("tintin_fw.bin")
if not recovery:
resources = pbz.read("system_resources.pbpack")
self.system_message("FIRMWARE_START")
time.sleep(2)
if resources:
client = PutBytesClient(self, 0, "SYS_RESOURCES", resources)
self.register_endpoint("PUTBYTES", client.handle_message)
client.init()
while not client._done and not client._error:
pass
if client._error:
raise PebbleError(self.id, "Failed to send firmware resources %s/system_resources.pbpack" % pbz_path)
client = PutBytesClient(self, 0, "RECOVERY" if recovery else "FIRMWARE", binary)
self.register_endpoint("PUTBYTES", client.handle_message)
client.init()
while not client._done and not client._error:
pass
if client._error:
raise PebbleError(self.id, "Failed to send firmware binary %s/tintin_fw.bin" % pbz_path)
self.system_message("FIRMWARE_COMPLETE")
def system_message(self, command):
"""
Send a 'system message' to the watch.
These messages are used to signal important events/state-changes to the watch firmware.
"""
commands = {
"FIRMWARE_AVAILABLE": 0,
"FIRMWARE_START": 1,
"FIRMWARE_COMPLETE": 2,
"FIRMWARE_FAIL": 3,
"FIRMWARE_UP_TO_DATE": 4,
"FIRMWARE_OUT_OF_DATE": 5,
"BLUETOOTH_START_DISCOVERABLE": 6,
"BLUETOOTH_END_DISCOVERABLE": 7
}
if command not in commands:
raise PebbleError(self.id, "Invalid command \"%s\"" % command)
data = pack("!bb", 0, commands[command])
log.debug("Sending command %s (code %d)" % (command, commands[command]))
self._send_message("SYSTEM_MESSAGE", data)
def ping(self, cookie = 0, async = False):
"""Send a 'ping' to the watch to test connectivity."""
data = pack("!bL", 0, cookie)
self._send_message("PING", data)
if not async:
return EndpointSync(self, "PING").get_data()
def reset(self):
"""Reset the watch remotely."""
self._send_message("RESET", "\x00")
def disconnect(self):
"""Disconnect from the target Pebble."""
self._alive = False
self._ser.close()
#DEMO - Set NowPlaying & focus PowerPoint
def powerpoint(self):
"""Control Microsoft PowerPoint using Pebble Music App"""
log.info("PowerPoint: Make sure PowerPoint is loaded and Slideshow is already started")
self.set_nowplaying_metadata("PowerPoint 2013", "libpebble", "Microsoft")
shell.AppActivate('PowerPoint')
#DEMO - UP button was pressed
def powerpoint_next(self):
log.info("PowerPoint: Next")
shell.AppActivate('PowerPoint')
shell.SendKeys('{RIGHT}')
#DEMO - DOWN button was pressed
def powerpoint_prev(self):
log.info("PowerPoint: Prev")
shell.AppActivate('PowerPoint')
shell.SendKeys('{LEFT}')
#DEMO - SELECT button was pressed.
def powerpoint_play(self):
log.info("PowerPoint: Home")
shell.AppActivate('PowerPoint')
shell.SendKeys('{HOME}')
#DEMO - Receives button presses from Pebble
def _powerpoint_response(self, endpoint, data):
res, = unpack("!b", data)
#log.info("PowerPoint: %s" % res)
if res == 1:
self.powerpoint_play()
elif res == 4:
self.powerpoint_next()
elif res == 5:
self.powerpoint_prev()
def _add_app(self, index):
data = pack("!bI", 3, index)
self._send_message("APP_MANAGER", data)
def _ping_response(self, endpoint, data):
restype, retcookie = unpack("!bL", data)
return retcookie
def _get_time_response(self, endpoint, data):
restype, timestamp = unpack("!bL", data)
return timestamp
def _system_message_response(self, endpoint, data):
if len(data) == 2:
log.info("Got system message %s" % repr(unpack('!bb', data)))
else:
log.info("Got 'unknown' system message...")
def _log_response(self, endpoint, data):
if (len(data) < 8):
log.warn("Unable to decode log message (length %d is less than 8)" % len(data))
return;
timestamp, level, msgsize, linenumber = unpack("!Ibbh", data[:8])
filename = data[8:24].decode('utf-8')
message = data[24:24+msgsize].decode('utf-8')
log_levels = {
0: "*",
1: "E",
50: "W",
100: "I",
200: "D",
250: "V"
}
level = log_levels[level] if level in log_levels else "?"
print timestamp, level, filename, linenumber, message
def _appbank_status_response(self, endpoint, data):
apps = {}
restype, = unpack("!b", data[0])
if restype == 1:
apps["banks"], apps_installed = unpack("!II", data[1:9])
apps["apps"] = []
appinfo_size = 78
offset = 9
for i in xrange(apps_installed):
app = {}
app["id"], app["index"], app["name"], app["company"], app["flags"], app["version"] = \
unpack("!II32s32sIH", data[offset:offset+appinfo_size])
app["name"] = app["name"].replace("\x00", "")
app["company"] = app["company"].replace("\x00", "")
apps["apps"] += [app]
offset += appinfo_size
return apps
def _version_response(self, endpoint, data):
fw_names = {
0: "normal_fw",
1: "recovery_fw"
}
resp = {}
for i in xrange(2):
fwver_size = 47
offset = i*fwver_size+1
fw = {}
fw["timestamp"],fw["version"],fw["commit"],fw["is_recovery"], \
fw["hardware_platform"],fw["metadata_ver"] = \
unpack("!i32s8s?bb", data[offset:offset+fwver_size])
fw["version"] = fw["version"].replace("\x00", "")
fw["commit"] = fw["commit"].replace("\x00", "")
fw_name = fw_names[i]
resp[fw_name] = fw
resp["bootloader_timestamp"],resp["hw_version"],resp["serial"] = \
unpack("!L9s12s", data[95:120])
resp["hw_version"] = resp["hw_version"].replace("\x00","")
btmac_hex = binascii.hexlify(data[120:126])
resp["btmac"] = ":".join([btmac_hex[i:i+2].upper() for i in reversed(xrange(0, 12, 2))])
return resp
class PutBytesClient(object):
states = {
"NOT_STARTED": 0,
"WAIT_FOR_TOKEN": 1,
"IN_PROGRESS": 2,
"COMMIT": 3,
"COMPLETE": 4,
"FAILED": 5
}
transfer_types = {
"FIRMWARE": 1,
"RECOVERY": 2,
"SYS_RESOURCES": 3,
"RESOURCES": 4,
"BINARY": 5
}
def __init__(self, pebble, index, transfer_type, buffer):
self._pebble = pebble
self._state = self.states["NOT_STARTED"]
self._transfer_type = self.transfer_types[transfer_type]
self._buffer = buffer
self._index = index
self._done = False
self._error = False
def init(self):
data = pack("!bIbb", 1, len(self._buffer), self._transfer_type, self._index)
self._pebble._send_message("PUTBYTES", data)
self._state = self.states["WAIT_FOR_TOKEN"]
def wait_for_token(self, resp):
res, = unpack("!b", resp[0])
if res != 1:
log.error("init failed with code %d" % res)
self._error = True
return
self._token, = unpack("!I", resp[1:])
self._left = len(self._buffer)
self._state = self.states["IN_PROGRESS"]
self.send()
def in_progress(self, resp):
res, = unpack("!b", resp[0])
if res != 1:
self.abort()
return
if self._left > 0:
self.send()
log.debug("Sent %d of %d bytes" % (len(self._buffer)-self._left, len(self._buffer)))
else:
self._state = self.states["COMMIT"]
self.commit()
def commit(self):
data = pack("!bII", 3, self._token & 0xFFFFFFFF, stm32_crc.crc32(self._buffer))
self._pebble._send_message("PUTBYTES", data)
def handle_commit(self, resp):
res, = unpack("!b", resp[0])
if res != 1:
self.abort()
return
self._state = self.states["COMPLETE"]
self.complete()
def complete(self):
data = pack("!bI", 5, self._token & 0xFFFFFFFF)
self._pebble._send_message("PUTBYTES", data)
def handle_complete(self, resp):
res, = unpack("!b", resp[0])
if res != 1:
self.abort()
return
self._done = True
def abort(self):
msgdata = pack("!bI", 4, self._token & 0xFFFFFFFF)
self._pebble.send_message("PUTBYTES", msgdata)
self._error = True
def send(self):
datalen = min(self._left, 2000)
rg = len(self._buffer)-self._left
msgdata = pack("!bII", 2, self._token & 0xFFFFFFFF, datalen)
msgdata += self._buffer[rg:rg+datalen]
self._pebble._send_message("PUTBYTES", msgdata)
self._left -= datalen
def handle_message(self, endpoint, resp):
if self._state == self.states["WAIT_FOR_TOKEN"]:
self.wait_for_token(resp)
elif self._state == self.states["IN_PROGRESS"]:
self.in_progress(resp)
elif self._state == self.states["COMMIT"]:
self.handle_commit(resp)
elif self._state == self.states["COMPLETE"]:
self.handle_complete(resp)
#!/usr/bin/env python
import argparse
import pebble as libpebble
import code
import readline
import rlcompleter
def start_repl(pebble_id):
pebble = libpebble.Pebble(pebble_id)
readline.set_completer(rlcompleter.Completer(locals()).complete)
readline.parse_and_bind('tab:complete')
code.interact(local=locals())
parser = argparse.ArgumentParser(description='An interactive environment for libpebble.')
parser.add_argument('pebble_id', metavar='PEBBLE_ID', type=str, help='the last 4 digits of the target Pebble\'s MAC address')
args = parser.parse_args()
start_repl(args.pebble_id)
import array
import sys
CRC_POLY = 0x04C11DB7
def process_word(data, crc=0xffffffff):
if (len(data) < 4):
d_array = array.array('B', data)
for x in range(0, 4 - len(data)):
d_array.insert(0,0)
d_array.reverse()
data = d_array.tostring()
d = array.array('I', data)[0]
crc = crc ^ d
for i in xrange(0, 32):
if (crc & 0x80000000) != 0:
crc = (crc << 1) ^ CRC_POLY
else:
crc = (crc << 1)
result = crc & 0xffffffff
return result
def process_buffer(buf, c = 0xffffffff):
word_count = len(buf) / 4
if (len(buf) % 4 != 0):
word_count += 1
crc = c
for i in xrange(0, word_count):
crc = process_word(buf[i * 4 : (i + 1) * 4], crc)
return crc
def crc32(data):
return process_buffer(data)
@JVimes
JVimes commented Feb 7, 2014

When I run p.py, I get "list index out of range" on line 163 of pebble.py. It appears that sorted(grep("0001005e")) is returning an empty list. I'm brand new to Python, but I have the code running in a debugger. Let me know if I can help. I'm on a 64-bit Windows 8 machine. Both the watch and my computer says they're paired/connected.

Traceback (most recent call last):
  File "C:\pebble\p.py", line 98, in <module>
    main()
  File "C:\pebble\p.py", line 88, in main
    pebble = libpebble.Pebble(args.pebble_id)
  File "C:\pebble\pebble.py", line 175, in __init__
    devicefile = Pebble.AutodetectDevice() #returns COM port number
  File "C:\pebble\pebble.py", line 163, in AutodetectDevice
    id = iterator[0][0]
IndexError: list index out of range

screen shot

@pitipucorrado

Hey Orviwan!
Is there any way i could contact you for help? I can't install it... :-(
Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment