Skip to content

Instantly share code, notes, and snippets.

@ep1cman
Forked from orviwan/README.md
Last active December 22, 2015 12:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ep1cman/7f1fd961b62d2f96d44e to your computer and use it in GitHub Desktop.
Save ep1cman/7f1fd961b62d2f96d44e to your computer and use it in GitHub Desktop.
Spotify Remote Control using Pebble
__title__ = 'libpebble'
__version__ = '0.0.1'
__build__ = 0x01
__license__ = 'MIT'
from .pebble import Pebble, PebbleError, PutBytesClient

Spotify Remote Control using Pebble

All of the good stuff comes from libpebble

Thanks to Hexxeh, orviwan who made the code this is based on and all the folks in #pebble

Prerequisites

  1. A Pebble, Windows & Bluetooth

  2. Python 2.7 32-Bit/64-Bit

  3. Pywin32 32-Bit/64-Bit

  4. Pyserial 32-Bit/64-Bit

  5. A Pebble watch, paired with Windows

    On Pebble, go to Settings > Bluetooth

    Initiate pairing from Windows

Usage

  1. Make sure bluetooth is enabled and Pebble is paired (you may have to disable bluetooth on your phone)

  2. Download/extract this gist into a folder. e.g C:\Pebble

  3. Open a command prompt (as administrator)

  4. Run the following commands:

    cd c:\Pebble

    python p.py spotify

  5. Open spotify and log in

  6. Open the Music app on your Pebble and use the UP/DOWN/SELECT buttons to control spotify playback.

#!/usr/bin/env python
import argparse
import pebble as libpebble
import sys
import time
#SPOTIFY
import ctypes
#SPOTIFY
titles = []
EnumWindows = ctypes.windll.user32.EnumWindows
EnumWindowsProc = ctypes.WINFUNCTYPE(ctypes.c_bool, ctypes.POINTER(ctypes.c_int), ctypes.POINTER(ctypes.c_int))
GetWindowText = ctypes.windll.user32.GetWindowTextW
GetWindowTextLength = ctypes.windll.user32.GetWindowTextLengthW
IsWindowVisible = ctypes.windll.user32.IsWindowVisible
#SPOTIFY
def foreach_window(hwnd, lParam):
if IsWindowVisible(hwnd):
length = GetWindowTextLength(hwnd)
buff = ctypes.create_unicode_buffer(length + 1)
GetWindowText(hwnd, buff, length + 1)
titles.append(buff.value)
return True
#SPOTIFY
def getSongDetails():
global titles
titles = []
EnumWindows(EnumWindowsProc(foreach_window), 0)
for x in titles:
if "Spotify " == x[0:8]:
y = x.split(u"\u2013 ")
song = y[1].encode('ascii','ignore')
artist = y[0].split(" - ")[1][0:-1].encode('ascii','ignore')
return [song,artist]
from time import sleep
MAX_ATTEMPTS = 5
def cmd_ping(pebble, args):
pebble.ping(cookie=0xDEADBEEF)
def cmd_load(pebble, args):
pebble.install_app(args.app_bundle)
def cmd_load_fw(pebble, args):
pebble.install_firmware(args.fw_bundle)
time.sleep(5)
print 'resetting to apply firmware update...'
pebble.reset()
def cmd_logcat(pebble, args):
print 'listening for logs...'
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
return
def cmd_list_apps(pebble, args):
for app in pebble.get_appbank_status()['apps']:
print '[{}] {}'.format(app['index'], app['name'])
def cmd_rm_app(pebble, args):
for app in pebble.get_appbank_status()['apps']:
if app['index'] == args.app_index:
pebble.remove_app(app["id"], app["index"])
print 'removed app'
return
#DEMO
def cmd_powerpoint(pebble, args):
pebble.powerpoint()
#DEMO - keep open for PowerPoint
while True:
time.sleep(1)
#SPOTIFY
def cmd_spotify(pebble, args):
currentsong = getSongDetails()
pebble.spotify(currentsong)
while True:
time.sleep(1)
if getSongDetails() != currentsong:
currentsong = getSongDetails()
pebble.spotify(currentsong)
def main():
parser = argparse.ArgumentParser(description='a utility belt for pebble development')
parser.add_argument('--pebble_id', type=str, help='the last 4 digits of the target Pebble\'s MAC address')
subparsers = parser.add_subparsers(help='commands', dest='which')
ping_parser = subparsers.add_parser('ping', help='send a ping message')
ping_parser.set_defaults(func=cmd_ping)
#DEMO
powerpoint_parser = subparsers.add_parser('powerpoint', help='control powerpoint')
powerpoint_parser.set_defaults(func=cmd_powerpoint)
#SPOTIFY
spotify_parser = subparsers.add_parser('spotify', help='control spotify')
spotify_parser.set_defaults(func=cmd_spotify)
load_parser = subparsers.add_parser('load', help='load an app onto a connected watch')
load_parser.add_argument('app_bundle', metavar='FILE', type=str, help='a compiled app bundle')
load_parser.set_defaults(func=cmd_load)
load_fw_parser = subparsers.add_parser('load_fw', help='load new firmware onto a connected watch')
load_fw_parser.add_argument('fw_bundle', metavar='FILE', type=str, help='a compiled app bundle')
load_fw_parser.set_defaults(func=cmd_load_fw)
logcat_parser = subparsers.add_parser('logcat', help='view logs sent from the connected watch')
logcat_parser.set_defaults(func=cmd_logcat)
list_apps_parser = subparsers.add_parser('list', help='list installed apps')
list_apps_parser.set_defaults(func=cmd_list_apps)
rm_app_parser = subparsers.add_parser('rm', help='remove installed apps')
rm_app_parser.add_argument('app_index', metavar='IDX', type=int, help='the app index to delete')
rm_app_parser.set_defaults(func=cmd_rm_app)
args = parser.parse_args()
attempts = 0
while True:
if attempts > MAX_ATTEMPTS:
raise 'Could not connect to Pebble'
try:
pebble = libpebble.Pebble(args.pebble_id)
break
except libpebble.PebbleError:
time.sleep(5)
attempts += 1
args.func(pebble, args)
if __name__ == '__main__':
main()
#!/usr/bin/env python
import binascii
import serial
import stm32_crc
import sys
import threading
import time
import traceback
import zipfile
from struct import pack, unpack
import os
import glob
import logging
import json
from serial.tools import list_ports
#DEMO - Used for SHELL
import win32com.client
#SPOTIFY
import win32api
import win32con
# chose an implementation, depending on os
if os.name == 'nt':
from serial.tools.list_ports_windows import *
elif os.name == 'posix':
from serial.tools.list_ports_posix import *
else:
raise ImportError("Sorry: no implementation for your platform ('%s') available" % (os.name,))
def grep(regexp):
for port, desc, hwid in comports():
if re.search(regexp, port, re.I) or re.search(regexp, desc) or re.search(regexp, hwid):
yield port, desc, hwid
#DEMO - Instance of SHELL to sendkeys
shell = win32com.client.Dispatch("WScript.Shell")
log = logging.getLogger()
logging.basicConfig(format='[%(levelname)-8s] %(message)s')
log.setLevel(logging.DEBUG)
DEFAULT_PEBBLE_ID = None #Triggers autodetection
DEBUG_PROTOCOL = False
class PebbleBundle(object):
MANIFEST_FILENAME = 'manifest.json'
def __init__(self, bundle_path):
bundle_abs_path = os.path.abspath(bundle_path)
if not os.path.exists(bundle_abs_path):
raise "Bundle does not exist: " + bundle_path
self.zip = zipfile.ZipFile(bundle_abs_path)
self.path = bundle_abs_path
self.manifest = None
def get_manifest(self):
if (self.manifest):
return self.manifest
if self.MANIFEST_FILENAME not in self.zip.namelist():
raise "Could not find {}; are you sure this is a PebbleBundle?".format(self.MANIFEST_FILENAME)
self.manifest = json.loads(self.zip.read(self.MANIFEST_FILENAME))
return self.manifest
def close(self):
self.zip.close()
def is_firmware_bundle(self):
return 'firmware' in self.get_manifest()
def is_app_bundle(self):
return 'application' in self.get_manifest()
def has_resources(self):
return 'resources' in self.get_manifest()
def get_firmware_info(self):
if not self.is_firmware_bundle():
return None
return self.get_manifest()['firmware']
def get_application_info(self):
if not self.is_app_bundle():
return None
return self.get_manifest()['application']
def get_resources_info(self):
if not self.has_resources():
return None
return self.get_manifest()['resources']
class EndpointSync():
timeout = 10
def __init__(self, pebble, endpoint):
pebble.register_endpoint(endpoint, self.callback)
self.marker = threading.Event()
def callback(self, *args):
self.data = args
self.marker.set()
def get_data(self):
try:
self.marker.wait(timeout=self.timeout)
return self.data[1]
except:
return False
class PebbleError(Exception):
def __init__(self, id, message):
self._id = id
self._message = message
def __str__(self):
return "%s (ID:%s)" % (self._message, self._id)
class Pebble(object):
"""
A connection to a Pebble watch; data and commands may be sent
to the watch through an instance of this class.
"""
endpoints = {
"TIME": 11,
"VERSION": 16,
"PHONE_VERSION": 17,
"SYSTEM_MESSAGE": 18,
"MUSIC_CONTROL": 32,
"PHONE_CONTROL": 33,
"LOGS": 2000,
"PING": 2001,
"LOG_DUMP": 2002,
"RESET": 2003,
"APP": 2004,
"NOTIFICATION": 3000,
"RESOURCE": 4000,
"APP_MANAGER": 6000,
"PUTBYTES": 48879
}
@staticmethod
def AutodetectDevice():
if os.name == "posix":
pebbles = glob.glob("/dev/tty.Pebble????-SerialPortSe")
if len(pebbles) == 0:
raise PebbleError(None, "Autodetection could not find any Pebble devices")
elif len(pebbles) > 1:
log.warn("Autodetect found %d Pebbles; using most recent" % len(pebbles))
#NOTE: Not entirely sure if this is the correct approach
pebbles.sort(key=lambda x: os.stat(x).st_mtime, reverse=True)
id = pebbles[0][15:19]
elif os.name == "nt":
iterator = sorted(grep("0001005e")) #VID&0001005e_PID&0001
id = iterator[0][0]
log.info("Autodetect found a Pebble at port %s" % id)
return id
def __init__(self, id):
if os.name == "posix":
if id is None:
id = Pebble.AutodetectDevice()
devicefile = "/dev/tty.Pebble"+id+"-SerialPortSe"
elif os.name == "nt":
if id is None:
id = "AUTO"
devicefile = Pebble.AutodetectDevice() #returns COM port number
self.id = id
self._alive = True
self._endpoint_handlers = {}
self._internal_endpoint_handlers = {
self.endpoints["TIME"]: self._get_time_response,
self.endpoints["VERSION"]: self._version_response,
self.endpoints["SYSTEM_MESSAGE"]: self._system_message_response,
self.endpoints["LOGS"]: self._log_response,
self.endpoints["PING"]: self._ping_response,
self.endpoints["APP_MANAGER"]: self._appbank_status_response,
#self.endpoints["MUSIC_CONTROL"]: self._powerpoint_response
self.endpoints["MUSIC_CONTROL"]: self._spotify_response
} #DEMO response
try:
log.debug("Attempting to open %s as Pebble device %s" % (devicefile, id))
self._ser = serial.Serial(devicefile, 115200, timeout=1)
log.debug("Connected, discarding null response")
# we get a null response when we connect, discard it
self._ser.read(5)
#DEMO - This was the missing part from libpebble. Thanks hotdogs!
self._ser.write("\x00\x0d\x00\x11\x01\xff\xff\xff\xff\x80\x00\x00\x00\x00\x00\x00\x32")
# Eat any cruft that might be sitting in the serial buffer...
while self._ser.read():
pass
log.debug("Initializing reader thread")
self._read_thread = threading.Thread(target=self._reader)
self._read_thread.setDaemon(True)
self._read_thread.start()
log.debug("Reader thread loaded")
except:
raise PebbleError(id, "Failed to connect to Pebble")
def __del__(self):
try:
self._ser.close()
except:
pass
def _reader(self):
try:
while self._alive:
endpoint, resp = self._recv_message()
if resp == None:
continue
if DEBUG_PROTOCOL:
log.debug("Got message for endpoint %s of length %d" % (endpoint, len(resp)))
log.debug('<<< ' + resp.encode('hex'))
if endpoint in self._internal_endpoint_handlers:
resp = self._internal_endpoint_handlers[endpoint](endpoint, resp)
if endpoint in self._endpoint_handlers:
self._endpoint_handlers[endpoint](endpoint, resp)
except:
traceback.print_exc()
raise PebbleError(self.id, "Lost connection to Pebble")
self._alive = False
def _build_message(self, endpoint, data):
return pack("!HH", len(data), endpoint)+data
def _send_message(self, endpoint, data, callback = None):
if endpoint not in self.endpoints:
raise PebbleError(self.id, "Invalid endpoint specified")
msg = self._build_message(self.endpoints[endpoint], data)
if DEBUG_PROTOCOL:
log.debug('>>> ' + msg.encode('hex'))
self._ser.write(msg)
def _recv_message(self):
data = self._ser.read(4)
if len(data) == 0:
return (None, None)
elif len(data) < 4:
raise PebbleError(self.id, "Malformed response with length "+str(len(data)))
size, endpoint = unpack("!HH", data)
resp = self._ser.read(size)
return (endpoint, resp)
def register_endpoint(self, endpoint_name, func):
if endpoint_name not in self.endpoints:
raise PebbleError(self.id, "Invalid endpoint specified")
endpoint = self.endpoints[endpoint_name]
self._endpoint_handlers[endpoint] = func
def notification_sms(self, sender, body):
"""Send a 'SMS Notification' to the displayed on the watch."""
ts = str(int(time.time())*1000)
parts = [sender, body, ts]
data = "\x01"
for part in parts:
data += pack("!b", len(part))+part
self._send_message("NOTIFICATION", data)
def notification_email(self, sender, subject, body):
"""Send an 'Email Notification' to the displayed on the watch."""
ts = str(int(time.time())*1000)
parts = [sender, subject, ts, body]
data = "\x00"
for part in parts:
data += pack("!b", len(part))+part
self._send_message("NOTIFICATION", data)
def set_nowplaying_metadata(self, track, album, artist):
"""Update the song metadata displayed in Pebble's music app."""
parts = [artist, album, track]
data = pack("!b", 16)
for part in parts:
data += pack("!b", len(part))+part
self._send_message("MUSIC_CONTROL", data)
def get_versions(self, async = False):
"""
Retrieve a summary of version information for various software
(firmware, bootloader, etc) running on the watch.
"""
self._send_message("VERSION", "\x00")
if not async:
return EndpointSync(self, "VERSION").get_data()
def get_appbank_status(self, async = False):
"""
Retrieve a list of all installed watch-apps.
This is particularly useful when trying to locate a
free app-bank to use when installing a new watch-app.
"""
self._send_message("APP_MANAGER", "\x01")
if not async:
return EndpointSync(self, "APP_MANAGER").get_data()
def remove_app(self, appid, index):
"""Remove an installed application from the target app-bank."""
data = pack("!bII", 2, appid, index)
self._send_message("APP_MANAGER", data)
def get_time(self, async = False):
"""Retrieve the time from the Pebble's RTC."""
self._send_message("TIME", "\x00")
if not async:
return EndpointSync(self, "TIME").get_data()
def set_time(self, timestamp):
"""Set the time stored in the target Pebble's RTC."""
data = pack("!bL", 2, timestamp)
self._send_message("TIME", data)
def reinstall_app(self, name, pbz_path):
"""
A convenience method to uninstall and install an app.
This will only work if the app hasn't changed names between the new and old versions.
"""
apps = self.get_appbank_status()
for app in apps["apps"]:
if app["name"] == name:
self.remove_app(app["id"], app["index"])
self.install_app(pbz_path)
def install_app(self, pbz_path):
"""
Install an app bundle (*.pbw) to the target Pebble.
This will pick the first free app-bank available.
"""
bundle = PebbleBundle(pbz_path)
if not bundle.is_app_bundle():
raise PebbleError(self.id, "This is not an app bundle")
binary = bundle.zip.read(
bundle.get_application_info()['name'])
if bundle.has_resources():
resources = bundle.zip.read(
bundle.get_resources_info()['name'])
else:
resources = None
apps = self.get_appbank_status()
if not apps:
raise PebbleError(self.id, "could not obtain app list; try again")
first_free = 1
for app in apps["apps"]:
if app["index"] == first_free:
first_free += 1
if first_free == apps["banks"]:
raise PebbleError(self.id, "All %d app banks are full" % apps["banks"])
log.debug("Attempting to add app to bank %d of %d" % (first_free, apps["banks"]))
client = PutBytesClient(self, first_free, "BINARY", binary)
self.register_endpoint("PUTBYTES", client.handle_message)
client.init()
while not client._done and not client._error:
pass
if client._error:
raise PebbleError(self.id, "Failed to send application binary %s/pebble-app.bin" % pbz_path)
if resources:
client = PutBytesClient(self, first_free, "RESOURCES", resources)
self.register_endpoint("PUTBYTES", client.handle_message)
client.init()
while not client._done and not client._error:
pass
if client._error:
raise PebbleError(self.id, "Failed to send application resources %s/app_resources.pbpack" % pbz_path)
time.sleep(2)
self._add_app(first_free)
time.sleep(2)
def install_firmware(self, pbz_path, recovery=False):
"""Install a firmware bundle to the target watch."""
resources = None
with zipfile.ZipFile(pbz_path) as pbz:
binary = pbz.read("tintin_fw.bin")
if not recovery:
resources = pbz.read("system_resources.pbpack")
self.system_message("FIRMWARE_START")
time.sleep(2)
if resources:
client = PutBytesClient(self, 0, "SYS_RESOURCES", resources)
self.register_endpoint("PUTBYTES", client.handle_message)
client.init()
while not client._done and not client._error:
pass
if client._error:
raise PebbleError(self.id, "Failed to send firmware resources %s/system_resources.pbpack" % pbz_path)
client = PutBytesClient(self, 0, "RECOVERY" if recovery else "FIRMWARE", binary)
self.register_endpoint("PUTBYTES", client.handle_message)
client.init()
while not client._done and not client._error:
pass
if client._error:
raise PebbleError(self.id, "Failed to send firmware binary %s/tintin_fw.bin" % pbz_path)
self.system_message("FIRMWARE_COMPLETE")
def system_message(self, command):
"""
Send a 'system message' to the watch.
These messages are used to signal important events/state-changes to the watch firmware.
"""
commands = {
"FIRMWARE_AVAILABLE": 0,
"FIRMWARE_START": 1,
"FIRMWARE_COMPLETE": 2,
"FIRMWARE_FAIL": 3,
"FIRMWARE_UP_TO_DATE": 4,
"FIRMWARE_OUT_OF_DATE": 5,
"BLUETOOTH_START_DISCOVERABLE": 6,
"BLUETOOTH_END_DISCOVERABLE": 7
}
if command not in commands:
raise PebbleError(self.id, "Invalid command \"%s\"" % command)
data = pack("!bb", 0, commands[command])
log.debug("Sending command %s (code %d)" % (command, commands[command]))
self._send_message("SYSTEM_MESSAGE", data)
def ping(self, cookie = 0, async = False):
"""Send a 'ping' to the watch to test connectivity."""
data = pack("!bL", 0, cookie)
self._send_message("PING", data)
if not async:
return EndpointSync(self, "PING").get_data()
def reset(self):
"""Reset the watch remotely."""
self._send_message("RESET", "\x00")
def disconnect(self):
"""Disconnect from the target Pebble."""
self._alive = False
self._ser.close()
#SPOTIFY CODE BELOW!!!
def spotify(self,info):
if info != None:
log.info("Spotify: Now playing " + info[0] + " by " + info[1])
self.set_nowplaying_metadata(info[0], "Spotify", info[1])
def spotify_next(self):
log.info("Spotify: Next")
win32api.keybd_event(0xB0, 0,0,0)
time.sleep(.05)
win32api.keybd_event(0xB0,0 ,win32con.KEYEVENTF_KEYUP ,0)
def spotify_prev(self):
log.info("Spotify: Prev")
win32api.keybd_event(0xB1, 0,0,0)
time.sleep(.05)
win32api.keybd_event(0xB1,0 ,win32con.KEYEVENTF_KEYUP ,0)
def spotify_play(self):
log.info("Spotify: Play/Pause")
win32api.keybd_event(0xB3, 0,0,0)
time.sleep(.05)
win32api.keybd_event(0xB3,0 ,win32con.KEYEVENTF_KEYUP ,0)
def _spotify_response(self, endpoint, data):
res, = unpack("!b", data)
if res == 1:
self.spotify_play()
elif res == 4:
self.spotify_next()
elif res == 5:
self.spotify_prev()
#DEMO - Set NowPlaying & focus PowerPoint
def powerpoint(self):
"""Control Microsoft PowerPoint using Pebble Music App"""
log.info("PowerPoint: Make sure PowerPoint is loaded and Slideshow is already started")
self.set_nowplaying_metadata("PowerPoint 2013", "libpebble", "Microsoft")
shell.AppActivate('PowerPoint')
#DEMO - UP button was pressed
def powerpoint_next(self):
log.info("PowerPoint: Next")
shell.AppActivate('PowerPoint')
shell.SendKeys('{RIGHT}')
#DEMO - DOWN button was pressed
def powerpoint_prev(self):
log.info("PowerPoint: Prev")
shell.AppActivate('PowerPoint')
shell.SendKeys('{LEFT}')
#DEMO - SELECT button was pressed.
def powerpoint_play(self):
log.info("PowerPoint: Home")
shell.AppActivate('PowerPoint')
shell.SendKeys('{HOME}')
#DEMO - Receives button presses from Pebble
def _powerpoint_response(self, endpoint, data):
res, = unpack("!b", data)
#log.info("PowerPoint: %s" % res)
if res == 1:
self.powerpoint_play()
elif res == 4:
self.powerpoint_next()
elif res == 5:
self.powerpoint_prev()
def _add_app(self, index):
data = pack("!bI", 3, index)
self._send_message("APP_MANAGER", data)
def _ping_response(self, endpoint, data):
restype, retcookie = unpack("!bL", data)
return retcookie
def _get_time_response(self, endpoint, data):
restype, timestamp = unpack("!bL", data)
return timestamp
def _system_message_response(self, endpoint, data):
if len(data) == 2:
log.info("Got system message %s" % repr(unpack('!bb', data)))
else:
log.info("Got 'unknown' system message...")
def _log_response(self, endpoint, data):
if (len(data) < 8):
log.warn("Unable to decode log message (length %d is less than 8)" % len(data))
return;
timestamp, level, msgsize, linenumber = unpack("!Ibbh", data[:8])
filename = data[8:24].decode('utf-8')
message = data[24:24+msgsize].decode('utf-8')
log_levels = {
0: "*",
1: "E",
50: "W",
100: "I",
200: "D",
250: "V"
}
level = log_levels[level] if level in log_levels else "?"
print timestamp, level, filename, linenumber, message
def _appbank_status_response(self, endpoint, data):
apps = {}
restype, = unpack("!b", data[0])
if restype == 1:
apps["banks"], apps_installed = unpack("!II", data[1:9])
apps["apps"] = []
appinfo_size = 78
offset = 9
for i in xrange(apps_installed):
app = {}
app["id"], app["index"], app["name"], app["company"], app["flags"], app["version"] = \
unpack("!II32s32sIH", data[offset:offset+appinfo_size])
app["name"] = app["name"].replace("\x00", "")
app["company"] = app["company"].replace("\x00", "")
apps["apps"] += [app]
offset += appinfo_size
return apps
def _version_response(self, endpoint, data):
fw_names = {
0: "normal_fw",
1: "recovery_fw"
}
resp = {}
for i in xrange(2):
fwver_size = 47
offset = i*fwver_size+1
fw = {}
fw["timestamp"],fw["version"],fw["commit"],fw["is_recovery"], \
fw["hardware_platform"],fw["metadata_ver"] = \
unpack("!i32s8s?bb", data[offset:offset+fwver_size])
fw["version"] = fw["version"].replace("\x00", "")
fw["commit"] = fw["commit"].replace("\x00", "")
fw_name = fw_names[i]
resp[fw_name] = fw
resp["bootloader_timestamp"],resp["hw_version"],resp["serial"] = \
unpack("!L9s12s", data[95:120])
resp["hw_version"] = resp["hw_version"].replace("\x00","")
btmac_hex = binascii.hexlify(data[120:126])
resp["btmac"] = ":".join([btmac_hex[i:i+2].upper() for i in reversed(xrange(0, 12, 2))])
return resp
class PutBytesClient(object):
states = {
"NOT_STARTED": 0,
"WAIT_FOR_TOKEN": 1,
"IN_PROGRESS": 2,
"COMMIT": 3,
"COMPLETE": 4,
"FAILED": 5
}
transfer_types = {
"FIRMWARE": 1,
"RECOVERY": 2,
"SYS_RESOURCES": 3,
"RESOURCES": 4,
"BINARY": 5
}
def __init__(self, pebble, index, transfer_type, buffer):
self._pebble = pebble
self._state = self.states["NOT_STARTED"]
self._transfer_type = self.transfer_types[transfer_type]
self._buffer = buffer
self._index = index
self._done = False
self._error = False
def init(self):
data = pack("!bIbb", 1, len(self._buffer), self._transfer_type, self._index)
self._pebble._send_message("PUTBYTES", data)
self._state = self.states["WAIT_FOR_TOKEN"]
def wait_for_token(self, resp):
res, = unpack("!b", resp[0])
if res != 1:
log.error("init failed with code %d" % res)
self._error = True
return
self._token, = unpack("!I", resp[1:])
self._left = len(self._buffer)
self._state = self.states["IN_PROGRESS"]
self.send()
def in_progress(self, resp):
res, = unpack("!b", resp[0])
if res != 1:
self.abort()
return
if self._left > 0:
self.send()
log.debug("Sent %d of %d bytes" % (len(self._buffer)-self._left, len(self._buffer)))
else:
self._state = self.states["COMMIT"]
self.commit()
def commit(self):
data = pack("!bII", 3, self._token & 0xFFFFFFFF, stm32_crc.crc32(self._buffer))
self._pebble._send_message("PUTBYTES", data)
def handle_commit(self, resp):
res, = unpack("!b", resp[0])
if res != 1:
self.abort()
return
self._state = self.states["COMPLETE"]
self.complete()
def complete(self):
data = pack("!bI", 5, self._token & 0xFFFFFFFF)
self._pebble._send_message("PUTBYTES", data)
def handle_complete(self, resp):
res, = unpack("!b", resp[0])
if res != 1:
self.abort()
return
self._done = True
def abort(self):
msgdata = pack("!bI", 4, self._token & 0xFFFFFFFF)
self._pebble.send_message("PUTBYTES", msgdata)
self._error = True
def send(self):
datalen = min(self._left, 2000)
rg = len(self._buffer)-self._left
msgdata = pack("!bII", 2, self._token & 0xFFFFFFFF, datalen)
msgdata += self._buffer[rg:rg+datalen]
self._pebble._send_message("PUTBYTES", msgdata)
self._left -= datalen
def handle_message(self, endpoint, resp):
if self._state == self.states["WAIT_FOR_TOKEN"]:
self.wait_for_token(resp)
elif self._state == self.states["IN_PROGRESS"]:
self.in_progress(resp)
elif self._state == self.states["COMMIT"]:
self.handle_commit(resp)
elif self._state == self.states["COMPLETE"]:
self.handle_complete(resp)
#!/usr/bin/env python
import argparse
import pebble as libpebble
import code
import readline
import rlcompleter
def start_repl(pebble_id):
pebble = libpebble.Pebble(pebble_id)
readline.set_completer(rlcompleter.Completer(locals()).complete)
readline.parse_and_bind('tab:complete')
code.interact(local=locals())
parser = argparse.ArgumentParser(description='An interactive environment for libpebble.')
parser.add_argument('pebble_id', metavar='PEBBLE_ID', type=str, help='the last 4 digits of the target Pebble\'s MAC address')
args = parser.parse_args()
start_repl(args.pebble_id)
import array
import sys
CRC_POLY = 0x04C11DB7
def process_word(data, crc=0xffffffff):
if (len(data) < 4):
d_array = array.array('B', data)
for x in range(0, 4 - len(data)):
d_array.insert(0,0)
d_array.reverse()
data = d_array.tostring()
d = array.array('I', data)[0]
crc = crc ^ d
for i in xrange(0, 32):
if (crc & 0x80000000) != 0:
crc = (crc << 1) ^ CRC_POLY
else:
crc = (crc << 1)
result = crc & 0xffffffff
return result
def process_buffer(buf, c = 0xffffffff):
word_count = len(buf) / 4
if (len(buf) % 4 != 0):
word_count += 1
crc = c
for i in xrange(0, word_count):
crc = process_word(buf[i * 4 : (i + 1) * 4], crc)
return crc
def crc32(data):
return process_buffer(data)
@Braingapps
Copy link

Hi,
A question: does your app permits to control volume too?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment