I have a rails 3.1 app (using thin server which is Event Machine
based) using the AMQP gem to connect to two server cluster on EC2. the
broker is running on ubuntu 11.10 and is version 2.6.1 with erlang
version 5.8.3.

I have an initializer that connects to and subscribes to a specifc
queue. when i place a single message into the queue then start the
server it consumes the message properly and the management console
shows a consumer attached to the queue, all good so far. then when i
place another message into the queue it gets consumed, but subsequent
to that message being consumed the broker log shows a "connection
closed abruptly" error and of course the consumer gets cut off.

I'm sure I'm doing something wrong in the initializer but can't figure
out what as its really pretty vanilla. Any help greatly appreciated...

here's the initializer code:

-------------------------------------------------------
connect_params = YAML::load_file(File.join(Rails.root, "config",
"rabbitmq.yml"))[Rails.env]

EventMachine.next_tick {
AMQP.connect(connect_params['uri'], :auto_delete => false) do |
connection, open_ok|
puts "Connected to MQ Server to Retreive API Submissions..."
AMQP::Channel.new(connection) do |channel, open_ok|
puts "Channel ##{channel.id} is open..."
exchange = channel.direct('inbound', :durable => true)
channel.on_error do |ch, close|
puts "Handling channel-level error"
connection.close
end
channel.queue('dropbox', :durable => true) do |queue,
declare_ok|
queue.bind(exchange).subscribe do |metadata, payload|
message_h = '#{metadata.headers}'
message_p = '#{payload}'
Resque.enqueue(ApiIntake, message_h, message_p)
end
end
end
end
}

Search Discussions

  • Michael Klishin at Nov 16, 2011 at 3:39 pm
    2011/11/16 p2w <tpletch001 at gmail.com>
    I have a rails 3.1 app (using thin server which is Event Machine
    based) using the AMQP gem to connect to two server cluster on EC2. the
    broker is running on ubuntu 11.10 and is version 2.6.1 with erlang
    version 5.8.3.
    What application server are you using? If it is something that forks
    (Unicorn or Passenger), you need to connect
    after that happens. Otherwise only one thread will be inherited (but not
    EventMachine's one). This is how fork(2) works, unfortunately.

    One more thing you need to do is to set up connection exception handler as
    described here:
    http://rubydoc.info/github/ruby-amqp/amqp/master/file/docs/ErrorHandling.textile
  • Michael Klishin at Nov 16, 2011 at 3:40 pm
    2011/11/16 p2w <tpletch001 at gmail.com>
    AMQP.connect(connect_params['uri'], :auto_delete => false) d

    One more note: AMQP.connect does not take :auto_delete option. It only
    makes sense for
    queues and exchanges.
  • Michael Klishin at Nov 16, 2011 at 3:52 pm
    2011/11/16 p2w <tpletch001 at gmail.com>
    I'm sure I'm doing something wrong in the initializer but can't figure
    out what as its really pretty vanilla. Any help greatly appreciated...
    I apologize for not noticing that you are using Thin. One thing that may
    cause abrupt TCP connection termination is uncaught exception on the
    EventMachine
    thread (in this case, your applications' main thread). In some cases it may
    be swallowed by
    various "helpful" libraries. Can you try removing more and more
    code from your consumer handlers to see at what point connection becomes
    stable?

    Also, with at least a couple of most recent point amqp gem releases you can
    simplify your initializer
    a little bit by renaming config file to amqp.yml and using
    AMQP::Integration::Rails:

    https://gist.github.com/3d995824090df94558b0

    Sorry that it is undocumented yet.
  • P2w at Nov 16, 2011 at 5:11 pm
    Michael,

    Thanks for the heads up on all accounts. I added the following to my
    initializer per the documentation:
    ---------------------------------------------------------
    connection.on_tcp_connection_loss do |connection, settings|
    puts "recovering RabbitMQ TCP Connection"
    connection.reconnect(false, 10)
    ---------------------------------------------------------

    What's interesting is that if there is absolutely no activity in the
    app through the application interface itself, then this exception code
    reconnects the consumer consistently after the TCP exception
    disconnects the client. However as soon as any activity occurs in the
    app interface the reconnect fails. This makes me pretty sure this is
    an issue with Thin/EM blocking AMQP at thread level. I can replicate
    this consistently.

    I'm going to make sure I am up to date on the EM and AMQP gems, then
    try the AMQP::Integration::Rails...
    On Nov 16, 9:52?am, Michael Klishin wrote:
    2011/11/16 p2w <tpletch... at gmail.com>
    I'm sure I'm doing something wrong in the initializer but can't figure
    out what as its really pretty vanilla. Any help greatly appreciated...
    I apologize for not noticing that you are using Thin. One thing that may
    cause abrupt TCP connection termination is uncaught exception on the
    EventMachine
    thread (in this case, your applications' main thread). In some cases it may
    be swallowed by
    various "helpful" libraries. Can you try removing more and more
    code from your consumer handlers to see at what point connection becomes
    stable?

    Also, with at least a couple of most recent point amqp gem releases you can
    simplify your initializer
    a little bit by renaming config file to amqp.yml and using
    AMQP::Integration::Rails:

    https://gist.github.com/3d995824090df94558b0

    Sorry that it is undocumented yet.
    --
    MK

    http://github.com/michaelklishinhttp://twitter.com/michaelklishin

    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
  • Michael Klishin at Nov 16, 2011 at 6:18 pm

    Le 16/nov/2011 ? 21:11, p2w a ?crit :

    This makes me pretty sure this is
    an issue with Thin/EM blocking AMQP at thread level. I can replicate
    this consistently.
    With Thin you do not typically start EventMachine in a separate thread. This is the case with Unicorn and Passenger but almost never with Thin.

    This can be an EventMachine issue. We depend on EventMachine for TCP connection failure notifications. Maybe try EventMachine 1.0 pre-releases? amqp gem will work with that and we run CI against that version against 7 Rubies or so.

    MK

    http://github.com/michaelklishin
    http://twitter.com/michaelklishin
  • P2w at Nov 16, 2011 at 6:47 pm
    Right, the doc indicates that for Thin you do an
    EventMachine.next_tick {.......}, which was my first pass approach...

    separately, when i try to use the approach in that gist you pointed me
    at it is not reading the amqp.yml file properly. is there a particular
    format it wants? I am using uri's in that file....can you let me know
    if it wants something specific? here is the current format of that
    file i looked at the doc and it should be working...but it keeps
    trying to connect to localhost...:

    development:
    uri: amqp://###:####@mq.#########.com/test
    test:
    uri: amqp://###:####@mq.#########.com/test
    sandbox:
    uri: amqp://###:####@mq.#########.com/sandbox
    staging:
    uri: amqp://###:####@mq.#########.com/staging
    production:
    uri: amqp://###:####@mq.#########.com/barzmedia
    On Nov 16, 12:18?pm, Michael Klishin wrote:
    Le 16/nov/2011 ? 21:11, p2w a ?crit :

    This makes me pretty sure this is
    an issue with Thin/EM blocking AMQP at thread level. I can replicate
    this consistently.
    With Thin you do not typically start EventMachine in a separate thread. This is the case with Unicorn and Passenger but almost never with Thin.

    This can be an EventMachine issue. We depend on EventMachine for TCP connection failure notifications. Maybe try EventMachine 1.0 pre-releases? amqp gem will work with that and we run CI against that version against 7 Rubies or so.

    MK

    http://github.com/michaelklishinhttp://twitter.com/michaelklishin

    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
  • Michael Klishin at Nov 16, 2011 at 7:23 pm

    Le 16/nov/2011 ? 22:47, p2w a ?crit :

    separately, when i try to use the approach in that gist you pointed me
    at it is not reading the amqp.yml file properly. is there a particular
    format it wants? I am using uri's in that file....can you let me know
    if it wants something specific? here is the current format of that
    file i looked at the doc and it should be working...but it keeps
    trying to connect to localhost...:
    URI support is only available in master, I believe. Here is a couple of examples:
    https://gist.github.com/b26e4af5970c76130b7d

    URI support was committed in
    https://github.com/ruby-amqp/amqp/commit/50d15f19971daf2f289cb527922298a4a94ab947

    Feel free to switch to master, it is identical to the tip of 0.8.x-stable branch right now.

    MK

    http://github.com/michaelklishin
    http://twitter.com/michaelklishin
  • P2w at Nov 16, 2011 at 7:29 pm
    Cool.

    Hey I just noticed something in the new code. You are doing an
    AMQP.start inside the EventMachine.next_tick. In my original code I am
    doing a connect because the doc indicates that Thin already starts the
    EM reactor.

    Could that be an issue?

    On Nov 16, 1:23?pm, Michael Klishin wrote:
    Le 16/nov/2011 ? 22:47, p2w a ?crit :

    separately, when i try to use the approach in that gist you pointed me
    at it is not reading the amqp.yml file properly. is there a particular
    format it wants? I am using uri's in that file....can you let me know
    if it wants something specific? here is the current format of that
    file i looked at the doc and it should be working...but it keeps
    trying to connect to localhost...:
    URI support is only available in master, I believe. Here is a couple of examples:https://gist.github.com/b26e4af5970c76130b7d

    URI support was committed inhttps://github.com/ruby-amqp/amqp/commit/50d15f19971daf2f289cb5279222...

    Feel free to switch to master, it is identical to the tip of 0.8.x-stable branch right now.

    MK

    http://github.com/michaelklishinhttp://twitter.com/michaelklishin

    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
  • Michael Klishin at Nov 16, 2011 at 7:44 pm

    Le 16/nov/2011 ? 23:29, p2w a ?crit :

    Cool.

    Hey I just noticed something in the new code. You are doing an
    AMQP.start inside the EventMachine.next_tick. In my original code I am
    doing a connect because the doc indicates that Thin already starts the
    EM reactor.

    Could that be an issue?
    I doubt it. EventMachine is smart enough to detect that the reactor is running. In 1.0 it should
    be smart enough to also detect when forking has killed it.


    MK

    http://github.com/michaelklishin
    http://twitter.com/michaelklishin
  • P2w at Nov 16, 2011 at 8:53 pm
    OK, I think I have some more concrete behavior to work off of.

    First of all some of the earlier issues were due to orphaned
    connections on the server. I'm not sure how that happens and why those
    idle connections were not killed of by the broker, but i'll sort that
    out later.

    Once those connections and channels were deleted i was able to do the
    following with 100% reproducibility: 1) publish a message to the
    server, 2) start the server and consume the message, pass it to the
    Resque work queue, do some stuff there. 3) bounce back and forth
    between doing things in the app gui and publishing/consuming messages
    on the rabbitmq server.

    Now here's where it gets interesting. if i then stop all activities
    the connection is closed after a couple minutes and is not reconnected
    by the recovery code in the initializer. I am unable to tell whether
    the connection is severed at the broker or the client side. i do see
    in the doc where there is a timeout setting in some of the code
    samples but i am not seeing anything in the doc on how it works or
    what the recommended params are.

    for reference here is the initializer code as it sits right now:

    ------------------------------------------------------------------------------------------------------------------
    connect_params = YAML::load_file(File.join(Rails.root, "config",
    "amqp.yml"))[Rails.env]

    EventMachine.next_tick {
    AMQP.connect(connect_params['uri']) do |connection, open_ok|
    puts "Connected to MQ Server to Retreive API Submissions..."
    AMQP::Channel.new(connection, :auto_recovery => true) do |channel,
    open_ok|
    puts "Channel ##{channel.id} is open..."
    exchange = channel.direct('inbound', :durable => true)
    channel.on_error do |ch, close|
    puts "Handling channel-level error"
    connection.close
    end
    channel.queue('dropbox', :durable => true) do |queue,
    declare_ok|
    queue.bind(exchange).subscribe do |metadata, payload|
    message_h = metadata.headers
    message_p = payload
    puts message_h
    puts message_p
    Resque.enqueue(ApiIntake, message_h, message_p)
    end
    end
    end
    connection.on_tcp_connection_loss do |connection, settings|
    # reconnect in 2 seconds, with enforcement
    connection.reconnect(true, 2)
    end
    end
    }
    ----------------------------------------------------------------------------

    So I guess the next question is how is inactivity handled at the amqp
    client side?

    On Nov 16, 1:44?pm, Michael Klishin wrote:
    Le 16/nov/2011 ? 23:29, p2w a ?crit :

    Cool.
    Hey I just noticed something in the new code. You are doing an
    AMQP.start inside the EventMachine.next_tick. In my original code I am
    doing a connect because the doc indicates that Thin already starts the
    EM reactor.
    Could that be an issue?
    I doubt it. EventMachine is smart enough to detect that the reactor is running. In 1.0 it should
    be smart enough to also detect when forking has killed it.

    MK

    http://github.com/michaelklishinhttp://twitter.com/michaelklishin

    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
  • Michael Klishin at Nov 17, 2011 at 7:29 am

    Le 17/nov/2011 ? 00:53, p2w a ?crit :

    there is a timeout setting in some of the code
    samples but i am not seeing anything in the doc on how it works or
    what the recommended params are.
    That must be initial connection timeout, not inactivity timeout. amqp gem does not close connections
    on inactivity. EventMachine may do that, I am not sure. Event missing heartbeats detection will result in an event
    handler called, not automatic connection termination.

    MK

    http://github.com/michaelklishin
    http://twitter.com/michaelklishin
  • P2w at Nov 16, 2011 at 5:52 pm
    Michael,

    OK, so I was a little premature in the statement that reproducing
    behavior was one-hundred percent consistent. It's still failing
    intermittently....

    On Nov 16, 9:52?am, Michael Klishin wrote:
    2011/11/16 p2w <tpletch... at gmail.com>
    I'm sure I'm doing something wrong in the initializer but can't figure
    out what as its really pretty vanilla. Any help greatly appreciated...
    I apologize for not noticing that you are using Thin. One thing that may
    cause abrupt TCP connection termination is uncaught exception on the
    EventMachine
    thread (in this case, your applications' main thread). In some cases it may
    be swallowed by
    various "helpful" libraries. Can you try removing more and more
    code from your consumer handlers to see at what point connection becomes
    stable?

    Also, with at least a couple of most recent point amqp gem releases you can
    simplify your initializer
    a little bit by renaming config file to amqp.yml and using
    AMQP::Integration::Rails:

    https://gist.github.com/3d995824090df94558b0

    Sorry that it is undocumented yet.
    --
    MK

    http://github.com/michaelklishinhttp://twitter.com/michaelklishin

    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
  • Michael Klishin at Nov 16, 2011 at 6:19 pm

    Le 16/nov/2011 ? 21:52, p2w a ?crit :

    Michael,

    OK, so I was a little premature in the statement that reproducing
    behavior was one-hundred percent consistent. It's still failing
    intermittently....
    This may be a concurrency issue. Can you try putting together a script that reproduces it? (even if it is just 1 time out of 100)

    MK

    http://github.com/michaelklishin
    http://twitter.com/michaelklishin

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
grouprabbitmq-discuss @
categoriesrabbitmq
postedNov 16, '11 at 3:25p
activeNov 17, '11 at 7:29a
posts14
users2
websiterabbitmq.com
irc#rabbitmq

2 users in discussion

Michael Klishin: 8 posts P2w: 6 posts

People

Translate

site design / logo © 2022 Grokbase