Skip to content

Instantly share code, notes, and snippets.

@rahulsom
Created February 3, 2012 20:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rahulsom/1732115 to your computer and use it in GitHub Desktop.
Save rahulsom/1732115 to your computer and use it in GitHub Desktop.
Async Http Client Infinite length connections
@Grapes([
@Grab('com.ning:async-http-client:1.7.0'),
@Grab(group = 'log4j', module = 'log4j', version = '1.2.16')
])
import org.apache.log4j.*
import com.ning.http.client.AsyncHttpClientConfig.Builder
import com.ning.http.client.*
import groovy.json.JsonSlurper
import java.util.concurrent.ExecutorService
import java.util.concurrent.Callable
import java.util.concurrent.Executors
import java.util.concurrent.TimeoutException
import groovy.util.logging.Log4j
import java.text.NumberFormat
import java.text.MessageFormat
class ConnectionSettings {
String url
String sender
String receiver
Boolean status = Boolean.FALSE
String identifier
@Override
String toString() {
"$url from $sender to $receiver"
}
}
@Log4j
class MyAsyncHandler implements AsyncHandler<String> {
ConnectionSettings connectionSettings
ConnectionManager connectionManager
public void onThrowable(Throwable t) {
if (t instanceof TimeoutException) {
if (connectionSettings.status) {
log.info "${connectionSettings.identifier}: Timeout. "
} else {
log.info "${connectionSettings.identifier}: Got marked disabled. That's all folks!"
}
} else if (t instanceof ConnectException) {
log.debug "${connectionSettings.identifier}: Connect failed. Relaunching after pause...", t
Thread.sleep(connectionManager.defaultTimeout)
} else {
println("Throwable received")
println("=" * "Throwable received".length())
t.printStackTrace()
}
}
public AsyncHandler.STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
def str = new String(bodyPart.getBodyPartBytes())
def slurper = new JsonSlurper()
try {
def result = slurper.parseText(str)
if (result) {
log.debug "Sender: ${result.sender}"
log.debug "Unique Id: ${result.uniqueId}"
String mc = result.messageContent
byte[] mBytes = mc.decodeBase64()
def mString = new String(mBytes)
log.debug "Message:\n${mString.replaceAll('\r', '\n')}\n\n"
log.info "${connectionSettings.identifier}: Message received"
}
} catch (Exception ignore) {
log.debug "Unparsable body received"
}
AsyncHandler.STATE.CONTINUE
}
public AsyncHandler.STATE onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
log.info connectionSettings.identifier + ': ' + responseStatus.statusText
AsyncHandler.STATE.CONTINUE
}
public AsyncHandler.STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception {
log.debug "headers received"
AsyncHandler.STATE.CONTINUE
}
public String onCompleted() throws Exception {
log.info connectionSettings.identifier + ': ' + 'Complete. Relaunching'
return "DONE"
}
}
@Log4j
class ConnectionManager {
private myConnections = []
def defaultTimeout = 12000
ExecutorService e = Executors.newCachedThreadPool()
def addConnection(ConnectionSettings cs) {
myConnections.push cs
startConnection(cs)
}
def startConnection(ConnectionSettings myConn) {
e.execute({launchConnection(myConn)})
}
private launchConnection(ConnectionSettings myConn) {
log.debug "Starting conn on $myConn"
Builder builder = new Builder()
builder.setCompressionEnabled(true) \
.setAllowPoolingConnection(false) \
.setConnectionTimeoutInMs(defaultTimeout) \
.setIdleConnectionInPoolTimeoutInMs(defaultTimeout) \
.setRequestTimeoutInMs(defaultTimeout) \
.build()
final AsyncHttpClient client = new AsyncHttpClient(builder.build())
log.debug "client ready"
def handler = new MyAsyncHandler(connectionSettings: myConn, connectionManager: this)
while(myConn.status) {
try {
def future = client.preparePost(myConn.url) \
.setFollowRedirects(true) \
.setBody("{'sender':'${myConn.sender}', 'receiver': '${myConn.receiver}'}".replaceAll(/\'/, '"')) \
.execute(handler)
log.debug "${myConn.identifier}: Launched new connection to ${myConn.url} from ${myConn.sender} to ${myConn.receiver}"
future.get()
} catch (Exception e) {
log.debug "Connection failed"
Thread.sleep defaultTimeout
}
}
}
}
def simple = new PatternLayout("%d{ABSOLUTE} %-5p [%c{1}] %m%n")
BasicConfigurator.resetConfiguration()
BasicConfigurator.configure(new ConsoleAppender(simple))
LogManager.rootLogger.level = Level.INFO
def cm = new ConnectionManager()
def allConns = []
def connLimit = 200
for (int i = 0; i < connLimit; i ++) {
def conn = new ConnectionSettings(
url: 'http://localhost:8080/healthdock/atmosphere/poll',
sender: '18ffd4e2-40a4-4c2a-802f-c00f8987762e',
receiver: '302e48f7-7285-4bd3-87ca-5803e80608bc',
identifier: MessageFormat.format("{0,number,0000}",i),
status: Boolean.TRUE
)
cm.addConnection conn
allConns << conn
}
println "After start"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment