Skip to content

Instantly share code, notes, and snippets.

@TylerTemp
Last active January 5, 2020 10:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TylerTemp/670c7f6baf566e4745166bfd79062486 to your computer and use it in GitHub Desktop.
Save TylerTemp/670c7f6baf566e4745166bfd79062486 to your computer and use it in GitHub Desktop.
Watching android adb logcat change on a specific pacakge name
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Usage:
adblogcatwatcher <package_name> [options] [--filter-message-contains <string>]...
adblogcatwatcher pid <package_name>
adblogcatwatcher start <package_name>
Options:
--adb=<path> specific adb execuable path. [default: adb]
-c clean logcat before start
-r, --restart restart application
--filter-message-contains <string>, --fmc <string>
filter message that contains <string>
-w<mode>, --width=<mode>
max width for log trigger name, avaliable: number, `max`, `min`. [default: max]
--no-wrap disable wrap for message
--no-date
--no-time
--no-pid
--no-sub-pid
Dependences:
pip install docpie
[optional] pip install colorlog
"""
import subprocess
import threading
import logging
import os
import sys
import time
import shutil
import textwrap
LOGGER = logging.getLogger('adblogcatwatcher')
def ps_line_cutter(line):
return tuple(each.strip() for each in line.split(maxsplit=8))
def get_pid(package_name, adb='adb'):
logger = logging.getLogger('adblogcatwatcher.get_pid')
p = subprocess.Popen([adb, 'shell', 'ps'], stdout=subprocess.PIPE)
try:
titles = ps_line_cutter(next(p.stdout).decode('utf-8'))
except StopIteration:
return None
# print(titles)
for line_b in p.stdout:
line = line_b.decode('utf-8')
line_info = dict(zip(titles, ps_line_cutter(line)))
# logger.debug('get process %s', line_info)
# print(line_info)
if 'NAME' not in line_info:
continue
if line_info['NAME'] == package_name:
pid = int(line_info['PID'])
# logger.info('%s pid = %d', package_name, pid)
return pid
else:
# logger.warning('%s pid not found', package_name)
return None
class AdbLogcatFilter(threading.Thread):
BASH_FORMAT_CONTROL = dict(
RS = "\033[0m" , # reset
HC = "\033[1m" , # hicolor
UL = "\033[4m" , # underline
INV = "\033[7m" , # inverse background and foreground
# BOLD = "\x1B[1;0;0m", # BOLD
FBLK = "\033[30m", # foreground black
FRED = "\033[31m", # foreground red
FGRN = "\033[32m", # foreground green
FYEL = "\033[33m", # foreground yellow
FBLE = "\033[34m", # foreground blue
FMAG = "\033[35m", # foreground magenta
FCYN = "\033[36m", # foreground cyan
FWHT = "\033[37m", # foreground white
BBLK = "\033[40m", # background black
BRED = "\033[41m", # background red
BGRN = "\033[42m", # background green
BYEL = "\033[43m", # background yellow
BBLE = "\033[44m", # background blue
BMAG = "\033[45m", # background magenta
BCYN = "\033[46m", # background cyan
BWHT = "\033[47m", # background white
)
def __init__(self, adb_executable, pid, width_mode='max', excludes=frozenset(), wrap_mode=None, filter_message_contains=frozenset(), *a, **k):
super(self.__class__, self).__init__(*a, **k)
# self.ADB = adb_executable
self.PID = pid
self.process = subprocess.Popen([adb_executable, 'logcat'], stdout=subprocess.PIPE)
# print(self)
self._stop_event = threading.Event()
self.WIDTH_MODE = width_mode
self.EXCLUDES = excludes
self.WRAP_MODE = wrap_mode
self.FILTER_MESSAGE_CONTAINS = filter_message_contains
def run(self):
LOGGER = logging.getLogger('adblogcatwatcher.AdbLogcatFilter.w_{}'.format(threading.get_ident()))
LOGGER.debug('checking pid %s', self.PID)
while self.process.poll() is not None:
if self.stopped():
LOGGER.debug('exit')
return
LOGGER.debug('process not running, restart')
self.process = subprocess.Popen(self.process.args, stdout=subprocess.PIPE)
max_sender_length = 5
break_group_count = 0
EXCLUDES = self.EXCLUDES
FILTER_MESSAGE_CONTAINS = self.FILTER_MESSAGE_CONTAINS
for line_b in self.process.stdout:
try:
line = line_b.decode('utf-8')
except UnicodeDecodeError:
# print(repr(line_b.strip())[2:-1])
# continue
line = repr(line_b.strip())[2:-1]
# print(line)
if line.startswith('----'):
continue
# pid = int(line.split()[2])
splited_lines = line.split(maxsplit=5)
pid = int(splited_lines[2])
if pid == self.PID:
# sys.stdout.write(line)
level = splited_lines[4]
message = splited_lines[-1]
message_sender, message_payload = [each.strip() for each in message.split(':', 1)]
if any(fragment in message_payload for fragment in FILTER_MESSAGE_CONTAINS):
# self.LOGGER.debug('skip message %s', message)
continue
# width_controller = ''
if self.WIDTH_MODE == 'max':
max_sender_length = max((len(message_sender), max_sender_length))
width_controller = ':{}'.format(max_sender_length)
elif self.WIDTH_MODE == 'min':
width_controller = ''
else:
width_controller = ':%d' % (self.WIDTH_MODE,)
line_head_pretty_builder = []
line_head_plain_builder = []
builders = (line_head_pretty_builder, line_head_plain_builder)
for ex_check, ex_fragment in (
('date', '{date}'),
('time', '{time}'),
('pid', '{pid:5}'),
('sub_pid', '{sub_pid:5}'),
):
if ex_check not in EXCLUDES:
for builder in builders:
builder.append(ex_fragment)
line_head_pretty_builder.append('{log_level}{RS}')
line_head_plain_builder.append('{log_level}')
# line_head_pretty_builder.append('{UL}{message_sender%s}{RS}' % (width_controller,))
line_head_pretty_builder.append('\x1B[1;38;48m\x1B[4;38;48m{message_sender%s}\x1B[0m' % (width_controller,))
line_head_plain_builder.append('{message_sender%s}' % (width_controller,))
# line_head_pretty = '{log_level_control}{date} {time} {pid:5} {sub_pid:5} {log_level}{RS} {UL}{message_sender:%d}{RS}' % (max_sender_length,)
# line_head_plain = '{date} {time} {pid:5} {sub_pid:5} {log_level} {message_sender%s}' % (width_controller,)
line_head_pretty = '{log_level_control}' + ' '.join(line_head_pretty_builder)
line_head_plain = ' '.join(line_head_plain_builder)
line_info = {
'date': splited_lines[0],
'time': splited_lines[1],
'pid': splited_lines[2],
'sub_pid': splited_lines[3],
'log_level': splited_lines[4],
'message': splited_lines[5],
'message_sender': message_sender,
'message_payload': message_payload,
'log_level_control': {
'V': self.BASH_FORMAT_CONTROL['FWHT'],
'D': self.BASH_FORMAT_CONTROL['FGRN'],
'I': self.BASH_FORMAT_CONTROL['FBLE'],
'W': self.BASH_FORMAT_CONTROL['FMAG'],
'E': self.BASH_FORMAT_CONTROL['FRED'],
}.get(splited_lines[4], '')
}
line_info.update(self.BASH_FORMAT_CONTROL)
line_head = line_head_pretty.format(**line_info)
columns, _rows = shutil.get_terminal_size()
line_head_length = len(line_head_plain.format(**line_info))
message_payload_rows = len(message_payload) + 1
message_has_rows = columns - line_head_length
left_rows = columns - line_head_length - message_payload_rows
# LOGGER.debug('columns=%s, line_head=%s left=%s', columns, line_head_length, need_rows)
# sys.stdout.write(line_head)
break_group = '' if break_group_count % 2 else '{BGRN}'
cont_arrow = (u'%s{INV}↪{RS}' % (break_group,)).format(**self.BASH_FORMAT_CONTROL)
start_arrow = (u'%s{INV}↦{RS}' % (break_group,)).format(**self.BASH_FORMAT_CONTROL)
# LOGGER.debug('columns %s, line_head_length %s, message_has_rows')
if self.WRAP_MODE == 'no_wrap' or left_rows >= 0: # fit
# LOGGER.debug('direct print')
line_rest = ' {}\n'.format(message_payload)
elif message_has_rows < 10: # not enough, and very small, use another block
# LOGGER.debug('pump block')
break_group_count += 1
init_indent = u'\n {INV}↦{RS}'.format(**self.BASH_FORMAT_CONTROL)
sub_indent = u' ' + cont_arrow
line_rest = '\n'.join(textwrap.wrap(message_payload, width=columns, initial_indent=init_indent, subsequent_indent=sub_indent)) + '\n'
else: # not enough but has big space left, use same block
# LOGGER.debug('cont block: columns=%s, line_head_length=%s', columns, line_head_length)
break_group_count += 1
line_ready = '\n'.join(textwrap.wrap(' ' * line_head_length + message_payload, width=columns - 1, initial_indent='', subsequent_indent=u' ' * line_head_length + cont_arrow, drop_whitespace=False)).lstrip() + '\n'
if line_ready.startswith(cont_arrow):
line_ready = line_ready[len(cont_arrow):]
line_rest = start_arrow + line_ready
sys.stdout.write(line_head)
sys.stdout.write(line_rest)
sys.stdout.flush()
if self.stopped():
LOGGER.debug('exit')
return
def stop(self):
LOGGER.debug('set to exit')
self._stop_event.set()
try:
self.process.kill()
except BaseException:
pass
def stopped(self):
return self._stop_event.is_set()
def __del__(self):
try:
self.process.kill()
except BaseException as e:
sys.stderr.write('{}\n'.format(e))
if __name__ == '__main__':
import sys
import docpie
try:
import colorlog
except ImportError:
FORMATTER = logging.Formatter(
'[%(levelname)1.1s %(lineno)3d %(asctime)s %(funcName)s]'
' %(message)s'
)
else:
FORMATTER = colorlog.ColoredFormatter(
'%(log_color)s'
'[%(levelname)1.1s %(lineno)3d %(asctime)s %(funcName)s]'
'%(reset)s'
' %(message)s'
)
CLIARGS = docpie.docpie(__doc__)
HDLR = logging.StreamHandler(sys.stdout)
HDLR.setFormatter(FORMATTER)
LOGGER.addHandler(HDLR)
LOGGER.setLevel(logging.DEBUG)
if CLIARGS['pid']:
print(get_pid(CLIARGS['<package_name>']))
sys.exit()
if CLIARGS['start']:
p = subprocess.Popen([CLIARGS['--adb'], 'shell', 'monkey', '-p', CLIARGS['<package_name>'], '-v', '500'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
pout, perr = p.communicate()
if p.returncode != 0:
msg = 'failed to start process: %s %s\n' % (
pout.decode('utf-8'),
perr.decode('utf-8'),
)
sys.stderr.write(msg)
sys.exit(1)
else:
# LOGGER.info('process started')
print(get_pid(CLIARGS['<package_name>']))
sys.exit()
sys.exit()
# pid = get_pid('com.pomelogames.TowerGame.fake')
pid = None
filter_thread = None
# f.start()
# stop if needed
if CLIARGS['--restart']:
current_pid = get_pid(CLIARGS['<package_name>'])
if current_pid is not None:
LOGGER.debug('killing process %s', current_pid)
p = subprocess.Popen([CLIARGS['--adb'], 'shell', 'su', '-c', 'kill {}'.format(current_pid)], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
pout, perr = p.communicate()
if p.returncode != 0:
LOGGER.error('failed to kill process: %s %s',
pout.decode('utf-8'),
perr.decode('utf-8'),
)
else:
LOGGER.info('process killed %s', pout.decode('utf-8'))
# clean log
if CLIARGS['-c']:
LOGGER.debug('clearing current logs')
p = subprocess.Popen([CLIARGS['--adb'], 'logcat', '-c'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
pout, perr = p.communicate()
if p.returncode != 0:
LOGGER.error('failed to clean adb log: %s %s',
pout.decode('utf-8'),
perr.decode('utf-8'),
)
else:
LOGGER.info('adb logcat cleaned %s', pout.decode('utf-8'))
if CLIARGS['--restart']:
# start
LOGGER.debug('start process %s', CLIARGS['<package_name>'])
p = subprocess.Popen([CLIARGS['--adb'], 'shell', 'monkey', '-p', CLIARGS['<package_name>'], '-v', '500'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
pout, perr = p.communicate()
if p.returncode != 0:
LOGGER.error('failed to start process: %s %s',
pout.decode('utf-8'),
perr.decode('utf-8'),
)
else:
# LOGGER.info('process started %s', pout.decode('utf-8'))
LOGGER.info('process started')
width_mode = CLIARGS['--width']
if width_mode not in ('max', 'min'):
width_mode = int(width_mode)
excludes = frozenset(
each.replace('-', '_') for each in
filter(
lambda x: CLIARGS['--no-' + x],
('date', 'time', 'pid', 'sub-pid')
)
)
LOGGER.debug('exclude %s', excludes)
while True:
new_pid = get_pid(CLIARGS['<package_name>'])
if new_pid is None and filter_thread is not None:
filter_thread.stop()
filter_thread = None
if new_pid is not None and pid != new_pid:
pid = new_pid
LOGGER.info('restart thread')
if filter_thread is not None:
filter_thread.stop()
wrap_mode = 'no_wrap' if CLIARGS['--no-wrap'] else None
filter_thread = AdbLogcatFilter(pid=pid, adb_executable=CLIARGS['--adb'], width_mode=width_mode, wrap_mode=wrap_mode, excludes=excludes, filter_message_contains=set(CLIARGS['--fmc']))
filter_thread.start()
else:
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment