Hi,

I have multiple queue consumers running at the same time. They all
listen to one single queue. Messages are sent to the default exchange
with queue name as the routing key. When one of the consumers is busy
processing a message, before acking the message if RabbitMQ service
crashes (simply by stopping the service), the message is sent to all
other consumers immediately which results in processing the same
message multiple times concurrently. Is this the right behavior for
RabbitMQ?

Here is my queue design:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-ha-policy", "all");
channel.queueDeclare(JOB_EXEC_QUEUE, true, false, false, args);

I use durable HA queue but I am not running in a clustered
environment.
===========================
Message publishing:

channel.basicPublish("", JOB_EXEC_QUEUE,
MessageProperties.PERSISTENT_BASIC, message.getBytes());
===========================
Consumer Code:

channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(JOB_EXEC_QUEUE, consumer);

QueueingConsumer.Delivery delivery;
while (true) {
try {
delivery = consumer.nextDelivery();
} catch (InterruptedException ie) {
continue;
}

// process the message

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);
}

I stop the service when one consumer is in the process message phase.

Am I doing something wrong? Is this an expected behavior for RabbitMQ?

Thanks,
Reza

Search Discussions

  • Simon MacMullen at Oct 21, 2011 at 11:37 am

    On 19/10/11 21:02, rasadoll wrote:
    I have multiple queue consumers running at the same time. They all
    listen to one single queue. Messages are sent to the default exchange
    with queue name as the routing key. When one of the consumers is busy
    processing a message, before acking the message if RabbitMQ service
    crashes (simply by stopping the service), the message is sent to all
    other consumers immediately which results in processing the same
    message multiple times concurrently. Is this the right behavior for
    RabbitMQ?
    No. RabbitMQ should really only deliver a given message from a queue to
    a single consumer.

    I haven't been able to replicate the behaviour you describe.

    Cheers, Simon
    Here is my queue design:

    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-ha-policy", "all");
    channel.queueDeclare(JOB_EXEC_QUEUE, true, false, false, args);

    I use durable HA queue but I am not running in a clustered
    environment.
    ===========================
    Message publishing:

    channel.basicPublish("", JOB_EXEC_QUEUE,
    MessageProperties.PERSISTENT_BASIC, message.getBytes());
    ===========================
    Consumer Code:

    channel.basicQos(1);
    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(JOB_EXEC_QUEUE, consumer);

    QueueingConsumer.Delivery delivery;
    while (true) {
    try {
    delivery = consumer.nextDelivery();
    } catch (InterruptedException ie) {
    continue;
    }

    // process the message

    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
    false);
    }

    I stop the service when one consumer is in the process message phase.

    Am I doing something wrong? Is this an expected behavior for RabbitMQ?

    Thanks,
    Reza
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-discuss at lists.rabbitmq.com
    https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

    --
    Simon MacMullen
    RabbitMQ, VMware
  • Rasadoll at Oct 21, 2011 at 4:53 pm
    Hi Simon,

    This problem happens only if consumers run as separate processes (in
    different jvms). If they all run in the same jvm everything works
    fine.

    Thanks,
    Reza
    On Oct 21, 7:37?am, Simon MacMullen wrote:
    On 19/10/11 21:02, rasadoll wrote:

    I have multiple queue consumers running at the same time. They all
    listen to one single queue. Messages are sent to the default exchange
    with queue name as the routing key. When one of the consumers is busy
    processing a message, before acking the message if RabbitMQ service
    crashes (simply by stopping the service), the message is sent to all
    other consumers immediately which results in processing the same
    message multiple times concurrently. Is this the right behavior for
    RabbitMQ?
    No. RabbitMQ should really only deliver a given message from a queue to
    a single consumer.

    I haven't been able to replicate the behaviour you describe.

    Cheers, Simon








    Here is my queue design:
    Map<String, Object> ?args = new HashMap<String, Object>();
    args.put("x-ha-policy", "all");
    channel.queueDeclare(JOB_EXEC_QUEUE, true, false, false, args);
    I use durable HA queue but I am not running in a clustered
    environment.
    ===========================
    Message publishing:
    channel.basicPublish("", JOB_EXEC_QUEUE,
    MessageProperties.PERSISTENT_BASIC, message.getBytes());
    ===========================
    Consumer Code:
    channel.basicQos(1);
    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(JOB_EXEC_QUEUE, consumer);
    QueueingConsumer.Delivery delivery;
    while (true) {
    ? ? ? ? ?try {
    ? ? ? ? ? ?delivery = consumer.nextDelivery();
    ? ? ? ? ?} catch (InterruptedException ie) {
    ? ? ? ? ? ?continue;
    ? ? ? ? ?}
    // process the message
    ? ? ? ? channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
    false);
    }
    I stop the service when one consumer is in the process message phase.
    Am I doing something wrong? Is this an expected behavior for RabbitMQ?
    Thanks,
    Reza
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-disc... at lists.rabbitmq.com
    https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
    --
    Simon MacMullen
    RabbitMQ, VMware
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
  • Rasadoll at Oct 21, 2011 at 5:50 pm
    It's strange because I cannot reproduce it anymore. I'll post more
    info if I could figure out under what circumstances it happens.
    Thanks,
    On Oct 21, 12:53?pm, rasadoll wrote:
    Hi Simon,

    This problem happens only if consumers run as separate processes (in
    different jvms). If they all run in the same jvm everything works
    fine.

    Thanks,
    Reza

    On Oct 21, 7:37?am, Simon MacMullen wrote:






    On 19/10/11 21:02, rasadoll wrote:

    I have multiple queue consumers running at the same time. They all
    listen to one single queue. Messages are sent to the default exchange
    with queue name as the routing key. When one of the consumers is busy
    processing a message, before acking the message if RabbitMQ service
    crashes (simply by stopping the service), the message is sent to all
    other consumers immediately which results in processing the same
    message multiple times concurrently. Is this the right behavior for
    RabbitMQ?
    No. RabbitMQ should really only deliver a given message from a queue to
    a single consumer.
    I haven't been able to replicate the behaviour you describe.
    Cheers, Simon
    Here is my queue design:
    Map<String, Object> ?args = new HashMap<String, Object>();
    args.put("x-ha-policy", "all");
    channel.queueDeclare(JOB_EXEC_QUEUE, true, false, false, args);
    I use durable HA queue but I am not running in a clustered
    environment.
    ===========================
    Message publishing:
    channel.basicPublish("", JOB_EXEC_QUEUE,
    MessageProperties.PERSISTENT_BASIC, message.getBytes());
    ===========================
    Consumer Code:
    channel.basicQos(1);
    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(JOB_EXEC_QUEUE, consumer);
    QueueingConsumer.Delivery delivery;
    while (true) {
    ? ? ? ? ?try {
    ? ? ? ? ? ?delivery = consumer.nextDelivery();
    ? ? ? ? ?} catch (InterruptedException ie) {
    ? ? ? ? ? ?continue;
    ? ? ? ? ?}
    // process the message
    ? ? ? ? channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
    false);
    }
    I stop the service when one consumer is in the process message phase.
    Am I doing something wrong? Is this an expected behavior for RabbitMQ?
    Thanks,
    Reza
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-disc... at lists.rabbitmq.com
    https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
    --
    Simon MacMullen
    RabbitMQ, VMware
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
grouprabbitmq-discuss @
categoriesrabbitmq
postedOct 19, '11 at 8:02p
activeOct 21, '11 at 5:50p
posts4
users2
websiterabbitmq.com
irc#rabbitmq

2 users in discussion

Rasadoll: 3 posts Simon MacMullen: 1 post

People

Translate

site design / logo © 2023 Grokbase