In my program, messages coming from rabbit are handled on different
threads, and once the message is handled it is ack'd. New messages
are also sometimes generated, so I need to send them as well. I know
that channels aren't thread-safe, so I wrote a wrapper class around
QueueingConsumer that maintains two internal BlockingQueues, one for
delivery tags to ack and one for messages to send. I have public
methods ack and publish that take dtags or message representations and
enqueue them, and I overrode the nextDelivery(long) method to look
like this:

public QueueingConsumer.Delivery nextDelivery(long timeout)
throws ShutdownSignalException, InterruptedException
{
QueueingConsumer.Delivery delivery = super.nextDelivery(timeout);
this.processAcks();
this.processPublishes();
return delivery;
}

processAcks() iterates through the acks BlockingQueue and calls

this.getChannel().basicAck(l.longValue(), false);

while processPublishes() iterates through the publishes BlockingQueue and calls

this.getChannel().basicPublish(m.exchange, m.key, ...)

The problem I'm having is that, although all these functions are
always being called in the thread that the consumer is running in, I'm
hanging. My stack trace looks like this:

Stack trace:
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:485)
com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:300)
com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:285)
com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:403)
com.foo.rabbit.AckingQueueingConsumer.processPublishes(AckingQueueingConsumer.java:136)
com.foo.rabbit.AckingQueueingConsumer.nextDelivery(AckingQueueingConsumer.java:40)
[handler stuff]
java.lang.Thread.run(Thread.java:619)

It seems to have been stuck there for quite a while. Am I doing
something obviously wrong, or is this something complicated? I'm
reasonably sure that the channel is only being used in the thread
where the consumer is subscribed, so I'm hoping that I missed some
caveat about doing publishes right after a nextDelivery or something.
Any tips would be very appreciated.

Search Discussions

  • Emile Joubert at Jul 29, 2010 at 2:09 pm
    Hi Tsuraan,

    Channel.flow has been asserted and therefore the basic.publish call has
    blocked. We are aware that our API documentation is incorrect in
    claiming that basic.publish is safe to use in the given context. This
    will be updated.

    You need to enable consumption of messages while publishers are
    potentially blocked. This will allow memory pressure on the broker to be
    relieved and publishers to resume eventually. In your example
    consumption and publication happens in the same thread and therefore
    your application gets completely stuck as soon as publication blocks.

    You could also consider using separate channels for publishing and
    consuming messages.

    Regards

    Emile







    On 27/07/10 21:53, tsuraan wrote:
    In my program, messages coming from rabbit are handled on different
    threads, and once the message is handled it is ack'd. New messages
    are also sometimes generated, so I need to send them as well. I know
    that channels aren't thread-safe, so I wrote a wrapper class around
    QueueingConsumer that maintains two internal BlockingQueues, one for
    delivery tags to ack and one for messages to send. I have public
    methods ack and publish that take dtags or message representations and
    enqueue them, and I overrode the nextDelivery(long) method to look
    like this:

    public QueueingConsumer.Delivery nextDelivery(long timeout)
    throws ShutdownSignalException, InterruptedException
    {
    QueueingConsumer.Delivery delivery = super.nextDelivery(timeout);
    this.processAcks();
    this.processPublishes();
    return delivery;
    }

    processAcks() iterates through the acks BlockingQueue and calls

    this.getChannel().basicAck(l.longValue(), false);

    while processPublishes() iterates through the publishes BlockingQueue and calls

    this.getChannel().basicPublish(m.exchange, m.key, ...)

    The problem I'm having is that, although all these functions are
    always being called in the thread that the consumer is running in, I'm
    hanging. My stack trace looks like this:

    Stack trace:
    java.lang.Object.wait(Native Method)
    java.lang.Object.wait(Object.java:485)
    com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:300)
    com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:285)
    com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:403)
    com.foo.rabbit.AckingQueueingConsumer.processPublishes(AckingQueueingConsumer.java:136)
    com.foo.rabbit.AckingQueueingConsumer.nextDelivery(AckingQueueingConsumer.java:40)
    [handler stuff]
    java.lang.Thread.run(Thread.java:619)

    It seems to have been stuck there for quite a while. Am I doing
    something obviously wrong, or is this something complicated? I'm
    reasonably sure that the channel is only being used in the thread
    where the consumer is subscribed, so I'm hoping that I missed some
    caveat about doing publishes right after a nextDelivery or something.
    Any tips would be very appreciated.
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-discuss at lists.rabbitmq.com
    https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
  • Tsuraan at Jul 29, 2010 at 4:11 pm

    Channel.flow has been asserted and therefore the basic.publish call has
    blocked. We are aware that our API documentation is incorrect in
    claiming that basic.publish is safe to use in the given context. This
    will be updated.
    So this probably only happens when rabbit thinks it's overloaded?
    You need to enable consumption of messages while publishers are
    potentially blocked. This will allow memory pressure on the broker to be
    relieved and publishers to resume eventually. In your example
    consumption and publication happens in the same thread and therefore
    your application gets completely stuck as soon as publication blocks.
    So is it safe to publish messages on a channel when some other thread
    is consuming on that channel? If I acquire a lock on the channel in a
    different thread, and the channel blocks in basicPublish, will that
    still allow new messages to be delivered to the consumer?
    You could also consider using separate channels for publishing and
    consuming messages.
    I'm currently doing acks and publishes in a transaction, but I'm not
    sure if that's entirely necessary. It's certainly convenient and
    safe, but I don't know that I really need to do it. Are acks
    guaranteed not to block the way that publishes do? I obviously can't
    send my acks on a different channel :)
  • Emile Joubert at Jul 30, 2010 at 1:32 pm
    Hi Tsuraan,
    On 29/07/10 17:11, tsuraan wrote:
    Channel.flow has been asserted and therefore the basic.publish call has
    blocked. We are aware that our API documentation is incorrect in
    claiming that basic.publish is safe to use in the given context. This
    will be updated.
    So this probably only happens when rabbit thinks it's overloaded?
    Correct
    You need to enable consumption of messages while publishers are
    potentially blocked. This will allow memory pressure on the broker to be
    relieved and publishers to resume eventually. In your example
    consumption and publication happens in the same thread and therefore
    your application gets completely stuck as soon as publication blocks.
    So is it safe to publish messages on a channel when some other thread
    is consuming on that channel? If I acquire a lock on the channel in a
    different thread, and the channel blocks in basicPublish, will that
    still allow new messages to be delivered to the consumer?
    In general, channels should not be shared by multiple threads, which is
    what you are proposing. In your case it makes sense to publish and
    consume on different channels to prevent this.

    Even so, the channel.flow event from the broker that blocks the producer
    only affects messages travelling in one direction. AMQP permits the
    broker to deliver messages to consumer on the same channel.
    You could also consider using separate channels for publishing and
    consuming messages.
    I'm currently doing acks and publishes in a transaction, but I'm not
    sure if that's entirely necessary. It's certainly convenient and
    safe, but I don't know that I really need to do it. Are acks
    guaranteed not to block the way that publishes do? I obviously can't
    send my acks on a different channel :)
    Acks are guaranteed not to block.



    Regards

    Emile

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
grouprabbitmq-discuss @
categoriesrabbitmq
postedJul 27, '10 at 8:53p
activeJul 30, '10 at 1:32p
posts4
users2
websiterabbitmq.com
irc#rabbitmq

2 users in discussion

Emile Joubert: 2 posts Tsuraan: 2 posts

People

Translate

site design / logo © 2022 Grokbase