Last active
October 4, 2017 08:59
-
-
Save dweinstein/8449852 to your computer and use it in GitHub Desktop.
chunked logcat logging for Android
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import subprocess | |
import re | |
import os | |
import signal | |
from shlex import split | |
import time | |
import sys | |
from threading import Timer | |
from Queue import Queue | |
__author__ = 'David Weinstein' | |
def checked_call(command): | |
command = split(command) | |
p = subprocess.Popen( | |
command, | |
shell=False, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE | |
) | |
out, err = p.communicate() | |
if p.returncode != 0: | |
raise ADBError('ADB error: %s' % err) | |
else: | |
# NOTE: adb seems to write all shell output to stderr | |
if not out: | |
out = err | |
return out.strip() | |
class ADBError(Exception): | |
pass | |
class ADBSession(object): | |
def __init__(self, parent, target): | |
self.target = target | |
self.custodian = parent | |
self.outstanding_pids = Queue() | |
def get_target(self): | |
return self.target | |
def call(self, command): | |
command = 'adb -s {target} {cmd}'.format(target=self.target, cmd=command) | |
#print "=> calling", command | |
sys.stdout.flush() | |
return checked_call(command) | |
def shell(self, command): | |
command = 'shell "{cmd}"'.format(cmd=command) | |
return self.call(command) | |
def try_shell(self, command): | |
try: | |
return self.shell(command) | |
except: | |
pass | |
def remount(self): | |
command = 'remount' | |
self.wait(5) | |
return self.call(command) | |
def push_file(self, local,remote): | |
command = 'push {local} {remote}'.format(local=local,remote=remote) | |
return self.call(command) | |
def pull_file(self,remote,local): | |
command = 'push {remote} {local}'.format(local=local,remote=remote) | |
return self.call(command) | |
def install_apk(self, apkpath): | |
command = 'install -r {apkpath}'.format(apkpath=apkpath) | |
return self.call(command) | |
def uninstall_app(self, appname): | |
command = 'pm uninstall {name}'.format(name=appname) | |
return self.shell(command) | |
def start_activity(self, appname, activity): | |
command = 'am start -n {app}/.{activity}'.format(app=appname,activity=activity) | |
return self.shell(command) | |
def start_activity_fullpath(self, activity): | |
command = 'am start -n {activity}'.format(activity) | |
return self.shell(command) | |
def wait_for_device(self): | |
command = 'wait-for-device' | |
self.custodian.notify(self, 'waiting') | |
self.call(command) | |
return self | |
def wait_for_boot(self): | |
while not self.is_booted(): | |
self.wait(2) | |
self.custodian.notify(self, 'booted') | |
return self | |
def logcat(self, callback, regex_filter = None): | |
proc = subprocess.Popen(['adb', '-s', self.target, "logcat"], stdout=subprocess.PIPE) | |
pid = proc.pid | |
self.outstanding_pids.put(pid) | |
for line in iter(proc.stdout.readline,''): | |
filtered_line = self.__apply_filter(regex_filter,line) | |
if not filtered_line == None: | |
callback(filtered_line) | |
def __apply_filter(self, regex_filter, line): | |
if regex_filter == None or regex_filter == "": | |
return line | |
regex = re.compile(regex_filter) | |
m = regex.match(line) | |
if not m == None: | |
return m.groups()[0] | |
return None | |
def wait(self, length): | |
time.sleep(length) | |
return self | |
def kill_session_in(self, length): | |
Timer(length, lambda: self.custodian.notify(self, 'done')).start() | |
return self | |
def disconnect(self): | |
command = 'disconnect {target}'.format(target=self.target) | |
self.custodian.notify(self, 'disconnect') | |
self.cleanup() | |
return self.call(command) | |
def is_booted(self): | |
val = self.try_shell('getprop sys.boot_completed') | |
return val == '1' | |
def getprop(self): | |
output = self.shell('getprop') | |
if output: | |
try: | |
rows = output.split('\r\n') | |
dic = {} | |
for i in rows: | |
pair = re.findall('\[(.*?)\]', i) | |
try: | |
dic[pair[0]] = pair[1] | |
except IndexError: | |
# TODO: log malformed getprop entry | |
pass | |
return dic | |
except Exception as e: | |
# TODO: log 'adb getprop failed: %s' % str(e) | |
return None | |
else: | |
return None | |
def cleanup(self): | |
while not self.outstanding_pids.empty(): | |
os.kill(self.outstanding_pids.get_nowait(), signal.SIGTERM) | |
class ADB(object): | |
_sessions = [] | |
def __init__(self, manager): | |
self.manager = manager | |
def attach(self, target): | |
session = ADBSession(self, target) | |
return session.wait_for_device() | |
def connect(self, target): | |
command = 'connect {target}'.format(target=target) | |
#print "connnect: ", self.call(command) | |
session = ADBSession(self, target) | |
return session.wait_for_device() | |
def notify(self, session, msg): | |
#print "received", msg, "notification from session for target", session.get_target() | |
if msg == 'disconnect': | |
if session in self._sessions: | |
self._sessions.remove(session) | |
return self.manager.onDisconnect(session) | |
elif msg == 'ready': | |
self._sessions.append(session) | |
return self.manager.onReady(session) | |
elif msg == 'waiting': | |
return self.manager.onWait(session) | |
elif msg == 'done': | |
return self.manager.onDone(session) | |
elif msg == 'booted': | |
return self.manager.onBooted(session) | |
else: | |
return None | |
def kill_server(self): | |
self.call('kill-server') | |
def call(self, command): | |
command = 'adb {cmd}'.format(cmd=command) | |
return checked_call(command) | |
def get_sessions(self): | |
return self._sessions | |
class SessionManagement(object): | |
def onReady(self, session): | |
#print "device is ready but not booted.", session.get_target() | |
pass | |
def onDisconnect(self, session): | |
#print "session disconnect.", session.get_target() | |
pass | |
def onWait(self, session): | |
#print "waiting for", session.get_target() | |
pass | |
def onDone(self, session): | |
#print "session done", session.get_target() | |
pass | |
def onBooted(self, session): | |
#print "device fully booted", session.get_target() | |
pass | |
class AnalysisSession(SessionManagement): | |
name = "AnalysisSession" | |
def onBooted(self, adb_session): | |
#print self.name, "onBooted()", adb_session.get_target() | |
adb_session.install_apk('foo.apk') | |
adb_session.start_activity('com.foo.bar', 'MainActivity') | |
adb_session.kill_session_in(30) | |
adb_session.disconnect() | |
def onDisconnect(self, adb_session): | |
#print self.name, "onDisconnect()", adb_session.get_target() | |
adb_session.uninstall_app('com.viber.voip') | |
def onDone(self, adb_session): | |
#print "our session has run out of time for", adb_session.get_target() | |
pass | |
if __name__ == "__main__": | |
session = ADB(AnalysisSession()).connect('04c697b1928a5379') | |
session.wait_for_boot() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package android.injection.logging; | |
import android.util.Log; | |
import java.util.Random; | |
import utils.HexDump; | |
public class LogUtil { | |
static final int DEFAULT_CHUNK_SIZE = 1024; | |
static final Random rand = new Random(); | |
public static void d(String tag, String message) { | |
d(tag, message, DEFAULT_CHUNK_SIZE); | |
} | |
public static void d(String tag, String message, int chunk_size) { | |
if (message != null && message.length() == 0) { | |
return; | |
} | |
int chunks = (int) Math.ceil((double) message.length() / (double) chunk_size); | |
int start = 0; | |
int count = 1; | |
byte[] buf = new byte[8]; | |
rand.nextBytes(buf); | |
String guid = HexDump.toHexString(buf); | |
do { | |
int end = Math.min(start+chunk_size, message.length()); | |
String substring = message.substring(start, end); | |
start += substring.length(); | |
Log.d(tag, String.format("{%s/%d/%d}: %s", guid, count, chunks, substring)); | |
count++; | |
} while (start < message.length()); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import sys | |
import pprint | |
import re | |
from adb import SessionManagement, ADB | |
import subprocess | |
def logAnalysis(fh, packageName): | |
def logger(data): | |
matched = re.match('^{(.*)/(\d+)/(\d+)}: (.*)$', data) | |
if not logger.synced: | |
if matched and (matched.group(2) == '1'): | |
logger.synced = True | |
if matched and logger.synced: | |
count = int(matched.group(2)) | |
total = int(matched.group(3)) | |
sys.stdout.write(matched.group(4)) | |
sys.stdout.flush() | |
if count == total: | |
sys.stdout.flush() | |
# | |
# | |
# | |
# | |
logger.synced = False | |
logger.buff = "" | |
return logger | |
if __name__ == "__main__": | |
if len(sys.argv) < 2: | |
print "Usage: {} target".format(sys.argv[0]) | |
print " e.g., $ {} [deviceId]".format(sys.argv[0]) | |
sys.exit(1) | |
player = None | |
vbox_session = None | |
try: | |
target = sys.argv[1] | |
adb_session = ADB(SessionManagement()).attach(target) | |
#adb_session.wait_for_boot() | |
fh = sys.stdout | |
regex = "D\/.*Logger\+JSON\([0-9 ]+\): (.*)" | |
adb_session.logcat(callback=logAnalysis(fh, pkgName), regex_filter=regex) | |
finally: | |
print "...cleanup..." |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
python LogWatcher.py 04c697b1928a5379 | jq 'if .packageName | contains("com.mx.browser") then . else empty end' | |
# jq = http://stedolan.github.io/jq/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment