Grokbase Groups Kafka users May 2016
FAQ
Hi,

I am using new Kafka consumer API ( 0.9.0.0) . I created 100 partitions of
a topic and started only one consumer to consume. Many of times , In
consumer logs I see lot of rebalancing activity and no object is consumed
due to that.

Is this a known issue? Please let me know if some body can help with regard
to this.

My Consumer config:
         props.put("zookeeper.session.timeout.ms", "10000");
         props.put("rebalance.backoff.ms","10000");
         props.put("zookeeper.sync.time.ms","200");
         props.put("rebalance.max.retries","10");
         props.put("enable.auto.commit", "false");
         props.put("consumer.timeout.ms","20000");
         props.put("auto.offset.reset", "smallest");

Thanks,
Sahitya

Search Discussions

  • Abhinav Solan at May 13, 2016 at 2:21 pm
    Hi Sahitya,

    Try reducing max.partition.fetch.bytes in your consumer.
    Then also increase heartbeat.interval.ms, this might help in to delay the
    consumer rebalance of your inbound process is taking more time than this

    - Abhinav
    On Fri, May 13, 2016 at 5:42 AM sahitya agrawal wrote:

    Hi,

    I am using new Kafka consumer API ( 0.9.0.0) . I created 100 partitions of
    a topic and started only one consumer to consume. Many of times , In
    consumer logs I see lot of rebalancing activity and no object is consumed
    due to that.

    Is this a known issue? Please let me know if some body can help with regard
    to this.

    My Consumer config:
    props.put("zookeeper.session.timeout.ms", "10000");
    props.put("rebalance.backoff.ms","10000");
    props.put("zookeeper.sync.time.ms","200");
    props.put("rebalance.max.retries","10");
    props.put("enable.auto.commit", "false");
    props.put("consumer.timeout.ms","20000");
    props.put("auto.offset.reset", "smallest");

    Thanks,
    Sahitya
  • Cees de Groot at May 13, 2016 at 4:06 pm
    What Abhinav said. To give some context: the common cause of frequent
    rebalances is that your consumer takes too long to process batches. As
    long as you don't call into the consumer library, heartbeats aren't
    sent so if you take too long working through a batch, the broker
    things your consumer is gone and starts re-balancing. The message
    batch under processing never gets marked as done, so after
    rebalancing, things start over from the same spot.

    So the solution is to either make the batches smaller or the heartbeat
    interval longer. There are fancier solutions for when this doesn't
    work, but it should do the trick for most normal cases.
    On Fri, May 13, 2016 at 10:20 AM, Abhinav Solan wrote:
    Hi Sahitya,

    Try reducing max.partition.fetch.bytes in your consumer.
    Then also increase heartbeat.interval.ms, this might help in to delay the
    consumer rebalance of your inbound process is taking more time than this

    - Abhinav
    On Fri, May 13, 2016 at 5:42 AM sahitya agrawal wrote:

    Hi,

    I am using new Kafka consumer API ( 0.9.0.0) . I created 100 partitions of
    a topic and started only one consumer to consume. Many of times , In
    consumer logs I see lot of rebalancing activity and no object is consumed
    due to that.

    Is this a known issue? Please let me know if some body can help with regard
    to this.

    My Consumer config:
    props.put("zookeeper.session.timeout.ms", "10000");
    props.put("rebalance.backoff.ms","10000");
    props.put("zookeeper.sync.time.ms","200");
    props.put("rebalance.max.retries","10");
    props.put("enable.auto.commit", "false");
    props.put("consumer.timeout.ms","20000");
    props.put("auto.offset.reset", "smallest");

    Thanks,
    Sahitya


    --
    Cees de Groot
    Principal Software Engineer
    PagerDuty, Inc.
  • Sahitya agrawal at May 14, 2016 at 3:06 pm
    Thanks Cees and Abhinav, will give this trick a try and update if it helped
    for my case.

    Regards,
    Sahitya Agrawal
    On Fri, May 13, 2016 at 9:36 PM, Cees de Groot wrote:

    What Abhinav said. To give some context: the common cause of frequent
    rebalances is that your consumer takes too long to process batches. As
    long as you don't call into the consumer library, heartbeats aren't
    sent so if you take too long working through a batch, the broker
    things your consumer is gone and starts re-balancing. The message
    batch under processing never gets marked as done, so after
    rebalancing, things start over from the same spot.

    So the solution is to either make the batches smaller or the heartbeat
    interval longer. There are fancier solutions for when this doesn't
    work, but it should do the trick for most normal cases.
    On Fri, May 13, 2016 at 10:20 AM, Abhinav Solan wrote:
    Hi Sahitya,

    Try reducing max.partition.fetch.bytes in your consumer.
    Then also increase heartbeat.interval.ms, this might help in to delay the
    consumer rebalance of your inbound process is taking more time than this

    - Abhinav

    On Fri, May 13, 2016 at 5:42 AM sahitya agrawal <sahitya2109@gmail.com>
    wrote:
    Hi,

    I am using new Kafka consumer API ( 0.9.0.0) . I created 100
    partitions of
    a topic and started only one consumer to consume. Many of times , In
    consumer logs I see lot of rebalancing activity and no object is
    consumed
    due to that.

    Is this a known issue? Please let me know if some body can help with
    regard
    to this.

    My Consumer config:
    props.put("zookeeper.session.timeout.ms", "10000");
    props.put("rebalance.backoff.ms","10000");
    props.put("zookeeper.sync.time.ms","200");
    props.put("rebalance.max.retries","10");
    props.put("enable.auto.commit", "false");
    props.put("consumer.timeout.ms","20000");
    props.put("auto.offset.reset", "smallest");

    Thanks,
    Sahitya


    --
    Cees de Groot
    Principal Software Engineer
    PagerDuty, Inc.
  • Jason Gustafson at May 16, 2016 at 6:52 pm
    To pile on a little bit, the API is designed to ensure consumer liveness so
    that partitions cannot be held indefinitely by a defunct process. Since
    heartbeating and message processing are done in the same thread, the
    consumer needs to demonstrate "progress" by calling poll() often enough not
    to get kicked out. What "often enough" means is dictated by the session
    timeout, which is 30s by default. If you fail to call poll() before the
    session timer expires, then the broker will assume that the member is dead
    and begin a rebalance. If you need more time to handle messages, increase
    session.timeout.ms in your configuration. The only downside to a higher
    timeout in general is that it will take longer to detect other kinds of
    failures (such as process crashes or network partitions).

    This was the initial design, but it hasn't worked out quite as well as we
    would have liked, at least not in all situations. The first problem in 0.9
    is that you don't have a direct way to control the amount of data that can
    be returned in a call to poll(), which makes it difficult to estimate the
    session timeout. You can set max.partition.fetch.bytes and, based on an
    estimate for the total number of partitions that you need to read, try to
    come up with a guess, but this is kind of hard in practice. So in 0.10
    we've introduced a new setting max.poll.records, which lets you set an
    explicit bound on the number of messages that need to be handled on each
    poll iteration. The idea is hopefully that you can set this to a reasonably
    low value so that you're never risking a session timeout.

    It's also worthwhile understanding a little bit about how the rebalance
    mechanism works. After a consumer group is created, each consumer begins
    sending heartbeat messages to a special broker known as the coordinator.
    When a new consumer joins the group (or when the session timeout of an
    existing member expires), the other members find out about it through the
    error code in the heartbeat response. The group coordination protocol
    basically implements a synchronization barrier. When a rebalance begins,
    all members of the group have to join the barrier for it to complete. So if
    you want to reduce the impact from rebalancing, then you need to ensure
    that all members can join the barrier as soon as possible after it begins.
    For this, we expose heartbeat.interval.ms, but note that we can't actually
    send heartbeats any faster than the poll() interval itself because
    everything is done from the same thread. So if you want to always have fast
    rebalances, then the target for setting the processing bound should be the
    heartbeat interval instead of the session timeout.

    We've made some other small improvements to make unexpected rebalancing
    less of a problem in practice. For example, we modified the protocol
    behavior to allow offset commits to serve as effective heartbeats, which
    wasn't the case in 0.9. However, we're still encountering situations where
    there's really no clear way to estimate the session timeout other than
    somewhat exhaustive testing. Even max.poll.records doesn't help when the
    impact of a single message can vary disproportionately (as is sometimes the
    case in Kafka Streams which uses the consumer internally). You could set a
    ridiculously large session timeout in these cases, but that guarantees also
    a long time to recover from hard failures. I think this basically means
    that these use cases need a separate notion of liveness, which they have a
    bit more control over. For example, we can expose a method in the consumer
    which applications can call from any thread to know that they're still
    around. I'm working on a KIP right now to address this problem, so look for
    it in the next few weeks.

    Thanks,
    Jason
    On Sat, May 14, 2016 at 8:05 AM, sahitya agrawal wrote:

    Thanks Cees and Abhinav, will give this trick a try and update if it helped
    for my case.

    Regards,
    Sahitya Agrawal
    On Fri, May 13, 2016 at 9:36 PM, Cees de Groot wrote:

    What Abhinav said. To give some context: the common cause of frequent
    rebalances is that your consumer takes too long to process batches. As
    long as you don't call into the consumer library, heartbeats aren't
    sent so if you take too long working through a batch, the broker
    things your consumer is gone and starts re-balancing. The message
    batch under processing never gets marked as done, so after
    rebalancing, things start over from the same spot.

    So the solution is to either make the batches smaller or the heartbeat
    interval longer. There are fancier solutions for when this doesn't
    work, but it should do the trick for most normal cases.

    On Fri, May 13, 2016 at 10:20 AM, Abhinav Solan <abhinav.solan@gmail.com

    wrote:
    Hi Sahitya,

    Try reducing max.partition.fetch.bytes in your consumer.
    Then also increase heartbeat.interval.ms, this might help in to delay the
    consumer rebalance of your inbound process is taking more time than
    this
    - Abhinav

    On Fri, May 13, 2016 at 5:42 AM sahitya agrawal <sahitya2109@gmail.com
    wrote:
    Hi,

    I am using new Kafka consumer API ( 0.9.0.0) . I created 100
    partitions of
    a topic and started only one consumer to consume. Many of times , In
    consumer logs I see lot of rebalancing activity and no object is
    consumed
    due to that.

    Is this a known issue? Please let me know if some body can help with
    regard
    to this.

    My Consumer config:
    props.put("zookeeper.session.timeout.ms", "10000");
    props.put("rebalance.backoff.ms","10000");
    props.put("zookeeper.sync.time.ms","200");
    props.put("rebalance.max.retries","10");
    props.put("enable.auto.commit", "false");
    props.put("consumer.timeout.ms","20000");
    props.put("auto.offset.reset", "smallest");

    Thanks,
    Sahitya


    --
    Cees de Groot
    Principal Software Engineer
    PagerDuty, Inc.
  • Sahitya agrawal at May 31, 2016 at 5:59 am
    Hi,

    I still face the same issue sometimes. My kafka consumer is giving this
    exception after failing to claim any partition.

    java.nio.channels.ClosedChannelException
             at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
             at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
             at
    kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
             at kafka.producer.SyncProducer.send(SyncProducer.scala:119)
             at
    kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
             at
    kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
             at
    kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
             at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)


    Regards,
    Sahitya Agrawal
    On Tue, May 17, 2016 at 12:22 AM, Jason Gustafson wrote:

    To pile on a little bit, the API is designed to ensure consumer liveness so
    that partitions cannot be held indefinitely by a defunct process. Since
    heartbeating and message processing are done in the same thread, the
    consumer needs to demonstrate "progress" by calling poll() often enough not
    to get kicked out. What "often enough" means is dictated by the session
    timeout, which is 30s by default. If you fail to call poll() before the
    session timer expires, then the broker will assume that the member is dead
    and begin a rebalance. If you need more time to handle messages, increase
    session.timeout.ms in your configuration. The only downside to a higher
    timeout in general is that it will take longer to detect other kinds of
    failures (such as process crashes or network partitions).

    This was the initial design, but it hasn't worked out quite as well as we
    would have liked, at least not in all situations. The first problem in 0.9
    is that you don't have a direct way to control the amount of data that can
    be returned in a call to poll(), which makes it difficult to estimate the
    session timeout. You can set max.partition.fetch.bytes and, based on an
    estimate for the total number of partitions that you need to read, try to
    come up with a guess, but this is kind of hard in practice. So in 0.10
    we've introduced a new setting max.poll.records, which lets you set an
    explicit bound on the number of messages that need to be handled on each
    poll iteration. The idea is hopefully that you can set this to a reasonably
    low value so that you're never risking a session timeout.

    It's also worthwhile understanding a little bit about how the rebalance
    mechanism works. After a consumer group is created, each consumer begins
    sending heartbeat messages to a special broker known as the coordinator.
    When a new consumer joins the group (or when the session timeout of an
    existing member expires), the other members find out about it through the
    error code in the heartbeat response. The group coordination protocol
    basically implements a synchronization barrier. When a rebalance begins,
    all members of the group have to join the barrier for it to complete. So if
    you want to reduce the impact from rebalancing, then you need to ensure
    that all members can join the barrier as soon as possible after it begins.
    For this, we expose heartbeat.interval.ms, but note that we can't actually
    send heartbeats any faster than the poll() interval itself because
    everything is done from the same thread. So if you want to always have fast
    rebalances, then the target for setting the processing bound should be the
    heartbeat interval instead of the session timeout.

    We've made some other small improvements to make unexpected rebalancing
    less of a problem in practice. For example, we modified the protocol
    behavior to allow offset commits to serve as effective heartbeats, which
    wasn't the case in 0.9. However, we're still encountering situations where
    there's really no clear way to estimate the session timeout other than
    somewhat exhaustive testing. Even max.poll.records doesn't help when the
    impact of a single message can vary disproportionately (as is sometimes the
    case in Kafka Streams which uses the consumer internally). You could set a
    ridiculously large session timeout in these cases, but that guarantees also
    a long time to recover from hard failures. I think this basically means
    that these use cases need a separate notion of liveness, which they have a
    bit more control over. For example, we can expose a method in the consumer
    which applications can call from any thread to know that they're still
    around. I'm working on a KIP right now to address this problem, so look for
    it in the next few weeks.

    Thanks,
    Jason
    On Sat, May 14, 2016 at 8:05 AM, sahitya agrawal wrote:

    Thanks Cees and Abhinav, will give this trick a try and update if it helped
    for my case.

    Regards,
    Sahitya Agrawal
    On Fri, May 13, 2016 at 9:36 PM, Cees de Groot wrote:

    What Abhinav said. To give some context: the common cause of frequent
    rebalances is that your consumer takes too long to process batches. As
    long as you don't call into the consumer library, heartbeats aren't
    sent so if you take too long working through a batch, the broker
    things your consumer is gone and starts re-balancing. The message
    batch under processing never gets marked as done, so after
    rebalancing, things start over from the same spot.

    So the solution is to either make the batches smaller or the heartbeat
    interval longer. There are fancier solutions for when this doesn't
    work, but it should do the trick for most normal cases.

    On Fri, May 13, 2016 at 10:20 AM, Abhinav Solan <
    abhinav.solan@gmail.com
    wrote:
    Hi Sahitya,

    Try reducing max.partition.fetch.bytes in your consumer.
    Then also increase heartbeat.interval.ms, this might help in to
    delay
    the
    consumer rebalance of your inbound process is taking more time than
    this
    - Abhinav

    On Fri, May 13, 2016 at 5:42 AM sahitya agrawal <
    sahitya2109@gmail.com
    wrote:
    Hi,

    I am using new Kafka consumer API ( 0.9.0.0) . I created 100
    partitions of
    a topic and started only one consumer to consume. Many of times , In
    consumer logs I see lot of rebalancing activity and no object is
    consumed
    due to that.

    Is this a known issue? Please let me know if some body can help with
    regard
    to this.

    My Consumer config:
    props.put("zookeeper.session.timeout.ms", "10000");
    props.put("rebalance.backoff.ms","10000");
    props.put("zookeeper.sync.time.ms","200");
    props.put("rebalance.max.retries","10");
    props.put("enable.auto.commit", "false");
    props.put("consumer.timeout.ms","20000");
    props.put("auto.offset.reset", "smallest");

    Thanks,
    Sahitya


    --
    Cees de Groot
    Principal Software Engineer
    PagerDuty, Inc.

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupusers @
categorieskafka
postedMay 13, '16 at 12:42p
activeMay 31, '16 at 5:59a
posts6
users4
websitekafka.apache.org
irc#kafka

People

Translate

site design / logo © 2019 Grokbase