Hello,

I have a serious problem with RabbitMQ. After a certain amount of messages
received, the consumers simply stop receiving messages.
They are still connected but they are not receiving messages anymore. I have
found other posts about this issue but none have a working answer.

Here is the connection wrapper I use: http://pastebin.com/XVY3HNGq. I use
stormed-amqp which is an async amqp lib for python.

All errors are caught and none are thrown during the process and I'm pretty
sure it doesn't come from an application error (but I can be mistaken of
course :).
On the same server there is a single exchange with 2 queues which each
process hundreds of thousands of messages a day. The first queue (which
works just fine) receives messages,
processes them and publishes them in the second one for further processing.
It's this second queue which stops after a certain amount of messages. I
have tried with one or multiple consumers
but the problem remains.

I have tried consuming messages without ack or with ack and a qos prefetch
count of 100 (as indicated in several responses to this kind of problems).
When I do "rabbitmqctl list_queues name messages_ready
messages_unacknowledged", I have a lot of ready messages and the "prefetch
count" (100) value of unacknoledged messages when using ack.

I really don't know wha'ts happening!
Thank you for your answers.

Maxime Bouroumeau-Fuseau
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110719/ff446313/attachment.htm>

Search Discussions

  • Matthias Radestock at Jul 19, 2011 at 9:09 am
    Maxime,
    On 19/07/11 09:46, Maxime Bouroumeau-Fuseau wrote:
    I have a serious problem with RabbitMQ. After a certain amount of
    messages received, the consumers simply stop receiving messages.
    They are still connected but they are not receiving messages anymore.
    [...]
    I have tried consuming messages without ack or with ack and a qos
    prefetch count of 100 (as indicated in several responses to this kind of
    problems).
    When I do "rabbitmqctl list_queues name messages_ready
    messages_unacknowledged", I have a lot of ready messages and the
    "prefetch count" (100) value of unacknoledged messages when using ack.
    That indicates that, for whatever reason, the client is not accepting
    any more messages.

    What version of the RabbitMQ server are you running? If it's >= 2.5.0,
    please run 'rabbitmqctl report > server_report.txt' and send us the
    output. Preferably when it's stuck while consuming messages without
    acks, since that scenario has the fewest places to get stuck. If your
    server version is < 2.5.0 then please upgrade.

    Regards,

    Matthias.
  • Maxime Bouroumeau-Fuseau at Jul 19, 2011 at 9:57 am
    Matthias,

    I was using stormed-amqp 0.1 and rabbitmq 2.3.1.
    I have updated to the latest master of stormed-amqp and to rabbitmq 2.5.1.

    The problem persists but appears after more messages have been received.
    You'll find attached report.txt which is with a consumer without ack and
    report_with_ack.txt

    Regars,

    Maxime Bouroumeau-Fuseau


    On Tue, Jul 19, 2011 at 11:09 AM, Matthias Radestock
    wrote:
    Maxime,

    On 19/07/11 09:46, Maxime Bouroumeau-Fuseau wrote:

    I have a serious problem with RabbitMQ. After a certain amount of
    messages received, the consumers simply stop receiving messages.
    They are still connected but they are not receiving messages anymore.
    [...]
    I have tried consuming messages without ack or with ack and a qos
    prefetch count of 100 (as indicated in several responses to this kind of
    problems).
    When I do "rabbitmqctl list_queues name messages_ready
    messages_unacknowledged", I have a lot of ready messages and the
    "prefetch count" (100) value of unacknoledged messages when using ack.
    That indicates that, for whatever reason, the client is not accepting any
    more messages.

    What version of the RabbitMQ server are you running? If it's >= 2.5.0,
    please run 'rabbitmqctl report > server_report.txt' and send us the output.
    Preferably when it's stuck while consuming messages without acks, since that
    scenario has the fewest places to get stuck. If your server version is <
    2.5.0 then please upgrade.

    Regards,

    Matthias.
    -------------- next part --------------
    An HTML attachment was scrubbed...
    URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110719/d53a0a45/attachment.htm>
    -------------- next part --------------
    {error_logger,{{2011,7,19},{11,50,50}},"inet_parse:~p:~p: erroneous line, SKIPPED~n",["/etc/hosts",8]}

    =INFO REPORT==== 19-Jul-2011::11:50:50 ===
    inet_parse:"/etc/hosts":8: erroneous line, SKIPPED
    Reporting server status on {{2011,7,19},{9,50,50}}

    Status of node rabbit at ns353204 ...
    [{pid,19099},
    {running_applications,[{rabbit,"RabbitMQ","2.5.1"},
    {mnesia,"MNESIA CXC 138 12","4.4.12"},
    {os_mon,"CPO CXC 138 46","2.2.4"},
    {sasl,"SASL CXC 138 11","2.1.8"},
    {stdlib,"ERTS CXC 138 10","1.16.4"},
    {kernel,"ERTS CXC 138 10","2.13.4"}]},
    {os,{unix,linux}},
    {erlang_version,"Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:30] [hipe] [kernel-poll:true]\n"},
    {memory,[{total,28389696},
    {processes,12546536},
    {processes_used,12533840},
    {system,15843160},
    {atom,1101009},
    {atom_used,1085489},
    {binary,516840},
    {code,11606637},
    {ets,749280}]}]

    Cluster status of node rabbit at ns353204 ...
    [{nodes,[{disc,[rabbit at ns353204]}]},{running_nodes,[rabbit at ns353204]}]

    Application environment of node rabbit at ns353204 ...
    [{auth_backends,[rabbit_auth_backend_internal]},
    {auth_mechanisms,['PLAIN','AMQPLAIN']},
    {backing_queue_module,rabbit_variable_queue},
    {cluster_nodes,[]},
    {collect_statistics,none},
    {default_permissions,[<<".*">>,<<".*">>,<<".*">>]},
    {default_user,<<"guest">>},
    {default_user_is_admin,true},
    {default_vhost,<<"/">>},
    {delegate_count,16},
    {frame_max,131072},
    {included_applications,[]},
    {msg_store_file_size_limit,16777216},
    {msg_store_index_module,rabbit_msg_store_ets_index},
    {persister_hibernate_after,10000},
    {persister_max_wrap_entries,500},
    {queue_index_max_journal_entries,262144},
    {server_properties,[]},
    {ssl_listeners,[]},
    {ssl_options,[]},
    {tcp_listen_options,[binary,
    {packet,raw},
    {reuseaddr,true},
    {backlog,128},
    {nodelay,true},
    {exit_on_close,false}]},
    {tcp_listeners,[5672]},
    {trace_vhosts,[]},
    {vm_memory_high_watermark,0.4}]

    Connections:
    pid address port peer_address peer_port ssl peer_cert_subject peer_cert_issuer peer_cert_validity auth_mechanism ssl_protocol ssl_key_exchange ssl_cipher ssl_hash protocol user vhost timeout frame_max client_properties recv_oct recv_cnt send_oct send_cnt send_pend state channels
    <rabbit at ns353204.3.428.0> 127.0.0.1 5672 127.0.0.1 42713 false AMQPLAIN {0,9,1} guest / 0 65536 [{"client","stormed-amqp"}] 138 3 368 3 0 running 0
    <rabbit at ns353204.3.1621.0> 127.0.0.1 5672 127.0.0.1 54223 false AMQPLAIN {0,9,1} guest / 0 65536 [{"client","stormed-amqp"}] 305 8 1248829 2934 8432 running 1
    <rabbit at ns353204.3.1145.0> 127.0.0.1 5672 127.0.0.1 47849 false AMQPLAIN {0,9,1} guest / 0 65536 [{"client","stormed-amqp"}] 2167945 6497 384 4 0 running 1
    <rabbit at ns353204.3.1721.0> 127.0.0.1 5672 127.0.0.1 54866 false AMQPLAIN {0,9,1} guest / 0 65536 [{"client","stormed-amqp"}] 881066 2245 840093 2296 0 running 2

    Channels:
    pid connection number user vhost transactional confirm consumer_count messages_unacknowledged messages_unconfirmed acks_uncommitted prefetch_count client_flow_blocked
    <rabbit at ns353204.3.1148.0> <rabbit at ns353204.3.1145.0> 1 guest / false false 0 0 0 0 0 false
    <rabbit at ns353204.3.1624.0> <rabbit at ns353204.3.1621.0> 1 guest / false false 1 0 0 0 0 false
    <rabbit at ns353204.3.1724.0> <rabbit at ns353204.3.1721.0> 1 guest / false false 1 0 0 0 0 false
    <rabbit at ns353204.3.1727.0> <rabbit at ns353204.3.1721.0> 2 guest / false false 0 0 0 0 0 false

    Queues on /:
    pid name durable auto_delete arguments owner_pid exclusive_consumer_pid exclusive_consumer_tag messages_ready messages_unacknowledged messages consumers memory backing_queue_status
    <rabbit at ns353204.3.295.0> thot.tracker true false [] 0 0 0 1 142896 [{q1,0}, {q2,0}, {delta,{delta,undefined,0,undefined}}, {q3,0}, {q4,0}, {len,0}, {pending_acks,0}, {outstanding_txns,0}, {target_ram_count,infinity}, {ram_msg_count,0}, {ram_ack_count,0}, {ram_index_count,0}, {next_seq_id,368}, {persistent_count,0}, {avg_ingress_rate,0.0}, {avg_egress_rate,0.0}, {avg_ack_ingress_rate,0.0}, {avg_ack_egress_rate,0.0}]
    <rabbit at ns353204.3.460.0> thot.processor true false [] 863 0 863 1 1801808 [{q1,0}, {q2,0}, {delta,{delta,undefined,0,undefined}}, {q3,0}, {q4,863}, {len,863}, {pending_acks,0}, {outstanding_txns,0}, {target_ram_count,infinity}, {ram_msg_count,863}, {ram_ack_count,0}, {ram_index_count,0}, {next_seq_id,2067}, {persistent_count,0}, {avg_ingress_rate,14.365453956325828}, {avg_egress_rate,0.0}, {avg_ack_ingress_rate,0.0}, {avg_ack_egress_rate,0.0}]

    Exchanges on /:
    name type durable auto_delete internal arguments
    amq.direct direct true false false []
    thot direct true false false []
    amq.topic topic true false false []
    amq.rabbitmq.trace topic true false false []
    amq.rabbitmq.log topic true false false []
    amq.fanout fanout true false false []
    amq.headers headers true false false []
    direct true false false []
    amq.match headers true false false []

    Bindings on /:
    source_name source_kind destination_name destination_kind routing_key arguments
    exchange thot.processor queue thot.processor []
    exchange thot.tracker queue thot.tracker []
    thot exchange thot.processor queue thot.processor []
    thot exchange thot.tracker queue thot.tracker []

    Consumers on /:
    queue_name channel_pid consumer_tag ack_required
    thot.tracker <rabbit at ns353204.3.1724.0> amq.ctag-MrgQjPfwELAqNOvL4ONfXQ== false
    thot.processor <rabbit at ns353204.3.1624.0> amq.ctag-YbiqDaXfDod+eChM9ViPtQ== false

    Permissions on /:
    user configure write read
    guest .* .* .*

    End of server status report
    ...done.
    -------------- next part --------------
    {error_logger,{{2011,7,19},{11,55,25}},"inet_parse:~p:~p: erroneous line, SKIPPED~n",["/etc/hosts",8]}

    =INFO REPORT==== 19-Jul-2011::11:55:25 ===
    inet_parse:"/etc/hosts":8: erroneous line, SKIPPED
    Reporting server status on {{2011,7,19},{9,55,25}}

    Status of node rabbit at ns353204 ...
    [{pid,19099},
    {running_applications,[{rabbit,"RabbitMQ","2.5.1"},
    {mnesia,"MNESIA CXC 138 12","4.4.12"},
    {os_mon,"CPO CXC 138 46","2.2.4"},
    {sasl,"SASL CXC 138 11","2.1.8"},
    {stdlib,"ERTS CXC 138 10","1.16.4"},
    {kernel,"ERTS CXC 138 10","2.13.4"}]},
    {os,{unix,linux}},
    {erlang_version,"Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:30] [hipe] [kernel-poll:true]\n"},
    {memory,[{total,27474056},
    {processes,11786984},
    {processes_used,11775288},
    {system,15687072},
    {atom,1101817},
    {atom_used,1086017},
    {binary,357392},
    {code,11606637},
    {ets,749928}]}]

    Cluster status of node rabbit at ns353204 ...
    [{nodes,[{disc,[rabbit at ns353204]}]},{running_nodes,[rabbit at ns353204]}]

    Application environment of node rabbit at ns353204 ...
    [{auth_backends,[rabbit_auth_backend_internal]},
    {auth_mechanisms,['PLAIN','AMQPLAIN']},
    {backing_queue_module,rabbit_variable_queue},
    {cluster_nodes,[]},
    {collect_statistics,none},
    {default_permissions,[<<".*">>,<<".*">>,<<".*">>]},
    {default_user,<<"guest">>},
    {default_user_is_admin,true},
    {default_vhost,<<"/">>},
    {delegate_count,16},
    {frame_max,131072},
    {included_applications,[]},
    {msg_store_file_size_limit,16777216},
    {msg_store_index_module,rabbit_msg_store_ets_index},
    {persister_hibernate_after,10000},
    {persister_max_wrap_entries,500},
    {queue_index_max_journal_entries,262144},
    {server_properties,[]},
    {ssl_listeners,[]},
    {ssl_options,[]},
    {tcp_listen_options,[binary,
    {packet,raw},
    {reuseaddr,true},
    {backlog,128},
    {nodelay,true},
    {exit_on_close,false}]},
    {tcp_listeners,[5672]},
    {trace_vhosts,[]},
    {vm_memory_high_watermark,0.4}]

    Connections:
    pid address port peer_address peer_port ssl peer_cert_subject peer_cert_issuer peer_cert_validity auth_mechanism ssl_protocol ssl_key_exchange ssl_cipher ssl_hash protocol user vhost timeout frame_max client_properties recv_oct recv_cnt send_oct send_cnt send_pend state channels
    <rabbit at ns353204.3.2466.0> 127.0.0.1 5672 127.0.0.1 55538 false AMQPLAIN {0,9,1} guest / 0 65536 [{"client","stormed-amqp"}] 23697 747 522926 1222 0 running 1
    <rabbit at ns353204.3.428.0> 127.0.0.1 5672 127.0.0.1 42713 false AMQPLAIN {0,9,1} guest / 0 65536 [{"client","stormed-amqp"}] 138 3 368 3 0 running 0
    <rabbit at ns353204.3.1145.0> 127.0.0.1 5672 127.0.0.1 47849 false AMQPLAIN {0,9,1} guest / 0 65536 [{"client","stormed-amqp"}] 3475460 10414 384 4 0 running 1
    <rabbit at ns353204.3.1721.0> 127.0.0.1 5672 127.0.0.1 54866 false AMQPLAIN {0,9,1} guest / 0 65536 [{"client","stormed-amqp"}] 2419305 6153 2306728 6274 0 running 2

    Channels:
    pid connection number user vhost transactional confirm consumer_count messages_unacknowledged messages_unconfirmed acks_uncommitted prefetch_count client_flow_blocked
    <rabbit at ns353204.3.1148.0> <rabbit at ns353204.3.1145.0> 1 guest / false false 0 0 0 0 0 false
    <rabbit at ns353204.3.1724.0> <rabbit at ns353204.3.1721.0> 1 guest / false false 1 0 0 0 0 false
    <rabbit at ns353204.3.1727.0> <rabbit at ns353204.3.1721.0> 2 guest / false false 0 0 0 0 0 false
    <rabbit at ns353204.3.2469.0> <rabbit at ns353204.3.2466.0> 1 guest / false false 1 100 0 0 100 false

    Queues on /:
    pid name durable auto_delete arguments owner_pid exclusive_consumer_pid exclusive_consumer_tag messages_ready messages_unacknowledged messages consumers memory backing_queue_status
    <rabbit at ns353204.3.295.0> thot.tracker true false [] 0 0 0 1 142896 [{q1,0}, {q2,0}, {delta,{delta,undefined,0,undefined}}, {q3,0}, {q4,0}, {len,0}, {pending_acks,0}, {outstanding_txns,0}, {target_ram_count,infinity}, {ram_msg_count,0}, {ram_ack_count,0}, {ram_index_count,0}, {next_seq_id,368}, {persistent_count,0}, {avg_ingress_rate,0.0}, {avg_egress_rate,0.0}, {avg_ack_ingress_rate,0.0}, {avg_ack_egress_rate,0.0}]
    <rabbit at ns353204.3.460.0> thot.processor true false [] 530 100 630 1 1114112 [{q1,0}, {q2,0}, {delta,{delta,undefined,0,undefined}}, {q3,0}, {q4,530}, {len,530}, {pending_acks,100}, {outstanding_txns,0}, {target_ram_count,infinity}, {ram_msg_count,530}, {ram_ack_count,100}, {ram_index_count,0}, {next_seq_id,4947}, {persistent_count,0}, {avg_ingress_rate,12.14102738766987}, {avg_egress_rate,0.0}, {avg_ack_ingress_rate,0.0}, {avg_ack_egress_rate,0.0}]

    Exchanges on /:
    name type durable auto_delete internal arguments
    amq.direct direct true false false []
    thot direct true false false []
    amq.topic topic true false false []
    amq.rabbitmq.trace topic true false false []
    amq.rabbitmq.log topic true false false []
    amq.fanout fanout true false false []
    amq.headers headers true false false []
    direct true false false []
    amq.match headers true false false []

    Bindings on /:
    source_name source_kind destination_name destination_kind routing_key arguments
    exchange thot.processor queue thot.processor []
    exchange thot.tracker queue thot.tracker []
    thot exchange thot.processor queue thot.processor []
    thot exchange thot.tracker queue thot.tracker []

    Consumers on /:
    queue_name channel_pid consumer_tag ack_required
    thot.tracker <rabbit at ns353204.3.1724.0> amq.ctag-MrgQjPfwELAqNOvL4ONfXQ== false
    thot.processor <rabbit at ns353204.3.2469.0> amq.ctag-7kDGEwDPOKGAVaXBbI+HzQ== true

    Permissions on /:
    user configure write read
    guest .* .* .*

    End of server status report
    ...done.
  • Matthias Radestock at Jul 19, 2011 at 10:30 am
    Maxime,
    On 19/07/11 10:57, Maxime Bouroumeau-Fuseau wrote:
    I was using stormed-amqp 0.1 and rabbitmq 2.3.1.
    I have updated to the latest master of stormed-amqp and to rabbitmq 2.5.1.

    The problem persists but appears after more messages have been received.
    You'll find attached report.txt which is with a consumer without ack and
    report_with_ack.txt
    We can see that the 'thot.processor' queue has 863 messages ready, and
    one consumer, on channel <rabbit at ns353204.3.1624.0>. That channel is
    associated with connection <rabbit at ns353204.3.1621.0>. That connection
    has a 'send_pend' count of 8432.

    This indicates that the client is not reading from the rabbit connection
    socket. So this is either a problem with the client library or with the
    app. I am not much of a python hacker and am not familiar with
    stormed-amqp, but I suggest the next step is to figure out what the
    consumer process is doing when its stuck, i.e. somehow get a stack trace
    from it.


    Regards,

    Matthias.
  • Ask Solem at Jul 19, 2011 at 11:04 am


    This indicates that the client is not reading from the rabbit connection socket. So this is either a problem with the client library or with the app. I am not much of a python hacker and am not familiar with stormed-amqp, but I suggest the next step is to figure out what the consumer process is doing when its stuck, i.e. somehow get a stack trace from it.

    In Celery we have a signal handler to get the stack trace of all active
    threads, I cut the code out into a paste for you here:

    http://pastie.org/2236433

    Just call "install_cry_handler" somewhere when your process starts,
    and then you can 'kill -USR1 <pid>' to make the process dump out
    the stack traces.

    Also ktrace/strace/dtruss can help you find out if the process
    is stuck in the system call. If you ktrace the running process,
    then it should interrupt it.

    I've had a similar issue in the past, which I hacked working by
    adding a socket timeout.
  • Maxime Bouroumeau-Fuseau at Jul 19, 2011 at 12:56 pm
    Matthias,

    Thank you for your response. I will try to investigate more in the
    application layer in the next few days.

    Regards,

    Maxime Bouroumeau-Fuseau


    On Tue, Jul 19, 2011 at 12:30 PM, Matthias Radestock
    wrote:
    Maxime,

    On 19/07/11 10:57, Maxime Bouroumeau-Fuseau wrote:

    I was using stormed-amqp 0.1 and rabbitmq 2.3.1.
    I have updated to the latest master of stormed-amqp and to rabbitmq 2.5.1.

    The problem persists but appears after more messages have been received.
    You'll find attached report.txt which is with a consumer without ack and
    report_with_ack.txt
    We can see that the 'thot.processor' queue has 863 messages ready, and one
    consumer, on channel <rabbit at ns353204.3.1624.0>. That channel is
    associated with connection <rabbit at ns353204.3.1621.0>. That connection has
    a 'send_pend' count of 8432.

    This indicates that the client is not reading from the rabbit connection
    socket. So this is either a problem with the client library or with the app.
    I am not much of a python hacker and am not familiar with stormed-amqp, but
    I suggest the next step is to figure out what the consumer process is doing
    when its stuck, i.e. somehow get a stack trace from it.


    Regards,

    Matthias.
    -------------- next part --------------
    An HTML attachment was scrubbed...
    URL: <http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/attachments/20110719/da85d6fc/attachment.htm>

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
grouprabbitmq-discuss @
categoriesrabbitmq
postedJul 19, '11 at 8:46a
activeJul 19, '11 at 12:56p
posts6
users3
websiterabbitmq.com
irc#rabbitmq

People

Translate

site design / logo © 2022 Grokbase