On Feb 26, 2010, at 6:42 AM, Matthias Radestock wrote:
John Mann wrote:
-> publish(A)
<- deliver(1, A)
-> publish(B)
<- deliver(2, B)
-> recover
<- deliver(3, A)
<- deliver(4, B)
-> ack(2)
-> ack(3)
As you can see, the ack(2) isn't received by the server until after it has re-delivered both messages.
John Mann wrote:
From the client perspective, receiving messages that I've already
acknowledged is confusing.
Here is a possible trace, as observed at the server, with delivery tags and acks included:acknowledged is confusing.
-> publish(A)
<- deliver(1, A)
-> publish(B)
<- deliver(2, B)
-> recover
<- deliver(3, A)
<- deliver(4, B)
-> ack(2)
-> ack(3)
As you can see, the ack(2) isn't received by the server until after it has re-delivered both messages.
Is there a way to recover unacknowledged messages without also
receiving previously acknowledged messages?
'recover' should be a synchronous operation, and such an operation is introduced in the 0-9-1 version of the AMQP spec, which rabbit will implement soon.receiving previously acknowledged messages?
The recover-ok reply of the synchronous operation acts as a marker in the delivery stream - all unacked messages received by the client before it are recovered.
There is still a problem though: there is no easy way to tell in the client API which messages were received before or after the recover-ok. I will raise a bug for that.
Yup, that's my problem. I knew that the basicRecoverAsync method was asynchronous (the name helps! :) ), but I didn't know what to wait on.
Can you elaborate on what you are trying to accomplish?
Here is the meat of the RabbitMQConsumer class:
...
while (isRunning()) {
try {
deliverToRawConsumer(consumer.nextDelivery());
} catch (InterruptedException e) {
break;
}
}
...
private void deliverToRawConsumer(QueueingConsumer.Delivery delivery) {
String rawMessage = new String(delivery.getBody());
try {
rawConsumer.consume(rawMessage);
acknowledgeMessageDelivery(delivery);
}
catch (Exception e) {
LOG.error("Problem consuming message.", e);
LOG.info("Not acknowledging message delivery.");
}
}
By design, the RabbitMQConsumer does not acknowledge a message if the RawConsumer throws an exception.
The problem is that the behavior of the RawConsumer class can change during runtime. This means that it can throw an exception while consuming some messages, then later in time it can "recover". When it recovers, I need to be able to retry all of the previously unacknowledged messages.
Would closing and re-opening the channel be an option?
I think I can do that. Are there any pitfalls in this approach. I don't want to lose any messages.
PS: could we continue this discussion on rabbitmq-discuss?
Good idea. Done.I think I can do that. Are there any pitfalls in this approach. I don't want to lose any messages.
PS: could we continue this discussion on rabbitmq-discuss?
Thanks Matthias.
-John