Skip to content

Instantly share code, notes, and snippets.

@ktnr74
Last active March 20, 2022 00:32
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ktnr74/6755712 to your computer and use it in GitHub Desktop.
Save ktnr74/6755712 to your computer and use it in GitHub Desktop.
Simple ADB client in python with some linux only specifics
#!/usr/bin/python
from __future__ import print_function
import os
import sys
import errno
import socket
import struct
import time
class ADBC(object):
class Socket(socket.socket):
def _send(self, buff):
try:
self.send('{:04X}{}'.format(len(buff), buff))
except:
pass
def _recv(self, size):
buff = bytearray()
if size > 0:
try:
self.settimeout(1)
buff = self.recv(size)
except:
return bytearray()
return bytearray(buff)
def _recvint(self):
try:
res = int(str(self._recv(4)), 16)
except:
return -1
return res
def _recvall(self, timeout=None, size=4096):
if timeout is not None:
self.settimeout(timeout)
buff = bytearray()
while True:
try:
chunk = self.recv(size)
except socket.timeout as e:
break
except socket.error as e:
if e.args[0] == errno.EINTR:
continue
break
else:
if len(chunk) > 0:
buff.extend(chunk)
else:
break
return buff
def _isok(self):
return self._recv(4) == bytearray('OKAY')
class Sock(object):
def __init__(self, address):
self.address = address
self.socket = ADBC.Socket(socket.AF_INET, socket.SOCK_STREAM)
def __enter__(self):
self.socket.settimeout(1)
self.socket.connect_ex(self.address)
return self.socket
def __exit__(self, *args):
try:
if self.socket is not None:
self.socket.close()
self.socket = None
except:
self.socket = None
def __init__(self, serial=None, host='127.0.0.1', port=5037, timeout=10, version=32):
self.version = version
self.serial = serial
self.timeout = timeout
self.address = (host, port)
def __nonzero__(self):
try:
return int(str(self.__call__('host:version')), 16) >= self.version
except:
return False
def __getitem__(self, serial):
dev = self.getdevices().get(serial, {})
if dev.get('sysfs', '') != '':
dev['ctime'] = os.path.getctime(dev['sysfs'])
dev['mtime'] = os.path.getmtime(dev['sysfs'])
dev['enumerated'] = time.time() - dev['ctime']
return dev
def __call__(self, cmd, serial=None, timeout=None):
if timeout is None:
timeout = self.timeout
if serial is None:
serial = self.serial
res = bytearray()
with ADBC.Sock(self.address) as sock:
if not cmd.startswith('host'):
sock._send('host:transport:{}'.format(serial))
if not sock._isok():
return bytearray()
sock._send(cmd)
if not sock._isok():
return bytearray()
if cmd.startswith('host'):
res = sock._recv(sock._recvint())
else:
res = sock._recvall(timeout)
return bytearray(res)
def shell(self, cmd, serial=None, timeout=None, fixlinesep=True, ascii=False):
res = self.__call__(cmd='shell:{}'.format(cmd), serial=serial, timeout=timeout).decode('utf-8')
if fixlinesep:
res = res.replace('\r\n', os.linesep)
if ascii:
res = res.encode('ascii', 'backslashreplace')
return res
def getstate(self, serial):
return self.__call__('host-serial:{}:get-state'.format(serial))
def getdevices(self):
devices = {}
for d in str(self.__call__('host:devices-l')).splitlines():
info = d.split(None, 2)
dev = {'state': info[1]}
dev.update(dict([(p.split(':', 1)) for p in info[2].split()]))
dev['sysfs'] = os.path.join('/sys/bus/usb/devices', dev['usb'])
devices[info[0]] = dev
return devices
def getframebuffer(self, serial=None):
if serial is None:
serial = self.serial
buff = self.__call__('framebuffer:', serial=serial, timeout=2)
if len(buff) < 52:
return bytearray()
ver, bpp, size = struct.unpack('<LLL', str(buff)[0:12])
if size != len(buff) - 52:
return bytearray()
return buff
if __name__ == '__main__':
adbc = ADBC()
if adbc:
devices = adbc.getdevices()
for serial in devices.keys():
print(serial, devices[serial], adbc.shell('getprop ro.build.fingerprint', serial=serial).rstrip())
@dhoomakethu
Copy link

Hi,
Great work!
I have a few questions ,expect your answers for the same.

Is is not possible to run commands like adb root from this script ?
Also, the outputs when the adb commands are actually executed in shell vs commands executed via sockets varies ,is there a way to have consistent outputs with either approach ?

Sanjay

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment