Last active
March 20, 2022 00:32
-
-
Save ktnr74/6755712 to your computer and use it in GitHub Desktop.
Simple ADB client in python with some linux only specifics
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
from __future__ import print_function | |
import os | |
import sys | |
import errno | |
import socket | |
import struct | |
import time | |
class ADBC(object): | |
class Socket(socket.socket): | |
def _send(self, buff): | |
try: | |
self.send('{:04X}{}'.format(len(buff), buff)) | |
except: | |
pass | |
def _recv(self, size): | |
buff = bytearray() | |
if size > 0: | |
try: | |
self.settimeout(1) | |
buff = self.recv(size) | |
except: | |
return bytearray() | |
return bytearray(buff) | |
def _recvint(self): | |
try: | |
res = int(str(self._recv(4)), 16) | |
except: | |
return -1 | |
return res | |
def _recvall(self, timeout=None, size=4096): | |
if timeout is not None: | |
self.settimeout(timeout) | |
buff = bytearray() | |
while True: | |
try: | |
chunk = self.recv(size) | |
except socket.timeout as e: | |
break | |
except socket.error as e: | |
if e.args[0] == errno.EINTR: | |
continue | |
break | |
else: | |
if len(chunk) > 0: | |
buff.extend(chunk) | |
else: | |
break | |
return buff | |
def _isok(self): | |
return self._recv(4) == bytearray('OKAY') | |
class Sock(object): | |
def __init__(self, address): | |
self.address = address | |
self.socket = ADBC.Socket(socket.AF_INET, socket.SOCK_STREAM) | |
def __enter__(self): | |
self.socket.settimeout(1) | |
self.socket.connect_ex(self.address) | |
return self.socket | |
def __exit__(self, *args): | |
try: | |
if self.socket is not None: | |
self.socket.close() | |
self.socket = None | |
except: | |
self.socket = None | |
def __init__(self, serial=None, host='127.0.0.1', port=5037, timeout=10, version=32): | |
self.version = version | |
self.serial = serial | |
self.timeout = timeout | |
self.address = (host, port) | |
def __nonzero__(self): | |
try: | |
return int(str(self.__call__('host:version')), 16) >= self.version | |
except: | |
return False | |
def __getitem__(self, serial): | |
dev = self.getdevices().get(serial, {}) | |
if dev.get('sysfs', '') != '': | |
dev['ctime'] = os.path.getctime(dev['sysfs']) | |
dev['mtime'] = os.path.getmtime(dev['sysfs']) | |
dev['enumerated'] = time.time() - dev['ctime'] | |
return dev | |
def __call__(self, cmd, serial=None, timeout=None): | |
if timeout is None: | |
timeout = self.timeout | |
if serial is None: | |
serial = self.serial | |
res = bytearray() | |
with ADBC.Sock(self.address) as sock: | |
if not cmd.startswith('host'): | |
sock._send('host:transport:{}'.format(serial)) | |
if not sock._isok(): | |
return bytearray() | |
sock._send(cmd) | |
if not sock._isok(): | |
return bytearray() | |
if cmd.startswith('host'): | |
res = sock._recv(sock._recvint()) | |
else: | |
res = sock._recvall(timeout) | |
return bytearray(res) | |
def shell(self, cmd, serial=None, timeout=None, fixlinesep=True, ascii=False): | |
res = self.__call__(cmd='shell:{}'.format(cmd), serial=serial, timeout=timeout).decode('utf-8') | |
if fixlinesep: | |
res = res.replace('\r\n', os.linesep) | |
if ascii: | |
res = res.encode('ascii', 'backslashreplace') | |
return res | |
def getstate(self, serial): | |
return self.__call__('host-serial:{}:get-state'.format(serial)) | |
def getdevices(self): | |
devices = {} | |
for d in str(self.__call__('host:devices-l')).splitlines(): | |
info = d.split(None, 2) | |
dev = {'state': info[1]} | |
dev.update(dict([(p.split(':', 1)) for p in info[2].split()])) | |
dev['sysfs'] = os.path.join('/sys/bus/usb/devices', dev['usb']) | |
devices[info[0]] = dev | |
return devices | |
def getframebuffer(self, serial=None): | |
if serial is None: | |
serial = self.serial | |
buff = self.__call__('framebuffer:', serial=serial, timeout=2) | |
if len(buff) < 52: | |
return bytearray() | |
ver, bpp, size = struct.unpack('<LLL', str(buff)[0:12]) | |
if size != len(buff) - 52: | |
return bytearray() | |
return buff | |
if __name__ == '__main__': | |
adbc = ADBC() | |
if adbc: | |
devices = adbc.getdevices() | |
for serial in devices.keys(): | |
print(serial, devices[serial], adbc.shell('getprop ro.build.fingerprint', serial=serial).rstrip()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi,
Great work!
I have a few questions ,expect your answers for the same.
Is is not possible to run commands like adb root from this script ?
Also, the outputs when the adb commands are actually executed in shell vs commands executed via sockets varies ,is there a way to have consistent outputs with either approach ?
Sanjay