FAQ
Hello everyone

I have tried to test the parallel processing with Camel. The split was easy
and worked out of the box. I have some surprises and I'm wondering whenever
I'm understanding correctly the expected behaviour with the multicast. In
the Code below, I try to send messages to two routes. The "even" route takes
more time than the "odd" one. I would like to get the odd number I'm sending
before the even number. I was expecting the odd number to be processed
faster than the other, am I correct?

I'm currently getting the exchange in the same order I sent, is it the
expected behavior?

Thanks and have a nice day

Christophe

I'm using camel 2.12:



       val croute = new org.apache.camel.builder.RouteBuilder {
         override def configure(): Unit = {
           from("direct:input").multicast().parallelProcessing().
             to("direct:even", "direct:odd").end()

           from("direct:odd").filter(body.isEqualTo(1)).process(new Processor
{
             override def process(exchange: Exchange): Unit = {
               println(exchange.getIn.getBody)
             }
           }).to("mock:output")

           from("direct:even").filter(body.isEqualTo(0)).
             process(new Processor {
             override def process(exchange: Exchange): Unit = {
               println(exchange.getIn.getBody)
               Thread.sleep(1000)
               //or to avoid side effects on the Thread: (1 to
1000000).foreach(x => scala.util.Random.nextDouble())
               }
             }
           ).
             to("mock:output")
         }
       }

//.....
  val producer = camelContext.createProducerTemplate()
List(0, 1, 0, 1, 0, 1, 0, 1, 0, 1).foreach(x =>
producer.sendBody("direct:input", x))




--
View this message in context: http://camel.465427.n5.nabble.com/parallelProcessing-with-multicast-tp5765146.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Search Discussions

  • Claus Ibsen at Apr 1, 2015 at 12:12 pm
    You need to enable streaming mode to have out of order aggregation -
    even for multicast. The work is done in parallel, but in non streaming
    mode the aggregation happens in fixed order.

    On Wed, Apr 1, 2015 at 1:52 PM, Christophe Pache wrote:
    Hello everyone

    I have tried to test the parallel processing with Camel. The split was easy
    and worked out of the box. I have some surprises and I'm wondering whenever
    I'm understanding correctly the expected behaviour with the multicast. In
    the Code below, I try to send messages to two routes. The "even" route takes
    more time than the "odd" one. I would like to get the odd number I'm sending
    before the even number. I was expecting the odd number to be processed
    faster than the other, am I correct?

    I'm currently getting the exchange in the same order I sent, is it the
    expected behavior?

    Thanks and have a nice day

    Christophe

    I'm using camel 2.12:



    val croute = new org.apache.camel.builder.RouteBuilder {
    override def configure(): Unit = {
    from("direct:input").multicast().parallelProcessing().
    to("direct:even", "direct:odd").end()

    from("direct:odd").filter(body.isEqualTo(1)).process(new Processor
    {
    override def process(exchange: Exchange): Unit = {
    println(exchange.getIn.getBody)
    }
    }).to("mock:output")

    from("direct:even").filter(body.isEqualTo(0)).
    process(new Processor {
    override def process(exchange: Exchange): Unit = {
    println(exchange.getIn.getBody)
    Thread.sleep(1000)
    //or to avoid side effects on the Thread: (1 to
    1000000).foreach(x => scala.util.Random.nextDouble())
    }
    }
    ).
    to("mock:output")
    }
    }

    //.....
    val producer = camelContext.createProducerTemplate()
    List(0, 1, 0, 1, 0, 1, 0, 1, 0, 1).foreach(x =>
    producer.sendBody("direct:input", x))




    --
    View this message in context: http://camel.465427.n5.nabble.com/parallelProcessing-with-multicast-tp5765146.html
    Sent from the Camel - Users mailing list archive at Nabble.com.


    --
    Claus Ibsen
    -----------------
    Red Hat, Inc.
    Email: cibsen@redhat.com
    Twitter: davsclaus
    Blog: http://davsclaus.com
    Author of Camel in Action: http://www.manning.com/ibsen
    hawtio: http://hawt.io/
    fabric8: http://fabric8.io/
  • Christophe Pache at Apr 1, 2015 at 12:38 pm
    Thanks Claus for your answer! I did test but did not succeed into making it
    work as I wanted to (change the order of received message). Following your
    advice, I've just added the streaming configuration to the multicast eip
    such as following, is that correct?

           val croute = new org.apache.camel.builder.RouteBuilder {
             override def configure(): Unit = {
               from("direct:input").multicast().parallelProcessing().streaming().
                 to("direct:even", "direct:odd").end()

               from("direct:odd").filter(body.isEqualTo(1)).to("mock:output")

               from("direct:even").filter(body.isEqualTo(0)).
                 process(new Processor {
                 override def process(exchange: Exchange): Unit = {
                   Thread.sleep(1000)
                   }
                 }
               ).
                 to("mock:output")
             }
           }



    --
    View this message in context: http://camel.465427.n5.nabble.com/parallelProcessing-with-multicast-tp5765146p5765149.html
    Sent from the Camel - Users mailing list archive at Nabble.com.
  • Claus Ibsen at Apr 1, 2015 at 12:59 pm
    You need to be aware that multicast is sending a copy of the same
    incoming message. So the filter is either true or false for all the
    messages as they are from the same copy.

    So in one case they are all odd, and in another case they are all
    even. And then they process about the "same time" and the order may
    appear as the same.

    If you want one of them to sleep for 1 sec and not the other you need
    to change your code.
    On Wed, Apr 1, 2015 at 2:38 PM, Christophe Pache wrote:
    Thanks Claus for your answer! I did test but did not succeed into making it
    work as I wanted to (change the order of received message). Following your
    advice, I've just added the streaming configuration to the multicast eip
    such as following, is that correct?

    val croute = new org.apache.camel.builder.RouteBuilder {
    override def configure(): Unit = {
    from("direct:input").multicast().parallelProcessing().streaming().
    to("direct:even", "direct:odd").end()

    from("direct:odd").filter(body.isEqualTo(1)).to("mock:output")

    from("direct:even").filter(body.isEqualTo(0)).
    process(new Processor {
    override def process(exchange: Exchange): Unit = {
    Thread.sleep(1000)
    }
    }
    ).
    to("mock:output")
    }
    }



    --
    View this message in context: http://camel.465427.n5.nabble.com/parallelProcessing-with-multicast-tp5765146p5765149.html
    Sent from the Camel - Users mailing list archive at Nabble.com.


    --
    Claus Ibsen
    -----------------
    Red Hat, Inc.
    Email: cibsen@redhat.com
    Twitter: davsclaus
    Blog: http://davsclaus.com
    Author of Camel in Action: http://www.manning.com/ibsen
    hawtio: http://hawt.io/
    fabric8: http://fabric8.io/
  • Christophe Pache at Apr 1, 2015 at 1:15 pm
    Thanks, You are true.

    Sorry, I should have pasted the code where I send exchanges again.

    List(0, 1, 0, 1, 0, 1, 0, 1, 0, 1).foreach(x =>sendBody("direct:input", x))

    I'm sending 10 exchanges. I hope I'll receive 10 in the end and I'm not
    expecting aggregation in fact. I've controlled each sub route sends 5
    exchanges to the final mock endpoint.
    I receive 10 exchanges but this is just the order which interests me on this
    case.
    From what I understand, this is the aggregation of results that I should
    make parallel to get the odd results faster than the even one. I'm fine with
    that, I would just find a way to be sure the processing (in the sub routes)
    is done in parallel.

    I've had a look to MultiCastParallelAndStreamCachingTest.java
    <https://github.com/apache/camel/blob/master/camel-core/src/test/java/org/apache/camel/processor/MultiCastParallelAndStreamCachingTest.java
    but did not find tests that validate it yet.
    My latest try was to print iteratively in both sub route to prove the print
    may occur at the same time, but did not succeed:

           val croute = new org.apache.camel.builder.RouteBuilder {
             override def configure(): Unit = {
               from("direct:input").multicast().parallelProcessing().streaming().
                 to("direct:even", "direct:odd").end()

               from("direct:odd").filter(body.isEqualTo(1)).
                 process(new Processor {
                 override def process(exchange: Exchange): Unit = {
                   (1 to 10).foreach(x => {
                     Thread.sleep(100)
                     println("odd")
                   })
                 }
               }
                 ).to("mock:output")

               from("direct:even").filter(body.isEqualTo(0)).
                 process(new Processor {
                 override def process(exchange: Exchange): Unit = {
                   (1 to 10).foreach(x => {
                     Thread.sleep(100)
                     println("even")
                   })
                   }
                 }
               ).
                 to("mock:output")
             }
           }





    --
    View this message in context: http://camel.465427.n5.nabble.com/parallelProcessing-with-multicast-tp5765146p5765151.html
    Sent from the Camel - Users mailing list archive at Nabble.com.
  • Alexey-s at Apr 1, 2015 at 1:48 pm
    An incident that is necessary to call a method .end()

    from("...")
         .filter()
             .to("myprocess")
             .to("mock.out")
         .end();


    Your account is perceived as

    from("...")
         .filter()
             .to("myprocess")
         .end()
         .to("mock.out");




    --
    View this message in context: http://camel.465427.n5.nabble.com/parallelProcessing-with-multicast-tp5765146p5765154.html
    Sent from the Camel - Users mailing list archive at Nabble.com.
  • Christophe Pache at Apr 1, 2015 at 2:01 pm
    Thanks, Alexey


    I added the end and then it worked.
    Well, Claus was true also: Now, I've understood the parallel processing
    is only on one exchange in one EIP at a time.

    My sample works, thanks guys!!!

    have a nice day


    Le 01. 04. 15 15:47, alexey-s [via Camel] a écrit :
    An incident that is necessary to call a method .end()

    from("...")
    .filter()
    .to("myprocess")
    .to("mock.out")
    .end();


    Your account is perceived as

    from("...")
    .filter()
    .to("myprocess")
    .end()
    .to("mock.out");


    ------------------------------------------------------------------------
    If you reply to this email, your message will be added to the discussion
    below:
    http://camel.465427.n5.nabble.com/parallelProcessing-with-multicast-tp5765146p5765154.html

    To unsubscribe from parallelProcessing with multicast, click here
    <http://camel.465427.n5.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=5765146&code=Y2hwYWNoZUBnbWFpbC5jb218NTc2NTE0NnwtMTU5MDMwMjIzOQ==>.
    NAML
    <http://camel.465427.n5.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>



    --
    View this message in context: http://camel.465427.n5.nabble.com/parallelProcessing-with-multicast-tp5765146p5765157.html
    Sent from the Camel - Users mailing list archive at Nabble.com.

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupusers @
categoriescamel
postedApr 1, '15 at 11:54a
activeApr 1, '15 at 2:01p
posts7
users3
websitecamel.apache.org

People

Translate

site design / logo © 2021 Grokbase