FAQ
Repository: camel
Updated Branches:
   refs/heads/camel-2.16.x f17b51d18 -> 7d1f61759
   refs/heads/camel-2.17.x 900cf6947 -> df2a31a4b
   refs/heads/master ddb852cdf -> 7c4dd0b4f


CAMEL-10050: Routing slip no longer caches error handlers.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7c4dd0b4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7c4dd0b4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7c4dd0b4

Branch: refs/heads/master
Commit: 7c4dd0b4f6ecd4840e4ccdbf1d7c28f2e8cb5691
Parents: ddb852c
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Sun Jun 12 14:21:46 2016 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Sun Jun 12 14:21:46 2016 +0200

----------------------------------------------------------------------
  .../camel/processor/DefaultErrorHandler.java | 4 ++
  .../org/apache/camel/processor/RoutingSlip.java | 65 +++++++++-----------
  .../util/AsyncProcessorConverterHelper.java | 10 ++-
  .../camel/issues/RoutingSlipMemoryLeakTest.java | 15 +----
  4 files changed, 44 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7c4dd0b4/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
index 7f94887..f6dc784 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
@@ -69,4 +69,8 @@ public class DefaultErrorHandler extends RedeliveryErrorHandler {
          return "DefaultErrorHandler[" + output + "]";
      }

+ public Processor getDeadLetterProcessor() {
+ return deadLetter;
+ }
+
  }

http://git-wip-us.apache.org/repos/asf/camel/blob/7c4dd0b4/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index d2d46af..c081d46 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -17,8 +17,6 @@
  package org.apache.camel.processor;

  import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;

  import org.apache.camel.AsyncCallback;
  import org.apache.camel.AsyncProcessor;
@@ -44,7 +42,6 @@ import org.apache.camel.spi.RouteContext;
  import org.apache.camel.support.ServiceSupport;
  import org.apache.camel.util.AsyncProcessorHelper;
  import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.KeyValueHolder;
  import org.apache.camel.util.MessageHelper;
  import org.apache.camel.util.ObjectHelper;
  import org.apache.camel.util.ServiceHelper;
@@ -73,20 +70,6 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
      protected Expression expression;
      protected String uriDelimiter;
      protected final CamelContext camelContext;
- private final ConcurrentMap<PreparedErrorHandler, AsyncProcessor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, AsyncProcessor>();
-
- /**
- * Class that represents prepared fine grained error handlers when processing routingslip/dynamic-router exchanges
- * <p/>
- * This is similar to how multicast processor does.
- */
- static final class PreparedErrorHandler extends KeyValueHolder<String, Processor> {
-
- PreparedErrorHandler(String key, Processor value) {
- super(key, value);
- }
-
- }

      /**
       * The iterator to be used for retrieving the next routing slip(s) to be used.
@@ -336,16 +319,6 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
              // this is needed to support redelivery on that output alone and not doing redelivery
              // for the entire routingslip/dynamic-router block again which will start from scratch again

- // create key for cache
- final PreparedErrorHandler key = new PreparedErrorHandler(endpoint.getEndpointUri(), processor);
-
- // lookup cached first to reuse and preserve memory
- answer = errorHandlers.get(key);
- if (answer != null) {
- log.trace("Using existing error handler for: {}", processor);
- return answer;
- }
-
              log.trace("Creating error handler for: {}", processor);
              ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder();
              // create error handler (create error handler directly to keep it light weight,
@@ -356,9 +329,6 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
                  // must start the error handler
                  ServiceHelper.startServices(answer);

- // add to cache
- errorHandlers.putIfAbsent(key, answer);
-
              } catch (Exception e) {
                  throw ObjectHelper.wrapRuntimeCamelException(e);
              }
@@ -379,13 +349,13 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace

                  // rework error handling to support fine grained error handling
                  RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
- asyncProducer = createErrorHandler(routeContext, exchange, asyncProducer, endpoint);
+ AsyncProcessor target = createErrorHandler(routeContext, exchange, asyncProducer, endpoint);

                  // set property which endpoint we send to
                  exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
                  exchange.setProperty(Exchange.SLIP_ENDPOINT, endpoint.getEndpointUri());

- return asyncProducer.process(exchange, new AsyncCallback() {
+ boolean answer = target.process(exchange, new AsyncCallback() {
                      public void done(boolean doneSync) {
                          // we only have to handle async completion of the routing slip
                          if (doneSync) {
@@ -445,9 +415,33 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace

                          // copy results back to the original exchange
                          ExchangeHelper.copyResults(original, current);
+
+ if (target instanceof DeadLetterChannel) {
+ Processor deadLetter = ((DeadLetterChannel) target).getDeadLetter();
+ try {
+ ServiceHelper.stopService(deadLetter);
+ } catch (Exception e) {
+ log.warn("Error stopping DeadLetterChannel error handler on routing slip. This exception is ignored.", e);
+ }
+ }
+
                          callback.done(false);
                      }
                  });
+
+ // stop error handler if we completed synchronously
+ if (answer) {
+ if (target instanceof DeadLetterChannel) {
+ Processor deadLetter = ((DeadLetterChannel) target).getDeadLetter();
+ try {
+ ServiceHelper.stopService(deadLetter);
+ } catch (Exception e) {
+ log.warn("Error stopping DeadLetterChannel error handler on routing slip. This exception is ignored.", e);
+ }
+ }
+ }
+
+ return answer;
              }
          });

@@ -471,14 +465,11 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
      }

      protected void doStop() throws Exception {
- ServiceHelper.stopServices(producerCache, errorHandlers);
+ ServiceHelper.stopServices(producerCache);
      }

      protected void doShutdown() throws Exception {
- ServiceHelper.stopAndShutdownServices(producerCache, errorHandlers);
-
- // only clear error handlers when shutting down
- errorHandlers.clear();
+ ServiceHelper.stopAndShutdownServices(producerCache);
      }

      public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {

http://git-wip-us.apache.org/repos/asf/camel/blob/7c4dd0b4/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java b/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java
index 6b1862e..14319ed 100644
--- a/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorConverterHelper.java
@@ -123,13 +123,21 @@ public final class AsyncProcessorConverterHelper {
                  return false;
              }

+ if (processor == null) {
+ return false;
+ }
+
              ProcessorToAsyncProcessorBridge that = (ProcessorToAsyncProcessorBridge) o;
              return processor.equals(that.processor);
          }

          @Override
          public int hashCode() {
- return processor.hashCode();
+ if (processor != null) {
+ return processor.hashCode();
+ } else {
+ return 0;
+ }
          }
      }


http://git-wip-us.apache.org/repos/asf/camel/blob/7c4dd0b4/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTest.java b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTest.java
index 7ead2b3..7129ad5 100644
--- a/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTest.java
+++ b/camel-core/src/test/java/org/apache/camel/issues/RoutingSlipMemoryLeakTest.java
@@ -16,9 +16,6 @@
   */
  package org.apache.camel.issues;

-import java.lang.reflect.Field;
-import java.util.Map;
-
  import org.apache.camel.ContextTestSupport;
  import org.apache.camel.builder.RouteBuilder;
  import org.apache.camel.processor.RoutingSlip;
@@ -40,8 +37,7 @@ public class RoutingSlipMemoryLeakTest extends ContextTestSupport {
              template.sendBody("direct:start", "message " + i);
          }
          RoutingSlip routingSlip = context.getProcessor("memory-leak", RoutingSlip.class);
- Map errorHandlers = getRoutingSlipErrorHandlers(routingSlip);
- assertEquals("Error handlers cache must contain only one value", 1, errorHandlers.size());
+ assertNotNull(routingSlip);
      }

      @Override
@@ -49,19 +45,14 @@ public class RoutingSlipMemoryLeakTest extends ContextTestSupport {
          return new RouteBuilder() {
              @Override
              public void configure() throws Exception {
+ errorHandler(deadLetterChannel("mock:dead"));
+
                  from("direct:start")
                      .routingSlip(method(SlipProvider.class)).id("memory-leak");
              }
          };
      }

- private Map<?, ?> getRoutingSlipErrorHandlers(RoutingSlip routingSlip) throws Exception {
- Field errorHandlersField = routingSlip.getClass().getDeclaredField("errorHandlers");
- errorHandlersField.setAccessible(true);
- Map errorHandlers = (Map) errorHandlersField.get(routingSlip);
- return errorHandlers;
- }
-
      public static class SlipProvider {

          public String computeSlip(String body) {

Search Discussions

Discussion Posts

Follow ups

Related Discussions

Discussion Navigation
viewthread | post
posts ‹ prev | 1 of 3 | next ›
Discussion Overview
groupcommits @
categoriescamel
postedJun 12, '16 at 12:23p
activeJun 12, '16 at 12:23p
posts3
users1
websitecamel.apache.org

1 user in discussion

Davsclaus: 3 posts

People

Translate

site design / logo © 2017 Grokbase