Created
November 11, 2014 07:27
-
-
Save mingmasplace/ce544ab21bbc4ff17564 to your computer and use it in GitHub Desktop.
HADOOP-11252 patch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java | |
index e4ee78a..bedfc12 100644 | |
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java | |
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java | |
@@ -208,6 +208,11 @@ | |
"ipc.client.tcpnodelay"; | |
/** Defalt value for IPC_CLIENT_TCPNODELAY_KEY */ | |
public static final boolean IPC_CLIENT_TCPNODELAY_DEFAULT = true; | |
+ public static final String IPC_CLIENT_CALL_TIMEOUT_MS_KEY = | |
+ "ipc.client.call.timeout.ms"; | |
+ /** Default value for IPC_CLIENT_CALL_TIMEOUT_MS_DEFAULT */ | |
+ public static final int IPC_CLIENT_CALL_TIMEOUT_MS_DEFAULT = 0; | |
+ | |
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */ | |
public static final String IPC_SERVER_LISTEN_QUEUE_SIZE_KEY = | |
"ipc.server.listen.queue.size"; | |
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java | |
index 84fe552..b47460b 100644 | |
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java | |
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java | |
@@ -239,7 +239,8 @@ final public static int getTimeout(Configuration conf) { | |
if (!conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true)) { | |
return getPingInterval(conf); | |
} | |
- return -1; | |
+ return conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CALL_TIMEOUT_MS_KEY, | |
+ CommonConfigurationKeys.IPC_CLIENT_CALL_TIMEOUT_MS_DEFAULT); | |
} | |
/** | |
* set the connection timeout value in configuration | |
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java | |
index 40f6515..c861899 100644 | |
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java | |
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java | |
@@ -347,7 +347,8 @@ public RpcErrorCodeProto getRpcErrorCodeProto() { | |
long clientVersion, | |
InetSocketAddress addr, Configuration conf, | |
long connTimeout) throws IOException { | |
- return waitForProtocolProxy(protocol, clientVersion, addr, conf, 0, null, connTimeout); | |
+ return waitForProtocolProxy(protocol, clientVersion, addr, conf, | |
+ Client.getTimeout(conf), null, connTimeout); | |
} | |
/** | |
@@ -485,7 +486,8 @@ public RpcErrorCodeProto getRpcErrorCodeProto() { | |
Configuration conf, | |
SocketFactory factory) throws IOException { | |
return getProtocolProxy( | |
- protocol, clientVersion, addr, ticket, conf, factory, 0, null); | |
+ protocol, clientVersion, addr, ticket, conf, factory, | |
+ Client.getTimeout(conf), null); | |
} | |
/** | |
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java | |
index c1b1bfb..59b3159 100644 | |
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java | |
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java | |
@@ -38,6 +38,7 @@ | |
import java.net.ConnectException; | |
import java.net.InetAddress; | |
import java.net.InetSocketAddress; | |
+import java.net.SocketTimeoutException; | |
import java.util.Arrays; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.CyclicBarrier; | |
@@ -51,6 +52,7 @@ | |
import org.apache.hadoop.HadoopIllegalArgumentException; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.CommonConfigurationKeys; | |
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic; | |
import org.apache.hadoop.io.UTF8; | |
import org.apache.hadoop.io.Writable; | |
import org.apache.hadoop.io.retry.RetryPolicies; | |
@@ -1013,6 +1015,93 @@ public void testRpcMetrics() throws Exception { | |
} | |
} | |
+ private void testRPCTimeout(int callTimeout) | |
+ throws Exception { | |
+ Configuration conf = new Configuration(); | |
+ conf.setLong(CommonConfigurationKeysPublic.IPC_CLIENT_CALL_TIMEOUT_MS_KEY, | |
+ callTimeout); | |
+ final Server server = new RPC.Builder(conf) | |
+ .setProtocol(TestProtocol.class).setInstance(new TestImpl()) | |
+ .setBindAddress(ADDRESS).setPort(0) | |
+ .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true) | |
+ .build(); | |
+ server.start(); | |
+ InetSocketAddress addr = NetUtils.getConnectAddress(server); | |
+ | |
+ final TestProtocol proxy = | |
+ RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf); | |
+ | |
+ try { | |
+ // start a sleep RPC call to consume the only handler thread. | |
+ // Start another sleep RPC call to make the rpc call queue full. | |
+ SleepRPCClient client1 = new SleepRPCClient(proxy); | |
+ client1.start(); | |
+ | |
+ while (!client1.isAlive()) { | |
+ try { | |
+ Thread.sleep(1000); | |
+ } catch (InterruptedException e) {} | |
+ } | |
+ | |
+ // start a sleep RPC call to consume the only handler thread. | |
+ // Start another sleep RPC call to make the rpc call queue full. | |
+ SleepRPCClient client2 = new SleepRPCClient(proxy); | |
+ client2.start(); | |
+ | |
+ if (callTimeout > 0) { | |
+ // If RPC didn't timeout, the test will fail due to unit test timeout | |
+ while (!client2.receivedSocketTimeoutException()) { | |
+ try { | |
+ Thread.sleep(callTimeout); | |
+ } catch (InterruptedException e) {} | |
+ } | |
+ } else { | |
+ Thread.sleep(callTimeout*2); | |
+ assertTrue("RPC timed out", | |
+ !client2.receivedSocketTimeoutException()); | |
+ } | |
+ | |
+ client1.interrupt(); | |
+ client1.join(2000); | |
+ client2.interrupt(); | |
+ client2.join(2000); | |
+ } finally { | |
+ if (proxy != null) RPC.stopProxy(proxy); | |
+ server.stop(); | |
+ } | |
+ } | |
+ | |
+ @Test (timeout=60000) | |
+ public void testZeroRPCTimeout() throws Exception { | |
+ testRPCTimeout(0); | |
+ } | |
+ | |
+ @Test (timeout=60000) | |
+ public void testPositiveRPCTimeout() throws Exception { | |
+ testRPCTimeout(1000); | |
+ } | |
+ | |
+ private static class SleepRPCClient extends Thread { | |
+ | |
+ TestProtocol proxy; | |
+ boolean gotSocketTimeoutException = false; | |
+ public SleepRPCClient(TestProtocol proxy) { | |
+ this.proxy = proxy; | |
+ } | |
+ @Override | |
+ public void run() { | |
+ try { | |
+ proxy.sleep(100000); | |
+ } catch (SocketTimeoutException se) { | |
+ gotSocketTimeoutException = true; | |
+ } catch (Exception ex) { | |
+ } | |
+ } | |
+ public boolean receivedSocketTimeoutException() { | |
+ return gotSocketTimeoutException; | |
+ } | |
+ } | |
+ | |
public static void main(String[] args) throws IOException { | |
new TestRPC().testCallsInternal(conf); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment