Created
April 30, 2017 17:31
-
-
Save mkurian/c00f8a2ea359f84782c5730c0b2253d1 to your computer and use it in GitHub Desktop.
Based on org.apache.catalina.valves.StuckThreadDetectionValve. Changes: splunk friendly logging, configurable threshold value per jvm
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.catalina.valves; | |
/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
import org.apache.catalina.LifecycleException; | |
import org.apache.juli.logging.Log; | |
import org.apache.juli.logging.LogFactory; | |
import org.apache.tomcat.util.res.StringManager; | |
import javax.servlet.ServletException; | |
import java.io.IOException; | |
import java.text.SimpleDateFormat; | |
import java.util.ArrayList; | |
import java.util.Date; | |
import java.util.List; | |
import java.util.Queue; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
import java.util.concurrent.atomic.AtomicInteger; | |
/** | |
* Based on org.apache.catalina.valves.StuckThreadDetectionValve | |
* Changes: splunk friendly logging, configurable threshold value per jvm | |
* | |
*/ | |
public class StuckThreadDetectionCustomValve extends ValveBase { | |
private static final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); | |
/** | |
* Logger | |
*/ | |
private static final Log log = LogFactory.getLog(StuckThreadDetectionCustomValve.class); | |
/** | |
* The string manager for this package. | |
*/ | |
private static final StringManager sm = | |
StringManager.getManager(Constants.Package); | |
/** | |
* Keeps count of the number of stuck threads detected | |
*/ | |
private final AtomicInteger stuckCount = new AtomicInteger(0); | |
/** | |
* In seconds. Default 600 (10 minutes). | |
*/ | |
private int threshold = 600; | |
private static final String STUCK_THREAD_THRESHOLD_IN_SEC = "stuck_thread_threshold_in_sec"; | |
/** | |
* The only references we keep to actual running Thread objects are in | |
* this Map (which is automatically cleaned in invoke()s finally clause). | |
* That way, Threads can be GC'ed, eventhough the Valve still thinks they | |
* are stuck (caused by a long monitor interval) | |
*/ | |
private final ConcurrentHashMap<Long, MonitoredThread> activeThreads = | |
new ConcurrentHashMap<>(); | |
/** | |
* | |
*/ | |
private final Queue<CompletedStuckThread> completedStuckThreadsQueue = | |
new ConcurrentLinkedQueue<>(); | |
/** | |
* Specify the threshold (in seconds) used when checking for stuck threads. | |
* If <=0, the detection is disabled. The default is 600 seconds. | |
* | |
* @param threshold | |
* The new threshold in seconds | |
*/ | |
public void setThreshold(int threshold) { | |
this.threshold = threshold; | |
} | |
/** | |
* @see #setThreshold(int) | |
* @return The current threshold in seconds | |
*/ | |
public int getThreshold() { | |
String thresholdValue = System.getProperty(STUCK_THREAD_THRESHOLD_IN_SEC); | |
if(thresholdValue != null && !thresholdValue.isEmpty()){ | |
setThreshold(Integer.parseInt(thresholdValue)); | |
} | |
return threshold; | |
} | |
/** | |
* Required to enable async support. | |
*/ | |
public StuckThreadDetectionCustomValve() { | |
super(true); | |
} | |
@Override | |
protected void initInternal() throws LifecycleException { | |
super.initInternal(); | |
if (log.isDebugEnabled()) { | |
log.debug("Monitoring stuck threads with threshold = " | |
+ getThreshold() | |
+ " sec"); | |
} | |
} | |
private void notifyStuckThreadDetected(MonitoredThread monitoredThread, | |
long activeTime, int numStuckThreads) { | |
if (log.isWarnEnabled()) { | |
String msg = generateMessage("stuckThreadDetectionValve.notifyStuckThreadDetected", monitoredThread , Long.valueOf(activeTime), | |
Integer.valueOf(numStuckThreads), Integer.valueOf(getThreshold())); | |
Throwable th = new Throwable(); | |
th.setStackTrace(monitoredThread.getThread().getStackTrace()); | |
log.warn(msg, th); | |
} | |
} | |
private String generateMessage(String op, MonitoredThread monitoredThread, Long activeTime, Integer stuckThreads, Integer threshold){ | |
return String.format("op=%s reqUri=\"%s\" thName=\"%s\" thId=%d started=\"%s\" activeTimeMs=%d stuckThreads=%d thresholdSec=%d" , op, | |
monitoredThread.getRequestUri(), monitoredThread.getThread().getName(), monitoredThread.getThread().getId(), format.format(monitoredThread.getStartTime()), activeTime, stuckThreads, threshold); | |
} | |
private void notifyStuckThreadCompleted(CompletedStuckThread thread, | |
int numStuckThreads) { | |
if (log.isWarnEnabled()) { | |
String msg = String.format("op=%s thName=\"%s\" thId=%d activeTimeMs=%d stuckThreads=%d thresholdSec=%d" , "stuckThreadDetectionValve.notifyStuckThreadCompleted", | |
thread.getName(), thread.getId(), Long.valueOf(thread.getTotalActiveTime()), Integer.valueOf(numStuckThreads), Integer.valueOf(getThreshold())); | |
log.warn(msg); | |
} | |
} | |
/** | |
* {@inheritDoc} | |
*/ | |
@Override | |
public void invoke(Request request, Response response) | |
throws IOException, ServletException { | |
if (getThreshold() <= 0) { | |
// short-circuit if not monitoring stuck threads | |
getNext().invoke(request, response); | |
return; | |
} | |
// Save the thread/runnable | |
// Keeping a reference to the thread object here does not prevent | |
// GC'ing, as the reference is removed from the Map in the finally clause | |
Long key = Long.valueOf(Thread.currentThread().getId()); | |
StringBuffer requestUrl = request.getRequestURL(); | |
if(request.getQueryString()!=null) { | |
requestUrl.append("?"); | |
requestUrl.append(request.getQueryString()); | |
} | |
MonitoredThread monitoredThread = new MonitoredThread(Thread.currentThread(), | |
requestUrl.toString()); | |
activeThreads.put(key, monitoredThread); | |
try { | |
getNext().invoke(request, response); | |
} finally { | |
activeThreads.remove(key); | |
if (monitoredThread.markAsDone() == MonitoredThreadState.STUCK) { | |
completedStuckThreadsQueue.add( | |
new CompletedStuckThread(monitoredThread.getThread(), | |
monitoredThread.getActiveTimeInMillis())); | |
} | |
} | |
} | |
@Override | |
public void backgroundProcess() { | |
super.backgroundProcess(); | |
long thresholdInMillis = threshold * 1000; | |
// Check monitored threads, being careful that the request might have | |
// completed by the time we examine it | |
for (MonitoredThread monitoredThread : activeThreads.values()) { | |
long activeTime = monitoredThread.getActiveTimeInMillis(); | |
if (activeTime >= thresholdInMillis && monitoredThread.markAsStuckIfStillRunning()) { | |
int numStuckThreads = stuckCount.incrementAndGet(); | |
notifyStuckThreadDetected(monitoredThread, activeTime, numStuckThreads); | |
} | |
} | |
// Check if any threads previously reported as stuck, have finished. | |
for (CompletedStuckThread completedStuckThread = completedStuckThreadsQueue.poll(); | |
completedStuckThread != null; completedStuckThread = completedStuckThreadsQueue.poll()) { | |
int numStuckThreads = stuckCount.decrementAndGet(); | |
notifyStuckThreadCompleted(completedStuckThread, numStuckThreads); | |
} | |
} | |
public long[] getStuckThreadIds() { | |
List<Long> idList = new ArrayList<>(); | |
for (MonitoredThread monitoredThread : activeThreads.values()) { | |
if (monitoredThread.isMarkedAsStuck()) { | |
idList.add(Long.valueOf(monitoredThread.getThread().getId())); | |
} | |
} | |
long[] result = new long[idList.size()]; | |
for (int i = 0; i < result.length; i++) { | |
result[i] = idList.get(i).longValue(); | |
} | |
return result; | |
} | |
public String[] getStuckThreadNames() { | |
List<String> nameList = new ArrayList<>(); | |
for (MonitoredThread monitoredThread : activeThreads.values()) { | |
if (monitoredThread.isMarkedAsStuck()) { | |
nameList.add(monitoredThread.getThread().getName()); | |
} | |
} | |
return nameList.toArray(new String[nameList.size()]); | |
} | |
private static class MonitoredThread { | |
/** | |
* Reference to the thread to get a stack trace from background task | |
*/ | |
private final Thread thread; | |
private final String requestUri; | |
private final long start; | |
private final AtomicInteger state = new AtomicInteger( | |
MonitoredThreadState.RUNNING.ordinal()); | |
public MonitoredThread(Thread thread, String requestUri) { | |
this.thread = thread; | |
this.requestUri = requestUri; | |
this.start = System.currentTimeMillis(); | |
} | |
public Thread getThread() { | |
return this.thread; | |
} | |
public String getRequestUri() { | |
return requestUri; | |
} | |
public long getActiveTimeInMillis() { | |
return System.currentTimeMillis() - start; | |
} | |
public Date getStartTime() { | |
return new Date(start); | |
} | |
public boolean markAsStuckIfStillRunning() { | |
return this.state.compareAndSet(MonitoredThreadState.RUNNING.ordinal(), | |
MonitoredThreadState.STUCK.ordinal()); | |
} | |
public MonitoredThreadState markAsDone() { | |
int val = this.state.getAndSet(MonitoredThreadState.DONE.ordinal()); | |
return MonitoredThreadState.values()[val]; | |
} | |
boolean isMarkedAsStuck() { | |
return this.state.get() == MonitoredThreadState.STUCK.ordinal(); | |
} | |
} | |
private static class CompletedStuckThread { | |
private final String threadName; | |
private final long threadId; | |
private final long totalActiveTime; | |
public CompletedStuckThread(Thread thread, long totalActiveTime) { | |
this.threadName = thread.getName(); | |
this.threadId = thread.getId(); | |
this.totalActiveTime = totalActiveTime; | |
} | |
public String getName() { | |
return this.threadName; | |
} | |
public long getId() { | |
return this.threadId; | |
} | |
public long getTotalActiveTime() { | |
return this.totalActiveTime; | |
} | |
} | |
private enum MonitoredThreadState { | |
RUNNING, STUCK, DONE; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment