Skip to content

Instantly share code, notes, and snippets.

@zubairov
Created February 3, 2011 14:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zubairov/809545 to your computer and use it in GitHub Desktop.
Save zubairov/809545 to your computer and use it in GitHub Desktop.
/*******************************************************************************
* Copyright (c) 2010 SOPERA GmbH
* All rights reserved.
* This program and the accompanying materials are made available
* under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*******************************************************************************/
package org.sopera.talend.provider;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.xml.ws.Provider;
/**
* Implementation of {@link Provider} that will queue incoming requests
* to be processed within one thread
*
* @author zubairov
*/
public class QueuedProvider<T> implements Provider<T> {
private static final int MAX_QUEUE_SIZE = 1000;
private static final int WAIT_TIMEOUT_SECONDS = 120;
@SuppressWarnings("rawtypes")
private static final BlockingQueue<QueuedRequestImpl> queue = new LinkedBlockingQueue<QueuedRequestImpl>(MAX_QUEUE_SIZE);
/**
* {@inheritDoc}
*/
public T invoke(T message) {
QueuedRequestImpl<T> context = new QueuedRequestImpl<T>(message);
boolean inserted = queue.offer(context);
if (!inserted) {
try {
context.release();
} catch (Exception e) {
e.printStackTrace();
}
throw new RuntimeException("Can't queue request, queue size of " + MAX_QUEUE_SIZE + " is exceeded");
} else {
try {
context.waitForRelease(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
if (context.getError() != null) {
throw new RuntimeException("Exception when processing request", context.getError());
}
return context.getResponse();
}
/**
* Blocking method to obtain the next message from the queue
*
* @return
* @throws InterruptedException
*/
@SuppressWarnings("unchecked")
public QueuedRequest<T> next() throws InterruptedException {
return queue.take();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment