Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save shrijeet/2950832 to your computer and use it in GitHub Desktop.
Save shrijeet/2950832 to your computer and use it in GitHub Desktop.
Asynchbase rpc timeout first cut
From 2fd88425c5848059fbefc7f85ce14858bbbe7775 Mon Sep 17 00:00:00 2001
From: Shrijeet Paliwal <shrijeet@rocketfuel.com>
Date: Mon, 18 Jun 2012 14:25:21 -0700
Subject: [PATCH] Support client RPC operation level timeout
---
Makefile | 1 +
src/GetRequest.java | 24 ++++++++++++++++++++++--
src/HBaseClient.java | 16 ++++++++++++++++
src/HBaseRpc.java | 24 ++++++++++++++++++++++++
src/RegionClient.java | 10 ++++++++++
src/RpcTimeoutException.java | 18 ++++++++++++++++++
6 files changed, 91 insertions(+), 2 deletions(-)
create mode 100644 src/RpcTimeoutException.java
diff --git a/Makefile b/Makefile
index 99d9cb5..f9eebe6 100644
--- a/Makefile
+++ b/Makefile
@@ -65,6 +65,7 @@ asynchbase_SOURCES := \
src/RemoteException.java \
src/RowLock.java \
src/RowLockRequest.java \
+ src/RpcTimeoutException.java \
src/Scanner.java \
src/SingletonList.java \
src/TableNotFoundException.java \
diff --git a/src/GetRequest.java b/src/GetRequest.java
index 1f15a5d..ee3d1ee 100644
--- a/src/GetRequest.java
+++ b/src/GetRequest.java
@@ -39,7 +39,8 @@ import org.jboss.netty.buffer.ChannelBuffer;
*/
public final class GetRequest extends HBaseRpc
implements HBaseRpc.HasTable, HBaseRpc.HasKey,
- HBaseRpc.HasFamily, HBaseRpc.HasQualifiers {
+ HBaseRpc.HasFamily, HBaseRpc.HasQualifiers,
+ HBaseRpc.SupportsRpcTimeout {
private static final byte[] GET = new byte[] { 'g', 'e', 't' };
private static final byte[] EXISTS =
@@ -48,15 +49,29 @@ public final class GetRequest extends HBaseRpc
private byte[] family; // TODO(tsuna): Handle multiple families?
private byte[][] qualifiers;
private long lockid = RowLock.NO_LOCK;
+ private int rpctimeout = 0;
/**
* Constructor.
* <strong>These byte arrays will NOT be copied.</strong>
* @param table The non-empty name of the table to use.
* @param key The row key to get in that table.
+ * @param rpctimeout The RPC timeout in milliseconds for this get request.
+ * Value of 0 means RPC timeout is disabled.
*/
- public GetRequest(final byte[] table, final byte[] key) {
+ public GetRequest(final byte[] table, final byte[] key, int rpctimeout) {
super(GET, table, key);
+ this.rpctimeout = rpctimeout;
+ }
+
+ /**
+ * Constructor.
+ * <strong>These byte arrays will NOT be copied.</strong>
+ * @param table The non-empty name of the table to use.
+ * @param key The row key to get in that table.
+ */
+ public GetRequest(final byte[] table, final byte[] key) {
+ this(table, key, 0);
}
/**
@@ -182,6 +197,11 @@ public final class GetRequest extends HBaseRpc
return qualifiers;
}
+ @Override
+ public int rpctimeout() {
+ return rpctimeout;
+ }
+
public String toString() {
final String klass = method() == GET ? "GetRequest" : "Exists";
return super.toStringWithQualifiers(klass, family, qualifiers);
diff --git a/src/HBaseClient.java b/src/HBaseClient.java
index 2a85755..c56b2ca 100644
--- a/src/HBaseClient.java
+++ b/src/HBaseClient.java
@@ -1053,6 +1053,20 @@ public final class HBaseClient {
if (client != null && client.isAlive()) {
request.setRegion(region);
final Deferred<Object> d = request.getDeferred();
+ if (request instanceof HBaseRpc.SupportsRpcTimeout
+ && ((HBaseRpc.SupportsRpcTimeout) request).rpctimeout() != 0) {
+ final class RpcTimer implements TimerTask {
+ public void run(final Timeout timeout) {
+ request.callback(new RpcTimeoutException(request));
+ }
+ public String toString() {
+ return "RPC timer for " + request;
+ }
+ };
+ request.rpctimeout = timer.newTimeout(new RpcTimer(),
+ ((HBaseRpc.SupportsRpcTimeout) request).rpctimeout(),
+ MILLISECONDS);
+ }
client.sendRpc(request);
return d;
}
@@ -1060,6 +1074,8 @@ public final class HBaseClient {
return locateRegion(table, key).addBothDeferring(new RetryRpc());
}
+
+
/**
* Returns how many lookups in {@code -ROOT-} were performed.
* <p>
diff --git a/src/HBaseRpc.java b/src/HBaseRpc.java
index dd5654c..35ae91f 100644
--- a/src/HBaseRpc.java
+++ b/src/HBaseRpc.java
@@ -29,6 +29,7 @@ package org.hbase.async;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.util.CharsetUtil;
+import org.jboss.netty.util.Timeout;
import com.stumbleupon.async.Deferred;
@@ -136,6 +137,15 @@ public abstract class HBaseRpc {
public long timestamp();
}
+ /**
+ * An RPC that supports RPC timeout
+ * @since 1.3.1
+ */
+ public interface SupportsRpcTimeout {
+ /** Returns the RPC timeout in milliseconds to be applied to this RPC. */
+ public int rpctimeout();
+ }
+
/*
* This class, although it's part of the public API, is mostly here to make
* it easier for this library to manipulate the HBase RPC protocol.
@@ -348,6 +358,11 @@ public abstract class HBaseRpc {
byte attempt; // package-private for RegionClient and HBaseClient only.
/**
+ * RPC timeout, null if this RPC does not support RPC timeouts.
+ */
+ Timeout rpctimeout;
+
+ /**
* Package private constructor for RPCs that aren't for any region.
* @param method The name of the method to invoke on the RegionServer.
*/
@@ -430,6 +445,15 @@ public abstract class HBaseRpc {
d.callback(result);
}
+ /*
+ * Cancels the RPC timeout.
+ */
+ final void cancelRpcTimout() {
+ if (rpctimeout != null) {
+ rpctimeout.cancel();
+ }
+ }
+
/** Checks whether or not this RPC has a Deferred without creating one. */
final boolean hasDeferred() {
return deferred != null;
diff --git a/src/RegionClient.java b/src/RegionClient.java
index 5b21653..d36b6f3 100644
--- a/src/RegionClient.java
+++ b/src/RegionClient.java
@@ -1109,6 +1109,16 @@ final class RegionClient extends ReplayingDecoder<VoidEnum> {
}
try {
+ if (rpc instanceof HBaseRpc.SupportsRpcTimeout) {
+ // In case timeout has expired, the response should just be silently
+ // ignored as no processing / handling of it can occur, because we
+ // already gave a RpcTimeoutException to the Deferred.
+ if (rpc.rpctimeout.isExpired()) {
+ return null;
+ } else {
+ rpc.cancelRpcTimout();
+ }
+ }
rpc.callback(decoded);
} catch (Exception e) {
LOG.error("Unexpected exception while handling RPC #" + rpcid
diff --git a/src/RpcTimeoutException.java b/src/RpcTimeoutException.java
new file mode 100644
index 0000000..db9dd6b
--- /dev/null
+++ b/src/RpcTimeoutException.java
@@ -0,0 +1,18 @@
+package org.hbase.async;
+
+public class RpcTimeoutException extends RecoverableException {
+
+ final HBaseRpc timedout_rpc;
+
+ RpcTimeoutException(HBaseRpc rpc) {
+ super("RPC Timed out " + rpc);
+ this.timedout_rpc = rpc;
+ }
+
+ public HBaseRpc getTimedOutRpc() {
+ return timedout_rpc;
+ }
+
+ private static final long serialVersionUID = -4253496236526944416L;
+
+}
--
1.7.7.5 (Apple Git-26)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment