Hi all,

I have a process that consumes from a queue with BasicConsume, but if
there is a problem on the consumer that slows down message processing,
RabbitMQ will carry on delivering messages until the process runs out
of memory and crashes. In order to fix this, I created a second thread
that watches for too many messages building up in the client and calls
ChannelFlow(false) on the channel. After the consumer catches up
again, it ChannelFlow(true) to re-enable the channel flow.

Unfortunately, since switching the broker to Windows Server 2008 and
upgrading both the server and client to 2.5.1, the broker is
occasionally terminating my connection on the ChannelFlow call. The
exception text from the .NET client is:

The AMQP operation was interrupted: AMQP close-reason,
initiated by Peer, codeT1, text="INTERNAL_ERROR", classId=0,
methodId=0, cause
When I was running on RHEL 6 I didn?t experience any problems, but I
hadn?t tested it thoroughly, so it might just be less likely to occur
on that setup. The clients are all running on Windows and using the
.NET client.

When this error occurs, the following is logged on the RabbitMQ server log:


=ERROR REPORT==== 29-Jul-2011::13:25:28 ==** Generic server <0.2462.52> terminating
** Last message in was {'$gen_cast',{method,{'channel.flow',false},none}}
** When Server state == {ch,running,rabbit_framing_amqp_0_9_1,1,<0.2459.52>,
<0.2461.52>,<0.2459.52>,undefined,
#Fun<rabbit_channel_sup.0.15412730>,none,
{set,0,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},
4089,
{[],[]},
{[],[]},
{user,<<"guest">>,true,rabbit_auth_backend_internal,
{internal_user,<<"guest">>,
<<99,244,141,154,192,80,146,251,179,199,206,114,71,
177,54,91,30,76,32,168>>,
true}},
<<"/">>,<<"testq">>,
{dict,1,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
{{[],[],[],[],[],[],[],[],[],[],[],
[[<<"amq.ctag-KeIdUjHd5LF+Zcti3NHf1A==">>|
{{amqqueue,
{resource,<<"/">>,queue,<<"testq">>},
false,false,none,[],<0.26681.51>},
#Ref<0.0.8.186885>}]],
[],[],[],[]}}},
{dict,0,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},
{dict,1,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
{{[],
[[#Ref<0.0.8.186885>|
<<"amq.ctag-KeIdUjHd5LF+Zcti3NHf1A==">>]],
[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},
<0.2457.52>,
{state,fine,{1311942332995907,#Ref<0.0.8.186868>}},
false,1,
{0,nil},
{0,nil},
[],
[{<<"basic.nack">>,bool,true},
{<<"publisher_confirms">>,bool,true},
{<<"consumer_cancel_notify">>,bool,true},
{<<"exchange_exchange_bindings">>,bool,true}],
none}
** Reason for termination =** {{badmatch,{error,already_present}},
[{rabbit_channel_sup,'-start_limiter_fun/1-fun-0-',2},
{rabbit_channel,start_limiter,1},
{rabbit_channel,handle_method,3},
{rabbit_channel,handle_cast,2},
{gen_server2,handle_msg,2},
{proc_lib,init_p_do_apply,3}]}



Here is the code I?m using that will trigger the error:


// ---------------------------- Start code ------------------------------------

using System;
using System.Threading;

using RabbitMQ.Client;

namespace test
{
class Consumer : IBasicConsumer
{
private int count;

public Consumer(IModel model) { this.Model = model; }

public void HandleBasicConsumeOk(string consumerTag) { }
public void HandleBasicCancelOk(string consumerTag) { }
public void HandleBasicCancel(string consumerTag) { }
public void HandleModelShutdown(IModel model,
ShutdownEventArgs reason) { }

public void HandleBasicDeliver(string consumerTag, ulong
deliveryTag, bool redelivered, string exchange, string routingKey,
IBasicProperties properties, byte[] body)
{
Interlocked.Increment(ref count);
if (count % 1000 == 0) Console.WriteLine("Consumed {0}", count);
}

public IModel Model { get; set; }

public int Count { get { return count; } }
}

class Program
{
static void Main()
{
var cf = new ConnectionFactory
{
HostName = "vmdevrmq1.apdev.local",
UserName = "guest",
Password = "guest",
Port = AmqpTcpEndpoint.UseDefaultPort
};

var conn = cf.CreateConnection();
new Thread(
x =>
{
var model = conn.CreateModel();
model.QueueDeclare("testq", false, false, false, null);

var props = model.CreateBasicProperties();
for (var i = 0; ; i++)
{
if (i % 1000 == 0)
{
Console.WriteLine("Published {0}", i);
}
model.BasicPublish("", "testq", false,
false, props, new byte[] { });
}
}).Start();

var consumerModel = conn.CreateModel();
consumerModel.QueueDeclare("testq", false, false, false, null);

var consumer = new Consumer(consumerModel);
consumerModel.BasicConsume("testq", true, null, consumer);

bool flowEnabled = true;
while (true)
{
flowEnabled = !flowEnabled;
Console.WriteLine("Setting Channel Flow: {0}", flowEnabled);
consumerModel.ChannelFlow(flowEnabled);
}
}
}
}

// ---------------------------- End code ------------------------------------

I accept that this might be an error in my usage; in particular, the
way I?m calling ChannelFlow on my thread at the same time as consuming
on the connection thread, but I didn?t know how else to do it (doing
it from the connection thread in the consumer causes deadlocks), and
also I wouldn?t have expected an internal error. Another approach
would be explicit acks and basic.qos, but when it worked, the
ChannelFlow approach was much faster.

Thanks!
Cameron

Search Discussions

  • Matthew Sackman at Jul 29, 2011 at 1:51 pm
    Hi,
    On Fri, Jul 29, 2011 at 02:21:46PM +0100, Cameron Harris wrote:
    I have a process that consumes from a queue with BasicConsume, but if
    there is a problem on the consumer that slows down message processing,
    RabbitMQ will carry on delivering messages until the process runs out
    of memory and crashes. In order to fix this...
    ...you should just use basic.qos with a non-zero prefetch count.
    I created a second thread
    that watches for too many messages building up in the client and calls
    ChannelFlow(false) on the channel. After the consumer catches up
    again, it ChannelFlow(true) to re-enable the channel flow.

    Unfortunately, since switching the broker to Windows Server 2008 and
    upgrading both the server and client to 2.5.1, the broker is
    occasionally terminating my connection on the ChannelFlow call. The
    exception text from the .NET client is:

    The AMQP operation was interrupted: AMQP close-reason,
    initiated by Peer, codeT1, text="INTERNAL_ERROR", classId=0,
    methodId=0, cause>
    When I was running on RHEL 6 I didn?t experience any problems, but I
    hadn?t tested it thoroughly, so it might just be less likely to occur
    on that setup. The clients are all running on Windows and using the
    .NET client.
    none}
    ** Reason for termination => ** {{badmatch,{error,already_present}},
    [{rabbit_channel_sup,'-start_limiter_fun/1-fun-0-',2},
    {rabbit_channel,start_limiter,1},
    That is odd, and does point to a bug in our code, I think... yes, we're
    doing the wrong thing with our limiter.

    Anyway, if you do just set a qos prefetch before consuming from the
    queue, you'll find that'll solve the problem. I'll file a bug for the
    other thing.

    Matthew
  • Cameron Harris at Jul 29, 2011 at 3:39 pm
    ?...you should just use basic.qos with a non-zero prefetch count.
    basic.qos requires explicit acks, doesn't it? I found that using
    basic.qos and explicit acks had a much lower throughput than
    auto-acking + channel.flow.
    I'll file a bug for the other thing.
    Thanks!

    Cameron Harris
  • Matthew Sackman at Jul 29, 2011 at 3:41 pm

    On Fri, Jul 29, 2011 at 04:39:08PM +0100, Cameron Harris wrote:
    ?...you should just use basic.qos with a non-zero prefetch count.
    basic.qos requires explicit acks, doesn't it? I found that using
    basic.qos and explicit acks had a much lower throughput than
    auto-acking + channel.flow.
    Ahh yes, that's true. I'd failed to spot that your code was using
    noAck=true. What kind of throughput numbers are you after?


    Matthew
  • Alexandru Scvorţov at Jul 29, 2011 at 3:50 pm

    I'll file a bug for the other thing.
    Thanks!
    This has been fixed on a bug branch (bug24285 if you're curious) and is
    pending QA. With a bit of luck, it should be in the next release.

    Cheers,
    Alex
    On Fri, Jul 29, 2011 at 04:39:08PM +0100, Cameron Harris wrote:
    ?...you should just use basic.qos with a non-zero prefetch count.
    basic.qos requires explicit acks, doesn't it? I found that using
    basic.qos and explicit acks had a much lower throughput than
    auto-acking + channel.flow.
    I'll file a bug for the other thing.
    Thanks!

    Cameron Harris
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-discuss at lists.rabbitmq.com
    https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
  • Cameron Harris at Jul 29, 2011 at 3:57 pm

    On 29 July 2011 16:41, Matthew Sackman wrote:
    Ahh yes, that's true. I'd failed to spot that your code was using
    noAck=true. What kind of throughput numbers are you after?
    Varies from app to app, but for certain things, we're after as high
    throughput as we can get on our hardware (f.ex. at least 50 000 per
    second for one of our applications). The lower the messages
    throughput, the more complicated our client code needs to be, so we're
    trying to get throughput as high as we can on our hardware.

    Cameron
  • Matthew Sackman at Jul 29, 2011 at 4:01 pm

    On Fri, Jul 29, 2011 at 04:57:15PM +0100, Cameron Harris wrote:
    Varies from app to app, but for certain things, we're after as high
    throughput as we can get on our hardware (f.ex. at least 50 000 per
    second for one of our applications). The lower the messages
    throughput, the more complicated our client code needs to be, so we're
    trying to get throughput as high as we can on our hardware.
    Interesting - ok, yes, that certainly is sufficiently high as to be
    beyond the capability of rabbit on non-exotic hardware with acks turned
    on. Ultimately you'll find that a single queue will become your
    bottleneck as queues are individually single-threaded. Thus you might
    find that you can scale better by having multiple queues for the same
    purpose and spreading your publishes across them. Of course, you need to
    be careful about which ordering guarantees you may start to lose...

    Matthew

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
grouprabbitmq-discuss @
categoriesrabbitmq
postedJul 29, '11 at 1:21p
activeJul 29, '11 at 4:01p
posts7
users3
websiterabbitmq.com
irc#rabbitmq

People

Translate

site design / logo © 2022 Grokbase