Skip to content

Instantly share code, notes, and snippets.

@oleksiyp
Last active August 29, 2015 14:05
Show Gist options
  • Save oleksiyp/af1feae371f3c17f10be to your computer and use it in GitHub Desktop.
Save oleksiyp/af1feae371f3c17f10be to your computer and use it in GitHub Desktop.
Tracks thread execution point (stack traces) using polling and logs them to provided listener
package dataxu.learn.etl.repacking_tool;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class WhatsAmIDoingTracker {
public final static String CHECKER_THREAD_NAME = "WhatsAmIDoingThread";
public final static LoppedChecker CHECKER = new LoppedChecker();
public final static ThreadHolder THREAD_HOLDER = new ThreadHolder();
private final Notifier notifier;
private final Watcher watcher;
public WhatsAmIDoingTracker(boolean currentThreadOnly, Notifier notifier) {
this.notifier = notifier;
watcher = currentThreadOnly ? new ThreadStackWatcher(Thread.currentThread()) : new AllThreadStackWatcher();
unpause();
}
public WhatsAmIDoingTracker(Notifier notifier) {
this(true, notifier);
}
public WhatsAmIDoingTracker() {
this(Notifier.SYS_OUT);
}
public WhatsAmIDoingTracker(Thread thread, Notifier notifier) {
this.notifier = notifier;
watcher = new ThreadStackWatcher(thread);
unpause();
}
public void pause() {
CHECKER.removeWatcher(watcher);
}
public void unpause() {
CHECKER.addWatcher(watcher);
THREAD_HOLDER.startOnDemand();
}
public interface Watcher {
void watch(CachedStackTraces stackTraces);
}
private static class CachedStackTraces {
private Map<Thread, StackTraceElement[]> allStackTraces = null;
public Map<Thread, StackTraceElement[]> getAll() {
if (allStackTraces == null) {
allStackTraces = Thread.getAllStackTraces();
}
return allStackTraces;
}
public StackTraceElement[] forThread(Thread thread) {
if (allStackTraces != null) {
StackTraceElement[] elements = allStackTraces.get(thread);
if (elements != null) {
return elements;
}
}
return thread.getStackTrace();
}
}
public class AllThreadStackWatcher implements Watcher {
private Map<Thread, ThreadStackWatcher> threadWatcherMap = new HashMap<Thread, ThreadStackWatcher>();
private Set<Thread> startedThreads;
private Set<Thread> stoppedThreads;
private Set<Thread> stableThreads;
public AllThreadStackWatcher() {
threadWatcherMap = new HashMap<Thread, ThreadStackWatcher>(128);
startedThreads = new HashSet<Thread>(128);
stoppedThreads = new HashSet<Thread>(128);
stableThreads = new HashSet<Thread>(128);
}
public void watch(CachedStackTraces stackTraces) {
fillDifference(stackTraces.getAll());
}
private void fillDifference(Map<Thread, StackTraceElement[]> allStackTraces) {
startedThreads.clear();
startedThreads.addAll(allStackTraces.keySet());
startedThreads.removeAll(threadWatcherMap.keySet());
stoppedThreads.clear();
stoppedThreads.addAll(threadWatcherMap.keySet());
stoppedThreads.removeAll(allStackTraces.keySet());
stableThreads.clear();
stableThreads.addAll(threadWatcherMap.keySet());
stableThreads.retainAll(allStackTraces.keySet());
for (Thread thread : startedThreads) {
StackTraceElement[] elements = allStackTraces.get(thread);
threadWatcherMap.put(thread,
createThreadWatcher(thread, elements));
}
if (!startedThreads.isEmpty()) {
Collection<String> strings = getThreadNames(startedThreads);
notifier.threadsStarted(strings);
}
for (Thread thread : stoppedThreads) {
threadWatcherMap.remove(thread);
}
if (!stoppedThreads.isEmpty()) {
Collection<String> strings = getThreadNames(stoppedThreads);
notifier.threadsStopped(strings);
}
for (Thread thread : stableThreads) {
ThreadStackWatcher threadWatcher = threadWatcherMap.get(thread);
StackTraceElement[] stackTraceElements = allStackTraces.get(thread);
threadWatcher.watch(stackTraceElements);
}
}
private Collection<String> getThreadNames(Set<Thread> threads) {
List<String> names = new ArrayList<String>(threads.size());
for (Thread thread : threads) {
names.add(thread.getName());
}
return names;
}
}
private class ThreadStackWatcher implements Watcher {
private Thread thread;
private StackTraceElement[] elements;
public ThreadStackWatcher(Thread thread, StackTraceElement[] elements) {
this.thread = thread;
this.elements = elements;
}
public ThreadStackWatcher(Thread thread) {
this(thread, thread.getStackTrace());
}
public void watch(StackTraceElement[] currentStackElements) {
if (currentStackElements.length != 0) {
StackTraceElement currentExecutionPoint = currentStackElements[0];
boolean logChange;
if (elements.length == 0) {
logChange = true;
} else {
StackTraceElement lastExecutionPoint = elements[0];
logChange = !lastExecutionPoint.equals(currentExecutionPoint);
}
if (logChange) {
notifier.log(thread.getName(),
currentExecutionPoint.getClassName(),
currentExecutionPoint.getMethodName());
}
}
this.elements = currentStackElements;
}
@Override
public void watch(CachedStackTraces stackTraces) {
watch(stackTraces.forThread(thread));
}
}
private ThreadStackWatcher createThreadWatcher(Thread thread, StackTraceElement[] elements) {
return new ThreadStackWatcher(thread, elements);
}
private static class LoppedChecker implements Runnable {
public static final int WAIT_SPARE_CYCLES = 33;
public static final long CYCLE_WAIT_PERIOD = 350L;
private final Set<Watcher> watchers = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<Watcher, Boolean>(32)));
private final Set<Watcher> watchersSetCopy = new HashSet<Watcher>(32);
private int spareCycles;
private boolean optimizeGetAll;
@Override
public void run() {
resetSpareCyclesCounter();
try {
runOrInterrupt();
}
catch (InterruptedException e) {
}
}
private void runOrInterrupt() throws InterruptedException {
while (useSpareCycles(copyWatchers())) {
updateOptimizeGetAll();
CachedStackTraces cachedStackTraces = new CachedStackTraces();
if (optimizeGetAll) {
cachedStackTraces.getAll();
}
for (Watcher watcher : watchersSetCopy) {
watcher.watch(cachedStackTraces);
}
Thread.sleep(CYCLE_WAIT_PERIOD);
}
}
private void updateOptimizeGetAll() {
optimizeGetAll = false;
for (Watcher watcher : watchersSetCopy) {
if (watcher instanceof AllThreadStackWatcher) {
optimizeGetAll = true;
}
}
}
private boolean copyWatchers() {
watchersSetCopy.clear();
synchronized (watchers) {
watchersSetCopy.addAll(watchers);
}
return !watchersSetCopy.isEmpty();
}
private boolean useSpareCycles(boolean hasWork) {
if (hasWork) {
resetSpareCyclesCounter();
return true;
}
return decrementSpareCyclesCounter();
}
private void resetSpareCyclesCounter() {
spareCycles = WAIT_SPARE_CYCLES;
}
private boolean decrementSpareCyclesCounter() {
return spareCycles-- > 0;
}
public boolean isUseless() {
return watchers.isEmpty();
}
public void addWatcher(Watcher watcher) {
watchers.add(watcher);
}
public void removeWatcher(Watcher watcher) {
watchers.remove(watcher);
}
public void makeNoVictims() {
watchers.clear();
}
}
public interface Notifier {
void threadsStarted(Collection<String> threadNames);
void threadsStopped(Collection<String> threadNames);
void log(String threadName, String classCalled, String methodCalled);
Notifier SYS_OUT = new VeryBasicNotifier() {
@Override
public void out(String output) {
System.out.println(output);
}
};
Notifier SYS_ERR = new VeryBasicNotifier() {
@Override
public void out(String output) {
System.err.println(output);
}
};
}
private static abstract class VeryBasicNotifier implements Notifier {
public abstract void out(String output);
@Override
public void threadsStarted(Collection<String> threadNames) {
out("Started: " + threadNames);
}
@Override
public void threadsStopped(Collection<String> threadNames) {
out("Stopped: " + threadNames);
}
@Override
public void log(String thread, String classCalled, String methodCalled) {
out(thread + ": " + classCalled + "." + methodCalled);
}
}
public static class ThreadHolder {
private Thread checkerThread;
public synchronized void startOnDemand() {
if (checkerThread != null && checkerThread.isAlive()) {
return;
}
if (CHECKER.isUseless()) {
return;
}
start();
}
public synchronized void start() {
checkerThread = new Thread(CHECKER, CHECKER_THREAD_NAME);
checkerThread.setDaemon(true);
checkerThread.setPriority(Thread.MAX_PRIORITY);
checkerThread.start();
}
public synchronized void halt() throws InterruptedException {
CHECKER.makeNoVictims();
checkerThread.interrupt();
checkerThread.join();
checkerThread = null;
}
}
}
// USAGE
// private static final WhatsAmIDoingTracker tracker = new WhatsAmIDoingTracker(Thread.currentThread(), Notifier.SYS_OUT);
// OR new WhatsAmIDoingTracker(false, Notifier.SYS_OUT);
// OR new WhatsAmIDoingTracker(true, Notifier.SYS_OUT);
// OR new WhatsAmIDoingTracker(..., new CustomNotifier());
// tracker.pause();
// tracker.unpause();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment