Skip to content

Instantly share code, notes, and snippets.

@qzed
Last active April 23, 2021 16:10
Show Gist options
  • Save qzed/01a93568efb863f1b7887f0f375c03fc to your computer and use it in GitHub Desktop.
Save qzed/01a93568efb863f1b7887f0f375c03fc to your computer and use it in GitHub Desktop.
Surface Book 2 / Surface Pro (2017) / Surface Laptop UART protocol proof-of-concept script.
#!/usr/bin/env python3
import json
import crcmod
from argparse import ArgumentParser
from pathlib import Path
import serial
from serial import Serial
DEFAULT_DEVICE = '/dev/ttyS4'
DEFAULT_BAUD_RATE = 3000000
CRC_FN = crcmod.predefined.mkCrcFun('crc-ccitt-false')
def setup_device(port, baudrate):
# definition from DSDT
return Serial(
port=port,
baudrate=baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
rtscts=False,
dsrdtr=False,
timeout=0,
)
def crc(pld):
x = CRC_FN(bytes(pld))
return [x & 0xff, (x >> 0x08) & 0xff]
def to_int(bytes):
return int.from_bytes(bytes, byteorder='little')
class Counters:
PATH = Path(__file__).parent / '.counters.json'
@staticmethod
def load():
if Counters.PATH.is_file():
with open(Counters.PATH) as fd:
data = json.load(fd)
seq = data['seq']
cnt = data['cnt']
else:
seq = 0x00
cnt = 0x0000
return Counters(seq, cnt)
def __init__(self, seq, cnt):
self.seq = seq
self.cnt = cnt
def store(self):
with open(Counters.PATH, 'w') as fd:
data = {'seq': self.seq, 'cnt': self.cnt}
json.dump(data, fd)
def inc_seq(self):
self.seq = (self.seq + 1) & 0xFF
def inc_cnt(self):
self.cnt = (self.cnt + 1) & 0xFFFF
def inc(self):
self.inc_seq()
self.inc_cnt()
class Command:
def __init__(self, rtc, riid, rcid, rsnc=0x01, quiet=False):
self.rtc = rtc
self.riid = riid
self.rcid = rcid
self.rsnc = rsnc
self.quiet = quiet
def _write_msg(self, dev, seq, cnt):
cnt_lo = cnt & 0xff
cnt_hi = (cnt >> 0x08) & 0xff
hdr = [0x80, 0x08, 0x00, seq]
pld = [0x80, self.rtc, 0x01, 0x00, self.riid, cnt_lo, cnt_hi, self.rcid]
msg = [0xaa, 0x55] + hdr + crc(hdr) + pld + crc(pld)
return dev.write(bytes(msg))
def _write_ack(self, dev, seq):
hdr = [0x40, 0x00, 0x00, seq]
msg = [0xaa, 0x55] + hdr + crc(hdr) + [0xff, 0xff]
return dev.write(bytes(msg))
def _read_ack(self, dev, exp_seq):
msg = bytes()
while len(msg) < 0x0A:
msg += dev.read(0x0A - len(msg))
if not self.quiet:
print("received: {}".format(msg.hex()))
assert msg[0:2] == bytes([0xaa, 0x55])
assert msg[3:5] == bytes([0x00, 0x00])
assert msg[6:8] == bytes(crc(msg[2:-4]))
assert msg[8:] == bytes([0xff, 0xff])
mty = msg[2]
seq = msg[5]
if mty == 0x40:
assert seq == exp_seq
return mty == 0x04
def _read_msg(self, dev, cnt):
cnt_lo = cnt & 0xff
cnt_hi = (cnt >> 0x08) & 0xff
buf = bytes()
rem = 0x08 # begin with header length
while len(buf) < rem:
buf += dev.read(0x0400)
# if we got a header, validate it
if rem == 0x08 and len(buf) >= 0x08:
hdr = buf[0:8]
assert hdr[0:3] == bytes([0xaa, 0x55, 0x80])
assert hdr[-2:] == bytes(crc(hdr[2:-2]))
rem += hdr[3] + 10 # len(payload) + frame + crc
hdr = buf[0:8]
msg = buf[8:hdr[3]+10]
rem = buf[hdr[3]+10:]
if not self.quiet:
print("received: {}".format(hdr.hex()))
print("received: {}".format(msg.hex()))
assert msg[0:8] == bytes([0x80, self.rtc, 0x00, 0x01, self.riid, cnt_lo, cnt_hi, self.rcid])
assert msg[-2:] == bytes(crc(msg[:-2]))
seq = hdr[5]
pld = msg[8:-2]
return seq, pld, rem
def _read_clean(self, dev, buf=bytes()):
buf += dev.read(0x0400) # make sure we're not missing some bytes
while buf:
# get header / detect message type
if len(buf) >= 0x08:
if buf[0:3] == bytes([0xaa, 0x55, 0x40]): # ACK
while len(buf) < 0x0A:
buf += dev.read(0x0400)
if not self.quiet:
print("ignored ACK: {}".format(buf[:0x0a].hex()))
buf = bytes(buf[0x0a:])
elif buf[0:3] == bytes([0xaa, 0x55, 0x80]): # response
buflen = 0x0a + buf[3]
while len(buf) < buflen:
buf += dev.read(0x0400)
if not self.quiet:
print("ignored MSG: {}".format(buf[:buflen].hex()))
buf = bytes(buf[buflen:])
elif buf[0:3] == bytes([0x4e, 0x00, 0x53]): # control message?
while len(buf) < 0x19:
buf += dev.read(0x0400)
if not self.quiet:
print("ignored CTRL: {}".format(buf[:0x19].hex()))
buf = bytes(buf[0x19:])
else: # unknown
if not self.quiet:
print("ignored unknown: {}".format(buf.hex()))
assert False
buf += dev.read(0x0400)
def run(self, dev, cnt):
self._read_clean(dev)
self._write_msg(dev, cnt.seq, cnt.cnt)
retry = self._read_ack(dev, cnt.seq)
# retry one time on com failure
if retry:
self._write_msg(dev, cnt.seq, cnt.cnt)
retry = self._read_ack(dev, cnt.seq)
if retry:
if not self.quiet:
print('Communication failure: invalid ACK, try again')
return
try:
if self.rsnc:
seq, pld, rem = self._read_msg(dev, cnt.cnt)
self._write_ack(dev, seq)
else:
seq, pld, rem = 0, bytes(), bytes()
self._read_clean(dev, rem)
finally:
cnt.inc()
return self._handle_payload(pld)
def _handle_payload(self, pld):
return None
class Gbos(Command):
def __init__(self, **kwargs):
super().__init__(0x11, 0x00, 0x0d, **kwargs)
def _handle_payload(self, pld):
return {
'Base Status': hex(pld[0]),
}
class Psr(Command):
def __init__(self, bat, **kwargs):
super().__init__(0x02, bat, 0x0d, **kwargs)
def _handle_payload(self, pld):
return {
'Power Source': hex(to_int(pld[0:4])),
}
class Sta(Command):
def __init__(self, bat, **kwargs):
super().__init__(0x02, bat, 0x01, **kwargs)
def _handle_payload(self, pld):
return {
'Battery Status': hex(to_int(pld[0:4])),
}
class Bst(Command):
def __init__(self, bat, **kwargs):
super().__init__(0x02, bat, 0x03, **kwargs)
def _handle_payload(self, pld):
return {
'State': hex(to_int(pld[0:4])),
'Present Rate': hex(to_int(pld[4:8])),
'Remaining Capacity': hex(to_int(pld[8:12])),
'Present Voltage': hex(to_int(pld[12:16])),
}
class Bix(Command):
def __init__(self, bat, **kwargs):
super().__init__(0x02, bat, 0x02, **kwargs)
def _handle_payload(self, pld):
return {
'Revision': hex(pld[0]),
'Power Unit': hex(to_int(pld[1:5])),
'Design Capacity': hex(to_int(pld[5:9])),
'Last Full Charge Capacity': hex(to_int(pld[9:13])),
'Technology': hex(to_int(pld[13:17])),
'Design Voltage': hex(to_int(pld[17:21])),
'Design Capacity of Warning': hex(to_int(pld[21:25])),
'Design Capacity of Low': hex(to_int(pld[25:29])),
'Cycle Count': hex(to_int(pld[29:33])),
'Measurement Accuracy': hex(to_int(pld[33:37])),
'Max Sampling Time': hex(to_int(pld[37:41])),
'Min Sampling Time': hex(to_int(pld[41:45])),
'Max Averaging Interval': hex(to_int(pld[45:49])),
'Min Averaging Interval': hex(to_int(pld[49:53])),
'Capacity Granularity 1': hex(to_int(pld[53:57])),
'Capacity Granularity 2': hex(to_int(pld[57:61])),
'Model Number': pld[61:82].decode().rstrip('\0'),
'Serial Number': pld[82:93].decode().rstrip('\0'),
'Type': pld[93:98].decode().rstrip('\0'),
'OEM Information': pld[98:119].decode().rstrip('\0'),
}
class PrettyBat:
def __init__(self, bat, **kwargs):
self.bix = Bix(bat, **kwargs)
self.bst = Bst(bat, **kwargs)
def run(self, dev, cnt):
bix = self.bix.run(dev, cnt)
bst = self.bst.run(dev, cnt)
state = int(bst['State'], 0)
vol = int(bst['Present Voltage'], 0)
rem_cap = int(bst['Remaining Capacity'], 0)
full_cap = int(bix['Last Full Charge Capacity'], 0)
rate = int(bst['Present Rate'], 0)
bat_states = {
0: 'None',
1: 'Discharging',
2: 'Charging',
4: 'Critical',
5: 'Critical (Discharging)',
6: 'Critical (Charging)',
}
bat_state = bat_states[state]
bat_vol = vol / 1000
if full_cap <= 0:
bat_rem_perc = '<unavailable>'
else:
bat_rem_perc = "{}%".format(int(rem_cap / full_cap * 100))
if state == 0x00 or rate == 0:
bat_rem_life = '<unavailable>'
else:
bat_rem_life = "{:.2f}h".format(rem_cap / rate)
return {
'State': bat_state,
'Voltage': "{}V".format(bat_vol),
'Percentage': bat_rem_perc,
'Remaing ': bat_rem_life,
}
class BaseLock(Command):
def __init__(self, lock, **kwargs):
super().__init__(0x11, 0x00, 0x06 if lock else 0x07, 0x00, **kwargs)
COMMANDS = {
'lid0.gbos': (Gbos, ()),
'adp1._psr': (Psr, (0x01,)),
'bat1._sta': (Sta, (0x01,)),
'bat1._bst': (Bst, (0x01,)),
'bat1._bix': (Bix, (0x01,)),
'bat2._sta': (Sta, (0x02,)),
'bat2._bst': (Bst, (0x02,)),
'bat2._bix': (Bix, (0x02,)),
'bat1.pretty': (PrettyBat, (0x01,)),
'bat2.pretty': (PrettyBat, (0x02,)),
'base.lock': (BaseLock, (True,)),
'base.unlock': (BaseLock, (False,)),
}
def main():
cli = ArgumentParser(description='Surface Book 2 / Surface Pro (2017) embedded controller requests.')
cli.add_argument('-d', '--device', default=DEFAULT_DEVICE, metavar='DEV', help='the UART device')
cli.add_argument('-b', '--baud', default=DEFAULT_BAUD_RATE, type=lambda x: int(x, 0), metavar='BAUD', help='the baud rate')
cli.add_argument('-c', '--cnt', type=lambda x: int(x, 0), help='overwrite CNT')
cli.add_argument('-s', '--seq', type=lambda x: int(x, 0), help='overwrite SEQ')
cli.add_argument('-q', '--quiet', action='store_true', help='do not print debug messages, just results')
commands = cli.add_subparsers()
for cmd in COMMANDS.keys():
parser = commands.add_parser(cmd, help="run request '{}'".format(cmd.upper()))
parser.set_defaults(command=cmd)
args = cli.parse_args()
dev = setup_device(args.device, args.baud)
cmd = COMMANDS.get(args.command)
cmd = cmd[0](*cmd[1], quiet=args.quiet)
cnt = Counters.load()
if args.seq is not None:
cnt.seq = args.seq
if args.cnt is not None:
cnt.cnt = args.cnt
try:
res = cmd.run(dev, cnt)
if args.quiet:
for k, v in sorted(res.items()):
print("{}: {}".format(k, v))
else:
import pprint
print()
pprint.pprint(res)
finally:
cnt.store()
if __name__ == '__main__':
main()
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
crcmod = "*"
pyserial = "*"
[dev-packages]
pylint = "*"
@qzed
Copy link
Author

qzed commented Oct 9, 2018

@JackMorganNZ Oops, thanks for catching that!

@sphh
Copy link

sphh commented Oct 12, 2018

Thanks @qzed for that great script!!

Since I want to use it in one of my own scripts, I took the liberty to add the command line switch -q|--quiet to remove the debugging output. I also changed the output in quiet mode to make it easier to parse (by not using pprint, thus removing the curly brackets). If you want to include it into your script, please feel free to do:

--- /home/sph/Software/sb2-batteryORIG.py
+++ /home/sph/Software/sb2-batteryNEW.py
@@ -1,7 +1,6 @@
 #!/usr/bin/env python3
 
 import json
-import pprint
 import crcmod
 
 from argparse import ArgumentParser
@@ -77,11 +76,12 @@
 
 
 class Command:
-    def __init__(self, rtc, riid, rcid, rsnc=0x01):
+    def __init__(self, rtc, riid, rcid, rsnc=0x01, quiet=False):
         self.rtc = rtc
         self.riid = riid
         self.rcid = rcid
         self.rsnc = rsnc
+        self.quiet = quiet
 
     def _write_msg(self, dev, seq, cnt):
         cnt_lo = cnt & 0xff
@@ -104,7 +104,8 @@
         while len(msg) < 0x0A:
             msg += dev.read(0x0A - len(msg))
 
-        print("received: {}".format(msg.hex()))
+        if not self.quiet:
+            print("received: {}".format(msg.hex()))
 
         assert msg[0:2] == bytes([0xaa, 0x55])
         assert msg[3:5] == bytes([0x00, 0x00])
@@ -141,8 +142,9 @@
         msg = buf[8:hdr[3]+10]
         rem = buf[hdr[3]+10:]
 
-        print("received: {}".format(hdr.hex()))
-        print("received: {}".format(msg.hex()))
+        if not self.quiet:
+            print("received: {}".format(hdr.hex()))
+            print("received: {}".format(msg.hex()))
 
         assert msg[0:8] == bytes([0x80, self.rtc, 0x00, 0x01, self.riid, cnt_lo, cnt_hi, self.rcid])
         assert msg[-2:] == bytes(crc(msg[:-2]))
@@ -162,7 +164,8 @@
                     while len(buf) < 0x0A:
                         buf += dev.read(0x0400)
 
-                    print("ignored ACK: {}".format(buf[:0x0a].hex()))
+                    if not self.quiet:
+                        print("ignored ACK: {}".format(buf[:0x0a].hex()))
                     buf = bytes(buf[0x0a:])
 
                 elif buf[0:3] == bytes([0xaa, 0x55, 0x80]):             # response
@@ -170,18 +173,21 @@
                     while len(buf) < buflen:
                         buf += dev.read(0x0400)
 
-                    print("ignored MSG: {}".format(buf[:buflen].hex()))
+                    if not self.quiet:
+                        print("ignored MSG: {}".format(buf[:buflen].hex()))
                     buf = bytes(buf[buflen:])
 
                 elif buf[0:3] == bytes([0x4e, 0x00, 0x53]):             # control message?
                     while len(buf) < 0x19:
                         buf += dev.read(0x0400)
 
-                    print("ignored CTRL: {}".format(buf[:0x19].hex()))
+                    if not self.quiet:
+                        print("ignored CTRL: {}".format(buf[:0x19].hex()))
                     buf = bytes(buf[0x19:])
 
                 else:                                                   # unknown
-                    print("ignored unknown: {}".format(buf.hex()))
+                    if not self.quiet:
+                        print("ignored unknown: {}".format(buf.hex()))
                     assert False
 
             buf += dev.read(0x0400)
@@ -197,7 +203,8 @@
             retry = self._read_ack(dev, cnt.seq)
 
             if retry:
-                print('Communication failure: invalid ACK, try again')
+                if not self.quiet:
+                    print('Communication failure: invalid ACK, try again')
                 return
 
         try:
@@ -218,8 +225,8 @@
 
 
 class Gbos(Command):
-    def __init__(self):
-        super().__init__(0x11, 0x00, 0x0d)
+    def __init__(self, **kwargs):
+        super().__init__(0x11, 0x00, 0x0d, **kwargs)
 
     def _handle_payload(self, pld):
         return {
@@ -228,8 +235,8 @@
 
 
 class Psr(Command):
-    def __init__(self, bat):
-        super().__init__(0x02, bat, 0x0d)
+    def __init__(self, bat, **kwargs):
+        super().__init__(0x02, bat, 0x0d, **kwargs)
 
     def _handle_payload(self, pld):
         return {
@@ -238,8 +245,8 @@
 
 
 class Sta(Command):
-    def __init__(self, bat):
-        super().__init__(0x02, bat, 0x01)
+    def __init__(self, bat, **kwargs):
+        super().__init__(0x02, bat, 0x01, **kwargs)
 
     def _handle_payload(self, pld):
         return {
@@ -248,8 +255,8 @@
 
 
 class Bst(Command):
-    def __init__(self, bat):
-        super().__init__(0x02, bat, 0x03)
+    def __init__(self, bat, **kwargs):
+        super().__init__(0x02, bat, 0x03, **kwargs)
 
     def _handle_payload(self, pld):
         return {
@@ -261,8 +268,8 @@
 
 
 class Bix(Command):
-    def __init__(self, bat):
-        super().__init__(0x02, bat, 0x02)
+    def __init__(self, bat, **kwargs):
+        super().__init__(0x02, bat, 0x02, **kwargs)
 
     def _handle_payload(self, pld):
         return {
@@ -290,9 +297,9 @@
 
 
 class PrettyBat:
-    def __init__(self, bat):
-        self.bix = Bix(bat)
-        self.bst = Bst(bat)
+    def __init__(self, bat, **kwargs):
+        self.bix = Bix(bat, **kwargs)
+        self.bst = Bst(bat, **kwargs)
 
     def run(self, dev, cnt):
         bix = self.bix.run(dev, cnt)
@@ -335,25 +342,25 @@
 
 
 class BaseLock(Command):
-    def __init__(self, lock):
-        super().__init__(0x11, 0x00, 0x06 if lock else 0x07, 0x00)
+    def __init__(self, lock, **kwargs):
+        super().__init__(0x11, 0x00, 0x06 if lock else 0x07, 0x00, **kwargs)
 
 
 COMMANDS = {
-    'lid0.gbos': Gbos(),
-    'adp1._psr': Psr(0x01),
-    'bat1._sta': Sta(0x01),
-    'bat1._bst': Bst(0x01),
-    'bat1._bix': Bix(0x01),
-    'bat2._sta': Sta(0x02),
-    'bat2._bst': Bst(0x02),
-    'bat2._bix': Bix(0x02),
-
-    'bat1.pretty': PrettyBat(0x01),
-    'bat2.pretty': PrettyBat(0x02),
-
-    'base.lock': BaseLock(True),
-    'base.unlock': BaseLock(False),
+    'lid0.gbos': (Gbos, ()),
+    'adp1._psr': (Psr, (0x01,)),
+    'bat1._sta': (Sta, (0x01,)),
+    'bat1._bst': (Bst, (0x01,)),
+    'bat1._bix': (Bix, (0x01,)),
+    'bat2._sta': (Sta, (0x02,)),
+    'bat2._bst': (Bst, (0x02,)),
+    'bat2._bix': (Bix, (0x02,)),
+
+    'bat1.pretty': (PrettyBat, (0x01,)),
+    'bat2.pretty': (PrettyBat, (0x02,)),
+
+    'base.lock': (BaseLock, (True,)),
+    'base.unlock': (BaseLock, (False,)),
 }
 
 
@@ -363,6 +370,7 @@
     cli.add_argument('-b', '--baud', default=DEFAULT_BAUD_RATE, type=lambda x: int(x, 0), metavar='BAUD', help='the baud rate')
     cli.add_argument('-c', '--cnt', type=lambda x: int(x, 0), help='overwrite CNT')
     cli.add_argument('-s', '--seq', type=lambda x: int(x, 0), help='overwrite SEQ')
+    cli.add_argument('-q', '--quiet', action='store_true', help='do not print debug messages, just results')
     commands = cli.add_subparsers()
 
     for cmd in COMMANDS.keys():
@@ -373,6 +381,7 @@
 
     dev = setup_device(args.device, args.baud)
     cmd = COMMANDS.get(args.command)
+    cmd = cmd[0](*cmd[1], quiet=args.quiet)
 
     cnt = Counters.load()
     if args.seq is not None:
@@ -383,8 +392,14 @@
     try:
         res = cmd.run(dev, cnt)
 
-        print()
-        pprint.pprint(res)
+        if args.quiet:
+            for k, v in sorted(res.items()):
+                print("{}: {}".format(k, v))
+        else:
+            import pprint
+            print()
+            pprint.pprint(res)
+
     finally:
         cnt.store()
 

@qzed
Copy link
Author

qzed commented Oct 15, 2018

@sphh seems like a good idea, patch applied, thanks!

@calvous
Copy link

calvous commented Oct 17, 2018

@qzed
just a little side note:
I kind of think that the the script is overreading its boundaries for the fields:

            'Revision': hex(pld[0]),
            'Power Unit': hex(to_int(pld[1:5])),
            'Design Capacity': hex(to_int(pld[5:9])),
            'Last Full Charge Capacity': hex(to_int(pld[9:13])),
            'Technology': hex(to_int(pld[13:17])),
            'Design Voltage': hex(to_int(pld[17:21])),
            'Design Capacity of Warning': hex(to_int(pld[21:25])),
'Design Capacity of Low': hex(to_int(pld[25:29])),

You start from 0, then from1 to 5, then 5 to 9, 9 to 13, and so on. Aren't you overlapping bytes here or is that done on purpose?
I also see lots of progress. I will work it in to the fakedev stuff (later tomorrow).

@qzed
Copy link
Author

qzed commented Oct 17, 2018

@calvous The last index is exclusive (while the first one is inclusive), so it actually gets bytes 1 to (inclusively) 4 for eg. the "Power Unit", if that is what you mean.

@calvous
Copy link

calvous commented Oct 17, 2018

@qzed
yes, that's it. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment