On Feb 26, 2010, at 6:42 AM, Matthias Radestock wrote:

John Mann wrote:
From the client perspective, receiving messages that I've already
acknowledged is confusing.
Here is a possible trace, as observed at the server, with delivery tags and acks included:

-> publish(A)
<- deliver(1, A)
-> publish(B)
<- deliver(2, B)
-> recover
<- deliver(3, A)
<- deliver(4, B)
-> ack(2)
-> ack(3)

As you can see, the ack(2) isn't received by the server until after it has re-delivered both messages.
Ah! I get it.
Is there a way to recover unacknowledged messages without also
receiving previously acknowledged messages?
'recover' should be a synchronous operation, and such an operation is introduced in the 0-9-1 version of the AMQP spec, which rabbit will implement soon.

The recover-ok reply of the synchronous operation acts as a marker in the delivery stream - all unacked messages received by the client before it are recovered.

There is still a problem though: there is no easy way to tell in the client API which messages were received before or after the recover-ok. I will raise a bug for that.
Yup, that's my problem. I knew that the basicRecoverAsync method was asynchronous (the name helps! :) ), but I didn't know what to wait on.
Can you elaborate on what you are trying to accomplish?
I have a component (RabbitMQConsumer) that is responsible for listening for messages from a RabbitMQ queue. Upon receiving a message, the RabbitMQConsumer invokes a method on a configurable listener class that implements the RawConsumer interface.

Here is the meat of the RabbitMQConsumer class:

...
while (isRunning()) {
try {
deliverToRawConsumer(consumer.nextDelivery());
} catch (InterruptedException e) {
break;
}
}
...

private void deliverToRawConsumer(QueueingConsumer.Delivery delivery) {
String rawMessage = new String(delivery.getBody());

try {
rawConsumer.consume(rawMessage);

acknowledgeMessageDelivery(delivery);
}
catch (Exception e) {
LOG.error("Problem consuming message.", e);
LOG.info("Not acknowledging message delivery.");
}
}

By design, the RabbitMQConsumer does not acknowledge a message if the RawConsumer throws an exception.

The problem is that the behavior of the RawConsumer class can change during runtime. This means that it can throw an exception while consuming some messages, then later in time it can "recover". When it recovers, I need to be able to retry all of the previously unacknowledged messages.
Would closing and re-opening the channel be an option?
I think I can do that. Are there any pitfalls in this approach. I don't want to lose any messages.
PS: could we continue this discussion on rabbitmq-discuss?
Good idea. Done.

Thanks Matthias.

-John

Search Discussions

  • Matthias Radestock at Feb 26, 2010 at 4:08 pm
    John,

    John Mann wrote:
    By design, the RabbitMQConsumer does not acknowledge a message if the
    RawConsumer throws an exception.

    The problem is that the behavior of the RawConsumer class can change
    during runtime. This means that it can throw an exception while
    consuming some messages, then later in time it can "recover". When
    it recovers, I need to be able to retry all of the previously
    unacknowledged messages.
    Would closing and re-opening the channel be an option?
    I think I can do that. Are there any pitfalls in this approach. I
    don't want to lose any messages.
    Closing the channel will result in all unacknowledged messages getting
    requeued, so no messages will be lost.

    Another option is to cancel the consumer before initiating the recover.
    The sequence of client actions would be something like this:
    1) send basic.cancel
    2) wait for basic.cancel-ok
    3) send basic.recover
    4) send basic.consume

    The hook for step 2 is provided by overriding the handleCancelOk method
    on the consumer class.

    Note that the basic.recover in step 3 can by async or sync - it doesn't
    matter.

    While waiting for the basic.cancel-ok in step 2 you have a choice of
    either continuing to process and ack messages, or throw them away and
    let the server resend them to the client as part of the recovery.


    Regards,

    Matthias.
  • Tony Garnock-Jones at Feb 27, 2010 at 12:50 am

    Matthias Radestock wrote:
    1) send basic.cancel
    2) wait for basic.cancel-ok
    The hook for step 2 is provided by overriding the handleCancelOk method
    on the consumer class.
    Note also that basicCancel() doesn't return to the caller until the
    basic.cancel-ok has been received and completely processed by the client
    library and the Consumer object.

    Regards,
    Tony
  • John Mann at Mar 3, 2010 at 11:42 am
    Thanks for all of the help. I got it working!

    I had no luck with calling basicCancel() and then waiting for the handleCancelOK() method to be called. I realized after stepping through the source with a debugger that a Channel disassociates the Consumer when basicCancel() is called:

    ChannelN.java: line 645:

    Consumer callback = _consumers.remove(consumerTag);

    The only way that I knew I could reestablish this association was by calling Channel.basicConsume(). Unfortunately this didn't work. Messages after that point were never delivered.

    I finally used the close and re-open the Channel approach. This worked as expected.

    If anyone would like to see my solution code, let me know.

    Thanks.

    -John
    On Feb 26, 2010, at 7:50 PM, Tony Garnock-Jones wrote:

    Matthias Radestock wrote:
    1) send basic.cancel
    2) wait for basic.cancel-ok
    The hook for step 2 is provided by overriding the handleCancelOk method
    on the consumer class.
    Note also that basicCancel() doesn't return to the caller until the
    basic.cancel-ok has been received and completely processed by the client
    library and the Consumer object.

    Regards,
    Tony
    -------------- next part --------------
    An HTML attachment was scrubbed...
    URL: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20100303/0bc15a77/attachment.htm
  • Matthias Radestock at Mar 3, 2010 at 1:00 pm
    John,

    John Mann wrote:
    I had no luck with calling basicCancel() and then waiting for the
    handleCancelOK() method to be called. I realized after stepping through
    the source with a debugger that a Channel disassociates the Consumer
    when basicCancel() is called:

    ChannelN.java: line 645:

    Consumer callback = _consumers.remove(consumerTag);
    That codes is part of an rpc continuation. It gets called when the
    cancel-ok arrives, and the call to handleCancelOk happens right after
    the above line.
    The only way that I knew I could reestablish this association was by
    calling Channel.basicConsume(). Unfortunately this didn't work.
    Messages after that point were never delivered.
    Here is the sequence of client actions I proposed:

    1) send basic.cancel
    2) wait for basic.cancel-ok
    3) send basic.recover
    4) send basic.consume

    So you do indeed need a basic.consume to re-establish the consumer.

    One needs to be very careful with the threading here though. For
    example, the basic.consume must not be issued from the handleCancelOk
    callback. Doing so would most likely cause a deadlock. Perhaps that's
    what you were seeing.

    Anyway, I'm glad the channel close/re-open works as expected.


    Regards,

    Matthias.
  • John Mann at Mar 3, 2010 at 2:16 pm

    On Wed, Mar 3, 2010 at 8:00 AM, Matthias Radestock wrote:
    John,

    John Mann wrote:
    I had no luck with calling basicCancel() and then waiting for the
    handleCancelOK() method to be called. ?I realized after stepping through the
    source with a debugger that a Channel disassociates the Consumer when
    basicCancel() is called:

    ChannelN.java: line 645:

    ? ? ? ? ? ? ? ?Consumer callback = _consumers.remove(consumerTag);
    That codes is part of an rpc continuation. It gets called when the cancel-ok
    arrives, and the call to handleCancelOk happens right after the above line.
    The only way that I knew I could reestablish this association was by
    calling Channel.basicConsume(). ?Unfortunately this didn't work. ?Messages
    after that point were never delivered.
    Here is the sequence of client actions I proposed:

    1) send basic.cancel
    2) wait for basic.cancel-ok
    3) send basic.recover
    4) send basic.consume

    So you do indeed need a basic.consume to re-establish the consumer.

    One needs to be very careful with the threading here though. For example,
    the basic.consume must not be issued from the handleCancelOk callback. Doing
    so would most likely cause a deadlock. Perhaps that's what you were seeing.

    Anyway, I'm glad the channel close/re-open works as expected.
    That's exactly what I was seeing.
    Anyway, I'm glad the channel close/re-open works as expected.
    Me too. Thanks again for your help.



    --
    -JC

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
grouprabbitmq-discuss @
categoriesrabbitmq
postedFeb 26, '10 at 3:10p
activeMar 3, '10 at 2:16p
posts6
users3
websiterabbitmq.com
irc#rabbitmq

People

Translate

site design / logo © 2022 Grokbase