FAQ
Repository: camel
Updated Branches:
   refs/heads/master 431ee2bdb -> 7ca0082d5


Fixed CS


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7ca0082d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7ca0082d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7ca0082d

Branch: refs/heads/master
Commit: 7ca0082d5b092e5a9e49a7b5bb36361973c5f726
Parents: af737a1
Author: Andrea Cosentino <ancosen@gmail.com>
Authored: Sun Apr 3 12:19:34 2016 +0200
Committer: Andrea Cosentino <ancosen@gmail.com>
Committed: Sun Apr 3 12:19:54 2016 +0200

----------------------------------------------------------------------
  .../camel/component/nats/NatsConsumer.java | 40 ++++++++++----------
  .../camel/component/nats/NatsProducer.java | 6 +--
  .../component/nats/NatsConsumerLoadTest.java | 9 ++---
  .../nats/NatsConsumerMaxMessagesTest.java | 3 +-
  4 files changed, 28 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7ca0082d/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index 8be0aea..31441e6 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -21,6 +21,12 @@ import java.util.Properties;
  import java.util.concurrent.ExecutorService;
  import java.util.concurrent.TimeoutException;

+import io.nats.client.Connection;
+import io.nats.client.ConnectionFactory;
+import io.nats.client.Message;
+import io.nats.client.MessageHandler;
+import io.nats.client.Subscription;
+
  import org.apache.camel.Exchange;
  import org.apache.camel.Processor;
  import org.apache.camel.impl.DefaultConsumer;
@@ -28,12 +34,6 @@ import org.apache.camel.util.ObjectHelper;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;

-import io.nats.client.Connection;
-import io.nats.client.ConnectionFactory;
-import io.nats.client.Message;
-import io.nats.client.MessageHandler;
-import io.nats.client.Subscription;
-
  public class NatsConsumer extends DefaultConsumer {

      private static final Logger LOG = LoggerFactory.getLogger(NatsConsumer.class);
@@ -114,11 +114,10 @@ public class NatsConsumer extends DefaultConsumer {
          @Override
          public void run() {
              try {
- if (ObjectHelper.isNotEmpty(configuration.getQueueName())) {
+ if (ObjectHelper.isNotEmpty(configuration.getQueueName())) {
                      sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), getEndpoint().getNatsConfiguration().getQueueName(), new MessageHandler() {
-
- @Override
- public void onMessage(Message msg) {
+ @Override
+ public void onMessage(Message msg) {
                              LOG.debug("Received Message: {}", msg);
                              Exchange exchange = getEndpoint().createExchange();
                              exchange.getIn().setBody(msg);
@@ -130,15 +129,14 @@ public class NatsConsumer extends DefaultConsumer {
                                  getExceptionHandler().handleException("Error during processing", exchange, e);
                              }
                          }
- });
+ });
                      if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) {
- sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages()));
+ sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages()));
                      }
- } else {
+ } else {
                      sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), new MessageHandler() {
-
- @Override
- public void onMessage(Message msg) {
+ @Override
+ public void onMessage(Message msg) {
                              LOG.debug("Received Message: {}", msg);
                              Exchange exchange = getEndpoint().createExchange();
                              exchange.getIn().setBody(msg);
@@ -150,14 +148,14 @@ public class NatsConsumer extends DefaultConsumer {
                                  getExceptionHandler().handleException("Error during processing", exchange, e);
                              }
                          }
- });
+ });
                      if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) {
- sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages()));
+ sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages()));
                      }
- }
- } catch (Throwable e) {
- getExceptionHandler().handleException("Error during processing", e);
                  }
+ } catch (Throwable e) {
+ getExceptionHandler().handleException("Error during processing", e);
+ }
          }
      }


http://git-wip-us.apache.org/repos/asf/camel/blob/7ca0082d/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
index 2e92f44..318b4e0 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
@@ -20,14 +20,14 @@ import java.io.IOException;
  import java.util.Properties;
  import java.util.concurrent.TimeoutException;

+import io.nats.client.Connection;
+import io.nats.client.ConnectionFactory;
+
  import org.apache.camel.Exchange;
  import org.apache.camel.impl.DefaultProducer;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;

-import io.nats.client.Connection;
-import io.nats.client.ConnectionFactory;
-
  public class NatsProducer extends DefaultProducer {

      private static final Logger LOG = LoggerFactory.getLogger(NatsProducer.class);

http://git-wip-us.apache.org/repos/asf/camel/blob/7ca0082d/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java
index 87d0c3e..d1a0350 100644
--- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java
@@ -17,9 +17,11 @@
  package org.apache.camel.component.nats;

  import java.io.IOException;
-import java.util.Properties;
  import java.util.concurrent.TimeoutException;

+import io.nats.client.Connection;
+import io.nats.client.ConnectionFactory;
+
  import org.apache.camel.EndpointInject;
  import org.apache.camel.builder.RouteBuilder;
  import org.apache.camel.component.mock.MockEndpoint;
@@ -27,9 +29,6 @@ import org.apache.camel.test.junit4.CamelTestSupport;
  import org.junit.Ignore;
  import org.junit.Test;

-import io.nats.client.Connection;
-import io.nats.client.ConnectionFactory;
-
  @Ignore("Require a running Nats server")
  public class NatsConsumerLoadTest extends CamelTestSupport {

@@ -54,7 +53,7 @@ public class NatsConsumerLoadTest extends CamelTestSupport {
          return new RouteBuilder() {
              @Override
              public void configure() throws Exception {
- from("direct:send").to("nats://localhost:4222?topic=test");
+ from("direct:send").to("nats://localhost:4222?topic=test");
                  from("nats://localhost:4222?topic=test").to(mockResultEndpoint);
              }
          };

http://git-wip-us.apache.org/repos/asf/camel/blob/7ca0082d/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java
index 5ee94d9..7f4d434 100644
--- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java
@@ -33,7 +33,8 @@ public class NatsConsumerMaxMessagesTest extends CamelTestSupport {

      @Test
      public void testMaxConsumer() throws InterruptedException, IOException {
- mockResultEndpoint.expectedBodiesReceived("{Subject=test;Reply=null;Payload=<test>}", "{Subject=test;Reply=null;Payload=<test1>}", "{Subject=test;Reply=null;Payload=<test2>}", "{Subject=test;Reply=null;Payload=<test3>}", "{Subject=test;Reply=null;Payload=<test4>}");
+ mockResultEndpoint.expectedBodiesReceived("{Subject=test;Reply=null;Payload=<test>}", "{Subject=test;Reply=null;Payload=<test1>}",
+ "{Subject=test;Reply=null;Payload=<test2>}", "{Subject=test;Reply=null;Payload=<test3>}", "{Subject=test;Reply=null;Payload=<test4>}");
          mockResultEndpoint.setExpectedMessageCount(5);
          template.sendBody("direct:send", "test");
          template.sendBody("direct:send", "test1");

Search Discussions

  • Acosentino at Apr 3, 2016 at 10:20 am
    Added camel-nats docs to gitbook


    Project: http://git-wip-us.apache.org/repos/asf/camel/repo
    Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/af737a18
    Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/af737a18
    Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/af737a18

    Branch: refs/heads/master
    Commit: af737a1822affe32cf2598b86a32b5d748f13efa
    Parents: 66f0fe8
    Author: Andrea Cosentino <ancosen@gmail.com>
    Authored: Sun Apr 3 12:11:21 2016 +0200
    Committer: Andrea Cosentino <ancosen@gmail.com>
    Committed: Sun Apr 3 12:19:54 2016 +0200

    ----------------------------------------------------------------------
      components/camel-nats/src/main/docs/nats.adoc | 98 ++++++++++++++++++++++
      docs/user-manual/en/SUMMARY.md | 1 +
      2 files changed, 99 insertions(+)
    ----------------------------------------------------------------------


    http://git-wip-us.apache.org/repos/asf/camel/blob/af737a18/components/camel-nats/src/main/docs/nats.adoc
    ----------------------------------------------------------------------
    diff --git a/components/camel-nats/src/main/docs/nats.adoc b/components/camel-nats/src/main/docs/nats.adoc
    new file mode 100644
    index 0000000..a8716d6
    --- /dev/null
    +++ b/components/camel-nats/src/main/docs/nats.adoc
    @@ -0,0 +1,98 @@
    +[[NATS-NATSComponent]]
    +NATS Component
    +~~~~~~~~~~~~~~
    +
    +*Available since Camel 2.17.0*
    +
    +http://nats.io/[NATS] is a fast and reliable messaging platform.
    +
    +Maven users will need to add the following dependency to
    +their `pom.xml` for this component.
    +
    +[source,xml]
    +------------------------------------------------------------
    +<dependency>
    + <groupId>org.apache.camel</groupId>
    + <artifactId>camel-nats</artifactId>
    + <!-- use the same version as your Camel core version -->
    + <version>x.y.z</version>
    +</dependency>
    +------------------------------------------------------------
    +
    +[[NATS-URIformat]]
    +URI format
    +^^^^^^^^^^
    +
    +[source,java]
    +----------------------
    +nats:servers[?options]
    +----------------------
    +
    +Where *servers* represents the list of NATS servers.
    +
    +[[NATS-Options]]
    +Options
    +^^^^^^^
    +
    +
    +// component options: START
    +The Nats component has no options.
    +// component options: END
    +
    +
    +
    +// endpoint options: START
    +The Nats component supports 17 endpoint options which are listed below:
    +
    +[width="100%",cols="2s,1,1m,1m,5",options="header"]
    +|=======================================================================
    +| Name | Group | Default | Java Type | Description
    +| servers | common | | String | *Required* URLs to one or more NAT servers. Use comma to separate URLs when specifying multiple servers.
    +| maxReconnectAttempts | common | 3 | int | Max reconnection attempts
    +| noRandomizeServers | common | false | boolean | Whether or not randomizing the order of servers for the connection attempts
    +| pedantic | common | false | boolean | Whether or not running in pedantic mode (this affects performace)
    +| pingInterval | common | 4000 | int | Ping interval to be aware if connection is still alive (in milliseconds)
    +| reconnect | common | true | boolean | Whether or not using reconnection feature
    +| reconnectTimeWait | common | 2000 | int | Waiting time before attempts reconnection (in milliseconds)
    +| ssl | common | false | boolean | Whether or not using SSL
    +| topic | common | | String | *Required* The name of topic we want to use
    +| verbose | common | false | boolean | Whether or not running in verbose mode
    +| bridgeErrorHandler | consumer | false | boolean | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN/ERROR level and ignored.
    +| maxMessages | consumer | | String | Stop receiving messages from a topic we are subscribing to after maxMessages
    +| poolSize | consumer | 10 | int | Consumer pool size
    +| queueName | consumer | | String | The Queue name if we are using nats for a queue configuration
    +| exceptionHandler | consumer (advanced) | | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN/ERROR level and ignored.
    +| exchangePattern | advanced | InOnly | ExchangePattern | Sets the default exchange pattern when creating an exchange
    +| synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).
    +|=======================================================================
    +// endpoint options: END
    +
    +
    +[[NATS-Headers]]
    +Headers
    +^^^^^^^
    +
    +[width="100%",cols="10%,10%,80%",options="header",]
    +|=======================================================================
    +|Name |Type |Description
    +
    +|CamelNatsMessageTimestamp |long |The timestamp of a consumed message.
    +
    +|CamelNatsSubscriptionId |Integer |The subscription ID of a consumer.
    +|=======================================================================

    +*Producer example:*
    +
    +[source,java]
    +-----------------------------------------------------------
    +from("direct:send").to("nats://localhost:4222?topic=test");
    +-----------------------------------------------------------
    +

    +
    +*Consumer example:*
    +
    +[source,java]
    +----------------------------------------------------------------------------------------
    +from("nats://localhost:4222?topic=test&maxMessages=5&queueName=test").to("mock:result");
    +----------------------------------------------------------------------------------------

    http://git-wip-us.apache.org/repos/asf/camel/blob/af737a18/docs/user-manual/en/SUMMARY.md
    ----------------------------------------------------------------------
    diff --git a/docs/user-manual/en/SUMMARY.md b/docs/user-manual/en/SUMMARY.md
    index 51bd3f0..146fb02 100644
    --- a/docs/user-manual/en/SUMMARY.md
    +++ b/docs/user-manual/en/SUMMARY.md
    @@ -153,6 +153,7 @@
          * [JSON](json.adoc)
          * [Metrics](metrics.adoc)
          * [Mock](mock.adoc)
    + * [NATS](nats.adoc)
          * [Properties](properties.adoc)
          * [Quickfix](quickfix.adoc)
          * [SJMS](sjms.adoc)
  • Acosentino at Apr 3, 2016 at 10:20 am
    CAMEL-9803: Camel-NATS: Switch to Jnats client as Java_nats is deprecated


    Project: http://git-wip-us.apache.org/repos/asf/camel/repo
    Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/66f0fe84
    Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/66f0fe84
    Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/66f0fe84

    Branch: refs/heads/master
    Commit: 66f0fe84c5a4dece014660ce4ebaf3e01fac94ec
    Parents: 431ee2b
    Author: Andrea Cosentino <ancosen@gmail.com>
    Authored: Sun Apr 3 12:02:38 2016 +0200
    Committer: Andrea Cosentino <ancosen@gmail.com>
    Committed: Sun Apr 3 12:19:54 2016 +0200

    ----------------------------------------------------------------------
      components/camel-nats/pom.xml | 6 +-
      .../camel/component/nats/NatsConfiguration.java | 12 +--
      .../camel/component/nats/NatsConsumer.java | 78 ++++++++++++++------
      .../camel/component/nats/NatsProducer.java | 12 ++-
      .../component/nats/NatsPropertiesConstants.java | 16 ++--
      .../component/nats/NatsConsumerLoadTest.java | 14 ++--
      .../nats/NatsConsumerMaxMessagesQueueTest.java | 2 +-
      .../nats/NatsConsumerMaxMessagesTest.java | 2 +-
      .../camel/component/nats/NatsConsumerTest.java | 2 +-
      .../camel/component/nats/NatsProducerTest.java | 1 +
      parent/pom.xml | 3 +-
      .../features/src/main/resources/features.xml | 2 +-
      12 files changed, 96 insertions(+), 54 deletions(-)
    ----------------------------------------------------------------------


    http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/pom.xml
    ----------------------------------------------------------------------
    diff --git a/components/camel-nats/pom.xml b/components/camel-nats/pom.xml
    index 6ca8079..c2fd1dc 100644
    --- a/components/camel-nats/pom.xml
    +++ b/components/camel-nats/pom.xml
    @@ -34,9 +34,9 @@
            <artifactId>camel-core</artifactId>
          </dependency>
          <dependency>
    - <groupId>com.github.tyagihas</groupId>
    - <artifactId>java_nats</artifactId>
    - <version>${java-nats-version}</version>
    + <groupId>io.nats</groupId>
    + <artifactId>jnats</artifactId>
    + <version>${jnats-version}</version>
          </dependency>
          <!-- testing -->
          <dependency>

    http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
    ----------------------------------------------------------------------
    diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
    index 260d1a7..1618eb9 100644
    --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
    +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
    @@ -218,12 +218,12 @@ public class NatsConfiguration {
              return props;
          }

    - public Properties createSubProperties() {
    - Properties props = new Properties();
    - addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_QUEUE, getQueueName());
    - addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_MAX_MESSAGES, getMaxMessages());
    - return props;
    - }
    +// public Properties createSubProperties() {
    +// Properties props = new Properties();
    +// addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_QUEUE, getQueueName());
    +// addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_MAX_MESSAGES, getMaxMessages());
    +// return props;
    +// }

          private String splitServers() {
              StringBuilder servers = new StringBuilder();

    http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
    ----------------------------------------------------------------------
    diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
    index 9c8a29d..8be0aea 100644
    --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
    +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
    @@ -19,15 +19,21 @@ package org.apache.camel.component.nats;
      import java.io.IOException;
      import java.util.Properties;
      import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.TimeoutException;

      import org.apache.camel.Exchange;
      import org.apache.camel.Processor;
      import org.apache.camel.impl.DefaultConsumer;
    -import org.nats.Connection;
    -import org.nats.MsgHandler;
    +import org.apache.camel.util.ObjectHelper;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;

    +import io.nats.client.Connection;
    +import io.nats.client.ConnectionFactory;
    +import io.nats.client.Message;
    +import io.nats.client.MessageHandler;
    +import io.nats.client.Subscription;
    +
      public class NatsConsumer extends DefaultConsumer {

          private static final Logger LOG = LoggerFactory.getLogger(NatsConsumer.class);
    @@ -35,7 +41,7 @@ public class NatsConsumer extends DefaultConsumer {
          private final Processor processor;
          private ExecutorService executor;
          private Connection connection;
    - private int sid;
    + private Subscription sid;

          public NatsConsumer(NatsEndpoint endpoint, Processor processor) {
              super(endpoint, processor);
    @@ -67,7 +73,7 @@ public class NatsConsumer extends DefaultConsumer {
              connection.flush();

              try {
    - connection.unsubscribe(sid);
    + sid.unsubscribe();
              } catch (Exception e) {
                  getExceptionHandler().handleException("Error during unsubscribing", e);
              }
    @@ -83,14 +89,15 @@ public class NatsConsumer extends DefaultConsumer {
              executor = null;

              LOG.debug("Closing Nats Connection");
    - if (connection.isConnected()) {
    + if (!connection.isClosed()) {
                  connection.close();
              }
          }

    - private Connection getConnection() throws IOException, InterruptedException {
    + private Connection getConnection() throws IOException, InterruptedException, TimeoutException {
              Properties prop = getEndpoint().getNatsConfiguration().createProperties();
    - connection = Connection.connect(prop);
    + ConnectionFactory factory = new ConnectionFactory(prop);
    + connection = factory.createConnection();
              return connection;
          }

    @@ -107,23 +114,50 @@ public class NatsConsumer extends DefaultConsumer {
              @Override
              public void run() {
                  try {
    - sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), configuration.createSubProperties(), new MsgHandler() {
    - public void execute(String msg) {
    - LOG.debug("Received Message: {}", msg);
    - Exchange exchange = getEndpoint().createExchange();
    - exchange.getIn().setBody(msg);
    - exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis());
    - exchange.getIn().setHeader(NatsConstants.NATS_SUBSCRIPTION_ID, sid);
    - try {
    - processor.process(exchange);
    - } catch (Exception e) {
    - getExceptionHandler().handleException("Error during processing", exchange, e);
    + if (ObjectHelper.isNotEmpty(configuration.getQueueName())) {
    + sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), getEndpoint().getNatsConfiguration().getQueueName(), new MessageHandler() {
    +
    + @Override
    + public void onMessage(Message msg) {
    + LOG.debug("Received Message: {}", msg);
    + Exchange exchange = getEndpoint().createExchange();
    + exchange.getIn().setBody(msg);
    + exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis());
    + exchange.getIn().setHeader(NatsConstants.NATS_SUBSCRIPTION_ID, sid);
    + try {
    + processor.process(exchange);
    + } catch (Exception e) {
    + getExceptionHandler().handleException("Error during processing", exchange, e);
    + }
                              }
    + });
    + if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) {
    + sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages()));
                          }
    - });
    - } catch (Throwable e) {
    - getExceptionHandler().handleException("Error during processing", e);
    - }
    + } else {
    + sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), new MessageHandler() {
    +
    + @Override
    + public void onMessage(Message msg) {
    + LOG.debug("Received Message: {}", msg);
    + Exchange exchange = getEndpoint().createExchange();
    + exchange.getIn().setBody(msg);
    + exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis());
    + exchange.getIn().setHeader(NatsConstants.NATS_SUBSCRIPTION_ID, sid);
    + try {
    + processor.process(exchange);
    + } catch (Exception e) {
    + getExceptionHandler().handleException("Error during processing", exchange, e);
    + }
    + }
    + });
    + if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) {
    + sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages()));
    + }
    + }
    + } catch (Throwable e) {
    + getExceptionHandler().handleException("Error during processing", e);
    + }
              }
          }


    http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
    ----------------------------------------------------------------------
    diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
    index 89b2b23..2e92f44 100644
    --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
    +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
    @@ -18,13 +18,16 @@ package org.apache.camel.component.nats;

      import java.io.IOException;
      import java.util.Properties;
    +import java.util.concurrent.TimeoutException;

      import org.apache.camel.Exchange;
      import org.apache.camel.impl.DefaultProducer;
    -import org.nats.Connection;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;

    +import io.nats.client.Connection;
    +import io.nats.client.ConnectionFactory;
    +
      public class NatsProducer extends DefaultProducer {

          private static final Logger LOG = LoggerFactory.getLogger(NatsProducer.class);
    @@ -64,14 +67,15 @@ public class NatsProducer extends DefaultProducer {
              LOG.debug("Stopping Nats Producer");

              LOG.debug("Closing Nats Connection");
    - if (connection != null && connection.isConnected()) {
    + if (connection != null && !connection.isClosed()) {
                  connection.close();
              }
          }

    - private Connection getConnection() throws IOException, InterruptedException {
    + private Connection getConnection() throws TimeoutException, IOException {
              Properties prop = getEndpoint().getNatsConfiguration().createProperties();
    - connection = Connection.connect(prop);
    + ConnectionFactory factory = new ConnectionFactory(prop);
    + connection = factory.createConnection();
              return connection;
          }


    http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java
    ----------------------------------------------------------------------
    diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java
    index 8c09ce8..2e09361 100644
    --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java
    +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java
    @@ -18,14 +18,14 @@ package org.apache.camel.component.nats;

      public interface NatsPropertiesConstants {
          String NATS_PROPERTY_URI = "uri";
    - String NATS_PROPERTY_VERBOSE = "verbose";
    - String NATS_PROPERTY_PEDANTIC = "pedantic";
    - String NATS_PROPERTY_RECONNECT = "reconnect";
    - String NATS_PROPERTY_SSL = "ssl";
    - String NATS_PROPERTY_MAX_RECONNECT_ATTEMPTS = "max_reconnect_attempts";
    - String NATS_PROPERTY_RECONNECT_TIME_WAIT = "reconnect_time_wait";
    - String NATS_PROPERTY_PING_INTERVAL = "ping_interval";
    - String NATS_PROPERTY_DONT_RANDOMIZE_SERVERS = "dont_randomize_servers";
    + String NATS_PROPERTY_VERBOSE = "io.nats.client.verbose";
    + String NATS_PROPERTY_PEDANTIC = "io.nats.client.pedantic";
    + String NATS_PROPERTY_RECONNECT = "io.nats.client.reconnect.allowed";
    + String NATS_PROPERTY_SSL = "io.nats.client.secure";
    + String NATS_PROPERTY_MAX_RECONNECT_ATTEMPTS = "io.nats.client.reconnect.max";
    + String NATS_PROPERTY_RECONNECT_TIME_WAIT = "io.nats.client.reconnect.wait";
    + String NATS_PROPERTY_PING_INTERVAL = "io.nats.client.pinginterval";
    + String NATS_PROPERTY_DONT_RANDOMIZE_SERVERS = "io.nats.client.norandomize";
          String NATS_PROPERTY_QUEUE = "queue";
          String NATS_PROPERTY_MAX_MESSAGES = "max";
      }

    http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java
    ----------------------------------------------------------------------
    diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java
    index 67d1e7c..87d0c3e 100644
    --- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java
    +++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java
    @@ -18,6 +18,7 @@ package org.apache.camel.component.nats;

      import java.io.IOException;
      import java.util.Properties;
    +import java.util.concurrent.TimeoutException;

      import org.apache.camel.EndpointInject;
      import org.apache.camel.builder.RouteBuilder;
    @@ -25,7 +26,9 @@ import org.apache.camel.component.mock.MockEndpoint;
      import org.apache.camel.test.junit4.CamelTestSupport;
      import org.junit.Ignore;
      import org.junit.Test;
    -import org.nats.Connection;
    +
    +import io.nats.client.Connection;
    +import io.nats.client.ConnectionFactory;

      @Ignore("Require a running Nats server")
      public class NatsConsumerLoadTest extends CamelTestSupport {
    @@ -34,11 +37,11 @@ public class NatsConsumerLoadTest extends CamelTestSupport {
          protected MockEndpoint mockResultEndpoint;

          @Test
    - public void testLoadConsumer() throws InterruptedException, IOException {
    + public void testLoadConsumer() throws InterruptedException, IOException, TimeoutException {
              mockResultEndpoint.setExpectedMessageCount(10000);
    -
    - Connection connection = Connection.connect(new Properties());
    -
    + ConnectionFactory cf = new ConnectionFactory("nats://localhost:4222");
    + Connection connection = cf.createConnection();
    +
              for (int i = 0; i < 10000; i++) {
                  connection.publish("test", ("test" + i).getBytes());
              }
    @@ -51,6 +54,7 @@ public class NatsConsumerLoadTest extends CamelTestSupport {
              return new RouteBuilder() {
                  @Override
                  public void configure() throws Exception {
    + from("direct:send").to("nats://localhost:4222?topic=test");
                      from("nats://localhost:4222?topic=test").to(mockResultEndpoint);
                  }
              };

    http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java
    ----------------------------------------------------------------------
    diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java
    index c637cef..b69a6b7 100644
    --- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java
    +++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java
    @@ -33,7 +33,7 @@ public class NatsConsumerMaxMessagesQueueTest extends CamelTestSupport {

          @Test
          public void testMaxConsumer() throws InterruptedException, IOException {
    - mockResultEndpoint.expectedBodiesReceivedInAnyOrder("test", "test1");
    + mockResultEndpoint.expectedBodiesReceivedInAnyOrder("{Subject=test;Reply=null;Payload=<test>}", "{Subject=test;Reply=null;Payload=<test1>}");
              mockResultEndpoint.setExpectedMessageCount(2);

              template.sendBody("direct:send", "test");

    http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java
    ----------------------------------------------------------------------
    diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java
    index 6e7482e..5ee94d9 100644
    --- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java
    +++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java
    @@ -33,7 +33,7 @@ public class NatsConsumerMaxMessagesTest extends CamelTestSupport {

          @Test
          public void testMaxConsumer() throws InterruptedException, IOException {
    - mockResultEndpoint.expectedBodiesReceived("test", "test1", "test2", "test3", "test4");
    + mockResultEndpoint.expectedBodiesReceived("{Subject=test;Reply=null;Payload=<test>}", "{Subject=test;Reply=null;Payload=<test1>}", "{Subject=test;Reply=null;Payload=<test2>}", "{Subject=test;Reply=null;Payload=<test3>}", "{Subject=test;Reply=null;Payload=<test4>}");
              mockResultEndpoint.setExpectedMessageCount(5);
              template.sendBody("direct:send", "test");
              template.sendBody("direct:send", "test1");

    http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
    ----------------------------------------------------------------------
    diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
    index c689ade..ca63048 100644
    --- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
    +++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
    @@ -34,7 +34,7 @@ public class NatsConsumerTest extends CamelTestSupport {
          @Test
          public void testConsumer() throws InterruptedException, IOException {
              mockResultEndpoint.expectedMessageCount(1);
    - mockResultEndpoint.expectedBodiesReceived("test");
    + mockResultEndpoint.expectedBodiesReceived("{Subject=test;Reply=null;Payload=<test>}");
              template.requestBody("direct:send", "test");

              mockResultEndpoint.assertIsSatisfied();

    http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java
    ----------------------------------------------------------------------
    diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java
    index 8f10b4c..4a22551 100644
    --- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java
    +++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java
    @@ -26,6 +26,7 @@ public class NatsProducerTest extends CamelTestSupport {

          @Test
          public void sendTest() throws Exception {
    +
              template.sendBody("direct:send", "pippo");
          }


    http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/parent/pom.xml
    ----------------------------------------------------------------------
    diff --git a/parent/pom.xml b/parent/pom.xml
    index b6dde19..386a760 100644
    --- a/parent/pom.xml
    +++ b/parent/pom.xml
    @@ -267,8 +267,7 @@
          <java-apns-bundle-version>1.0.0.Beta6_1</java-apns-bundle-version>
          <java-apns-version>1.0.0.Beta6</java-apns-version>
          <java-ewah-version>0.7.9</java-ewah-version>
    - <java-nats-version>0.5.2</java-nats-version>
    - <java-nats-bundle-version>0.5.2_1</java-nats-bundle-version>
    + <jnats-version>0.4.0</jnats-version>
          <javacc-maven-plugin-version>2.6</javacc-maven-plugin-version>
          <javacrumbs-version>0.22</javacrumbs-version>
          <javassist-bundle-version>3.12.1.GA_3</javassist-bundle-version>

    http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/platforms/karaf/features/src/main/resources/features.xml
    ----------------------------------------------------------------------
    diff --git a/platforms/karaf/features/src/main/resources/features.xml b/platforms/karaf/features/src/main/resources/features.xml
    index dd084e9..e5c0ea9 100644
    --- a/platforms/karaf/features/src/main/resources/features.xml
    +++ b/platforms/karaf/features/src/main/resources/features.xml
    @@ -1152,7 +1152,7 @@
        </feature>
        <feature name='camel-nats' version='${project.version}' resolver='(obr)' start-level='50'>
          <feature version='${project.version}'>camel-core</feature>
    - <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.java_nats/${java-nats-bundle-version}</bundle>
    + <bundle dependency='true'>wrap:mvn:io.nats/jnats/${jnats-version}</bundle>
          <bundle>mvn:org.apache.camel/camel-nats/${project.version}</bundle>
        </feature>
        <feature name='camel-netty' version='${project.version}' resolver='(obr)' start-level='50'>
  • Acosentino at Apr 3, 2016 at 12:00 pm
    Repository: camel
    Updated Branches:
       refs/heads/master a1e92a054 -> 58f6c460d


    CAMEL-9803: Camel-NATS: Switch to Jnats client as Java_nats is deprecated


    Project: http://git-wip-us.apache.org/repos/asf/camel/repo
    Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/58f6c460
    Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/58f6c460
    Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/58f6c460

    Branch: refs/heads/master
    Commit: 58f6c460d544b72e1d13a8f7f009851633215318
    Parents: a1e92a0
    Author: Andrea Cosentino <ancosen@gmail.com>
    Authored: Sun Apr 3 13:59:36 2016 +0200
    Committer: Andrea Cosentino <ancosen@gmail.com>
    Committed: Sun Apr 3 13:59:36 2016 +0200

    ----------------------------------------------------------------------
      .../org/apache/camel/component/nats/NatsConfiguration.java | 9 +--------
      .../camel/component/nats/NatsPropertiesConstants.java | 2 +-
      2 files changed, 2 insertions(+), 9 deletions(-)
    ----------------------------------------------------------------------


    http://git-wip-us.apache.org/repos/asf/camel/blob/58f6c460/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
    ----------------------------------------------------------------------
    diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
    index 1618eb9..53a7638 100644
    --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
    +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
    @@ -206,7 +206,7 @@ public class NatsConfiguration {

          public Properties createProperties() {
              Properties props = new Properties();
    - addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_URI, splitServers());
    + addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_URL, splitServers());
              addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_VERBOSE, getVerbose());
              addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_PEDANTIC, getPedantic());
              addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_SSL, getSsl());
    @@ -218,13 +218,6 @@ public class NatsConfiguration {
              return props;
          }

    -// public Properties createSubProperties() {
    -// Properties props = new Properties();
    -// addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_QUEUE, getQueueName());
    -// addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_MAX_MESSAGES, getMaxMessages());
    -// return props;
    -// }
    -
          private String splitServers() {
              StringBuilder servers = new StringBuilder();
              String prefix = "nats://";

    http://git-wip-us.apache.org/repos/asf/camel/blob/58f6c460/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java
    ----------------------------------------------------------------------
    diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java
    index 2e09361..61e5431 100644
    --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java
    +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java
    @@ -17,7 +17,7 @@
      package org.apache.camel.component.nats;

      public interface NatsPropertiesConstants {
    - String NATS_PROPERTY_URI = "uri";
    + String NATS_PROPERTY_URL = "io.nats.client.url";
          String NATS_PROPERTY_VERBOSE = "io.nats.client.verbose";
          String NATS_PROPERTY_PEDANTIC = "io.nats.client.pedantic";
          String NATS_PROPERTY_RECONNECT = "io.nats.client.reconnect.allowed";

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupcommits @
categoriescamel
postedApr 3, '16 at 10:20a
activeApr 3, '16 at 12:00p
posts4
users1
websitecamel.apache.org

1 user in discussion

Acosentino: 4 posts

People

Translate

site design / logo © 2018 Grokbase