Created
February 3, 2012 20:04
-
-
Save rahulsom/1732115 to your computer and use it in GitHub Desktop.
Async Http Client Infinite length connections
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Grapes([ | |
@Grab('com.ning:async-http-client:1.7.0'), | |
@Grab(group = 'log4j', module = 'log4j', version = '1.2.16') | |
]) | |
import org.apache.log4j.* | |
import com.ning.http.client.AsyncHttpClientConfig.Builder | |
import com.ning.http.client.* | |
import groovy.json.JsonSlurper | |
import java.util.concurrent.ExecutorService | |
import java.util.concurrent.Callable | |
import java.util.concurrent.Executors | |
import java.util.concurrent.TimeoutException | |
import groovy.util.logging.Log4j | |
import java.text.NumberFormat | |
import java.text.MessageFormat | |
class ConnectionSettings { | |
String url | |
String sender | |
String receiver | |
Boolean status = Boolean.FALSE | |
String identifier | |
@Override | |
String toString() { | |
"$url from $sender to $receiver" | |
} | |
} | |
@Log4j | |
class MyAsyncHandler implements AsyncHandler<String> { | |
ConnectionSettings connectionSettings | |
ConnectionManager connectionManager | |
public void onThrowable(Throwable t) { | |
if (t instanceof TimeoutException) { | |
if (connectionSettings.status) { | |
log.info "${connectionSettings.identifier}: Timeout. " | |
} else { | |
log.info "${connectionSettings.identifier}: Got marked disabled. That's all folks!" | |
} | |
} else if (t instanceof ConnectException) { | |
log.debug "${connectionSettings.identifier}: Connect failed. Relaunching after pause...", t | |
Thread.sleep(connectionManager.defaultTimeout) | |
} else { | |
println("Throwable received") | |
println("=" * "Throwable received".length()) | |
t.printStackTrace() | |
} | |
} | |
public AsyncHandler.STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception { | |
def str = new String(bodyPart.getBodyPartBytes()) | |
def slurper = new JsonSlurper() | |
try { | |
def result = slurper.parseText(str) | |
if (result) { | |
log.debug "Sender: ${result.sender}" | |
log.debug "Unique Id: ${result.uniqueId}" | |
String mc = result.messageContent | |
byte[] mBytes = mc.decodeBase64() | |
def mString = new String(mBytes) | |
log.debug "Message:\n${mString.replaceAll('\r', '\n')}\n\n" | |
log.info "${connectionSettings.identifier}: Message received" | |
} | |
} catch (Exception ignore) { | |
log.debug "Unparsable body received" | |
} | |
AsyncHandler.STATE.CONTINUE | |
} | |
public AsyncHandler.STATE onStatusReceived(HttpResponseStatus responseStatus) throws Exception { | |
log.info connectionSettings.identifier + ': ' + responseStatus.statusText | |
AsyncHandler.STATE.CONTINUE | |
} | |
public AsyncHandler.STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception { | |
log.debug "headers received" | |
AsyncHandler.STATE.CONTINUE | |
} | |
public String onCompleted() throws Exception { | |
log.info connectionSettings.identifier + ': ' + 'Complete. Relaunching' | |
return "DONE" | |
} | |
} | |
@Log4j | |
class ConnectionManager { | |
private myConnections = [] | |
def defaultTimeout = 12000 | |
ExecutorService e = Executors.newCachedThreadPool() | |
def addConnection(ConnectionSettings cs) { | |
myConnections.push cs | |
startConnection(cs) | |
} | |
def startConnection(ConnectionSettings myConn) { | |
e.execute({launchConnection(myConn)}) | |
} | |
private launchConnection(ConnectionSettings myConn) { | |
log.debug "Starting conn on $myConn" | |
Builder builder = new Builder() | |
builder.setCompressionEnabled(true) \ | |
.setAllowPoolingConnection(false) \ | |
.setConnectionTimeoutInMs(defaultTimeout) \ | |
.setIdleConnectionInPoolTimeoutInMs(defaultTimeout) \ | |
.setRequestTimeoutInMs(defaultTimeout) \ | |
.build() | |
final AsyncHttpClient client = new AsyncHttpClient(builder.build()) | |
log.debug "client ready" | |
def handler = new MyAsyncHandler(connectionSettings: myConn, connectionManager: this) | |
while(myConn.status) { | |
try { | |
def future = client.preparePost(myConn.url) \ | |
.setFollowRedirects(true) \ | |
.setBody("{'sender':'${myConn.sender}', 'receiver': '${myConn.receiver}'}".replaceAll(/\'/, '"')) \ | |
.execute(handler) | |
log.debug "${myConn.identifier}: Launched new connection to ${myConn.url} from ${myConn.sender} to ${myConn.receiver}" | |
future.get() | |
} catch (Exception e) { | |
log.debug "Connection failed" | |
Thread.sleep defaultTimeout | |
} | |
} | |
} | |
} | |
def simple = new PatternLayout("%d{ABSOLUTE} %-5p [%c{1}] %m%n") | |
BasicConfigurator.resetConfiguration() | |
BasicConfigurator.configure(new ConsoleAppender(simple)) | |
LogManager.rootLogger.level = Level.INFO | |
def cm = new ConnectionManager() | |
def allConns = [] | |
def connLimit = 200 | |
for (int i = 0; i < connLimit; i ++) { | |
def conn = new ConnectionSettings( | |
url: 'http://localhost:8080/healthdock/atmosphere/poll', | |
sender: '18ffd4e2-40a4-4c2a-802f-c00f8987762e', | |
receiver: '302e48f7-7285-4bd3-87ca-5803e80608bc', | |
identifier: MessageFormat.format("{0,number,0000}",i), | |
status: Boolean.TRUE | |
) | |
cm.addConnection conn | |
allConns << conn | |
} | |
println "After start" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment