Skip to content

Instantly share code, notes, and snippets.

@mingmasplace
Created November 11, 2014 07:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mingmasplace/ce544ab21bbc4ff17564 to your computer and use it in GitHub Desktop.
Save mingmasplace/ce544ab21bbc4ff17564 to your computer and use it in GitHub Desktop.
HADOOP-11252 patch
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
index e4ee78a..bedfc12 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
@@ -208,6 +208,11 @@
"ipc.client.tcpnodelay";
/** Defalt value for IPC_CLIENT_TCPNODELAY_KEY */
public static final boolean IPC_CLIENT_TCPNODELAY_DEFAULT = true;
+ public static final String IPC_CLIENT_CALL_TIMEOUT_MS_KEY =
+ "ipc.client.call.timeout.ms";
+ /** Default value for IPC_CLIENT_CALL_TIMEOUT_MS_DEFAULT */
+ public static final int IPC_CLIENT_CALL_TIMEOUT_MS_DEFAULT = 0;
+
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final String IPC_SERVER_LISTEN_QUEUE_SIZE_KEY =
"ipc.server.listen.queue.size";
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 84fe552..b47460b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -239,7 +239,8 @@ final public static int getTimeout(Configuration conf) {
if (!conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true)) {
return getPingInterval(conf);
}
- return -1;
+ return conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CALL_TIMEOUT_MS_KEY,
+ CommonConfigurationKeys.IPC_CLIENT_CALL_TIMEOUT_MS_DEFAULT);
}
/**
* set the connection timeout value in configuration
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
index 40f6515..c861899 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
@@ -347,7 +347,8 @@ public RpcErrorCodeProto getRpcErrorCodeProto() {
long clientVersion,
InetSocketAddress addr, Configuration conf,
long connTimeout) throws IOException {
- return waitForProtocolProxy(protocol, clientVersion, addr, conf, 0, null, connTimeout);
+ return waitForProtocolProxy(protocol, clientVersion, addr, conf,
+ Client.getTimeout(conf), null, connTimeout);
}
/**
@@ -485,7 +486,8 @@ public RpcErrorCodeProto getRpcErrorCodeProto() {
Configuration conf,
SocketFactory factory) throws IOException {
return getProtocolProxy(
- protocol, clientVersion, addr, ticket, conf, factory, 0, null);
+ protocol, clientVersion, addr, ticket, conf, factory,
+ Client.getTimeout(conf), null);
}
/**
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index c1b1bfb..59b3159 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -38,6 +38,7 @@
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
@@ -51,6 +52,7 @@
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicies;
@@ -1013,6 +1015,93 @@ public void testRpcMetrics() throws Exception {
}
}
+ private void testRPCTimeout(int callTimeout)
+ throws Exception {
+ Configuration conf = new Configuration();
+ conf.setLong(CommonConfigurationKeysPublic.IPC_CLIENT_CALL_TIMEOUT_MS_KEY,
+ callTimeout);
+ final Server server = new RPC.Builder(conf)
+ .setProtocol(TestProtocol.class).setInstance(new TestImpl())
+ .setBindAddress(ADDRESS).setPort(0)
+ .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true)
+ .build();
+ server.start();
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
+
+ final TestProtocol proxy =
+ RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf);
+
+ try {
+ // start a sleep RPC call to consume the only handler thread.
+ // Start another sleep RPC call to make the rpc call queue full.
+ SleepRPCClient client1 = new SleepRPCClient(proxy);
+ client1.start();
+
+ while (!client1.isAlive()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {}
+ }
+
+ // start a sleep RPC call to consume the only handler thread.
+ // Start another sleep RPC call to make the rpc call queue full.
+ SleepRPCClient client2 = new SleepRPCClient(proxy);
+ client2.start();
+
+ if (callTimeout > 0) {
+ // If RPC didn't timeout, the test will fail due to unit test timeout
+ while (!client2.receivedSocketTimeoutException()) {
+ try {
+ Thread.sleep(callTimeout);
+ } catch (InterruptedException e) {}
+ }
+ } else {
+ Thread.sleep(callTimeout*2);
+ assertTrue("RPC timed out",
+ !client2.receivedSocketTimeoutException());
+ }
+
+ client1.interrupt();
+ client1.join(2000);
+ client2.interrupt();
+ client2.join(2000);
+ } finally {
+ if (proxy != null) RPC.stopProxy(proxy);
+ server.stop();
+ }
+ }
+
+ @Test (timeout=60000)
+ public void testZeroRPCTimeout() throws Exception {
+ testRPCTimeout(0);
+ }
+
+ @Test (timeout=60000)
+ public void testPositiveRPCTimeout() throws Exception {
+ testRPCTimeout(1000);
+ }
+
+ private static class SleepRPCClient extends Thread {
+
+ TestProtocol proxy;
+ boolean gotSocketTimeoutException = false;
+ public SleepRPCClient(TestProtocol proxy) {
+ this.proxy = proxy;
+ }
+ @Override
+ public void run() {
+ try {
+ proxy.sleep(100000);
+ } catch (SocketTimeoutException se) {
+ gotSocketTimeoutException = true;
+ } catch (Exception ex) {
+ }
+ }
+ public boolean receivedSocketTimeoutException() {
+ return gotSocketTimeoutException;
+ }
+ }
+
public static void main(String[] args) throws IOException {
new TestRPC().testCallsInternal(conf);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment