Skip to content

Instantly share code, notes, and snippets.

@dweinstein
Last active October 4, 2017 08:59
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dweinstein/8449852 to your computer and use it in GitHub Desktop.
Save dweinstein/8449852 to your computer and use it in GitHub Desktop.
chunked logcat logging for Android
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import subprocess
import re
import os
import signal
from shlex import split
import time
import sys
from threading import Timer
from Queue import Queue
__author__ = 'David Weinstein'
def checked_call(command):
command = split(command)
p = subprocess.Popen(
command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
out, err = p.communicate()
if p.returncode != 0:
raise ADBError('ADB error: %s' % err)
else:
# NOTE: adb seems to write all shell output to stderr
if not out:
out = err
return out.strip()
class ADBError(Exception):
pass
class ADBSession(object):
def __init__(self, parent, target):
self.target = target
self.custodian = parent
self.outstanding_pids = Queue()
def get_target(self):
return self.target
def call(self, command):
command = 'adb -s {target} {cmd}'.format(target=self.target, cmd=command)
#print "=> calling", command
sys.stdout.flush()
return checked_call(command)
def shell(self, command):
command = 'shell "{cmd}"'.format(cmd=command)
return self.call(command)
def try_shell(self, command):
try:
return self.shell(command)
except:
pass
def remount(self):
command = 'remount'
self.wait(5)
return self.call(command)
def push_file(self, local,remote):
command = 'push {local} {remote}'.format(local=local,remote=remote)
return self.call(command)
def pull_file(self,remote,local):
command = 'push {remote} {local}'.format(local=local,remote=remote)
return self.call(command)
def install_apk(self, apkpath):
command = 'install -r {apkpath}'.format(apkpath=apkpath)
return self.call(command)
def uninstall_app(self, appname):
command = 'pm uninstall {name}'.format(name=appname)
return self.shell(command)
def start_activity(self, appname, activity):
command = 'am start -n {app}/.{activity}'.format(app=appname,activity=activity)
return self.shell(command)
def start_activity_fullpath(self, activity):
command = 'am start -n {activity}'.format(activity)
return self.shell(command)
def wait_for_device(self):
command = 'wait-for-device'
self.custodian.notify(self, 'waiting')
self.call(command)
return self
def wait_for_boot(self):
while not self.is_booted():
self.wait(2)
self.custodian.notify(self, 'booted')
return self
def logcat(self, callback, regex_filter = None):
proc = subprocess.Popen(['adb', '-s', self.target, "logcat"], stdout=subprocess.PIPE)
pid = proc.pid
self.outstanding_pids.put(pid)
for line in iter(proc.stdout.readline,''):
filtered_line = self.__apply_filter(regex_filter,line)
if not filtered_line == None:
callback(filtered_line)
def __apply_filter(self, regex_filter, line):
if regex_filter == None or regex_filter == "":
return line
regex = re.compile(regex_filter)
m = regex.match(line)
if not m == None:
return m.groups()[0]
return None
def wait(self, length):
time.sleep(length)
return self
def kill_session_in(self, length):
Timer(length, lambda: self.custodian.notify(self, 'done')).start()
return self
def disconnect(self):
command = 'disconnect {target}'.format(target=self.target)
self.custodian.notify(self, 'disconnect')
self.cleanup()
return self.call(command)
def is_booted(self):
val = self.try_shell('getprop sys.boot_completed')
return val == '1'
def getprop(self):
output = self.shell('getprop')
if output:
try:
rows = output.split('\r\n')
dic = {}
for i in rows:
pair = re.findall('\[(.*?)\]', i)
try:
dic[pair[0]] = pair[1]
except IndexError:
# TODO: log malformed getprop entry
pass
return dic
except Exception as e:
# TODO: log 'adb getprop failed: %s' % str(e)
return None
else:
return None
def cleanup(self):
while not self.outstanding_pids.empty():
os.kill(self.outstanding_pids.get_nowait(), signal.SIGTERM)
class ADB(object):
_sessions = []
def __init__(self, manager):
self.manager = manager
def attach(self, target):
session = ADBSession(self, target)
return session.wait_for_device()
def connect(self, target):
command = 'connect {target}'.format(target=target)
#print "connnect: ", self.call(command)
session = ADBSession(self, target)
return session.wait_for_device()
def notify(self, session, msg):
#print "received", msg, "notification from session for target", session.get_target()
if msg == 'disconnect':
if session in self._sessions:
self._sessions.remove(session)
return self.manager.onDisconnect(session)
elif msg == 'ready':
self._sessions.append(session)
return self.manager.onReady(session)
elif msg == 'waiting':
return self.manager.onWait(session)
elif msg == 'done':
return self.manager.onDone(session)
elif msg == 'booted':
return self.manager.onBooted(session)
else:
return None
def kill_server(self):
self.call('kill-server')
def call(self, command):
command = 'adb {cmd}'.format(cmd=command)
return checked_call(command)
def get_sessions(self):
return self._sessions
class SessionManagement(object):
def onReady(self, session):
#print "device is ready but not booted.", session.get_target()
pass
def onDisconnect(self, session):
#print "session disconnect.", session.get_target()
pass
def onWait(self, session):
#print "waiting for", session.get_target()
pass
def onDone(self, session):
#print "session done", session.get_target()
pass
def onBooted(self, session):
#print "device fully booted", session.get_target()
pass
class AnalysisSession(SessionManagement):
name = "AnalysisSession"
def onBooted(self, adb_session):
#print self.name, "onBooted()", adb_session.get_target()
adb_session.install_apk('foo.apk')
adb_session.start_activity('com.foo.bar', 'MainActivity')
adb_session.kill_session_in(30)
adb_session.disconnect()
def onDisconnect(self, adb_session):
#print self.name, "onDisconnect()", adb_session.get_target()
adb_session.uninstall_app('com.viber.voip')
def onDone(self, adb_session):
#print "our session has run out of time for", adb_session.get_target()
pass
if __name__ == "__main__":
session = ADB(AnalysisSession()).connect('04c697b1928a5379')
session.wait_for_boot()
package android.injection.logging;
import android.util.Log;
import java.util.Random;
import utils.HexDump;
public class LogUtil {
static final int DEFAULT_CHUNK_SIZE = 1024;
static final Random rand = new Random();
public static void d(String tag, String message) {
d(tag, message, DEFAULT_CHUNK_SIZE);
}
public static void d(String tag, String message, int chunk_size) {
if (message != null && message.length() == 0) {
return;
}
int chunks = (int) Math.ceil((double) message.length() / (double) chunk_size);
int start = 0;
int count = 1;
byte[] buf = new byte[8];
rand.nextBytes(buf);
String guid = HexDump.toHexString(buf);
do {
int end = Math.min(start+chunk_size, message.length());
String substring = message.substring(start, end);
start += substring.length();
Log.d(tag, String.format("{%s/%d/%d}: %s", guid, count, chunks, substring));
count++;
} while (start < message.length());
}
}
import json
import sys
import pprint
import re
from adb import SessionManagement, ADB
import subprocess
def logAnalysis(fh, packageName):
def logger(data):
matched = re.match('^{(.*)/(\d+)/(\d+)}: (.*)$', data)
if not logger.synced:
if matched and (matched.group(2) == '1'):
logger.synced = True
if matched and logger.synced:
count = int(matched.group(2))
total = int(matched.group(3))
sys.stdout.write(matched.group(4))
sys.stdout.flush()
if count == total:
sys.stdout.flush()
#
#
#
#
logger.synced = False
logger.buff = ""
return logger
if __name__ == "__main__":
if len(sys.argv) < 2:
print "Usage: {} target".format(sys.argv[0])
print " e.g., $ {} [deviceId]".format(sys.argv[0])
sys.exit(1)
player = None
vbox_session = None
try:
target = sys.argv[1]
adb_session = ADB(SessionManagement()).attach(target)
#adb_session.wait_for_boot()
fh = sys.stdout
regex = "D\/.*Logger\+JSON\([0-9 ]+\): (.*)"
adb_session.logcat(callback=logAnalysis(fh, pkgName), regex_filter=regex)
finally:
print "...cleanup..."
python LogWatcher.py 04c697b1928a5379 | jq 'if .packageName | contains("com.mx.browser") then . else empty end'
# jq = http://stedolan.github.io/jq/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment