Skip to content

Instantly share code, notes, and snippets.

@pierre
Created December 6, 2010 22:35
Show Gist options
  • Save pierre/731112 to your computer and use it in GitHub Desktop.
Save pierre/731112 to your computer and use it in GitHub Desktop.
Handle gracefully idle connections with Scribe in the eventtracker library
diff --git a/src/main/java/com/ning/metrics/eventtracker/ScribeSender.java b/src/main/java/com/ning/metrics/eventtracker/ScribeSender.java
index e8ffdda..f9b707b 100644
--- a/src/main/java/com/ning/metrics/eventtracker/ScribeSender.java
+++ b/src/main/java/com/ning/metrics/eventtracker/ScribeSender.java
@@ -24,42 +24,65 @@ import org.apache.commons.logging.LogFactory;
import org.apache.thrift.transport.TTransportException;
import scribe.thrift.LogEntry;
import scribe.thrift.ResultCode;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* ScribeSender
* <p/>
* The class needs to be public for JMX.
*/
public class ScribeSender implements EventSender
{
private static final Log log = LogFactory.getLog(ScribeSender.class);
private final AtomicInteger connectionRetries = new AtomicInteger(0);
private ScribeClient scribeClient;
private final AtomicInteger messagesSuccessfullySent = new AtomicInteger(0);
private final AtomicInteger messagesSuccessfullySentSinceLastReconnection = new AtomicInteger(0);
private int messagesToSendBeforeReconnecting = 0;
+ private final AtomicBoolean sleeping = new AtomicBoolean(false);
+
public ScribeSender(ScribeClient scribeClient, int messagesToSendBeforeReconnecting)
{
this.scribeClient = scribeClient;
this.messagesToSendBeforeReconnecting = messagesToSendBeforeReconnecting;
+
+ // Setup a watchdog for the Scribe connection. We don't want it to keep it open forever. For instance, SLB VIP
+ // may trigger a RST if idle more than a few minutes.
+ final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, Executors.defaultThreadFactory());
+ executor.scheduleAtFixedRate(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ if (sleeping.get()) {
+ log.info("Idle connection to Scribe, re-opening it");
+ createConnection();
+ }
+ sleeping.set(true);
+ }
+ }, 0, 3, TimeUnit.MINUTES);
}
/**
* Re-initialize the connection with the Scribe endpoint.
*/
public synchronized void createConnection()
{
if (scribeClient != null) {
try {
connectionRetries.incrementAndGet();
@@ -89,20 +112,23 @@ public class ScribeSender implements EventSender
}
@Override
public boolean send(Event event) throws IOException
{
if (scribeClient == null) {
log.warn("Scribe client has not been set up correctly.");
return false;
}
+ // Tell the watchdog that we are doing something
+ sleeping.set(false);
+
ResultCode res;
// TODO: offer batch API?, see drainEvents
List<LogEntry> list = new ArrayList<LogEntry>(1);
// TODO: update Scribe to pass a Thrift directly instead of serializing it
// Has the sender specified how to send the data?
byte[] payload = event.getSerializedEvent();
// Nope, default to ObjectOutputStream
if (payload == null) {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment