I am experimenting with RabbitMQ on a rhel vm (ver 1.8.1)
Have started the broker and am able to publish and consume events till the resident memory usage of the broker peaks at about 400M (it keeps growing with the number of messages published even though the consumer is consuming events) After this the broker needs to be restarted to bring back the memory usage to 0 before more messages can be sent.

Attached is the code snippet of the publisher and consumer.
=================================================================
Publisher :
factory.setHost("localhost");
factory.setPort(5672);
Connection conn = factory.newConnection();

Channel channel = conn.createChannel();

System.out.println("done creating conn");
String exchangeName = "testExchange";
String queueName = "testQueueNew";
String routingKey = "";
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, true, null);
channel.queueBind(queueName, exchangeName, routingKey);

byte[] messageBodyBytes = new byte[2000];
for(int j = 0;j<2000;j++) {
messageBodyBytes[j] = 'A';
}
for(int i = 0; i < numMessages; i++) {
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
if(sleepInterval != 0) {
if (i % sleepInterval == 0) {
Thread.sleep(timeToSleep);
System.out.println("Published Message " + i);
}
}
}

channel.close();
conn.close();
==================================================================
Consumer :
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();

String exchangeName = "testExchange";
String routingKey = "";
String queueName = "testQueue";
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueBind(queueName, exchangeName, routingKey);

boolean noAck = false;
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, noAck, consumer);
int count = 0;
while (count < numMessages) {
count++;
QueueingConsumer.Delivery delivery;
try {
delivery = consumer.nextDelivery();
} catch (InterruptedException ie) {
continue;
}
// (process the message components ...)

byte[] body = delivery.getBody();
String resp = new String(body);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

channel.close();
conn.close();

===================================================================================
Any help is appreciated.

Thanks,
Avanti

Search Discussions

  • Alexandru Scvorţov at Aug 10, 2010 at 9:31 am
    Hi Avanti,

    If the memory usage is steadily increasing, you're probably not ack'ing
    the messages.

    Try running,
    % rabbitmqctl list_queues name messages

    That should list your queues and tell you how many ready and
    unacknowledged there are. If this number of messages is
    steadily increasing, you're not ack'ing properly.

    If that's the case, you can either set noAck to true (i.e. messages
    don't need to be acknowledged), or explicitly ack them (like you're
    doing now); just make sure that that code is being run with the right
    arguments.

    Cheers,
    Alex
    On Mon, Aug 09, 2010 at 04:59:15PM -0700, Avanti Nadgir wrote:
    I am experimenting with RabbitMQ on a rhel vm (ver 1.8.1)
    Have started the broker and am able to publish and consume events till the resident memory usage of the broker peaks at about 400M (it keeps growing with the number of messages published even though the consumer is consuming events) After this the broker needs to be restarted to bring back the memory usage to 0 before more messages can be sent.

    Attached is the code snippet of the publisher and consumer.
    =================================================================
    Publisher :
    factory.setHost("localhost");
    factory.setPort(5672);
    Connection conn = factory.newConnection();

    Channel channel = conn.createChannel();

    System.out.println("done creating conn");
    String exchangeName = "testExchange";
    String queueName = "testQueueNew";
    String routingKey = "";
    channel.exchangeDeclare(exchangeName, "direct", true);
    channel.queueDeclare(queueName, true, false, true, null);
    channel.queueBind(queueName, exchangeName, routingKey);

    byte[] messageBodyBytes = new byte[2000];
    for(int j = 0;j<2000;j++) {
    messageBodyBytes[j] = 'A';
    }
    for(int i = 0; i < numMessages; i++) {
    channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
    if(sleepInterval != 0) {
    if (i % sleepInterval == 0) {
    Thread.sleep(timeToSleep);
    System.out.println("Published Message " + i);
    }
    }
    }

    channel.close();
    conn.close();
    ==================================================================
    Consumer :
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setPort(5672);
    Connection conn = factory.newConnection();
    Channel channel = conn.createChannel();

    String exchangeName = "testExchange";
    String routingKey = "";
    String queueName = "testQueue";
    channel.exchangeDeclare(exchangeName, "direct", true);
    channel.queueBind(queueName, exchangeName, routingKey);

    boolean noAck = false;
    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, noAck, consumer);
    int count = 0;
    while (count < numMessages) {
    count++;
    QueueingConsumer.Delivery delivery;
    try {
    delivery = consumer.nextDelivery();
    } catch (InterruptedException ie) {
    continue;
    }
    // (process the message components ...)

    byte[] body = delivery.getBody();
    String resp = new String(body);
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }

    channel.close();
    conn.close();

    ===================================================================================
    Any help is appreciated.

    Thanks,
    Avanti
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-discuss at lists.rabbitmq.com
    https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
  • Avanti Nadgir at Aug 10, 2010 at 11:35 pm
    Alexandru,

    That was the issue. The ack was not sent after consuming the message to the queues (since i had multiple queues registered against the same routing key).
    Thank you very much for the pointers.

    Avanti
    ----- Original Message -----
    From: "Alexandru Scvor?ov" <alexandru at rabbitmq.com>
    To: "Avanti Nadgir" <avanti at yahoo-inc.com>
    Cc: rabbitmq-discuss at lists.rabbitmq.com
    Sent: Tuesday, August 10, 2010 2:31:42 AM GMT -08:00 US/Canada Pacific
    Subject: Re: [rabbitmq-discuss] New user : Help needed in understanding memory usage

    Hi Avanti,

    If the memory usage is steadily increasing, you're probably not ack'ing
    the messages.

    Try running,
    % rabbitmqctl list_queues name messages

    That should list your queues and tell you how many ready and
    unacknowledged there are. If this number of messages is
    steadily increasing, you're not ack'ing properly.

    If that's the case, you can either set noAck to true (i.e. messages
    don't need to be acknowledged), or explicitly ack them (like you're
    doing now); just make sure that that code is being run with the right
    arguments.

    Cheers,
    Alex
    On Mon, Aug 09, 2010 at 04:59:15PM -0700, Avanti Nadgir wrote:
    I am experimenting with RabbitMQ on a rhel vm (ver 1.8.1)
    Have started the broker and am able to publish and consume events till the resident memory usage of the broker peaks at about 400M (it keeps growing with the number of messages published even though the consumer is consuming events) After this the broker needs to be restarted to bring back the memory usage to 0 before more messages can be sent.

    Attached is the code snippet of the publisher and consumer.
    =================================================================
    Publisher :
    factory.setHost("localhost");
    factory.setPort(5672);
    Connection conn = factory.newConnection();

    Channel channel = conn.createChannel();

    System.out.println("done creating conn");
    String exchangeName = "testExchange";
    String queueName = "testQueueNew";
    String routingKey = "";
    channel.exchangeDeclare(exchangeName, "direct", true);
    channel.queueDeclare(queueName, true, false, true, null);
    channel.queueBind(queueName, exchangeName, routingKey);

    byte[] messageBodyBytes = new byte[2000];
    for(int j = 0;j<2000;j++) {
    messageBodyBytes[j] = 'A';
    }
    for(int i = 0; i < numMessages; i++) {
    channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
    if(sleepInterval != 0) {
    if (i % sleepInterval == 0) {
    Thread.sleep(timeToSleep);
    System.out.println("Published Message " + i);
    }
    }
    }

    channel.close();
    conn.close();
    ==================================================================
    Consumer :
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setPort(5672);
    Connection conn = factory.newConnection();
    Channel channel = conn.createChannel();

    String exchangeName = "testExchange";
    String routingKey = "";
    String queueName = "testQueue";
    channel.exchangeDeclare(exchangeName, "direct", true);
    channel.queueBind(queueName, exchangeName, routingKey);

    boolean noAck = false;
    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, noAck, consumer);
    int count = 0;
    while (count < numMessages) {
    count++;
    QueueingConsumer.Delivery delivery;
    try {
    delivery = consumer.nextDelivery();
    } catch (InterruptedException ie) {
    continue;
    }
    // (process the message components ...)

    byte[] body = delivery.getBody();
    String resp = new String(body);
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }

    channel.close();
    conn.close();

    ===================================================================================
    Any help is appreciated.

    Thanks,
    Avanti
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-discuss at lists.rabbitmq.com
    https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
grouprabbitmq-discuss @
categoriesrabbitmq
postedAug 9, '10 at 11:59p
activeAug 10, '10 at 11:35p
posts3
users2
websiterabbitmq.com
irc#rabbitmq

People

Translate

site design / logo © 2021 Grokbase