FAQ
HBASE-15957 RpcClientImpl.close never ends in some circumstances

Signed-off-by: Enis Soztutar <enis@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4418ba2f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4418ba2f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4418ba2f

Branch: refs/heads/branch-1.1
Commit: 4418ba2f8f10ee08d82aee9f311e0e1b6bd7be5e
Parents: ac02348
Author: Sergey Soldatov <ssa@apache.org>
Authored: Sun Jun 5 23:46:03 2016 -0700
Committer: Enis Soztutar <enis@apache.org>
Committed: Tue Jun 7 11:42:54 2016 -0700

----------------------------------------------------------------------
  .../apache/hadoop/hbase/ipc/RpcClientImpl.java | 5 ++-
  .../hbase/ipc/IntegrationTestRpcClient.java | 35 ++++++++++++++++----
  2 files changed, 31 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4418ba2f/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index 56ae06f..6d69595 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -1146,9 +1146,8 @@ public class RpcClientImpl extends AbstractRpcClient {
      }
      if (connsToClose != null) {
        for (Connection conn : connsToClose) {
- if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) {
- conn.close();
- }
+ conn.markClosed(new InterruptedIOException("RpcClient is closing"));
+ conn.close();
        }
      }
      // wait until all connections are closed

http://git-wip-us.apache.org/repos/asf/hbase/blob/4418ba2f/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
index 1b425b8..c0b7a43 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -18,6 +18,7 @@

  package org.apache.hadoop.hbase.ipc;

+import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD;
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertNotNull;
  import static org.junit.Assert.assertTrue;
@@ -41,12 +42,6 @@ import org.apache.hadoop.hbase.CellScanner;
  import org.apache.hadoop.hbase.HBaseConfiguration;
  import org.apache.hadoop.hbase.HConstants;
  import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
-import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
-import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcClientImpl;
-import org.apache.hadoop.hbase.ipc.RpcScheduler;
-import org.apache.hadoop.hbase.ipc.RpcServer;
  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
@@ -290,6 +285,7 @@ public class IntegrationTestRpcClient {
    static class SimpleClient extends Thread {
      AbstractRpcClient rpcClient;
      AtomicBoolean running = new AtomicBoolean(true);
+ AtomicBoolean sending = new AtomicBoolean(false);
      AtomicReference<Throwable> exception = new AtomicReference<>(null);
      Cluster cluster;
      String id;
@@ -319,6 +315,7 @@ public class IntegrationTestRpcClient {
            if (address == null) {
              throw new IOException("Listener channel is closed");
            }
+ sending.set(true);
            ret = (EchoResponseProto)
                rpcClient.callBlockingMethod(md, null, param, ret, user, address);
          } catch (Exception e) {
@@ -340,6 +337,9 @@ public class IntegrationTestRpcClient {
      void stopRunning() {
        running.set(false);
      }
+ boolean isSending() {
+ return sending.get();
+ }

      void rethrowException() throws Throwable {
        if (exception.get() != null) {
@@ -348,6 +348,29 @@ public class IntegrationTestRpcClient {
      }
    }

+ /*
+ Test that not started connections are successfully removed from connection pool when
+ rpc client is closing.
+ */
+ @Test (timeout = 30000)
+ public void testRpcWithWriteThread() throws IOException, InterruptedException {
+ LOG.info("Starting test");
+ Cluster cluster = new Cluster(1, 1);
+ cluster.startServer();
+ conf.setBoolean(SPECIFIC_WRITE_THREAD, true);
+ for(int i = 0; i <1000; i++) {
+ AbstractRpcClient rpcClient = createRpcClient(conf, true);
+ SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1");
+ client.start();
+ while(!client.isSending()) {
+ Thread.sleep(1);
+ }
+ client.stopRunning();
+ rpcClient.close();
+ }
+ }
+
+
    @Test (timeout = 900000)
    public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable {
      for (int i = 0; i < numIterations; i++) {

Search Discussions

Discussion Posts

Previous

Follow ups

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 30 of 96 | next ›
Discussion Overview
groupcommits @
categorieshbase, hadoop
postedJun 1, '16 at 5:35p
activeJun 11, '16 at 5:04a
posts96
users9
websitehbase.apache.org

People

Translate

site design / logo © 2019 Grokbase