FAQ
Hi

I have a synchronization issue, which I wasn't able to sort out with core
camel components. I was wondering someone has a better solution.

I had a synchronous route:
<when>
<condition>daily close message</condition>
<to uri="direct:doAggregation/>
</when>
<otherwise>
<to uri="direct:storeMessages">
</otherwise>

Storing messages, happens all day. The order of the store messages doesn't
count, but the close message does. We shouldn't begin to process it, untill
all messages are stored. We shouldn't store new messages, untill aggregation
happen.

The service began to receive large inflow and we decided to make storing
parallel.

The route reads a durrable. So we wasn't able to consume asynchronusly. What
I came up with.

<when>
<condition>daily close message</condition>
<to uri="direct:doAggregation/>
</when>
<otherwise>
<to uri="seda:storeMessages">
</otherwise>

But now I have a race condition. If the daily report arrives, there could be
messages still under processing. Earlier it didn't existed a the same thread
was responsible for both scenario. So the report should wait untill all
message got consumed.

I've implemented a bean for synchronizing. It has a method, where workers
registers them self and another where they unregister. A third one, where
the aggregator, checks is there registered workers and if there is, it waits
untill all worker becomes unregistered. (Maybe a processor would be more
better, but I'm not really confident with the solution).

So now the solution looks like this:

<when>
<condition>daily close message</condition>
<bean id="workerSynch" methodName="waitForOutStandingWork"/>
<to uri="direct:doAggregation/>
</when>
<otherwise>
<doTry>
<bean id="workerSynch" methodName="registerWork"/>
</doTry>
<to uri="seda:myWork"/>
</otherwise>

<from uri="seda:myWork"/>
<doTry>
<to uri="direct:storeMessages/>
</doTry>
<doFinally>
<bean id="workerSynch" methodName="unRegisterWork"/>
</doFinally>

But I'm not satisfied with this. I'm worried about that the synchronization
spread through route definitions and later someone breaks it.

I'm not 100% sure that this is bullett proof. Like what happens that <to
uri="seda:myWork"/> throws excpetion. Than work stays unregistered.

I think the whole solution for this is to actually implement a new
requirement. Like previously we say process each messages one by one, but
now we have a new requirement, that process messages unless it is a close
down messages. Than the close down message should receive some sort of
business logic, where it can verify it self and act accordingly.

But I'd like to ask, that is there a way to sort this on function level,
like don't process untill other route work queue is empty instead
implementing a new functionality?

--
View this message in context: http://camel.465427.n5.nabble.com/Synchronizing-event-with-asynchronous-route-tp5501946p5501946.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Search Discussions

  • Claus Ibsen at Feb 22, 2012 at 10:43 am
    Camel has a in-flight registry which you can query number of
    in-progress messages.
    Maybe that can help you, to avoid using sync locks.


    On Tue, Feb 21, 2012 at 12:41 PM, edvicif wrote:
    Hi

    I have a synchronization issue, which I wasn't able to sort out with core
    camel components. I was wondering someone has a better solution.

    I had a synchronous route:
    <when>
    <condition>daily close message</condition>
    <to uri="direct:doAggregation/>
    </when>
    <otherwise>
    <to uri="direct:storeMessages">
    </otherwise>

    Storing messages, happens all day. The order of the store messages doesn't
    count, but the close message does. We shouldn't begin to process it, untill
    all messages are stored. We shouldn't store new messages, untill aggregation
    happen.

    The service began to receive large inflow and we decided to make storing
    parallel.

    The route reads a durrable. So we wasn't able to consume asynchronusly. What
    I came up with.

    <when>
    <condition>daily close message</condition>
    <to uri="direct:doAggregation/>
    </when>
    <otherwise>
    <to uri="seda:storeMessages">
    </otherwise>

    But now I have a race condition. If the daily report arrives, there could be
    messages still under processing. Earlier it didn't existed a the same thread
    was responsible for both scenario. So the report should wait untill all
    message got consumed.

    I've implemented a bean for synchronizing. It has a method, where workers
    registers them self and another where they unregister. A third one, where
    the aggregator, checks is there registered workers and if there is, it waits
    untill all worker becomes unregistered. (Maybe a processor would be more
    better, but I'm not really confident with the solution).

    So now the solution looks like this:

    <when>
    <condition>daily close message</condition>
    <bean id="workerSynch" methodName="waitForOutStandingWork"/>
    <to uri="direct:doAggregation/>
    </when>
    <otherwise>
    <doTry>
    <bean id="workerSynch" methodName="registerWork"/>
    </doTry>
    <to uri="seda:myWork"/>
    </otherwise>

    <from uri="seda:myWork"/>
    <doTry>
    <to uri="direct:storeMessages/>
    </doTry>
    <doFinally>
    <bean id="workerSynch" methodName="unRegisterWork"/>
    </doFinally>

    But I'm not satisfied with this. I'm worried about that the synchronization
    spread through route definitions and later someone breaks it.

    I'm not 100% sure that this is bullett proof. Like what happens that <to
    uri="seda:myWork"/> throws excpetion. Than work stays unregistered.

    I think the whole solution for this is to actually implement a new
    requirement. Like previously we say process each messages one by one, but
    now we have a new requirement, that process messages unless it is a close
    down messages. Than the close down message should receive some sort of
    business logic, where it can verify it self and act accordingly.

    But I'd like to ask, that is there a way to sort this on function level,
    like don't process untill other route work queue is empty instead
    implementing a new functionality?

    --
    View this message in context: http://camel.465427.n5.nabble.com/Synchronizing-event-with-asynchronous-route-tp5501946p5501946.html
    Sent from the Camel - Users mailing list archive at Nabble.com.


    --
    Claus Ibsen
    -----------------
    FuseSource
    Email: cibsen@fusesource.com
    Web: http://fusesource.com
    Twitter: davsclaus, fusenews
    Blog: http://davsclaus.blogspot.com/
    Author of Camel in Action: http://www.manning.com/ibsen/
  • Edvicif at Feb 22, 2012 at 12:35 pm
    You mean
    http://camel.apache.org/maven/camel-2.7.0/camel-core/apidocs/org/apache/camel/impl/DefaultInflightRepository.html
    DefaultInflightRepository , what used at durring shutdown?
    I'll take a look.

    --
    View this message in context: http://camel.465427.n5.nabble.com/Synchronizing-event-with-asynchronous-route-tp5501946p5504805.html
    Sent from the Camel - Users mailing list archive at Nabble.com.

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupusers @
categoriescamel
postedFeb 21, '12 at 12:11p
activeFeb 22, '12 at 12:35p
posts3
users2
websitecamel.apache.org

2 users in discussion

Edvicif: 2 posts Claus Ibsen: 1 post

People

Translate

site design / logo © 2022 Grokbase