Created
September 7, 2016 14:09
-
-
Save lhchavez/171098f17c56b148a9e6e010d03ec704 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
from Crypto.Signature import PKCS1_v1_5 | |
from Crypto.PublicKey import RSA | |
import argparse | |
import socket | |
import struct | |
import sys | |
ADB_AUTH_TOKEN = 1 | |
ADB_AUTH_SIGNATURE = 2 | |
ADB_AUTH_RSAPUBLICKEY = 3 | |
class NoOpHash: | |
def __init__(self, data): | |
self.data = data | |
@property | |
def oid(self): | |
return b'\x06\x05+\x0e\x03\x02\x1a' | |
def digest(self): | |
return self.data | |
class Adb: | |
def __init__(self, host, port, keyfile): | |
with open(keyfile, 'rb') as f: | |
self.key = RSA.importKey(f.read()) | |
self.signer = PKCS1_v1_5.new(self.key) | |
with open(keyfile + '.pub', 'rb') as f: | |
self.pubkey = f.read() | |
self.host = host | |
self.port = port | |
self.local_id = 1 | |
def __enter__(self): | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self.sock.connect((self.host, self.port)) | |
self._send_message(b'CNXN', 0x1000000, 0x40000, b'host::features=cmd,shell_v2') | |
command, arg0, arg1, data = self._recv_message() | |
assert command == b'AUTH' | |
assert arg0 == ADB_AUTH_TOKEN | |
assert arg1 == 0 | |
assert len(data) == 20 | |
self._send_message(b'AUTH', ADB_AUTH_SIGNATURE, 0, self.signer.sign(NoOpHash(data))) | |
command, arg0, arg1, data = self._recv_message() | |
if command == b'AUTH': | |
self._send_message(b'AUTH', ADB_AUTH_RSAPUBLICKEY, 0, self.pubkey + b'\x00') | |
command, arg0, arg1, data = self._recv_message() | |
assert command == b'CNXN' | |
assert arg0 == 0x1000000 | |
assert arg1 == 0x40000 | |
return self | |
def __exit__(self, *args): | |
self.sock.close() | |
pass | |
def _hexdump_line(self, line): | |
def convert(s): | |
for c in s: | |
if 0x20 <= c <= 0x7E: | |
yield c | |
else: | |
yield 0x2E | |
return '%-23s %-23s %s' % (' '.join('%02x' % b for b in line[:8]), ' '.join('%02x' % b for b in line[8:16]), str(bytes(convert(line)), encoding='utf-8')) | |
def _hexdump(self, message, header): | |
for i in range(0, len(message), 16): | |
print('%s %s' % (header, self._hexdump_line(message[i:i+16]))) | |
def _send_message(self, command, arg0, arg1, data): | |
message = struct.pack('<4sIIII4s', command, arg0, arg1, len(data), sum(data), bytes(0xff ^ x for x in command)) + data | |
self._hexdump(message, '>') | |
totalsent = 0 | |
while totalsent < len(message): | |
sent = self.sock.send(message[totalsent:]) | |
if sent == 0: | |
raise RuntimeError('socket connection broken') | |
totalsent += sent | |
def _recv_message(self): | |
header = self.sock.recv(24) | |
if len(header) < 24: | |
raise RuntimeError('socket connection broken') | |
command, arg0, arg1, data_len, data_sum, magic = struct.unpack('<4sIIII4s', header) | |
data = self.sock.recv(data_len) | |
self._hexdump(header + data, '<') | |
return (command, arg0, arg1, data) | |
def shell(self, args): | |
buf = [] | |
local_id = self.local_id | |
self.local_id += 1 | |
self._send_message(b'OPEN', local_id, 0, b'shell,raw:%s\x00' % bytes(args, encoding='utf-8')) | |
command, arg0, arg1, data = self._recv_message() | |
assert command == b'OKAY', command | |
assert arg1 == local_id | |
assert data == b'' | |
remote_id = arg0 | |
""" | |
self._send_message(b'WRTE', local_id, remote_id, b'\x05\x0b\x00\x00\x0070x272,0x0\x00') | |
command, arg0, arg1, data = self._recv_message() | |
assert command == b'OKAY' | |
assert arg0 == remote_id | |
assert arg1 == local_id | |
assert data == b'' | |
""" | |
while True: | |
command, arg0, arg1, data = self._recv_message() | |
if command == b'CLSE': | |
break | |
assert command == b'WRTE' | |
assert arg0 == remote_id | |
assert arg1 == local_id | |
buf.append(str(data, encoding='utf-8')) | |
self._send_message(b'OKAY', local_id, remote_id, b'') | |
""" | |
self._send_message(b'CLSE', local_id, remote_id, b'') | |
command, arg0, arg1, data = self._recv_message() | |
assert command == b'CLSE' | |
assert arg0 == remote_id | |
assert arg1 == local_id | |
return ''.join(buf) | |
""" | |
def main(): | |
with Adb('192.168.1.231', 5555, '/home/lhchavez/.android/adbkey') as adb: | |
print(adb.shell('ls')) | |
print(adb.shell('getprop')) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment