Skip to content

Instantly share code, notes, and snippets.

@sporsh
Created November 22, 2012 00:17
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sporsh/4128667 to your computer and use it in GitHub Desktop.
Save sporsh/4128667 to your computer and use it in GitHub Desktop.
Implementation of Android Debug Bridge (ADB) protocol in Twisted
from adb import protocol
data = (
'\x4f\x50\x45\x4e\x02\x00\x00\x00'
'\x00\x00\x00\x00\x09\x00\x00\x00'
'\x31\x03\x00\x00\xb0\xaf\xba\xb1'
'\x73\x68\x65\x6c\x6c\x3a\x6c\x73'
'\x00'
)
data = (
'\x4f\x4b\x41\x59\x02\x00\x00\x00'
'\x09\x00\x00\x00\x00\x00\x00\x00'
'\x00\x00\x00\x00\xb0\xb4\xbe\xa6'
)
message, _ = protocol.AdbMessage.decode(data)
print "HEADER:", message.header
print "MESSAGE:", message
"""
@author: Geir Sporsheim
@see: git repo https://android.googlesource.com/platform/system/core/
@see: source file adb/adb.h
"""
from twisted.internet import protocol
import struct
MAX_PAYLOAD = 4096
A_SYNC = 0x434e5953
A_CNXN = 0x4e584e43
A_OPEN = 0x4e45504f
A_OKAY = 0x59414b4f
A_CLSE = 0x45534c43
A_WRTE = 0x45545257
A_VERSION = 0x01000000 # ADB protocol version
def getCommandString(message):
"""Returns a readable string representation of a message code
"""
return struct.pack('L', message)
class AdbMessageHeader(tuple):
_fmt = '6L'
def __new__(cls, command, arg0, arg1, data_length, data_check, magic):
"""
@param command: command identifier constant
@param arg0: first argument
@param arg1: second argument
@param length: length of payload (0 is allowed)
@param data_check: checksum of data payload
@param magic: command ^ 0xffffffff
"""
return tuple.__new__(cls, (command,
arg0,
arg1,
data_length,
data_check,
magic))
command = property(lambda self: self[0])
arg0 = property(lambda self: self[1])
arg1 = property(lambda self: self[2])
data_length = property(lambda self: self[3])
data_check = property(lambda self: self[4])
magic = property(lambda self: self[5])
def encode(self):
return struct.pack(self._fmt,
self.command,
self.arg0,
self.arg1,
self.data_length,
self.data_check,
self.magic)
@classmethod
def decode(cls, data):
length = struct.calcsize(cls._fmt)
if len(data) < length:
return
args = struct.unpack(cls._fmt, data[:length])
return cls(*args), data[length:]
def __str__(self, *args, **kwargs):
return str((getCommandString(self.command),
self.arg0, self.arg1, self.data_length,
self.data_check, self.magic))
class AdbMessage(object):
def __init__(self, command, arg0, arg1, data=''):
self.command = command
self.arg0 = arg0
self.arg1 = arg1
self.data = data
@property
def header(self):
data_check = sum(ord(c) for c in self.data)
magic = self.command ^ 0xffffffff
return AdbMessageHeader(self.command,
self.arg0,
self.arg1,
len(self.data),
data_check,
magic)
@classmethod
def decode(cls, data):
header, data = AdbMessageHeader.decode(data)
if len(data) < header.data_length:
return
message = cls(header.command, header.arg0, header.arg1, data)
message.validate(header)
return message, data[header.data_length:]
def encode(self):
return self.header.encode() + self.data
def validate(self, header):
assert self.header == header
def __eq__(self, other):
return self.header == other.header and self.data == other.data
def __repr__(self):
return "%s(%r)" % (self.header, self.data)
class AdbProtocolBase(protocol.Protocol):
deferred = None
buff = ''
def dataReceived(self, data):
self.buff += data
message = self.getMessage()
while message:
self.dispatchMessage(message)
message = self.getMessage()
def getMessage(self):
try:
message, self.buff = AdbMessage.decode(self.buff)
except:
#TODO: correctly handle corrupt messages
return
return message
def dispatchMessage(self, message):
name = 'adb_' + getCommandString(message.command)
handler = getattr(self, name, self.unhandeledMessage)
handler(message)
def unhandeledMessage(self, message):
print "Unhandeled message:", message
def sendConnect(self):
data = 'host::'
self.sendCommand(A_CNXN, A_VERSION, MAX_PAYLOAD, data)
def sendCommand(self, command, arg0, arg1, data):
message = AdbMessage(command, arg0, arg1, data + '\x00')
self.sendMessage(message)
def sendMessage(self, message):
"""Send a complete ADB message to the peer
"""
#TODO: split message into chunks of MAX_PAYLOAD
self.transport.write(message.encode())
connectMessage = AdbMessage(A_CNXN, A_VERSION, MAX_PAYLOAD, 'host::\x00')
import unittest
from adb import protocol
class AdbProtocolTest(unittest.TestCase):
def setUp(self):
self.protocol = protocol.AdbProtocolBase()
def test_get_message(self):
messages = []
self.protocol.adb_OKAY = messages.append
data = "hello adb\x00"
message = protocol.AdbMessage(protocol.A_OKAY, 0, 1, data)
# Encode the message and send it in two pieces
encoded_message = message.encode()
self.protocol.dataReceived(encoded_message[:10])
self.protocol.dataReceived(encoded_message[10:])
self.assertEqual(messages, [message])
def test_encode_decode_message(self):
# This is the connect message grabbed from adb server
data = ('\x43\x4e\x58\x4e\x00\x00\x00\x01'
'\x00\x10\x00\x00\x07\x00\x00\x00'
'\x32\x02\x00\x00\xbc\xb1\xa7\xb1'
'\x68\x6f\x73\x74\x3a\x3a\x00')
message = protocol.AdbMessage(protocol.A_CNXN,
protocol.A_VERSION,
protocol.MAX_PAYLOAD,
'host::\x00')
self.assertEquals(message.encode(), data,
"Message did encode to the expected data")
decoded_message, _ = protocol.AdbMessage.decode(data)
self.assertEquals(decoded_message, message,
"Data did not decode to the expected message")
"""System test
"""
from twisted.internet import reactor
from twisted.internet.endpoints import clientFromString
from twisted.internet.protocol import ClientFactory
from adb import protocol
def main():
endpoint = clientFromString(reactor, "tcp:localhost:5555:timeout=5")
# endpoint = clientFromString(reactor, "unix:/dev/bus/usb/001/012")
factory = ClientFactory()
factory.protocol = protocol.AdbProtocolBase
data = []
def adb_CNXN(client, message):
print "GOT MESSAGE", message
client.sendCommand(protocol.A_OPEN, 2, message.arg0, 'shell:ls\x00')
def adb_WRTE(client, message):
print "GOT MESSAGE", message
data.append(message.data)
client.sendCommand(protocol.A_OKAY, 2, message.arg0, '')
def adb_CLSE(client, message):
print "GOT MESSAGE", message
client.sendCommand(protocol.A_CLSE, 2, message.arg0, '')
reactor.stop()
factory.protocol.adb_CNXN = adb_CNXN
factory.protocol.adb_WRTE = adb_WRTE
factory.protocol.adb_CLSE = adb_CLSE
client_d = endpoint.connect(factory)
@client_d.addCallback
def send_connect(client):
print "TCP CONNECTED", client
client.sendCommand(protocol.A_CNXN,
protocol.A_VERSION,
protocol.MAX_PAYLOAD,
'host::\x00')
@client_d.addErrback
def connection_failed(reason):
print reason.getErrorMessage()
reactor.stop()
reactor.run()
print "DONE", ''.join(data).splitlines()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment