FAQ
Good day,

While writing some code using the Ruby AMQP gem against RabbitMQ, I've
noticed that if I publish a message and quickly close the connection, even
though I've received a publish-ok response from the server, the message
fails to be queued by the broker.

The code in question looks like:

require 'amqp'
EM.run do
client = AMQP.connect(:host => '127.0.0.1')
channel = AMQP::Channel.new(client)
channel.on_error { puts 'channel error'; EM.stop }
exchange = channel.topic 'foo',:durable => true
exchange.publish( 'hello world', :routing_key => "some_topic", :persistent
=> true, :nowait => false ) { puts 'publish-ok'; EM.stop }
end

I should clarify that there is a binding in place to route messages with
routing_key "some_topic" to the a queue on the "foo" exchange.

If I remove the EM.stop call in the publish-ok callback, allowing the
connection to exist after the publish, then I the message is queued.

Is this expected behavior?

I would expect that if the broker has responded with a publish-ok, then
regardless of what happens to the connection from that point forward, it
will do its best to deliver that message to a queue.

--
*Elias Levy*
Fellow, Technical Research Group
SourceFire
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110505/c4caa36d/attachment.htm>

Search Discussions

  • Simon MacMullen at May 6, 2011 at 9:43 am
    Hi Elias.
    On 05/05/11 22:33, Elias Levy wrote:
    While writing some code using the Ruby AMQP gem against RabbitMQ, I've
    noticed that if I publish a message and quickly close the connection,
    even though I've received a publish-ok response from the server, the
    message fails to be queued by the broker.
    I'm not at all familiar with the Ruby client, but I should point out
    that unlike many of the other AMQP methods, basic.publish does not have
    a corresponding basic.publish-ok method; it's always asynchronous. So I
    imagine the post-publish callback fires immediately.

    In order to be able to know when the broker has taken responsibility for
    a message you can either wrap the publish in a transaction (when you see
    tx.commit-ok you know the server has the message) or use the rather more
    lightweight publish confirms:

    http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/

    ...but I'm not sure if they are supported by the Ruby gem yet (Jakub,
    can you comment?)

    Cheers, Simon

    --
    Simon MacMullen
    Staff Engineer, RabbitMQ
    SpringSource, a division of VMware
  • David Wragg at May 6, 2011 at 11:08 am

    Simon MacMullen <simon at rabbitmq.com> writes:
    On 05/05/11 22:33, Elias Levy wrote:
    While writing some code using the Ruby AMQP gem against RabbitMQ, I've
    noticed that if I publish a message and quickly close the connection,
    even though I've received a publish-ok response from the server, the
    message fails to be queued by the broker.
    I'm not at all familiar with the Ruby client, but I should point out
    that unlike many of the other AMQP methods, basic.publish does not
    have a corresponding basic.publish-ok method; it's always
    asynchronous. So I imagine the post-publish callback fires
    immediately.

    In order to be able to know when the broker has taken responsibility
    for a message you can either wrap the publish in a transaction (when
    you see tx.commit-ok you know the server has the message) or use the
    rather more lightweight publish confirms: o>
    http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/
    Another way to solve the problem is to do a synchronous AMQP method
    rather than abruptly closing the connection. If this the sync method
    completes successfully, you can be sure that your published messages
    have reached the broker (it doesn't give you all the guarantees of
    transactions, but it is much lighter weight).

    An easy way to do this with all versions of the AMQP gem (even 0.6.7) is
    to use the AMQP#close callback. E.g., add something like this to your
    code:

    client.close { puts "Closed ok" ; EM.stop }

    David

    --
    David Wragg
    Staff Engineer, RabbitMQ
    VMware, Inc.
  • Michael Nacos at Jul 4, 2011 at 8:24 pm
    there seems to be some support for tx_select, tx_commit, tx_rollback in the
    master branch of the amqp gem, but not in the 0.7.x-stable branch, which is
    what most people are using

    2011/5/6 David Wragg <david at rabbitmq.com>
    Simon MacMullen <simon at rabbitmq.com> writes:
    On 05/05/11 22:33, Elias Levy wrote:
    While writing some code using the Ruby AMQP gem against RabbitMQ, I've
    noticed that if I publish a message and quickly close the connection,
    even though I've received a publish-ok response from the server, the
    message fails to be queued by the broker.
    I'm not at all familiar with the Ruby client, but I should point out
    that unlike many of the other AMQP methods, basic.publish does not
    have a corresponding basic.publish-ok method; it's always
    asynchronous. So I imagine the post-publish callback fires
    immediately.

    In order to be able to know when the broker has taken responsibility
    for a message you can either wrap the publish in a transaction (when
    you see tx.commit-ok you know the server has the message) or use the
    rather more lightweight publish confirms: o>
    http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/
    Another way to solve the problem is to do a synchronous AMQP method
    rather than abruptly closing the connection. If this the sync method
    completes successfully, you can be sure that your published messages
    have reached the broker (it doesn't give you all the guarantees of
    transactions, but it is much lighter weight).

    An easy way to do this with all versions of the AMQP gem (even 0.6.7) is
    to use the AMQP#close callback. E.g., add something like this to your
    code:

    client.close { puts "Closed ok" ; EM.stop }

    David

    --
    David Wragg
    Staff Engineer, RabbitMQ
    VMware, Inc.
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-discuss at lists.rabbitmq.com
    https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
    -------------- next part --------------
    An HTML attachment was scrubbed...
    URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110704/54f42baf/attachment.htm>
  • Jakub Šťastný at Jul 4, 2011 at 8:30 pm
    AMQP 0.8 is available as RC, it's way more stable than 0.7, so I would
    recommend to just use the RC.

    Jakub

    http://www.flickr.com/photos/jakub-stastny
    http://twitter.com/botanicus



    2011/7/4 Michael Nacos <m.nacos at gmail.com>
    there seems to be some support for tx_select, tx_commit, tx_rollback in the
    master branch of the amqp gem, but not in the 0.7.x-stable branch, which is
    what most people are using

    2011/5/6 David Wragg <david at rabbitmq.com>
    Simon MacMullen <simon at rabbitmq.com> writes:
    On 05/05/11 22:33, Elias Levy wrote:
    While writing some code using the Ruby AMQP gem against RabbitMQ, I've
    noticed that if I publish a message and quickly close the connection,
    even though I've received a publish-ok response from the server, the
    message fails to be queued by the broker.
    I'm not at all familiar with the Ruby client, but I should point out
    that unlike many of the other AMQP methods, basic.publish does not
    have a corresponding basic.publish-ok method; it's always
    asynchronous. So I imagine the post-publish callback fires
    immediately.

    In order to be able to know when the broker has taken responsibility
    for a message you can either wrap the publish in a transaction (when
    you see tx.commit-ok you know the server has the message) or use the
    rather more lightweight publish confirms: o>
    http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/
    Another way to solve the problem is to do a synchronous AMQP method
    rather than abruptly closing the connection. If this the sync method
    completes successfully, you can be sure that your published messages
    have reached the broker (it doesn't give you all the guarantees of
    transactions, but it is much lighter weight).

    An easy way to do this with all versions of the AMQP gem (even 0.6.7) is
    to use the AMQP#close callback. E.g., add something like this to your
    code:

    client.close { puts "Closed ok" ; EM.stop }

    David

    --
    David Wragg
    Staff Engineer, RabbitMQ
    VMware, Inc.
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-discuss at lists.rabbitmq.com
    https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
    -------------- next part --------------
    An HTML attachment was scrubbed...
    URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110704/cb31aafd/attachment.htm>
  • Michael Nacos at Jul 8, 2011 at 1:58 pm
    so, what's currently the best way to publish messages consumed from one
    queue into another with the amqp gem? This is quite important in processing
    scenaria when no messages may be lost. In particular, is the following code
    safe? Will the pseudo-synchronous *publish* call return only after the
    message has been accepted in the second queue (:nowait => false)? otherwise,
    what would be the point of wrapping the publish call with @channel.tx_select
    / @channel.tx_commit ?

    # consumer which publishes each message to another queue
    # ------------------------------------------------------

    @consume_from = 'first'
    @deliver_to = 'second'

    EM.run do
    AMQP.connect do |connection|
    @channel = MQ.new(connection)
    # making sure the relevant queues exist
    @queue1 = @channel.queue(@consume_from, :passive => false, :durable =>
    true)
    @queue2 = @channel.queue(@deliver_to, :passive => false, :durable =>
    true)
    # setting up the consumer loop
    @queue1.subscribe(:ack => true) do |metadata, data|
    @channel.default_exchange.publish(data, \
    :routing_key => @deliver_to, \
    :persistent => true, \
    :nowait => false)
    puts '.'
    metadata.ack
    end
    end
    end


    2011/7/4 Jakub ??astn? <stastny at 101ideas.cz>
    AMQP 0.8 is available as RC, it's way more stable than 0.7, so I would
    recommend to just use the RC.

    Jakub

    http://www.flickr.com/photos/jakub-stastny
    http://twitter.com/botanicus



    2011/7/4 Michael Nacos <m.nacos at gmail.com>
    there seems to be some support for tx_select, tx_commit, tx_rollback in
    the master branch of the amqp gem, but not in the 0.7.x-stable branch, which
    is what most people are using

    2011/5/6 David Wragg <david at rabbitmq.com>
    Simon MacMullen <simon at rabbitmq.com> writes:
    On 05/05/11 22:33, Elias Levy wrote:
    While writing some code using the Ruby AMQP gem against RabbitMQ, I've
    noticed that if I publish a message and quickly close the connection,
    even though I've received a publish-ok response from the server, the
    message fails to be queued by the broker.
    I'm not at all familiar with the Ruby client, but I should point out
    that unlike many of the other AMQP methods, basic.publish does not
    have a corresponding basic.publish-ok method; it's always
    asynchronous. So I imagine the post-publish callback fires
    immediately.

    In order to be able to know when the broker has taken responsibility
    for a message you can either wrap the publish in a transaction (when
    you see tx.commit-ok you know the server has the message) or use the
    rather more lightweight publish confirms: o>
    http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/

    Another way to solve the problem is to do a synchronous AMQP method
    rather than abruptly closing the connection. If this the sync method
    completes successfully, you can be sure that your published messages
    have reached the broker (it doesn't give you all the guarantees of
    transactions, but it is much lighter weight).

    An easy way to do this with all versions of the AMQP gem (even 0.6.7) is
    to use the AMQP#close callback. E.g., add something like this to your
    code:

    client.close { puts "Closed ok" ; EM.stop }

    David

    --
    David Wragg
    Staff Engineer, RabbitMQ
    VMware, Inc.
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-discuss at lists.rabbitmq.com
    https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
    -------------- next part --------------
    An HTML attachment was scrubbed...
    URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110708/7d65315d/attachment.htm>
  • Michael Klishin at Jul 8, 2011 at 2:19 pm
    2011/7/8 Michael Nacos <m.nacos at gmail.com>
    so, what's currently the best way to publish messages consumed from one
    queue into another with the amqp gem?
    Just republish them or take

    This is quite important in processing scenaria when no messages may be
    lost. In particular, is the following code safe? Will the pseudo-synchronous
    *publish* call return only after the message has been accepted in the
    second queue (:nowait => false)? otherwise, what would be the point of
    wrapping the publish call with @channel.tx_select / @channel.tx_commit ?
    Exchange#publish returns as soon as your data was placed into an outgoing
    data buffer. This is documented in detail (including why knowing that you
    message was received is impossible without receiving an acknowledgement for
    it):

    http://rubydoc.info/github/ruby-amqp/amqp/master/file/docs/Exchanges.textile#Publishing_callback__Reliable_delivery_in_distributed_environments

    An examples of publishing confirmations extension in action:
    https://github.com/ruby-amqp/amqp/blob/master/examples/extensions/rabbitmq/publisher_confirmations_with_transient_messages.rb

    I am not sure what "(:nowait => false)" is referring to, can you please
    explain?
  • Michael Klishin at Jul 8, 2011 at 2:30 pm
    2011/7/8 Michael Klishin <michael.s.klishin at gmail.com>
    2011/7/8 Michael Nacos <m.nacos at gmail.com>
    so, what's currently the best way to publish messages consumed from one
    queue into another with the amqp gem?
    Just republish them or take
    Disregard this part. I now understand the question better. With a multi-step
    workflow, when one application (A1) consumes messages from one queue,
    processes them and publishes the original message or a different one to
    another queue, you need to use a combination of features:

    1. Explicit message acknowledgements

    http://rubydoc.info/github/ruby-amqp/amqp/master/file/docs/Queues.textile#Message_acknowledgements


    2. Publisher confirmations

    The only way to know that your re-published message is received by the
    broker is to receive an acknowledgement from it.
    I now realize that documentation section does not provide any code examples.
    I will put something together later today.

    http://rubydoc.info/github/ruby-amqp/amqp/master/file/docs/Exchanges.textile#Using_Publisher_Confirms_extension_to_AMQP_0_9_1


    So A1 would consume a message, process it, republish it and as soon as
    publishing confirmation arrives, A1 acknowledges the original one.
  • Jakub Šťastný at Jul 8, 2011 at 2:28 pm
    Basic.Publish isn't pseudo-synchronous, it's really
    asynchronous. Exchange#publish doesn't take nowait option (so no, your code
    isn't safe), it simply send the data and that's it. Then you have no idea
    whether everything went OK or not. That's why you can use either
    transactions (yes, you'd have to wrap the code in it as you've mentioned) or
    publisher confirms (which I'd personally prefer as transactions are really
    slow and fairly tricky). The documentation for publisher confirms in AMQP
    gem is here:
    http://rdoc.info/github/ruby-amqp/amqp/master/file/docs/Durability.textile

    Jakub

    http://www.flickr.com/photos/jakub-stastny
    http://twitter.com/botanicus



    2011/7/8 Michael Nacos <m.nacos at gmail.com>
    so, what's currently the best way to publish messages consumed from one
    queue into another with the amqp gem? This is quite important in processing
    scenaria when no messages may be lost. In particular, is the following code
    safe? Will the pseudo-synchronous *publish* call return only after the
    message has been accepted in the second queue (:nowait => false)? otherwise,
    what would be the point of wrapping the publish call with @channel.tx_select
    / @channel.tx_commit ?

    # consumer which publishes each message to another queue
    # ------------------------------------------------------

    @consume_from = 'first'
    @deliver_to = 'second'

    EM.run do
    AMQP.connect do |connection|
    @channel = MQ.new(connection)
    # making sure the relevant queues exist
    @queue1 = @channel.queue(@consume_from, :passive => false, :durable =>
    true)
    @queue2 = @channel.queue(@deliver_to, :passive => false, :durable =>
    true)
    # setting up the consumer loop
    @queue1.subscribe(:ack => true) do |metadata, data|
    @channel.default_exchange.publish(data, \
    :routing_key => @deliver_to, \
    :persistent => true, \
    :nowait => false)
    puts '.'
    metadata.ack
    end
    end
    end


    2011/7/4 Jakub ??astn? <stastny at 101ideas.cz>
    AMQP 0.8 is available as RC, it's way more stable than 0.7, so I would
    recommend to just use the RC.

    Jakub

    http://www.flickr.com/photos/jakub-stastny
    http://twitter.com/botanicus



    2011/7/4 Michael Nacos <m.nacos at gmail.com>
    there seems to be some support for tx_select, tx_commit, tx_rollback in
    the master branch of the amqp gem, but not in the 0.7.x-stable branch, which
    is what most people are using

    2011/5/6 David Wragg <david at rabbitmq.com>
    Simon MacMullen <simon at rabbitmq.com> writes:
    On 05/05/11 22:33, Elias Levy wrote:
    While writing some code using the Ruby AMQP gem against RabbitMQ,
    I've
    noticed that if I publish a message and quickly close the connection,
    even though I've received a publish-ok response from the server, the
    message fails to be queued by the broker.
    I'm not at all familiar with the Ruby client, but I should point out
    that unlike many of the other AMQP methods, basic.publish does not
    have a corresponding basic.publish-ok method; it's always
    asynchronous. So I imagine the post-publish callback fires
    immediately.

    In order to be able to know when the broker has taken responsibility
    for a message you can either wrap the publish in a transaction (when
    you see tx.commit-ok you know the server has the message) or use the
    rather more lightweight publish confirms: o>
    http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/

    Another way to solve the problem is to do a synchronous AMQP method
    rather than abruptly closing the connection. If this the sync method
    completes successfully, you can be sure that your published messages
    have reached the broker (it doesn't give you all the guarantees of
    transactions, but it is much lighter weight).

    An easy way to do this with all versions of the AMQP gem (even 0.6.7) is
    to use the AMQP#close callback. E.g., add something like this to your
    code:

    client.close { puts "Closed ok" ; EM.stop }

    David

    --
    David Wragg
    Staff Engineer, RabbitMQ
    VMware, Inc.
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-discuss at lists.rabbitmq.com
    https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
    -------------- next part --------------
    An HTML attachment was scrubbed...
    URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110708/bb0c4788/attachment.htm>
  • Michael Nacos at Jul 8, 2011 at 3:16 pm
    so if the publish call returns immediately with no guarantees, what's the
    point of adding a @channel.tx_commit right after it? the question becomes
    how do I get a reliable publish ack to base the tx_commit/metadata.ack on

    I saw this in the docs for publisher confirms:

    # define a callback that will be executed when message is
    acknowledgedchannel.on_ack do |basic_ack|
    puts "Received an acknowledgement: delivery_tag =
    #{basic_ack.delivery_tag}, multiple = #{basic_ack.multiple}"end

    which is great, but how does it fit within the consume loop? I need
    execution to block until it's safe to issue the metadata.ack call. Is this
    something I can achieve with eventmachine?

    2011/7/8 Jakub ??astn? <stastny at 101ideas.cz>
    Basic.Publish isn't pseudo-synchronous, it's really
    asynchronous. Exchange#publish doesn't take nowait option (so no, your code
    isn't safe), it simply send the data and that's it. Then you have no idea
    whether everything went OK or not. That's why you can use either
    transactions (yes, you'd have to wrap the code in it as you've mentioned) or
    publisher confirms (which I'd personally prefer as transactions are really
    slow and fairly tricky). The documentation for publisher confirms in AMQP
    gem is here:
    http://rdoc.info/github/ruby-amqp/amqp/master/file/docs/Durability.textile

    Jakub

    http://www.flickr.com/photos/jakub-stastny
    http://twitter.com/botanicus



    2011/7/8 Michael Nacos <m.nacos at gmail.com>
    so, what's currently the best way to publish messages consumed from one
    queue into another with the amqp gem? This is quite important in processing
    scenaria when no messages may be lost. In particular, is the following code
    safe? Will the pseudo-synchronous *publish* call return only after the
    message has been accepted in the second queue (:nowait => false)? otherwise,
    what would be the point of wrapping the publish call with @channel.tx_select
    / @channel.tx_commit ?

    # consumer which publishes each message to another queue
    # ------------------------------------------------------

    @consume_from = 'first'
    @deliver_to = 'second'

    EM.run do
    AMQP.connect do |connection|
    @channel = MQ.new(connection)
    # making sure the relevant queues exist
    @queue1 = @channel.queue(@consume_from, :passive => false, :durable =>
    true)
    @queue2 = @channel.queue(@deliver_to, :passive => false, :durable =>
    true)
    # setting up the consumer loop
    @queue1.subscribe(:ack => true) do |metadata, data|
    @channel.default_exchange.publish(data, \
    :routing_key => @deliver_to, \
    :persistent => true, \
    :nowait => false)
    puts '.'
    metadata.ack
    end
    end
    end


    2011/7/4 Jakub ??astn? <stastny at 101ideas.cz>
    AMQP 0.8 is available as RC, it's way more stable than 0.7, so I would
    recommend to just use the RC.

    Jakub

    http://www.flickr.com/photos/jakub-stastny
    http://twitter.com/botanicus



    2011/7/4 Michael Nacos <m.nacos at gmail.com>
    there seems to be some support for tx_select, tx_commit, tx_rollback in
    the master branch of the amqp gem, but not in the 0.7.x-stable branch, which
    is what most people are using

    2011/5/6 David Wragg <david at rabbitmq.com>
    Simon MacMullen <simon at rabbitmq.com> writes:
    On 05/05/11 22:33, Elias Levy wrote:
    While writing some code using the Ruby AMQP gem against RabbitMQ,
    I've
    noticed that if I publish a message and quickly close the
    connection,
    even though I've received a publish-ok response from the server, the
    message fails to be queued by the broker.
    I'm not at all familiar with the Ruby client, but I should point out
    that unlike many of the other AMQP methods, basic.publish does not
    have a corresponding basic.publish-ok method; it's always
    asynchronous. So I imagine the post-publish callback fires
    immediately.

    In order to be able to know when the broker has taken responsibility
    for a message you can either wrap the publish in a transaction (when
    you see tx.commit-ok you know the server has the message) or use the
    rather more lightweight publish confirms: o>
    http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/

    Another way to solve the problem is to do a synchronous AMQP method
    rather than abruptly closing the connection. If this the sync method
    completes successfully, you can be sure that your published messages
    have reached the broker (it doesn't give you all the guarantees of
    transactions, but it is much lighter weight).

    An easy way to do this with all versions of the AMQP gem (even 0.6.7)
    is
    to use the AMQP#close callback. E.g., add something like this to your
    code:

    client.close { puts "Closed ok" ; EM.stop }

    David

    --
    David Wragg
    Staff Engineer, RabbitMQ
    VMware, Inc.
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-discuss at lists.rabbitmq.com
    https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
    -------------- next part --------------
    An HTML attachment was scrubbed...
    URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110708/9d885563/attachment.htm>
  • Stastny Jakub at Jul 8, 2011 at 3:32 pm

    El 8 Jul 2011, a las 16:16, Michael Nacos escribi?:

    so if the publish call returns immediately with no guarantees, what's the point of adding a @channel.tx_commit right after it?
    Transactions are mainly atomicity, so if you publish more messages, then running it in a transactions will result in saving all or nothing (and getting an error back). Using it for one item will result in getting the error back, so you'll know if it went wrong (I think, I don't have much experience with them).

    The docs:

    "The Tx class allows publish and ack operations to be batched into atomic units of work. The intention is that all publish and ack requests issued within a transaction will complete successfully or none of them will. Servers SHOULD implement atomic transactions at least where all publish or ack requests affect a single queue. Transactions that cover multiple queues may be non atomic, given that queues can be created and destroyed asynchronously, and such events do not form part of any transaction."
    the question becomes how do I get a reliable publish ack to base the tx_commit/metadata.ack on

    I saw this in the docs for publisher confirms:

    # define a callback that will be executed when message is acknowledged
    channel.on_ack do |basic_ack|
    puts "Received an acknowledgement: delivery_tag = #{basic_ack.delivery_tag}, multiple = #{basic_ack.multiple}"
    end
    which is great, but how does it fit within the consume loop? I need execution to block until it's safe to issue the metadata.ack call. Is this something I can achieve with eventmachine?
    I'm not sure if I understand ... this is a callback, it doesn't block as well as #subscribe, it's a callback called when a message is delivered to the client. EventMachine is an implementation of reactor pattern (it means it's async). So it works based on events (when I get this, do that ... ). BTW just to make sure, do you know that this on_ack is for ack which the *broker* sends? So you don't call metadata.ack for this (you ack messages when they're actually processed on the consumer).

    Cheers,

    Jakub
    2011/7/8 Jakub ??astn? <stastny at 101ideas.cz>
    Basic.Publish isn't pseudo-synchronous, it's really asynchronous. Exchange#publish doesn't take nowait option (so no, your code isn't safe), it simply send the data and that's it. Then you have no idea whether everything went OK or not. That's why you can use either transactions (yes, you'd have to wrap the code in it as you've mentioned) or publisher confirms (which I'd personally prefer as transactions are really slow and fairly tricky). The documentation for publisher confirms in AMQP gem is here: http://rdoc.info/github/ruby-amqp/amqp/master/file/docs/Durability.textile

    Jakub

    http://www.flickr.com/photos/jakub-stastny
    http://twitter.com/botanicus



    2011/7/8 Michael Nacos <m.nacos at gmail.com>
    so, what's currently the best way to publish messages consumed from one queue into another with the amqp gem? This is quite important in processing scenaria when no messages may be lost. In particular, is the following code safe? Will the pseudo-synchronous publish call return only after the message has been accepted in the second queue (:nowait => false)? otherwise, what would be the point of wrapping the publish call with @channel.tx_select / @channel.tx_commit ?

    # consumer which publishes each message to another queue
    # ------------------------------------------------------

    @consume_from = 'first'
    @deliver_to = 'second'

    EM.run do
    AMQP.connect do |connection|
    @channel = MQ.new(connection)
    # making sure the relevant queues exist
    @queue1 = @channel.queue(@consume_from, :passive => false, :durable => true)
    @queue2 = @channel.queue(@deliver_to, :passive => false, :durable => true)
    # setting up the consumer loop
    @queue1.subscribe(:ack => true) do |metadata, data|
    @channel.default_exchange.publish(data, \
    :routing_key => @deliver_to, \
    :persistent => true, \
    :nowait => false)
    puts '.'
    metadata.ack
    end
    end
    end

    2011/7/4 Jakub ??astn? <stastny at 101ideas.cz>
    AMQP 0.8 is available as RC, it's way more stable than 0.7, so I would recommend to just use the RC.

    Jakub

    http://www.flickr.com/photos/jakub-stastny
    http://twitter.com/botanicus



    2011/7/4 Michael Nacos <m.nacos at gmail.com>
    there seems to be some support for tx_select, tx_commit, tx_rollback in the master branch of the amqp gem, but not in the 0.7.x-stable branch, which is what most people are using

    2011/5/6 David Wragg <david at rabbitmq.com>
    Simon MacMullen <simon at rabbitmq.com> writes:
    On 05/05/11 22:33, Elias Levy wrote:
    While writing some code using the Ruby AMQP gem against RabbitMQ, I've
    noticed that if I publish a message and quickly close the connection,
    even though I've received a publish-ok response from the server, the
    message fails to be queued by the broker.
    I'm not at all familiar with the Ruby client, but I should point out
    that unlike many of the other AMQP methods, basic.publish does not
    have a corresponding basic.publish-ok method; it's always
    asynchronous. So I imagine the post-publish callback fires
    immediately.

    In order to be able to know when the broker has taken responsibility
    for a message you can either wrap the publish in a transaction (when
    you see tx.commit-ok you know the server has the message) or use the
    rather more lightweight publish confirms: o>
    http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/
    Another way to solve the problem is to do a synchronous AMQP method
    rather than abruptly closing the connection. If this the sync method
    completes successfully, you can be sure that your published messages
    have reached the broker (it doesn't give you all the guarantees of
    transactions, but it is much lighter weight).

    An easy way to do this with all versions of the AMQP gem (even 0.6.7) is
    to use the AMQP#close callback. E.g., add something like this to your
    code:

    client.close { puts "Closed ok" ; EM.stop }

    David

    --
    David Wragg
    Staff Engineer, RabbitMQ
    VMware, Inc.
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-discuss at lists.rabbitmq.com
    https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss



    -------------- next part --------------
    An HTML attachment was scrubbed...
    URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110708/5e5d69a8/attachment.htm>
  • Michael Nacos at Jul 8, 2011 at 4:01 pm
    I forgot to mention my example code hangs if I introduce @channel.tx_select
    and @channel.tx_commit around the publish call. I'm using amqp 0.8.0rc13 +
    rabbit 2.4.1 & 2.5.1 -- perhaps this related to how transactions work and
    the fact I'm re-using the same channel -- should I even be doing this?

    I've got plenty of experience with transactions and atomicity, but in a
    database context, not amqp/rabbit. I think there is considerable complexity
    (made worse by my lack of familiarity with eventmachine and the amqp
    protocol): in this 'rebroadcaster' scenario is it better:

    - to use the same connection for 'consume' and 'publish' or different
    ones?
    - to use the same channel or different ones? (do amqp transactions
    operate on a per-channel basis?)

    Jakub, I understand eventmachine is event-driven, this is how the subscribe
    call works. But within its asynchronous consume loop, I need to defer the
    queue1 ack until messages have been published in queue2. Sorry, this is
    probably lack of familiarity with eventmachine. Do you mean I could have two
    handlers within the same EM.run block, a 'subscribe' one and a 'on_ack' one
    handling the publish confirms within which the queue1 ack is done? How do I
    know which queue1 message I'm getting the on_ack notification for?

    Michael
    On 8 July 2011 16:32, Stastny Jakub wrote:

    El 8 Jul 2011, a las 16:16, Michael Nacos escribi?:

    so if the publish call returns immediately with no guarantees, what's the
    point of adding a @channel.tx_commit right after it?


    Transactions are mainly atomicity, so if you publish more messages, then
    running it in a transactions will result in saving all or nothing (and
    getting an error back). Using it for one item will result in getting the
    error back, so you'll know if it went wrong (I think, I don't have much
    experience with them).

    The docs:

    "The Tx class allows publish and ack operations to be batched into atomic
    units of work. The intention is that all publish and ack requests issued
    within a transaction will complete successfully or none of them will.
    Servers SHOULD implement atomic transactions at least where all publish or
    ack requests affect a single queue. Transactions that cover multiple queues
    may be non?atomic, given that queues can be created and destroyed
    asynchronously, and such events do not form part of any transaction."

    the question becomes how do I get a reliable publish ack to base the
    tx_commit/metadata.ack on

    I saw this in the docs for publisher confirms:

    # define a callback that will be executed when message is acknowledgedchannel.on_ack do |basic_ack|
    puts "Received an acknowledgement: delivery_tag = #{basic_ack.delivery_tag}, multiple = #{basic_ack.multiple}"end

    which is great, but how does it fit within the consume loop? I need
    execution to block until it's safe to issue the metadata.ack call. Is this
    something I can achieve with eventmachine?


    I'm not sure if I understand ... this is a callback, it doesn't block as
    well as #subscribe, it's a callback called when a message is delivered to
    the client. EventMachine is an implementation of reactor pattern (it means
    it's async). So it works based on events (when I get this, do that ... ).
    BTW just to make sure, do you know that this on_ack is for ack which the
    *broker* sends? So you don't call metadata.ack for this (you ack messages
    when they're actually processed on the consumer).

    Cheers,

    Jakub

    2011/7/8 Jakub ??astn? <stastny at 101ideas.cz>
    Basic.Publish isn't pseudo-synchronous, it's really
    asynchronous. Exchange#publish doesn't take nowait option (so no, your code
    isn't safe), it simply send the data and that's it. Then you have no idea
    whether everything went OK or not. That's why you can use either
    transactions (yes, you'd have to wrap the code in it as you've mentioned) or
    publisher confirms (which I'd personally prefer as transactions are really
    slow and fairly tricky). The documentation for publisher confirms in AMQP
    gem is here:
    http://rdoc.info/github/ruby-amqp/amqp/master/file/docs/Durability.textile

    Jakub

    http://www.flickr.com/photos/jakub-stastny
    http://twitter.com/botanicus



    2011/7/8 Michael Nacos <m.nacos at gmail.com>
    so, what's currently the best way to publish messages consumed from one
    queue into another with the amqp gem? This is quite important in processing
    scenaria when no messages may be lost. In particular, is the following code
    safe? Will the pseudo-synchronous *publish* call return only after the
    message has been accepted in the second queue (:nowait => false)? otherwise,
    what would be the point of wrapping the publish call with @channel.tx_select
    / @channel.tx_commit ?

    # consumer which publishes each message to another queue
    # ------------------------------------------------------

    @consume_from = 'first'
    @deliver_to = 'second'

    EM.run do
    AMQP.connect do |connection|
    @channel = MQ.new(connection)
    # making sure the relevant queues exist
    @queue1 = @channel.queue(@consume_from, :passive => false, :durable
    => true)
    @queue2 = @channel.queue(@deliver_to, :passive => false, :durable =>
    true)
    # setting up the consumer loop
    @queue1.subscribe(:ack => true) do |metadata, data|
    @channel.default_exchange.publish(data, \
    :routing_key => @deliver_to, \
    :persistent => true, \
    :nowait => false)
    puts '.'
    metadata.ack
    end
    end
    end


    2011/7/4 Jakub ??astn? <stastny at 101ideas.cz>
    AMQP 0.8 is available as RC, it's way more stable than 0.7, so I would
    recommend to just use the RC.

    Jakub

    http://www.flickr.com/photos/jakub-stastny
    http://twitter.com/botanicus



    2011/7/4 Michael Nacos <m.nacos at gmail.com>
    there seems to be some support for tx_select, tx_commit, tx_rollback in
    the master branch of the amqp gem, but not in the 0.7.x-stable branch, which
    is what most people are using

    2011/5/6 David Wragg <david at rabbitmq.com>
    Simon MacMullen <simon at rabbitmq.com> writes:
    On 05/05/11 22:33, Elias Levy wrote:
    While writing some code using the Ruby AMQP gem against RabbitMQ,
    I've
    noticed that if I publish a message and quickly close the
    connection,
    even though I've received a publish-ok response from the server,
    the
    message fails to be queued by the broker.
    I'm not at all familiar with the Ruby client, but I should point out
    that unlike many of the other AMQP methods, basic.publish does not
    have a corresponding basic.publish-ok method; it's always
    asynchronous. So I imagine the post-publish callback fires
    immediately.

    In order to be able to know when the broker has taken responsibility
    for a message you can either wrap the publish in a transaction (when
    you see tx.commit-ok you know the server has the message) or use the
    rather more lightweight publish confirms: o>
    http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/

    Another way to solve the problem is to do a synchronous AMQP method
    rather than abruptly closing the connection. If this the sync method
    completes successfully, you can be sure that your published messages
    have reached the broker (it doesn't give you all the guarantees of
    transactions, but it is much lighter weight).

    An easy way to do this with all versions of the AMQP gem (even 0.6.7)
    is
    to use the AMQP#close callback. E.g., add something like this to your
    code:

    client.close { puts "Closed ok" ; EM.stop }

    David

    --
    David Wragg
    Staff Engineer, RabbitMQ
    VMware, Inc.
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-discuss at lists.rabbitmq.com
    https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
    -------------- next part --------------
    An HTML attachment was scrubbed...
    URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110708/e4a67a31/attachment.htm>
  • Michael Klishin at Jul 8, 2011 at 4:13 pm
    2011/7/8 Michael Nacos <m.nacos at gmail.com>
    I forgot to mention my example code hangs if I introduce @channel.tx_select
    and @channel.tx_commit around the publish call. I'm using amqp 0.8.0rc13 +
    rabbit 2.4.1 & 2.5.1 -- perhaps this related to how transactions work and
    the fact I'm re-using the same channel -- should I even be doing this?


    No, it sounds like a bug. Can you post a small snippet of code that
    reproduces the issue?
  • Michael Nacos at Jul 8, 2011 at 4:37 pm
    the code is similar to the one I originally posted

    @queue1.subscribe(:ack => true) do |metadata, data|
    @channel.tx_select
    @channel.default_exchange.publish(data, \
    :routing_key => @deliver_to, \
    :persistent => true, \
    :nowait => false)
    puts '.'
    @channel.tx_commit
    metadata.ack
    @channel.tx_commit # <-- if not present, there is always one unacked
    msg in @queue1
    end

    it wasn't hanging, but because of the tx_select, the ack didn't happen until
    the next time the consume callback was run, which I fixed by introducing
    another tx_commit right after the ack. mind you, with one tx_commit right
    after metadata.ack I lost messages.
    On 8 July 2011 17:13, Michael Klishin wrote:

    2011/7/8 Michael Nacos <m.nacos at gmail.com>
    I forgot to mention my example code hangs if I introduce
    @channel.tx_select and @channel.tx_commit around the publish call. I'm using
    amqp 0.8.0rc13 + rabbit 2.4.1 & 2.5.1 -- perhaps this related to how
    transactions work and the fact I'm re-using the same channel -- should I
    even be doing this?


    No, it sounds like a bug. Can you post a small snippet of code that
    reproduces the issue?

    --
    MK

    http://github.com/michaelklishin
    http://twitter.com/michaelklishin
    -------------- next part --------------
    An HTML attachment was scrubbed...
    URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110708/79d318a0/attachment.htm>
  • Matthias Radestock at Jul 11, 2011 at 12:35 pm
    Michael,
    On 08/07/11 17:37, Michael Nacos wrote:
    the code is similar to the one I originally posted

    @queue1.subscribe(:ack => true) do |metadata, data|
    @channel.tx_select
    @channel.default_exchange.publish(data, \
    :routing_key => @deliver_to, \
    :persistent => true, \
    :nowait => false)
    puts '.'
    @channel.tx_commit
    metadata.ack
    @channel.tx_commit # <-- if not present, there is always one
    unacked msg in @queue1
    end

    it wasn't hanging, but because of the tx_select, the ack didn't happen
    until the next time the consume callback was run, which I fixed by
    introducing another tx_commit right after the ack. mind you, with one
    tx_commit right after metadata.ack I lost messages.
    Under what circumstances did messages get lost? When killing the broker?

    I could see that happening if the broker is killed during the tx.commit
    - the 'ack' may get processed before the 'publish', and if the broker is
    nuked in-between the latter would get lost.

    'confirms' would be a better way to handle the above scenario, i.e. wait
    for confirmation of the 'publish' and only then send the ack.

    Regards,

    Matthias.
  • Michael Klishin at Jul 8, 2011 at 4:17 pm
    2011/7/8 Michael Nacos <m.nacos at gmail.com>
    Do you mean I could have two handlers within the same EM.run block, a
    'subscribe' one and a 'on_ack' one handling the publish confirms within
    which the queue1 ack is done? How do I know which queue1 message I'm getting
    the on_ack notification for?

    You need to integrate both consumer and publishing confirms code into an
    object that can keep track of any additional state you need. An example of
    such integration:

    http://rdoc.info/github/ruby-amqp/amqp/master/file/docs/GettingStarted.textile#Integration_with_objects

    (this example does not keep track of any state but demonstrates
    Object#method and Method#to_proc combination technique)
  • Michael Klishin at Jul 8, 2011 at 4:18 pm
    2011/7/8 Michael Nacos (do amqp transactions operate on a per-channel basis?)


    They do.
  • Michael Klishin at Jul 8, 2011 at 3:56 pm
    2011/7/8 Michael Nacos <m.nacos at gmail.com>
    which is great, but how does it fit within the consume loop? I need
    execution to block until it's safe to issue the metadata.ack call. Is this
    something I can achieve with eventmachine?


    Yes. You can use EventMachine::Deferrable for that. I can't explain how
    without sitting down to write a full example.
  • Michael Nacos at Jul 8, 2011 at 4:02 pm
    btw, thanks everybody for taking the time... excellent response times. I'll
    look into EventMachine::Deferrable, too

    2011/7/8 Michael Klishin <michael.s.klishin at gmail.com>
    2011/7/8 Michael Nacos <m.nacos at gmail.com>
    which is great, but how does it fit within the consume loop? I need
    execution to block until it's safe to issue the metadata.ack call. Is this
    something I can achieve with eventmachine?


    Yes. You can use EventMachine::Deferrable for that. I can't explain how
    without sitting down to write a full example.

    --
    MK

    http://github.com/michaelklishin
    http://twitter.com/michaelklishin
    -------------- next part --------------
    An HTML attachment was scrubbed...
    URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110708/c779a748/attachment.htm>
  • Michael Klishin at Jul 8, 2011 at 2:41 pm
    2011/7/8 Michael Nacos <m.nacos at gmail.com>
    so, what's currently the best way to publish messages consumed from one
    queue into another with the amqp gem? This is quite important in processing
    scenaria when no messages may be lost.

    Michael,

    Does this piece of code look like what you would like to have? An entity
    that consumes messages from one queue and republishes them safely into
    another queue, with a way to transform the message?

    https://gist.github.com/1071983

    This imaginary AMQP::Rebroadcaster is not that difficult to design and add
    to the amqp gem for RC14 (will be out in a few weeks after
    some more testing with real world apps).
  • Michael Nacos at Jul 8, 2011 at 3:17 pm
    Michael, something like this would be fantastic (and less code to maintain
    on my part). any chance of it getting into the RC14?
    On 8 July 2011 15:41, Michael Klishin wrote:

    2011/7/8 Michael Nacos <m.nacos at gmail.com>
    so, what's currently the best way to publish messages consumed from one
    queue into another with the amqp gem? This is quite important in processing
    scenaria when no messages may be lost.

    Michael,

    Does this piece of code look like what you would like to have? An entity
    that consumes messages from one queue and republishes them safely into
    another queue, with a way to transform the message?

    https://gist.github.com/1071983

    This imaginary AMQP::Rebroadcaster is not that difficult to design and add
    to the amqp gem for RC14 (will be out in a few weeks after
    some more testing with real world apps).

    --
    MK

    http://github.com/michaelklishin
    http://twitter.com/michaelklishin
    -------------- next part --------------
    An HTML attachment was scrubbed...
    URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110708/a1a5e46a/attachment.htm>
  • Michael Klishin at Jul 8, 2011 at 4:12 pm
    2011/7/8 Michael Nacos <m.nacos at gmail.com>
    Michael, something like this would be fantastic (and less code to maintain
    on my part). any chance of it getting into the RC14?

    Yes.

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
grouprabbitmq-discuss @
categoriesrabbitmq
postedMay 5, '11 at 9:33p
activeJul 11, '11 at 12:35p
posts22
users7
websiterabbitmq.com
irc#rabbitmq

People

Translate

site design / logo © 2022 Grokbase