Skip to content

Instantly share code, notes, and snippets.

@dblevins
Created August 21, 2012 18:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dblevins/3418341 to your computer and use it in GitHub Desktop.
Save dblevins/3418341 to your computer and use it in GitHub Desktop.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openejb.util;
import org.apache.openejb.loader.Options;
import org.apache.openejb.util.executor.OfferRejectedExecutionHandler;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @version $Rev$ $Date$
*/
public class ExecutorBuilder {
private int size = 10;
private String prefix = "Pool";
private ThreadFactory threadFactory;
private RejectedExecutionHandler rejectedExecutionHandler;
public ExecutorBuilder() {
}
public ExecutorBuilder size(int size) {
this.size = size;
return this;
}
public ExecutorBuilder prefix(String prefix) {
this.prefix = prefix;
return this;
}
public ExecutorBuilder threadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}
public ExecutorBuilder rejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) {
this.rejectedExecutionHandler = rejectedExecutionHandler;
return this;
}
public ThreadPoolExecutor build(Options options) {
final int corePoolSize = options.get(prefix + ".CorePoolSize", size);
// Default setting is for a fixed pool size, MaximumPoolSize==CorePoolSize
final int maximumPoolSize = Math.max(options.get(prefix + ".MaximumPoolSize", corePoolSize), corePoolSize);
// Default QueueSize is bounded using the MaximumPoolSize
final int size = options.get(prefix + ".QueueSize", maximumPoolSize);
// Keep Threads inactive threads alive for 60 seconds by default
final Duration keepAliveTime = options.get(prefix + ".KeepAliveTime", new Duration(60, TimeUnit.SECONDS));
// All threads can be timed out by default
final boolean allowCoreThreadTimeout = options.get(prefix + ".AllowCoreThreadTimeOut", true);
// If the user explicitly set the QueueSize to 0, we default QueueType to SYNCHRONOUS
final QueueType defaultQueueType = (size == 0) ? QueueType.SYNCHRONOUS : QueueType.LINKED;
final BlockingQueue queue = options.get(prefix + ".QueueType", defaultQueueType).create(options, prefix, size);
ThreadFactory factory = this.threadFactory;
if (factory == null) {
factory = new DaemonThreadFactory(prefix);
}
RejectedExecutionHandler handler = this.rejectedExecutionHandler;
if (handler == null) {
final Duration duration = options.get(prefix + ".OfferTimeout", new Duration(30, TimeUnit.SECONDS));
handler = new OfferRejectedExecutionHandler(duration);
}
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize
, maximumPoolSize
, keepAliveTime.getTime()
, keepAliveTime.getUnit() != null ? keepAliveTime.getUnit() : TimeUnit.SECONDS
, queue
, factory
, handler
);
threadPoolExecutor.allowCoreThreadTimeOut(allowCoreThreadTimeout);
return threadPoolExecutor;
}
/**
* @version $Rev$ $Date$
*/
public static enum QueueType {
ARRAY,
LINKED,
PRIORITY,
SYNCHRONOUS;
public BlockingQueue create(Options options, final String prefix, final int queueSize) {
switch (this) {
case ARRAY: {
return new ArrayBlockingQueue(queueSize);
}
case LINKED: {
return new LinkedBlockingQueue(queueSize);
}
case PRIORITY: {
return new PriorityBlockingQueue();
}
case SYNCHRONOUS: {
return new SynchronousQueue(options.get(prefix + ".QueueFair", false));
}
default: {
// The Options class will throw an error if the user supplies an unknown enum string
// The only way we can reach this is if we add a new QueueType element and forget to
// implement it in the above switch statement.
throw new IllegalArgumentException("Unknown QueueType type: " + this);
}
}
}
}
}
package org.apache.openejb.util.executor;
import org.apache.openejb.util.Duration;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @version $Rev$ $Date$
*/
public class OfferRejectedExecutionHandler implements RejectedExecutionHandler {
private long timeout = 30;
private TimeUnit seconds = TimeUnit.SECONDS;
public OfferRejectedExecutionHandler(Duration duration) {
this(duration.getTime(), duration.getUnit() == null ? TimeUnit.SECONDS : duration.getUnit());
}
public OfferRejectedExecutionHandler(long timeout, TimeUnit timeUnit) {
if (timeout <= 0) throw new IllegalArgumentException("timeout must be greater than zero");
if (timeUnit == null) throw new IllegalArgumentException("TimeUnit must not be null");
this.timeout = timeout;
this.seconds = timeUnit;
}
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor tpe) {
if (null == r || null == tpe || tpe.isShutdown() || tpe.isTerminated() || tpe.isTerminating()) {
return;
}
try {
if (!tpe.getQueue().offer(r, timeout, seconds)) {
throw new RejectedExecutionException("Timeout waiting for executor slot: waited " + timeout + " " + seconds.toString().toLowerCase());
}
} catch (InterruptedException e) {
throw new RejectedExecutionException("Interrupted waiting for executor slot");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment