Hello,

I have a pretty simple set up where I use the RabbitMQ client library
to pull objects from a RabbitMQ server.

Unfortunately, there seems to be some memory leak in my program
because it quickly consumes all of the heap memory with byte arrays.
I've increased the max memory for the JVM to 2 GB and it consumes all
of that too.

Here is the code I use to pull data from the queue. Having used a few
Java profilers, it looks like the leak is associated with they
QueueingConsumer.Delivery object, because each Delivery object has a
member byte[] called _body.

Does anyone have any idea how I can ensure these arrays are garbage
collected / destroyed?

Thanks in advance!!!

Property values:
queue.user=guest
queue.password=guest
queue.virtualHost=/
queue.host7.0.0.1
queue.hostPortV72
queue.heartBeat=0
queue.exchange=myExchange
queue.queueName=dtt
queue.routingKey=dttRoute

Code:

public void consume() throws IOException, ClassNotFoundException {

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(PropertyUtil
.getProperyFile().getProperty("queue.user"));
factory.setPassword(PropertyUtil
.getProperyFile().getProperty("queue.password"));
factory.setVirtualHost(PropertyUtil
.getProperyFile().getProperty("queue.virtualHost"));
factory.setRequestedHeartbeat(Integer.parseInt(PropertyUtil
.getProperyFile().getProperty("queue.heartBeat")));
factory.setHost(PropertyUtil
.getProperyFile().getProperty("queue.host"));
factory.setPort(Integer.parseInt(PropertyUtil
.getProperyFile().getProperty("queue.hostPort")));

boolean durable = true;
boolean noAck = false;

Connection conn = factory.newConnection();

Channel channel = conn.createChannel();

channel.basicQos(2);
channel.exchangeDeclare(getExchangeName(), "direct", durable);
channel.queueDeclare(getQueueName(), durable, false, false, null);
channel.queueBind(getQueueName(), getExchangeName(),
getRoutingKey());

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(getQueueName(), noAck, consumer);

boolean runInfinite = true;

java.sql.Connection sqlConn = null;
sqlConn = DatabaseUtil.getConnection();

long start = 0;
long end = 0;

while (runInfinite) {
start = System.currentTimeMillis();
QueueingConsumer.Delivery delivery = null;
try {
delivery = consumer.nextDelivery();

} catch (Exception ie) {
log.error(ie.getMessage(), ie);
continue;
}

byte[] body = delivery.getBody();

ObjectInputStream in = new ObjectInputStream(
new ByteArrayInputStream(body));

Status s = (Status) in.readObject();
in.close();
in = null;

try {

try {
process(s, sqlConn);

} catch (Exception e) {
try{
sqlConn = DatabaseUtil.getConnection();
} catch (Exception e1){
log.error(e.getMessage(), e1);
throw e1;
}
}

// only send an acknowledgement when the processing completed
// successfully
channel
.basicAck(delivery.getEnvelope().getDeliveryTag(),
false);

} catch (Exception e) {
log.error(e.getMessage(), e);
}


end = System.currentTimeMillis();
delivery = null;

log.debug("Processing time: "+ (end - start) +" ms");

}
try {
sqlConn.close();
} catch (SQLException e) {
// TODO Auto-generated catch block
log.error(e.getMessage(), e);
}

channel.close();
conn.close();

}

Search Discussions

  • Benjamin Bennett at Dec 12, 2010 at 5:51 pm
    Having a quick look at it make sure the line is not holding a
    reference , considering you are saying the profiler is saying that a
    reference is out there.
    byte[] body = delivery.getBody();
    de references the delivery.getBody();


    if the body variable is referencing the delivery.getBody() it will
    prevent it from being gc'ed .


    On Sun, Dec 12, 2010 at 9:58 AM, drenz wrote:
    Hello,

    I have a pretty simple set up where I use the RabbitMQ client library
    to pull objects from a RabbitMQ server.

    Unfortunately, there seems to be some memory leak in my program
    because it quickly consumes all of the heap memory with byte arrays.
    I've increased the max memory for the JVM to 2 GB and it consumes all
    of that too.

    Here is the code I use to pull data from the queue. ?Having used a few
    Java profilers, it looks like the leak is associated with they
    QueueingConsumer.Delivery object, because each Delivery object has a
    member byte[] called _body.

    Does anyone have any idea how I can ensure these arrays are garbage
    collected / destroyed?

    Thanks in advance!!!

    Property values:
    queue.user=guest
    queue.password=guest
    queue.virtualHost=/
    queue.host7.0.0.1
    queue.hostPortV72
    queue.heartBeat=0
    queue.exchange=myExchange
    queue.queueName=dtt
    queue.routingKey=dttRoute

    Code:

    ? ? ? ?public void consume() throws IOException, ClassNotFoundException {

    ? ? ? ? ? ? ? ?ConnectionFactory factory = new ConnectionFactory();
    ? ? ? ? ? ? ? ?factory.setUsername(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.user"));
    ? ? ? ? ? ? ? ?factory.setPassword(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.password"));
    ? ? ? ? ? ? ? ?factory.setVirtualHost(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.virtualHost"));
    ? ? ? ? ? ? ? ?factory.setRequestedHeartbeat(Integer.parseInt(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.heartBeat")));
    ? ? ? ? ? ? ? ?factory.setHost(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.host"));
    ? ? ? ? ? ? ? ?factory.setPort(Integer.parseInt(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.hostPort")));

    ? ? ? ? ? ? ? ?boolean durable = true;
    ? ? ? ? ? ? ? ?boolean noAck = false;

    ? ? ? ? ? ? ? ?Connection conn = factory.newConnection();

    ? ? ? ? ? ? ? ?Channel channel = conn.createChannel();

    ? ? ? ? ? ? ? ?channel.basicQos(2);
    ? ? ? ? ? ? ? ?channel.exchangeDeclare(getExchangeName(), "direct", durable);
    ? ? ? ? ? ? ? ?channel.queueDeclare(getQueueName(), durable, false, false, null);
    ? ? ? ? ? ? ? ?channel.queueBind(getQueueName(), getExchangeName(),
    getRoutingKey());

    ? ? ? ? ? ? ? ?QueueingConsumer consumer = new QueueingConsumer(channel);
    ? ? ? ? ? ? ? ?channel.basicConsume(getQueueName(), noAck, consumer);

    ? ? ? ? ? ? ? ?boolean runInfinite = true;

    ? ? ? ? ? ? ? ?java.sql.Connection sqlConn = null;
    ? ? ? ? ? ? ? ?sqlConn = DatabaseUtil.getConnection();

    ? ? ? ? ? ? ? ?long start = 0;
    ? ? ? ? ? ? ? ?long end = 0;

    ? ? ? ? ? ? ? ?while (runInfinite) {
    ? ? ? ? ? ? ? ? ? ? ? ?start = System.currentTimeMillis();
    ? ? ? ? ? ? ? ? ? ? ? ?QueueingConsumer.Delivery delivery = null;
    ? ? ? ? ? ? ? ? ? ? ? ?try {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?delivery = consumer.nextDelivery();

    ? ? ? ? ? ? ? ? ? ? ? ?} catch (Exception ie) {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?log.error(ie.getMessage(), ie);
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?continue;
    ? ? ? ? ? ? ? ? ? ? ? ?}

    ? ? ? ? ? ? ? ? ? ? ? ?byte[] body = delivery.getBody();

    ? ? ? ? ? ? ? ? ? ? ? ?ObjectInputStream in = new ObjectInputStream(
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?new ByteArrayInputStream(body));

    ? ? ? ? ? ? ? ? ? ? ? ?Status s = (Status) in.readObject();
    ? ? ? ? ? ? ? ? ? ? ? ?in.close();
    ? ? ? ? ? ? ? ? ? ? ? ?in = null;

    ? ? ? ? ? ? ? ? ? ? ? ?try {

    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?try {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?process(s, sqlConn);

    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?} catch (Exception e) {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?try{
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?sqlConn = DatabaseUtil.getConnection();
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?} catch (Exception e1){
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?log.error(e.getMessage(), e1);
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?throw e1;
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}

    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?// only send an acknowledgement when the processing completed
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?// successfully
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?channel
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.basicAck(delivery.getEnvelope().getDeliveryTag(),
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?false);

    ? ? ? ? ? ? ? ? ? ? ? ?} catch (Exception e) {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?log.error(e.getMessage(), e);
    ? ? ? ? ? ? ? ? ? ? ? ?}


    ? ? ? ? ? ? ? ? ? ? ? ?end = System.currentTimeMillis();
    ? ? ? ? ? ? ? ? ? ? ? ?delivery = null;

    ? ? ? ? ? ? ? ? ? ? ? ?log.debug("Processing time: "+ (end - start) +" ms");

    ? ? ? ? ? ? ? ?}
    ? ? ? ? ? ? ? ?try {
    ? ? ? ? ? ? ? ? ? ? ? ?sqlConn.close();
    ? ? ? ? ? ? ? ?} catch (SQLException e) {
    ? ? ? ? ? ? ? ? ? ? ? ?// TODO Auto-generated catch block
    ? ? ? ? ? ? ? ? ? ? ? ?log.error(e.getMessage(), e);
    ? ? ? ? ? ? ? ?}

    ? ? ? ? ? ? ? ?channel.close();
    ? ? ? ? ? ? ? ?conn.close();

    ? ? ? ?}


    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-discuss at lists.rabbitmq.com
    https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
  • Drenz at Dec 12, 2010 at 7:36 pm
    I've updated the code to explicitly set body = null after I am done
    using it. See below.

    However, the memory continues to build up. This is the only part of
    the code that runs, so it has to be something in here. It almost
    looks like the QueueingConsumer object might be keeping these
    references around. Is that possible?

    Thanks

    byte[] body = delivery.getBody();

    ObjectInputStream in = new ObjectInputStream(new
    ByteArrayInputStream(body));
    Status s = (Status) in.readObject();

    body = null;

    in.close();

    in = null;




    On Dec 12, 12:51?pm, Benjamin Bennett wrote:
    Having a quick look at it make sure the line is not holding a
    reference , considering you are saying the profiler is saying that a
    reference is out there.
    ?byte[] body = delivery.getBody();
    de references the delivery.getBody();

    if the body variable is referencing the delivery.getBody() it will
    prevent it from being gc'ed .








    On Sun, Dec 12, 2010 at 9:58 AM, drenz wrote:
    Hello,
    I have a pretty simple set up where I use the RabbitMQ client library
    to pull objects from a RabbitMQ server.
    Unfortunately, there seems to be some memory leak in my program
    because it quickly consumes all of the heap memory with byte arrays.
    I've increased the max memory for the JVM to 2 GB and it consumes all
    of that too.
    Here is the code I use to pull data from the queue. ?Having used a few
    Java profilers, it looks like the leak is associated with they
    QueueingConsumer.Delivery object, because each Delivery object has a
    member byte[] called _body.
    Does anyone have any idea how I can ensure these arrays are garbage
    collected / destroyed?
    Thanks in advance!!!
    Property values:
    queue.user=guest
    queue.password=guest
    queue.virtualHost=/
    queue.host7.0.0.1
    queue.hostPortV72
    queue.heartBeat=0
    queue.exchange=myExchange
    queue.queueName=dtt
    queue.routingKey=dttRoute
    Code:
    ? ? ? ?public void consume() throws IOException, ClassNotFoundException {
    ? ? ? ? ? ? ? ?ConnectionFactory factory = new ConnectionFactory();
    ? ? ? ? ? ? ? ?factory.setUsername(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.user"));
    ? ? ? ? ? ? ? ?factory.setPassword(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.password"));
    ? ? ? ? ? ? ? ?factory.setVirtualHost(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.virtualHost"));
    ? ? ? ? ? ? ? ?factory.setRequestedHeartbeat(Integer.parseInt(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.heartBeat")));
    ? ? ? ? ? ? ? ?factory.setHost(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.host"));
    ? ? ? ? ? ? ? ?factory.setPort(Integer.parseInt(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.hostPort")));
    ? ? ? ? ? ? ? ?boolean durable = true;
    ? ? ? ? ? ? ? ?boolean noAck = false;
    ? ? ? ? ? ? ? ?Connection conn = factory.newConnection();
    ? ? ? ? ? ? ? ?Channel channel = conn.createChannel();
    ? ? ? ? ? ? ? ?channel.basicQos(2);
    ? ? ? ? ? ? ? ?channel.exchangeDeclare(getExchangeName(), "direct", durable);
    ? ? ? ? ? ? ? ?channel.queueDeclare(getQueueName(), durable, false, false, null);
    ? ? ? ? ? ? ? ?channel.queueBind(getQueueName(), getExchangeName(),
    getRoutingKey());
    ? ? ? ? ? ? ? ?QueueingConsumer consumer = new QueueingConsumer(channel);
    ? ? ? ? ? ? ? ?channel.basicConsume(getQueueName(), noAck, consumer);
    ? ? ? ? ? ? ? ?boolean runInfinite = true;
    ? ? ? ? ? ? ? ?java.sql.Connection sqlConn = null;
    ? ? ? ? ? ? ? ?sqlConn = DatabaseUtil.getConnection();
    ? ? ? ? ? ? ? ?long start = 0;
    ? ? ? ? ? ? ? ?long end = 0;
    ? ? ? ? ? ? ? ?while (runInfinite) {
    ? ? ? ? ? ? ? ? ? ? ? ?start = System.currentTimeMillis();
    ? ? ? ? ? ? ? ? ? ? ? ?QueueingConsumer.Delivery delivery = null;
    ? ? ? ? ? ? ? ? ? ? ? ?try {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?delivery = consumer.nextDelivery();
    ? ? ? ? ? ? ? ? ? ? ? ?} catch (Exception ie) {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?log.error(ie.getMessage(), ie);
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?continue;
    ? ? ? ? ? ? ? ? ? ? ? ?}
    ? ? ? ? ? ? ? ? ? ? ? ?byte[] body = delivery.getBody();
    ? ? ? ? ? ? ? ? ? ? ? ?ObjectInputStream in = new ObjectInputStream(
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?new ByteArrayInputStream(body));
    ? ? ? ? ? ? ? ? ? ? ? ?Status s = (Status) in.readObject();
    ? ? ? ? ? ? ? ? ? ? ? ?in.close();
    ? ? ? ? ? ? ? ? ? ? ? ?in = null;
    ? ? ? ? ? ? ? ? ? ? ? ?try {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?try {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?process(s, sqlConn);
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?} catch (Exception e) {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?try{
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?sqlConn = DatabaseUtil.getConnection();
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?} catch (Exception e1){
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?log.error(e.getMessage(), e1);
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?throw e1;
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?// only send an acknowledgement when the processing completed
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?// successfully
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?channel
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.basicAck(delivery.getEnvelope().getDeliveryTag(),
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?false);
    ? ? ? ? ? ? ? ? ? ? ? ?} catch (Exception e) {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?log.error(e.getMessage(), e);
    ? ? ? ? ? ? ? ? ? ? ? ?}
    ? ? ? ? ? ? ? ? ? ? ? ?end = System.currentTimeMillis();
    ? ? ? ? ? ? ? ? ? ? ? ?delivery = null;
    ? ? ? ? ? ? ? ? ? ? ? ?log.debug("Processing time: "+ (end - start) +" ms");
    ? ? ? ? ? ? ? ?}
    ? ? ? ? ? ? ? ?try {
    ? ? ? ? ? ? ? ? ? ? ? ?sqlConn.close();
    ? ? ? ? ? ? ? ?} catch (SQLException e) {
    ? ? ? ? ? ? ? ? ? ? ? ?// TODO Auto-generated catch block
    ? ? ? ? ? ? ? ? ? ? ? ?log.error(e.getMessage(), e);
    ? ? ? ? ? ? ? ?}
    ? ? ? ? ? ? ? ?channel.close();
    ? ? ? ? ? ? ? ?conn.close();
    ? ? ? ?}
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-disc... at lists.rabbitmq.com
    https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
  • Matthias Radestock at Dec 12, 2010 at 7:56 pm
    David,

    drenz wrote:
    I've updated the code to explicitly set body = null after I am done
    using it. See below.

    However, the memory continues to build up. This is the only part of
    the code that runs, so it has to be something in here. It almost
    looks like the QueueingConsumer object might be keeping these
    references around. Is that possible?
    I suspect the leak is in the application code rather than the rabbit
    java client libs.

    I suggest you try removing *all* application code from the receive loop,
    leaving it looking like this:

    while (runInfinite) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    byte[] body = delivery.getBody();
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }

    Let us know whether you still see a leak with that.

    Regards,

    Matthias.
  • Benjamin Bennett at Dec 12, 2010 at 8:02 pm
    Don't know if it is possible, I would attach the source to your
    debugger.That might give you some idea what is going on.
    I had quick look at the QueueingConsumer code and it looks pretty
    strait forward.
    What profiler are you using ? I am yourkit memory inspections would
    give you some idea what class is holding the reference.
    Something is holding onto the bytes and it cannot be gc'd.
    Other item is the new ByteStreamInputArray() wonder if you can set
    that as a variable and set it to null after you read. But looking at
    the javadocs that shouldn't be the issue.
    I would still try it to 100% that isn't the issue.


    On Sun, Dec 12, 2010 at 1:36 PM, drenz wrote:
    I've updated the code to explicitly set body = null after I am done
    using it. See below.

    However, the memory continues to build up. This is the only part of
    the code that runs, so it has to be something in here. ?It almost
    looks like the QueueingConsumer object might be keeping these
    references around. ?Is that possible?

    Thanks

    byte[] body = delivery.getBody();

    ObjectInputStream in = new ObjectInputStream(new
    ByteArrayInputStream(body));
    Status s = (Status) in.readObject();

    body = null;

    in.close();

    in = null;




    On Dec 12, 12:51?pm, Benjamin Bennett wrote:
    Having a quick look at it make sure the line is not holding a
    reference , considering you are saying the profiler is saying that a
    reference is out there.
    ?byte[] body = delivery.getBody();
    de references the delivery.getBody();

    if the body variable is referencing the delivery.getBody() it will
    prevent it from being gc'ed .








    On Sun, Dec 12, 2010 at 9:58 AM, drenz wrote:
    Hello,
    I have a pretty simple set up where I use the RabbitMQ client library
    to pull objects from a RabbitMQ server.
    Unfortunately, there seems to be some memory leak in my program
    because it quickly consumes all of the heap memory with byte arrays.
    I've increased the max memory for the JVM to 2 GB and it consumes all
    of that too.
    Here is the code I use to pull data from the queue. ?Having used a few
    Java profilers, it looks like the leak is associated with they
    QueueingConsumer.Delivery object, because each Delivery object has a
    member byte[] called _body.
    Does anyone have any idea how I can ensure these arrays are garbage
    collected / destroyed?
    Thanks in advance!!!
    Property values:
    queue.user=guest
    queue.password=guest
    queue.virtualHost=/
    queue.host7.0.0.1
    queue.hostPortV72
    queue.heartBeat=0
    queue.exchange=myExchange
    queue.queueName=dtt
    queue.routingKey=dttRoute
    Code:
    ? ? ? ?public void consume() throws IOException, ClassNotFoundException {
    ? ? ? ? ? ? ? ?ConnectionFactory factory = new ConnectionFactory();
    ? ? ? ? ? ? ? ?factory.setUsername(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.user"));
    ? ? ? ? ? ? ? ?factory.setPassword(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.password"));
    ? ? ? ? ? ? ? ?factory.setVirtualHost(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.virtualHost"));
    ? ? ? ? ? ? ? ?factory.setRequestedHeartbeat(Integer.parseInt(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.heartBeat")));
    ? ? ? ? ? ? ? ?factory.setHost(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.host"));
    ? ? ? ? ? ? ? ?factory.setPort(Integer.parseInt(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.hostPort")));
    ? ? ? ? ? ? ? ?boolean durable = true;
    ? ? ? ? ? ? ? ?boolean noAck = false;
    ? ? ? ? ? ? ? ?Connection conn = factory.newConnection();
    ? ? ? ? ? ? ? ?Channel channel = conn.createChannel();
    ? ? ? ? ? ? ? ?channel.basicQos(2);
    ? ? ? ? ? ? ? ?channel.exchangeDeclare(getExchangeName(), "direct", durable);
    ? ? ? ? ? ? ? ?channel.queueDeclare(getQueueName(), durable, false, false, null);
    ? ? ? ? ? ? ? ?channel.queueBind(getQueueName(), getExchangeName(),
    getRoutingKey());
    ? ? ? ? ? ? ? ?QueueingConsumer consumer = new QueueingConsumer(channel);
    ? ? ? ? ? ? ? ?channel.basicConsume(getQueueName(), noAck, consumer);
    ? ? ? ? ? ? ? ?boolean runInfinite = true;
    ? ? ? ? ? ? ? ?java.sql.Connection sqlConn = null;
    ? ? ? ? ? ? ? ?sqlConn = DatabaseUtil.getConnection();
    ? ? ? ? ? ? ? ?long start = 0;
    ? ? ? ? ? ? ? ?long end = 0;
    ? ? ? ? ? ? ? ?while (runInfinite) {
    ? ? ? ? ? ? ? ? ? ? ? ?start = System.currentTimeMillis();
    ? ? ? ? ? ? ? ? ? ? ? ?QueueingConsumer.Delivery delivery = null;
    ? ? ? ? ? ? ? ? ? ? ? ?try {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?delivery = consumer.nextDelivery();
    ? ? ? ? ? ? ? ? ? ? ? ?} catch (Exception ie) {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?log.error(ie.getMessage(), ie);
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?continue;
    ? ? ? ? ? ? ? ? ? ? ? ?}
    ? ? ? ? ? ? ? ? ? ? ? ?byte[] body = delivery.getBody();
    ? ? ? ? ? ? ? ? ? ? ? ?ObjectInputStream in = new ObjectInputStream(
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?new ByteArrayInputStream(body));
    ? ? ? ? ? ? ? ? ? ? ? ?Status s = (Status) in.readObject();
    ? ? ? ? ? ? ? ? ? ? ? ?in.close();
    ? ? ? ? ? ? ? ? ? ? ? ?in = null;
    ? ? ? ? ? ? ? ? ? ? ? ?try {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?try {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?process(s, sqlConn);
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?} catch (Exception e) {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?try{
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?sqlConn = DatabaseUtil.getConnection();
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?} catch (Exception e1){
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?log.error(e.getMessage(), e1);
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?throw e1;
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?// only send an acknowledgement when the processing completed
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?// successfully
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?channel
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.basicAck(delivery.getEnvelope().getDeliveryTag(),
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?false);
    ? ? ? ? ? ? ? ? ? ? ? ?} catch (Exception e) {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?log.error(e.getMessage(), e);
    ? ? ? ? ? ? ? ? ? ? ? ?}
    ? ? ? ? ? ? ? ? ? ? ? ?end = System.currentTimeMillis();
    ? ? ? ? ? ? ? ? ? ? ? ?delivery = null;
    ? ? ? ? ? ? ? ? ? ? ? ?log.debug("Processing time: "+ (end - start) +" ms");
    ? ? ? ? ? ? ? ?}
    ? ? ? ? ? ? ? ?try {
    ? ? ? ? ? ? ? ? ? ? ? ?sqlConn.close();
    ? ? ? ? ? ? ? ?} catch (SQLException e) {
    ? ? ? ? ? ? ? ? ? ? ? ?// TODO Auto-generated catch block
    ? ? ? ? ? ? ? ? ? ? ? ?log.error(e.getMessage(), e);
    ? ? ? ? ? ? ? ?}
    ? ? ? ? ? ? ? ?channel.close();
    ? ? ? ? ? ? ? ?conn.close();
    ? ? ? ?}
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-disc... at lists.rabbitmq.com
    https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-discuss at lists.rabbitmq.com
    https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
  • Drenz at Jan 3, 2011 at 10:56 pm
    Thanks for the help guys! I've resolved the issue.

    Not surprisingly, this was my own fault. I had a long running database
    connection in another layer of the code that was eating up memory.
    Nothing was wrong with rabbitmq.

    Thanks,

    Dave

    On Dec 12 2010, 3:02?pm, Benjamin Bennett wrote:
    Don't know if it is possible, I would attach the source to your
    debugger.That might give you some idea what is going on.
    I had ?quick look at the QueueingConsumer code and it looks pretty
    strait forward.
    ?What profiler are you using ? I am yourkit memory inspections would
    give you some idea what class is holding the reference.
    ?Something is holding onto the bytes and it cannot be gc'd.
    Other item is the new ByteStreamInputArray() wonder if you can set
    that as a variable and set it to null after you read. But looking at
    the javadocs that shouldn't be the issue.
    I would still try it to 100% that isn't the issue.








    On Sun, Dec 12, 2010 at 1:36 PM, drenz wrote:
    I've updated the code to explicitly set body = null after I am done
    using it. See below.
    However, the memory continues to build up. This is the only part of
    the code that runs, so it has to be something in here. ?It almost
    looks like the QueueingConsumer object might be keeping these
    references around. ?Is that possible?
    Thanks
    byte[] body = delivery.getBody();
    ObjectInputStream in = new ObjectInputStream(new
    ByteArrayInputStream(body));
    Status s = (Status) in.readObject();
    body = null;
    in.close();
    in = null;
    On Dec 12, 12:51?pm, Benjamin Bennett wrote:
    Having a quick look at it make sure the line is not holding a
    reference , considering you are saying the profiler is saying that a
    reference is out there.
    ?byte[] body = delivery.getBody();
    de references the delivery.getBody();
    if the body variable is referencing the delivery.getBody() it will
    prevent it from being gc'ed .
    On Sun, Dec 12, 2010 at 9:58 AM, drenz wrote:
    Hello,
    I have a pretty simple set up where I use the RabbitMQ client library
    to pull objects from a RabbitMQ server.
    Unfortunately, there seems to be some memory leak in my program
    because it quickly consumes all of the heap memory with byte arrays.
    I've increased the max memory for the JVM to 2 GB and it consumes all
    of that too.
    Here is the code I use to pull data from the queue. ?Having used a few
    Java profilers, it looks like the leak is associated with they
    QueueingConsumer.Delivery object, because each Delivery object has a
    member byte[] called _body.
    Does anyone have any idea how I can ensure these arrays are garbage
    collected / destroyed?
    Thanks in advance!!!
    Property values:
    queue.user=guest
    queue.password=guest
    queue.virtualHost=/
    queue.host7.0.0.1
    queue.hostPortV72
    queue.heartBeat=0
    queue.exchange=myExchange
    queue.queueName=dtt
    queue.routingKey=dttRoute
    Code:
    ? ? ? ?public void consume() throws IOException, ClassNotFoundException {
    ? ? ? ? ? ? ? ?ConnectionFactory factory = new ConnectionFactory();
    ? ? ? ? ? ? ? ?factory.setUsername(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.user"));
    ? ? ? ? ? ? ? ?factory.setPassword(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.password"));
    ? ? ? ? ? ? ? ?factory.setVirtualHost(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.virtualHost"));
    ? ? ? ? ? ? ? ?factory.setRequestedHeartbeat(Integer.parseInt(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.heartBeat")));
    ? ? ? ? ? ? ? ?factory.setHost(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.host"));
    ? ? ? ? ? ? ? ?factory.setPort(Integer.parseInt(PropertyUtil
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.getProperyFile().getProperty("queue.hostPort")));
    ? ? ? ? ? ? ? ?boolean durable = true;
    ? ? ? ? ? ? ? ?boolean noAck = false;
    ? ? ? ? ? ? ? ?Connection conn = factory.newConnection();
    ? ? ? ? ? ? ? ?Channel channel = conn.createChannel();
    ? ? ? ? ? ? ? ?channel.basicQos(2);
    ? ? ? ? ? ? ? ?channel.exchangeDeclare(getExchangeName(), "direct", durable);
    ? ? ? ? ? ? ? ?channel.queueDeclare(getQueueName(), durable, false, false, null);
    ? ? ? ? ? ? ? ?channel.queueBind(getQueueName(), getExchangeName(),
    getRoutingKey());
    ? ? ? ? ? ? ? ?QueueingConsumer consumer = new QueueingConsumer(channel);
    ? ? ? ? ? ? ? ?channel.basicConsume(getQueueName(), noAck, consumer);
    ? ? ? ? ? ? ? ?boolean runInfinite = true;
    ? ? ? ? ? ? ? ?java.sql.Connection sqlConn = null;
    ? ? ? ? ? ? ? ?sqlConn = DatabaseUtil.getConnection();
    ? ? ? ? ? ? ? ?long start = 0;
    ? ? ? ? ? ? ? ?long end = 0;
    ? ? ? ? ? ? ? ?while (runInfinite) {
    ? ? ? ? ? ? ? ? ? ? ? ?start = System.currentTimeMillis();
    ? ? ? ? ? ? ? ? ? ? ? ?QueueingConsumer.Delivery delivery = null;
    ? ? ? ? ? ? ? ? ? ? ? ?try {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?delivery = consumer.nextDelivery();
    ? ? ? ? ? ? ? ? ? ? ? ?} catch (Exception ie) {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?log.error(ie.getMessage(), ie);
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?continue;
    ? ? ? ? ? ? ? ? ? ? ? ?}
    ? ? ? ? ? ? ? ? ? ? ? ?byte[] body = delivery.getBody();
    ? ? ? ? ? ? ? ? ? ? ? ?ObjectInputStream in = new ObjectInputStream(
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?new ByteArrayInputStream(body));
    ? ? ? ? ? ? ? ? ? ? ? ?Status s = (Status) in.readObject();
    ? ? ? ? ? ? ? ? ? ? ? ?in.close();
    ? ? ? ? ? ? ? ? ? ? ? ?in = null;
    ? ? ? ? ? ? ? ? ? ? ? ?try {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?try {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?process(s, sqlConn);
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?} catch (Exception e) {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?try{
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?sqlConn = DatabaseUtil.getConnection();
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?} catch (Exception e1){
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?log.error(e.getMessage(), e1);
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?throw e1;
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?// only send an acknowledgement when the processing completed
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?// successfully
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?channel
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.basicAck(delivery.getEnvelope().getDeliveryTag(),
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?false);
    ? ? ? ? ? ? ? ? ? ? ? ?} catch (Exception e) {
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?log.error(e.getMessage(), e);
    ? ? ? ? ? ? ? ? ? ? ? ?}
    ? ? ? ? ? ? ? ? ? ? ? ?end = System.currentTimeMillis();
    ? ? ? ? ? ? ? ? ? ? ? ?delivery = null;
    ? ? ? ? ? ? ? ? ? ? ? ?log.debug("Processing time: "+ (end - start) +" ms");
    ? ? ? ? ? ? ? ?}
    ? ? ? ? ? ? ? ?try {
    ? ? ? ? ? ? ? ? ? ? ? ?sqlConn.close();
    ? ? ? ? ? ? ? ?} catch (SQLException e) {
    ? ? ? ? ? ? ? ? ? ? ? ?// TODO Auto-generated catch block
    ? ? ? ? ? ? ? ? ? ? ? ?log.error(e.getMessage(), e);
    ? ? ? ? ? ? ? ?}
    ? ? ? ? ? ? ? ?channel.close();
    ? ? ? ? ? ? ? ?conn.close();
    ? ? ? ?}
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-disc... at lists.rabbitmq.com
    https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-disc... at lists.rabbitmq.com
    https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
    _______________________________________________
    rabbitmq-discuss mailing list
    rabbitmq-disc... at lists.rabbitmq.comhttps://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
grouprabbitmq-discuss @
categoriesrabbitmq
postedDec 12, '10 at 3:58p
activeJan 3, '11 at 10:56p
posts6
users3
websiterabbitmq.com
irc#rabbitmq

People

Translate

site design / logo © 2022 Grokbase