Last active
August 29, 2015 14:05
-
-
Save oleksiyp/af1feae371f3c17f10be to your computer and use it in GitHub Desktop.
Tracks thread execution point (stack traces) using polling and logs them to provided listener
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package dataxu.learn.etl.repacking_tool; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.Collections; | |
import java.util.HashMap; | |
import java.util.HashSet; | |
import java.util.IdentityHashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Set; | |
public class WhatsAmIDoingTracker { | |
public final static String CHECKER_THREAD_NAME = "WhatsAmIDoingThread"; | |
public final static LoppedChecker CHECKER = new LoppedChecker(); | |
public final static ThreadHolder THREAD_HOLDER = new ThreadHolder(); | |
private final Notifier notifier; | |
private final Watcher watcher; | |
public WhatsAmIDoingTracker(boolean currentThreadOnly, Notifier notifier) { | |
this.notifier = notifier; | |
watcher = currentThreadOnly ? new ThreadStackWatcher(Thread.currentThread()) : new AllThreadStackWatcher(); | |
unpause(); | |
} | |
public WhatsAmIDoingTracker(Notifier notifier) { | |
this(true, notifier); | |
} | |
public WhatsAmIDoingTracker() { | |
this(Notifier.SYS_OUT); | |
} | |
public WhatsAmIDoingTracker(Thread thread, Notifier notifier) { | |
this.notifier = notifier; | |
watcher = new ThreadStackWatcher(thread); | |
unpause(); | |
} | |
public void pause() { | |
CHECKER.removeWatcher(watcher); | |
} | |
public void unpause() { | |
CHECKER.addWatcher(watcher); | |
THREAD_HOLDER.startOnDemand(); | |
} | |
public interface Watcher { | |
void watch(CachedStackTraces stackTraces); | |
} | |
private static class CachedStackTraces { | |
private Map<Thread, StackTraceElement[]> allStackTraces = null; | |
public Map<Thread, StackTraceElement[]> getAll() { | |
if (allStackTraces == null) { | |
allStackTraces = Thread.getAllStackTraces(); | |
} | |
return allStackTraces; | |
} | |
public StackTraceElement[] forThread(Thread thread) { | |
if (allStackTraces != null) { | |
StackTraceElement[] elements = allStackTraces.get(thread); | |
if (elements != null) { | |
return elements; | |
} | |
} | |
return thread.getStackTrace(); | |
} | |
} | |
public class AllThreadStackWatcher implements Watcher { | |
private Map<Thread, ThreadStackWatcher> threadWatcherMap = new HashMap<Thread, ThreadStackWatcher>(); | |
private Set<Thread> startedThreads; | |
private Set<Thread> stoppedThreads; | |
private Set<Thread> stableThreads; | |
public AllThreadStackWatcher() { | |
threadWatcherMap = new HashMap<Thread, ThreadStackWatcher>(128); | |
startedThreads = new HashSet<Thread>(128); | |
stoppedThreads = new HashSet<Thread>(128); | |
stableThreads = new HashSet<Thread>(128); | |
} | |
public void watch(CachedStackTraces stackTraces) { | |
fillDifference(stackTraces.getAll()); | |
} | |
private void fillDifference(Map<Thread, StackTraceElement[]> allStackTraces) { | |
startedThreads.clear(); | |
startedThreads.addAll(allStackTraces.keySet()); | |
startedThreads.removeAll(threadWatcherMap.keySet()); | |
stoppedThreads.clear(); | |
stoppedThreads.addAll(threadWatcherMap.keySet()); | |
stoppedThreads.removeAll(allStackTraces.keySet()); | |
stableThreads.clear(); | |
stableThreads.addAll(threadWatcherMap.keySet()); | |
stableThreads.retainAll(allStackTraces.keySet()); | |
for (Thread thread : startedThreads) { | |
StackTraceElement[] elements = allStackTraces.get(thread); | |
threadWatcherMap.put(thread, | |
createThreadWatcher(thread, elements)); | |
} | |
if (!startedThreads.isEmpty()) { | |
Collection<String> strings = getThreadNames(startedThreads); | |
notifier.threadsStarted(strings); | |
} | |
for (Thread thread : stoppedThreads) { | |
threadWatcherMap.remove(thread); | |
} | |
if (!stoppedThreads.isEmpty()) { | |
Collection<String> strings = getThreadNames(stoppedThreads); | |
notifier.threadsStopped(strings); | |
} | |
for (Thread thread : stableThreads) { | |
ThreadStackWatcher threadWatcher = threadWatcherMap.get(thread); | |
StackTraceElement[] stackTraceElements = allStackTraces.get(thread); | |
threadWatcher.watch(stackTraceElements); | |
} | |
} | |
private Collection<String> getThreadNames(Set<Thread> threads) { | |
List<String> names = new ArrayList<String>(threads.size()); | |
for (Thread thread : threads) { | |
names.add(thread.getName()); | |
} | |
return names; | |
} | |
} | |
private class ThreadStackWatcher implements Watcher { | |
private Thread thread; | |
private StackTraceElement[] elements; | |
public ThreadStackWatcher(Thread thread, StackTraceElement[] elements) { | |
this.thread = thread; | |
this.elements = elements; | |
} | |
public ThreadStackWatcher(Thread thread) { | |
this(thread, thread.getStackTrace()); | |
} | |
public void watch(StackTraceElement[] currentStackElements) { | |
if (currentStackElements.length != 0) { | |
StackTraceElement currentExecutionPoint = currentStackElements[0]; | |
boolean logChange; | |
if (elements.length == 0) { | |
logChange = true; | |
} else { | |
StackTraceElement lastExecutionPoint = elements[0]; | |
logChange = !lastExecutionPoint.equals(currentExecutionPoint); | |
} | |
if (logChange) { | |
notifier.log(thread.getName(), | |
currentExecutionPoint.getClassName(), | |
currentExecutionPoint.getMethodName()); | |
} | |
} | |
this.elements = currentStackElements; | |
} | |
@Override | |
public void watch(CachedStackTraces stackTraces) { | |
watch(stackTraces.forThread(thread)); | |
} | |
} | |
private ThreadStackWatcher createThreadWatcher(Thread thread, StackTraceElement[] elements) { | |
return new ThreadStackWatcher(thread, elements); | |
} | |
private static class LoppedChecker implements Runnable { | |
public static final int WAIT_SPARE_CYCLES = 33; | |
public static final long CYCLE_WAIT_PERIOD = 350L; | |
private final Set<Watcher> watchers = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<Watcher, Boolean>(32))); | |
private final Set<Watcher> watchersSetCopy = new HashSet<Watcher>(32); | |
private int spareCycles; | |
private boolean optimizeGetAll; | |
@Override | |
public void run() { | |
resetSpareCyclesCounter(); | |
try { | |
runOrInterrupt(); | |
} | |
catch (InterruptedException e) { | |
} | |
} | |
private void runOrInterrupt() throws InterruptedException { | |
while (useSpareCycles(copyWatchers())) { | |
updateOptimizeGetAll(); | |
CachedStackTraces cachedStackTraces = new CachedStackTraces(); | |
if (optimizeGetAll) { | |
cachedStackTraces.getAll(); | |
} | |
for (Watcher watcher : watchersSetCopy) { | |
watcher.watch(cachedStackTraces); | |
} | |
Thread.sleep(CYCLE_WAIT_PERIOD); | |
} | |
} | |
private void updateOptimizeGetAll() { | |
optimizeGetAll = false; | |
for (Watcher watcher : watchersSetCopy) { | |
if (watcher instanceof AllThreadStackWatcher) { | |
optimizeGetAll = true; | |
} | |
} | |
} | |
private boolean copyWatchers() { | |
watchersSetCopy.clear(); | |
synchronized (watchers) { | |
watchersSetCopy.addAll(watchers); | |
} | |
return !watchersSetCopy.isEmpty(); | |
} | |
private boolean useSpareCycles(boolean hasWork) { | |
if (hasWork) { | |
resetSpareCyclesCounter(); | |
return true; | |
} | |
return decrementSpareCyclesCounter(); | |
} | |
private void resetSpareCyclesCounter() { | |
spareCycles = WAIT_SPARE_CYCLES; | |
} | |
private boolean decrementSpareCyclesCounter() { | |
return spareCycles-- > 0; | |
} | |
public boolean isUseless() { | |
return watchers.isEmpty(); | |
} | |
public void addWatcher(Watcher watcher) { | |
watchers.add(watcher); | |
} | |
public void removeWatcher(Watcher watcher) { | |
watchers.remove(watcher); | |
} | |
public void makeNoVictims() { | |
watchers.clear(); | |
} | |
} | |
public interface Notifier { | |
void threadsStarted(Collection<String> threadNames); | |
void threadsStopped(Collection<String> threadNames); | |
void log(String threadName, String classCalled, String methodCalled); | |
Notifier SYS_OUT = new VeryBasicNotifier() { | |
@Override | |
public void out(String output) { | |
System.out.println(output); | |
} | |
}; | |
Notifier SYS_ERR = new VeryBasicNotifier() { | |
@Override | |
public void out(String output) { | |
System.err.println(output); | |
} | |
}; | |
} | |
private static abstract class VeryBasicNotifier implements Notifier { | |
public abstract void out(String output); | |
@Override | |
public void threadsStarted(Collection<String> threadNames) { | |
out("Started: " + threadNames); | |
} | |
@Override | |
public void threadsStopped(Collection<String> threadNames) { | |
out("Stopped: " + threadNames); | |
} | |
@Override | |
public void log(String thread, String classCalled, String methodCalled) { | |
out(thread + ": " + classCalled + "." + methodCalled); | |
} | |
} | |
public static class ThreadHolder { | |
private Thread checkerThread; | |
public synchronized void startOnDemand() { | |
if (checkerThread != null && checkerThread.isAlive()) { | |
return; | |
} | |
if (CHECKER.isUseless()) { | |
return; | |
} | |
start(); | |
} | |
public synchronized void start() { | |
checkerThread = new Thread(CHECKER, CHECKER_THREAD_NAME); | |
checkerThread.setDaemon(true); | |
checkerThread.setPriority(Thread.MAX_PRIORITY); | |
checkerThread.start(); | |
} | |
public synchronized void halt() throws InterruptedException { | |
CHECKER.makeNoVictims(); | |
checkerThread.interrupt(); | |
checkerThread.join(); | |
checkerThread = null; | |
} | |
} | |
} | |
// USAGE | |
// private static final WhatsAmIDoingTracker tracker = new WhatsAmIDoingTracker(Thread.currentThread(), Notifier.SYS_OUT); | |
// OR new WhatsAmIDoingTracker(false, Notifier.SYS_OUT); | |
// OR new WhatsAmIDoingTracker(true, Notifier.SYS_OUT); | |
// OR new WhatsAmIDoingTracker(..., new CustomNotifier()); | |
// tracker.pause(); | |
// tracker.unpause(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment