FAQ

[Kafka-users] Getting LeaderNotAvailableException in console producer after increasing partitions from 4 to 16.

Rajasekar Elango
Aug 27, 2013 at 3:54 pm
Hello everyone,

We recently increased number of partitions from 4 to 16 and after that
console producer mostly fails with LeaderNotAvailableException and exits
after 3 tries:

Here is last few lines of console producer log:

No partition metadata for topic test-41 due to
kafka.common.LeaderNotAvailableException}] for topic [test-41]: class
kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
[2013-08-27 08:29:30,271] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: test-41
(kafka.producer.async.DefaultEventHandler)
[2013-08-27 08:29:30,271] INFO Back off for 100 ms before retrying send.
Remaining retries = 0 (kafka.producer.async.DefaultEventHandler)
[2013-08-27 08:29:30,372] INFO Secure sockets for data transfer is enabled
(kafka.producer.SyncProducerConfig)
[2013-08-27 08:29:30,372] INFO Fetching metadata from broker
id:0,host:localhost,port:6667,secure:true with correlation id 8 for 1
topic(s) Set(test-41) (kafka.client.ClientUtils$)
[2013-08-27 08:29:30,373] INFO begin ssl handshake for localhost/
127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
[2013-08-27 08:29:30,375] INFO finished ssl handshake for localhost/
127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
[2013-08-27 08:29:30,375] INFO Connected to localhost:6667:true for
producing (kafka.producer.SyncProducer)
[2013-08-27 08:29:30,380] INFO Disconnecting from localhost:6667:true
(kafka.producer.SyncProducer)
[2013-08-27 08:29:30,381] INFO Secure sockets for data transfer is enabled
(kafka.producer.SyncProducerConfig)
[2013-08-27 08:29:30,381] ERROR Failed to send requests for topics test-41
with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
kafka.common.FailedToSendMessageException: Failed to send messages after 3
tries.
         at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
         at kafka.producer.Producer.send(Producer.scala:74)
         at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:168)
         at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)
[2013-08-27 08:29:30,383] INFO Shutting down producer
(kafka.producer.Producer)
[2013-08-27 08:29:30,384] INFO Closing all sync producers
(kafka.producer.ProducerPool)


Also, this happens only for new topics (we have auto.create.topic set to
true), If retry sending message to existing topic, it works fine. Is there
any tweaking I need to do to broker or to producer to scale based on number
of partitions?

--
Thanks in advance for help,
Raja.
reply

Search Discussions

11 responses

  • Guozhang Wang at Aug 27, 2013 at 4:45 pm
    Hello Rajasekar,

    In 0.8 producers keep a cache of the partition -> leader_broker_id map
    which is used to determine to which brokers should the messages be sent.
    After new partitions are added, the cache on the producer has not populated
    yet hence it will throw this exception. The producer will then try to
    refresh its cache by asking the brokers "who are the leaders of these new
    partitions that I do not know of before". The brokers at the beginning also
    do not know this information, and will only get this information from
    controller which will only propagation the leader information after the
    leader elections have all been finished.

    If you set num.retries to 3 then it is possible that producer gives up too
    soon before the leader info ever propagated to producers, hence to
    producers also. Could you try to increase producer.num.retries and see if
    the producer can eventually succeed in re-trying?

    Guozhang

    On Tue, Aug 27, 2013 at 8:53 AM, Rajasekar Elango wrote:

    Hello everyone,

    We recently increased number of partitions from 4 to 16 and after that
    console producer mostly fails with LeaderNotAvailableException and exits
    after 3 tries:

    Here is last few lines of console producer log:

    No partition metadata for topic test-41 due to
    kafka.common.LeaderNotAvailableException}] for topic [test-41]: class
    kafka.common.LeaderNotAvailableException
    (kafka.producer.BrokerPartitionInfo)
    [2013-08-27 08:29:30,271] ERROR Failed to collate messages by topic,
    partition due to: Failed to fetch topic metadata for topic: test-41
    (kafka.producer.async.DefaultEventHandler)
    [2013-08-27 08:29:30,271] INFO Back off for 100 ms before retrying send.
    Remaining retries = 0 (kafka.producer.async.DefaultEventHandler)
    [2013-08-27 08:29:30,372] INFO Secure sockets for data transfer is enabled
    (kafka.producer.SyncProducerConfig)
    [2013-08-27 08:29:30,372] INFO Fetching metadata from broker
    id:0,host:localhost,port:6667,secure:true with correlation id 8 for 1
    topic(s) Set(test-41) (kafka.client.ClientUtils$)
    [2013-08-27 08:29:30,373] INFO begin ssl handshake for localhost/
    127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
    [2013-08-27 08:29:30,375] INFO finished ssl handshake for localhost/
    127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
    [2013-08-27 08:29:30,375] INFO Connected to localhost:6667:true for
    producing (kafka.producer.SyncProducer)
    [2013-08-27 08:29:30,380] INFO Disconnecting from localhost:6667:true
    (kafka.producer.SyncProducer)
    [2013-08-27 08:29:30,381] INFO Secure sockets for data transfer is enabled
    (kafka.producer.SyncProducerConfig)
    [2013-08-27 08:29:30,381] ERROR Failed to send requests for topics test-41
    with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
    kafka.common.FailedToSendMessageException: Failed to send messages after 3
    tries.
    at

    kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.Producer.send(Producer.scala:74)
    at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:168)
    at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)
    [2013-08-27 08:29:30,383] INFO Shutting down producer
    (kafka.producer.Producer)
    [2013-08-27 08:29:30,384] INFO Closing all sync producers
    (kafka.producer.ProducerPool)


    Also, this happens only for new topics (we have auto.create.topic set to
    true), If retry sending message to existing topic, it works fine. Is there
    any tweaking I need to do to broker or to producer to scale based on number
    of partitions?

    --
    Thanks in advance for help,
    Raja.


    --
    -- Guozhang
  • Neha Narkhede at Aug 27, 2013 at 4:52 pm
    As Guozhang said, your producer might give up sooner than the leader
    election completes for the new topic. To confirm if your producer gave up
    too soon, you can run the state change log merge tool for this topic and
    see when the leader election finished for all partitions

    ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger --logs <location
    to all state change logs> --topic <topic>

    Note that this tool requires you to give the state change logs for all
    brokers in the cluster.


    Thanks,
    Neha

    On Tue, Aug 27, 2013 at 9:45 AM, Guozhang Wang wrote:

    Hello Rajasekar,

    In 0.8 producers keep a cache of the partition -> leader_broker_id map
    which is used to determine to which brokers should the messages be sent.
    After new partitions are added, the cache on the producer has not populated
    yet hence it will throw this exception. The producer will then try to
    refresh its cache by asking the brokers "who are the leaders of these new
    partitions that I do not know of before". The brokers at the beginning also
    do not know this information, and will only get this information from
    controller which will only propagation the leader information after the
    leader elections have all been finished.

    If you set num.retries to 3 then it is possible that producer gives up too
    soon before the leader info ever propagated to producers, hence to
    producers also. Could you try to increase producer.num.retries and see if
    the producer can eventually succeed in re-trying?

    Guozhang


    On Tue, Aug 27, 2013 at 8:53 AM, Rajasekar Elango <rel...@...com
    wrote:
    Hello everyone,

    We recently increased number of partitions from 4 to 16 and after that
    console producer mostly fails with LeaderNotAvailableException and exits
    after 3 tries:

    Here is last few lines of console producer log:

    No partition metadata for topic test-41 due to
    kafka.common.LeaderNotAvailableException}] for topic [test-41]: class
    kafka.common.LeaderNotAvailableException
    (kafka.producer.BrokerPartitionInfo)
    [2013-08-27 08:29:30,271] ERROR Failed to collate messages by topic,
    partition due to: Failed to fetch topic metadata for topic: test-41
    (kafka.producer.async.DefaultEventHandler)
    [2013-08-27 08:29:30,271] INFO Back off for 100 ms before retrying send.
    Remaining retries = 0 (kafka.producer.async.DefaultEventHandler)
    [2013-08-27 08:29:30,372] INFO Secure sockets for data transfer is enabled
    (kafka.producer.SyncProducerConfig)
    [2013-08-27 08:29:30,372] INFO Fetching metadata from broker
    id:0,host:localhost,port:6667,secure:true with correlation id 8 for 1
    topic(s) Set(test-41) (kafka.client.ClientUtils$)
    [2013-08-27 08:29:30,373] INFO begin ssl handshake for localhost/
    127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
    [2013-08-27 08:29:30,375] INFO finished ssl handshake for localhost/
    127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
    [2013-08-27 08:29:30,375] INFO Connected to localhost:6667:true for
    producing (kafka.producer.SyncProducer)
    [2013-08-27 08:29:30,380] INFO Disconnecting from localhost:6667:true
    (kafka.producer.SyncProducer)
    [2013-08-27 08:29:30,381] INFO Secure sockets for data transfer is enabled
    (kafka.producer.SyncProducerConfig)
    [2013-08-27 08:29:30,381] ERROR Failed to send requests for topics test-41
    with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
    kafka.common.FailedToSendMessageException: Failed to send messages after 3
    tries.
    at

    kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.Producer.send(Producer.scala:74)
    at
    kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:168)
    at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)
    [2013-08-27 08:29:30,383] INFO Shutting down producer
    (kafka.producer.Producer)
    [2013-08-27 08:29:30,384] INFO Closing all sync producers
    (kafka.producer.ProducerPool)


    Also, this happens only for new topics (we have auto.create.topic set to
    true), If retry sending message to existing topic, it works fine. Is there
    any tweaking I need to do to broker or to producer to scale based on number
    of partitions?

    --
    Thanks in advance for help,
    Raja.


    --
    -- Guozhang
  • Rajasekar Elango at Aug 27, 2013 at 7:38 pm
    Thanks Neha & Guozhang,

    When I ran StateChangeLogMerger, I am seeing this message repeated 16 times
    for each partition:

    [2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker 1] Removing
    fetcher for partition [test-60,13] (kafka.server.ReplicaFetcherManager)
    [2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created log for
    partition [test-60,13] in
    /home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data.
    (kafka.log.LogManager)

    I am also seeing .log and .index files created for this topic in data dir.
    Also list topic command shows leaders, replicas and isrs for all
    partitions. Do you still think increasing num of retries would help or is
    it some other issue..? Also console Producer doesn't seem to have option
    to set num of retries. Is there a way to configure num of retries for
    console producer ?

    Thanks,
    Raja.

    On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede wrote:

    As Guozhang said, your producer might give up sooner than the leader
    election completes for the new topic. To confirm if your producer gave up
    too soon, you can run the state change log merge tool for this topic and
    see when the leader election finished for all partitions

    ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger --logs <location
    to all state change logs> --topic <topic>

    Note that this tool requires you to give the state change logs for all
    brokers in the cluster.


    Thanks,
    Neha

    On Tue, Aug 27, 2013 at 9:45 AM, Guozhang Wang wrote:

    Hello Rajasekar,

    In 0.8 producers keep a cache of the partition -> leader_broker_id map
    which is used to determine to which brokers should the messages be sent.
    After new partitions are added, the cache on the producer has not populated
    yet hence it will throw this exception. The producer will then try to
    refresh its cache by asking the brokers "who are the leaders of these new
    partitions that I do not know of before". The brokers at the beginning also
    do not know this information, and will only get this information from
    controller which will only propagation the leader information after the
    leader elections have all been finished.

    If you set num.retries to 3 then it is possible that producer gives up too
    soon before the leader info ever propagated to producers, hence to
    producers also. Could you try to increase producer.num.retries and see if
    the producer can eventually succeed in re-trying?

    Guozhang


    On Tue, Aug 27, 2013 at 8:53 AM, Rajasekar Elango <
    rel...@...com
    wrote:
    Hello everyone,

    We recently increased number of partitions from 4 to 16 and after that
    console producer mostly fails with LeaderNotAvailableException and
    exits
    after 3 tries:

    Here is last few lines of console producer log:

    No partition metadata for topic test-41 due to
    kafka.common.LeaderNotAvailableException}] for topic [test-41]: class
    kafka.common.LeaderNotAvailableException
    (kafka.producer.BrokerPartitionInfo)
    [2013-08-27 08:29:30,271] ERROR Failed to collate messages by topic,
    partition due to: Failed to fetch topic metadata for topic: test-41
    (kafka.producer.async.DefaultEventHandler)
    [2013-08-27 08:29:30,271] INFO Back off for 100 ms before retrying
    send.
    Remaining retries = 0 (kafka.producer.async.DefaultEventHandler)
    [2013-08-27 08:29:30,372] INFO Secure sockets for data transfer is enabled
    (kafka.producer.SyncProducerConfig)
    [2013-08-27 08:29:30,372] INFO Fetching metadata from broker
    id:0,host:localhost,port:6667,secure:true with correlation id 8 for 1
    topic(s) Set(test-41) (kafka.client.ClientUtils$)
    [2013-08-27 08:29:30,373] INFO begin ssl handshake for localhost/
    127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
    [2013-08-27 08:29:30,375] INFO finished ssl handshake for localhost/
    127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
    [2013-08-27 08:29:30,375] INFO Connected to localhost:6667:true for
    producing (kafka.producer.SyncProducer)
    [2013-08-27 08:29:30,380] INFO Disconnecting from localhost:6667:true
    (kafka.producer.SyncProducer)
    [2013-08-27 08:29:30,381] INFO Secure sockets for data transfer is enabled
    (kafka.producer.SyncProducerConfig)
    [2013-08-27 08:29:30,381] ERROR Failed to send requests for topics test-41
    with correlation ids in [0,8]
    (kafka.producer.async.DefaultEventHandler)
    kafka.common.FailedToSendMessageException: Failed to send messages
    after
    3
    tries.
    at
    kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.Producer.send(Producer.scala:74)
    at
    kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:168)
    at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)
    [2013-08-27 08:29:30,383] INFO Shutting down producer
    (kafka.producer.Producer)
    [2013-08-27 08:29:30,384] INFO Closing all sync producers
    (kafka.producer.ProducerPool)


    Also, this happens only for new topics (we have auto.create.topic set
    to
    true), If retry sending message to existing topic, it works fine. Is there
    any tweaking I need to do to broker or to producer to scale based on number
    of partitions?

    --
    Thanks in advance for help,
    Raja.


    --
    -- Guozhang


    --
    Thanks,
    Raja.
  • Guozhang Wang at Aug 27, 2013 at 8:04 pm
    Hello Rajasekar,

    The remove fetcher log entry is normal under addition of partitions, since
    they indicate that some leader changes have happened so brokers are closing
    the fetchers to the old leaders.

    I just realized that the console Producer does not have the
    message.send.max.retries options yet. Could you file a JIRA for this and I
    will followup to add this option? As for now you can hard modify the
    default value from 3 to a larger number.

    Guozhang


    On Tue, Aug 27, 2013 at 12:37 PM, Rajasekar Elango
    wrote:
    Thanks Neha & Guozhang,

    When I ran StateChangeLogMerger, I am seeing this message repeated 16 times
    for each partition:

    [2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker 1] Removing
    fetcher for partition [test-60,13] (kafka.server.ReplicaFetcherManager)
    [2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created log for
    partition [test-60,13] in
    /home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data.
    (kafka.log.LogManager)

    I am also seeing .log and .index files created for this topic in data dir.
    Also list topic command shows leaders, replicas and isrs for all
    partitions. Do you still think increasing num of retries would help or is
    it some other issue..? Also console Producer doesn't seem to have option
    to set num of retries. Is there a way to configure num of retries for
    console producer ?

    Thanks,
    Raja.


    On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede <neh...@...com
    wrote:
    As Guozhang said, your producer might give up sooner than the leader
    election completes for the new topic. To confirm if your producer gave up
    too soon, you can run the state change log merge tool for this topic and
    see when the leader election finished for all partitions

    ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger --logs <location
    to all state change logs> --topic <topic>

    Note that this tool requires you to give the state change logs for all
    brokers in the cluster.


    Thanks,
    Neha

    On Tue, Aug 27, 2013 at 9:45 AM, Guozhang Wang wrote:

    Hello Rajasekar,

    In 0.8 producers keep a cache of the partition -> leader_broker_id map
    which is used to determine to which brokers should the messages be
    sent.
    After new partitions are added, the cache on the producer has not populated
    yet hence it will throw this exception. The producer will then try to
    refresh its cache by asking the brokers "who are the leaders of these
    new
    partitions that I do not know of before". The brokers at the beginning also
    do not know this information, and will only get this information from
    controller which will only propagation the leader information after the
    leader elections have all been finished.

    If you set num.retries to 3 then it is possible that producer gives up too
    soon before the leader info ever propagated to producers, hence to
    producers also. Could you try to increase producer.num.retries and see
    if
    the producer can eventually succeed in re-trying?

    Guozhang


    On Tue, Aug 27, 2013 at 8:53 AM, Rajasekar Elango <
    rel...@...com
    wrote:
    Hello everyone,

    We recently increased number of partitions from 4 to 16 and after
    that
    console producer mostly fails with LeaderNotAvailableException and
    exits
    after 3 tries:

    Here is last few lines of console producer log:

    No partition metadata for topic test-41 due to
    kafka.common.LeaderNotAvailableException}] for topic [test-41]: class
    kafka.common.LeaderNotAvailableException
    (kafka.producer.BrokerPartitionInfo)
    [2013-08-27 08:29:30,271] ERROR Failed to collate messages by topic,
    partition due to: Failed to fetch topic metadata for topic: test-41
    (kafka.producer.async.DefaultEventHandler)
    [2013-08-27 08:29:30,271] INFO Back off for 100 ms before retrying
    send.
    Remaining retries = 0 (kafka.producer.async.DefaultEventHandler)
    [2013-08-27 08:29:30,372] INFO Secure sockets for data transfer is enabled
    (kafka.producer.SyncProducerConfig)
    [2013-08-27 08:29:30,372] INFO Fetching metadata from broker
    id:0,host:localhost,port:6667,secure:true with correlation id 8 for 1
    topic(s) Set(test-41) (kafka.client.ClientUtils$)
    [2013-08-27 08:29:30,373] INFO begin ssl handshake for localhost/
    127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
    [2013-08-27 08:29:30,375] INFO finished ssl handshake for localhost/
    127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
    [2013-08-27 08:29:30,375] INFO Connected to localhost:6667:true for
    producing (kafka.producer.SyncProducer)
    [2013-08-27 08:29:30,380] INFO Disconnecting from localhost:6667:true
    (kafka.producer.SyncProducer)
    [2013-08-27 08:29:30,381] INFO Secure sockets for data transfer is enabled
    (kafka.producer.SyncProducerConfig)
    [2013-08-27 08:29:30,381] ERROR Failed to send requests for topics test-41
    with correlation ids in [0,8]
    (kafka.producer.async.DefaultEventHandler)
    kafka.common.FailedToSendMessageException: Failed to send messages
    after
    3
    tries.
    at
    kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.Producer.send(Producer.scala:74)
    at
    kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:168)
    at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)
    [2013-08-27 08:29:30,383] INFO Shutting down producer
    (kafka.producer.Producer)
    [2013-08-27 08:29:30,384] INFO Closing all sync producers
    (kafka.producer.ProducerPool)


    Also, this happens only for new topics (we have auto.create.topic set
    to
    true), If retry sending message to existing topic, it works fine. Is there
    any tweaking I need to do to broker or to producer to scale based on number
    of partitions?

    --
    Thanks in advance for help,
    Raja.


    --
    -- Guozhang


    --
    Thanks,
    Raja.


    --
    -- Guozhang
  • Rajasekar Elango at Aug 27, 2013 at 9:39 pm
    Thanks Guozhang, Changing max retry to 5 worked. Since I am changing
    console producer code, I can also submit patch adding both
    message.send.max.retries
    and retry.backoff.ms to console producer. Can you let me know process for
    submitting patch?

    Thanks,
    Raja.

    On Tue, Aug 27, 2013 at 4:03 PM, Guozhang Wang wrote:

    Hello Rajasekar,

    The remove fetcher log entry is normal under addition of partitions, since
    they indicate that some leader changes have happened so brokers are closing
    the fetchers to the old leaders.

    I just realized that the console Producer does not have the
    message.send.max.retries options yet. Could you file a JIRA for this and I
    will followup to add this option? As for now you can hard modify the
    default value from 3 to a larger number.

    Guozhang


    On Tue, Aug 27, 2013 at 12:37 PM, Rajasekar Elango
    wrote:
    Thanks Neha & Guozhang,

    When I ran StateChangeLogMerger, I am seeing this message repeated 16 times
    for each partition:

    [2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker 1] Removing
    fetcher for partition [test-60,13] (kafka.server.ReplicaFetcherManager)
    [2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created log for
    partition [test-60,13] in
    /home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data.
    (kafka.log.LogManager)

    I am also seeing .log and .index files created for this topic in data dir.
    Also list topic command shows leaders, replicas and isrs for all
    partitions. Do you still think increasing num of retries would help or is
    it some other issue..? Also console Producer doesn't seem to have option
    to set num of retries. Is there a way to configure num of retries for
    console producer ?

    Thanks,
    Raja.


    On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede <neh...@...com
    wrote:
    As Guozhang said, your producer might give up sooner than the leader
    election completes for the new topic. To confirm if your producer gave
    up
    too soon, you can run the state change log merge tool for this topic
    and
    see when the leader election finished for all partitions

    ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger --logs <location
    to all state change logs> --topic <topic>

    Note that this tool requires you to give the state change logs for all
    brokers in the cluster.


    Thanks,
    Neha


    On Tue, Aug 27, 2013 at 9:45 AM, Guozhang Wang <wan...@...com>
    wrote:
    Hello Rajasekar,

    In 0.8 producers keep a cache of the partition -> leader_broker_id
    map
    which is used to determine to which brokers should the messages be
    sent.
    After new partitions are added, the cache on the producer has not populated
    yet hence it will throw this exception. The producer will then try to
    refresh its cache by asking the brokers "who are the leaders of these
    new
    partitions that I do not know of before". The brokers at the
    beginning
    also
    do not know this information, and will only get this information from
    controller which will only propagation the leader information after
    the
    leader elections have all been finished.

    If you set num.retries to 3 then it is possible that producer gives
    up
    too
    soon before the leader info ever propagated to producers, hence to
    producers also. Could you try to increase producer.num.retries and
    see
    if
    the producer can eventually succeed in re-trying?

    Guozhang


    On Tue, Aug 27, 2013 at 8:53 AM, Rajasekar Elango <
    rel...@...com
    wrote:
    Hello everyone,

    We recently increased number of partitions from 4 to 16 and after
    that
    console producer mostly fails with LeaderNotAvailableException and
    exits
    after 3 tries:

    Here is last few lines of console producer log:

    No partition metadata for topic test-41 due to
    kafka.common.LeaderNotAvailableException}] for topic [test-41]:
    class
    kafka.common.LeaderNotAvailableException
    (kafka.producer.BrokerPartitionInfo)
    [2013-08-27 08:29:30,271] ERROR Failed to collate messages by
    topic,
    partition due to: Failed to fetch topic metadata for topic: test-41
    (kafka.producer.async.DefaultEventHandler)
    [2013-08-27 08:29:30,271] INFO Back off for 100 ms before retrying
    send.
    Remaining retries = 0 (kafka.producer.async.DefaultEventHandler)
    [2013-08-27 08:29:30,372] INFO Secure sockets for data transfer is enabled
    (kafka.producer.SyncProducerConfig)
    [2013-08-27 08:29:30,372] INFO Fetching metadata from broker
    id:0,host:localhost,port:6667,secure:true with correlation id 8
    for 1
    topic(s) Set(test-41) (kafka.client.ClientUtils$)
    [2013-08-27 08:29:30,373] INFO begin ssl handshake for localhost/
    127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
    [2013-08-27 08:29:30,375] INFO finished ssl handshake for
    localhost/
    127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
    [2013-08-27 08:29:30,375] INFO Connected to localhost:6667:true for
    producing (kafka.producer.SyncProducer)
    [2013-08-27 08:29:30,380] INFO Disconnecting from
    localhost:6667:true
    (kafka.producer.SyncProducer)
    [2013-08-27 08:29:30,381] INFO Secure sockets for data transfer is enabled
    (kafka.producer.SyncProducerConfig)
    [2013-08-27 08:29:30,381] ERROR Failed to send requests for topics test-41
    with correlation ids in [0,8]
    (kafka.producer.async.DefaultEventHandler)
    kafka.common.FailedToSendMessageException: Failed to send messages
    after
    3
    tries.
    at
    kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.Producer.send(Producer.scala:74)
    at
    kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:168)
    at
    kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)
    [2013-08-27 08:29:30,383] INFO Shutting down producer
    (kafka.producer.Producer)
    [2013-08-27 08:29:30,384] INFO Closing all sync producers
    (kafka.producer.ProducerPool)


    Also, this happens only for new topics (we have auto.create.topic
    set
    to
    true), If retry sending message to existing topic, it works fine.
    Is
    there
    any tweaking I need to do to broker or to producer to scale based
    on
    number
    of partitions?

    --
    Thanks in advance for help,
    Raja.


    --
    -- Guozhang


    --
    Thanks,
    Raja.


    --
    -- Guozhang


    --
    Thanks,
    Raja.
  • Guozhang Wang at Aug 28, 2013 at 12:33 am
    Cool! You can follow the process of creating a JIRA here:

    http://kafka.apache.org/contributing.html

    And submit patch here:

    https://cwiki.apache.org/confluence/display/KAFKA/Git+Workflow

    It will be great if you can also add an entry for this issue in FAQ since I
    think this is a common question:

    https://cwiki.apache.org/confluence/display/KAFKA/FAQ

    Guozhang

    On Tue, Aug 27, 2013 at 2:38 PM, Rajasekar Elango wrote:

    Thanks Guozhang, Changing max retry to 5 worked. Since I am changing
    console producer code, I can also submit patch adding both
    message.send.max.retries
    and retry.backoff.ms to console producer. Can you let me know process for
    submitting patch?

    Thanks,
    Raja.

    On Tue, Aug 27, 2013 at 4:03 PM, Guozhang Wang wrote:

    Hello Rajasekar,

    The remove fetcher log entry is normal under addition of partitions, since
    they indicate that some leader changes have happened so brokers are closing
    the fetchers to the old leaders.

    I just realized that the console Producer does not have the
    message.send.max.retries options yet. Could you file a JIRA for this and I
    will followup to add this option? As for now you can hard modify the
    default value from 3 to a larger number.

    Guozhang


    On Tue, Aug 27, 2013 at 12:37 PM, Rajasekar Elango
    wrote:
    Thanks Neha & Guozhang,

    When I ran StateChangeLogMerger, I am seeing this message repeated 16 times
    for each partition:

    [2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker 1] Removing
    fetcher for partition [test-60,13] (kafka.server.ReplicaFetcherManager)
    [2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created log
    for
    partition [test-60,13] in
    /home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data.
    (kafka.log.LogManager)

    I am also seeing .log and .index files created for this topic in data dir.
    Also list topic command shows leaders, replicas and isrs for all
    partitions. Do you still think increasing num of retries would help or
    is
    it some other issue..? Also console Producer doesn't seem to have
    option
    to set num of retries. Is there a way to configure num of retries for
    console producer ?

    Thanks,
    Raja.


    On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede <
    neh...@...com
    wrote:
    As Guozhang said, your producer might give up sooner than the leader
    election completes for the new topic. To confirm if your producer
    gave
    up
    too soon, you can run the state change log merge tool for this topic
    and
    see when the leader election finished for all partitions

    ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger --logs <location
    to all state change logs> --topic <topic>

    Note that this tool requires you to give the state change logs for
    all
    brokers in the cluster.


    Thanks,
    Neha


    On Tue, Aug 27, 2013 at 9:45 AM, Guozhang Wang <wan...@...com>
    wrote:
    Hello Rajasekar,

    In 0.8 producers keep a cache of the partition -> leader_broker_id
    map
    which is used to determine to which brokers should the messages be
    sent.
    After new partitions are added, the cache on the producer has not populated
    yet hence it will throw this exception. The producer will then try
    to
    refresh its cache by asking the brokers "who are the leaders of
    these
    new
    partitions that I do not know of before". The brokers at the
    beginning
    also
    do not know this information, and will only get this information
    from
    controller which will only propagation the leader information after
    the
    leader elections have all been finished.

    If you set num.retries to 3 then it is possible that producer gives
    up
    too
    soon before the leader info ever propagated to producers, hence to
    producers also. Could you try to increase producer.num.retries and
    see
    if
    the producer can eventually succeed in re-trying?

    Guozhang


    On Tue, Aug 27, 2013 at 8:53 AM, Rajasekar Elango <
    rel...@...com
    wrote:
    Hello everyone,

    We recently increased number of partitions from 4 to 16 and after
    that
    console producer mostly fails with LeaderNotAvailableException
    and
    exits
    after 3 tries:

    Here is last few lines of console producer log:

    No partition metadata for topic test-41 due to
    kafka.common.LeaderNotAvailableException}] for topic [test-41]:
    class
    kafka.common.LeaderNotAvailableException
    (kafka.producer.BrokerPartitionInfo)
    [2013-08-27 08:29:30,271] ERROR Failed to collate messages by
    topic,
    partition due to: Failed to fetch topic metadata for topic:
    test-41
    (kafka.producer.async.DefaultEventHandler)
    [2013-08-27 08:29:30,271] INFO Back off for 100 ms before
    retrying
    send.
    Remaining retries = 0 (kafka.producer.async.DefaultEventHandler)
    [2013-08-27 08:29:30,372] INFO Secure sockets for data transfer
    is
    enabled
    (kafka.producer.SyncProducerConfig)
    [2013-08-27 08:29:30,372] INFO Fetching metadata from broker
    id:0,host:localhost,port:6667,secure:true with correlation id 8
    for 1
    topic(s) Set(test-41) (kafka.client.ClientUtils$)
    [2013-08-27 08:29:30,373] INFO begin ssl handshake for localhost/
    127.0.0.1:6667//127.0.0.1:36640(kafka.security.SSLSocketChannel)
    [2013-08-27 08:29:30,375] INFO finished ssl handshake for
    localhost/
    127.0.0.1:6667//127.0.0.1:36640(kafka.security.SSLSocketChannel)
    [2013-08-27 08:29:30,375] INFO Connected to localhost:6667:true
    for
    producing (kafka.producer.SyncProducer)
    [2013-08-27 08:29:30,380] INFO Disconnecting from
    localhost:6667:true
    (kafka.producer.SyncProducer)
    [2013-08-27 08:29:30,381] INFO Secure sockets for data transfer
    is
    enabled
    (kafka.producer.SyncProducerConfig)
    [2013-08-27 08:29:30,381] ERROR Failed to send requests for
    topics
    test-41
    with correlation ids in [0,8]
    (kafka.producer.async.DefaultEventHandler)
    kafka.common.FailedToSendMessageException: Failed to send
    messages
    after
    3
    tries.
    at
    kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.Producer.send(Producer.scala:74)
    at
    kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:168)
    at
    kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)
    [2013-08-27 08:29:30,383] INFO Shutting down producer
    (kafka.producer.Producer)
    [2013-08-27 08:29:30,384] INFO Closing all sync producers
    (kafka.producer.ProducerPool)


    Also, this happens only for new topics (we have auto.create.topic
    set
    to
    true), If retry sending message to existing topic, it works fine.
    Is
    there
    any tweaking I need to do to broker or to producer to scale based
    on
    number
    of partitions?

    --
    Thanks in advance for help,
    Raja.


    --
    -- Guozhang


    --
    Thanks,
    Raja.


    --
    -- Guozhang


    --
    Thanks,
    Raja.


    --
    -- Guozhang
  • Rajasekar Elango at Aug 28, 2013 at 3:37 pm
    Guozhang ,

    *The documentation says I need to work off of trunk. Can you confirm If I
    should be working in trunk or different branch.*
    *
    *
    *Thanks,*
    *Raja.*

    On Tue, Aug 27, 2013 at 8:33 PM, Guozhang Wang wrote:

    Cool! You can follow the process of creating a JIRA here:

    http://kafka.apache.org/contributing.html

    And submit patch here:

    https://cwiki.apache.org/confluence/display/KAFKA/Git+Workflow

    It will be great if you can also add an entry for this issue in FAQ since I
    think this is a common question:

    https://cwiki.apache.org/confluence/display/KAFKA/FAQ

    Guozhang


    On Tue, Aug 27, 2013 at 2:38 PM, Rajasekar Elango <rel...@...com
    wrote:
    Thanks Guozhang, Changing max retry to 5 worked. Since I am changing
    console producer code, I can also submit patch adding both
    message.send.max.retries
    and retry.backoff.ms to console producer. Can you let me know process for
    submitting patch?

    Thanks,
    Raja.

    On Tue, Aug 27, 2013 at 4:03 PM, Guozhang Wang wrote:

    Hello Rajasekar,

    The remove fetcher log entry is normal under addition of partitions, since
    they indicate that some leader changes have happened so brokers are closing
    the fetchers to the old leaders.

    I just realized that the console Producer does not have the
    message.send.max.retries options yet. Could you file a JIRA for this
    and
    I
    will followup to add this option? As for now you can hard modify the
    default value from 3 to a larger number.

    Guozhang


    On Tue, Aug 27, 2013 at 12:37 PM, Rajasekar Elango
    wrote:
    Thanks Neha & Guozhang,

    When I ran StateChangeLogMerger, I am seeing this message repeated 16 times
    for each partition:

    [2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker 1] Removing
    fetcher for partition [test-60,13]
    (kafka.server.ReplicaFetcherManager)
    [2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created log
    for
    partition [test-60,13] in
    /home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data.
    (kafka.log.LogManager)

    I am also seeing .log and .index files created for this topic in data dir.
    Also list topic command shows leaders, replicas and isrs for all
    partitions. Do you still think increasing num of retries would help
    or
    is
    it some other issue..? Also console Producer doesn't seem to have
    option
    to set num of retries. Is there a way to configure num of retries for
    console producer ?

    Thanks,
    Raja.


    On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede <
    neh...@...com
    wrote:
    As Guozhang said, your producer might give up sooner than the
    leader
    election completes for the new topic. To confirm if your producer
    gave
    up
    too soon, you can run the state change log merge tool for this
    topic
    and
    see when the leader election finished for all partitions

    ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger --logs <location
    to all state change logs> --topic <topic>

    Note that this tool requires you to give the state change logs for
    all
    brokers in the cluster.


    Thanks,
    Neha


    On Tue, Aug 27, 2013 at 9:45 AM, Guozhang Wang <wan...@...com
    wrote:
    Hello Rajasekar,

    In 0.8 producers keep a cache of the partition ->
    leader_broker_id
    map
    which is used to determine to which brokers should the messages
    be
    sent.
    After new partitions are added, the cache on the producer has not populated
    yet hence it will throw this exception. The producer will then
    try
    to
    refresh its cache by asking the brokers "who are the leaders of
    these
    new
    partitions that I do not know of before". The brokers at the
    beginning
    also
    do not know this information, and will only get this information
    from
    controller which will only propagation the leader information
    after
    the
    leader elections have all been finished.

    If you set num.retries to 3 then it is possible that producer
    gives
    up
    too
    soon before the leader info ever propagated to producers, hence
    to
    producers also. Could you try to increase producer.num.retries
    and
    see
    if
    the producer can eventually succeed in re-trying?

    Guozhang


    On Tue, Aug 27, 2013 at 8:53 AM, Rajasekar Elango <
    rel...@...com
    wrote:
    Hello everyone,

    We recently increased number of partitions from 4 to 16 and
    after
    that
    console producer mostly fails with LeaderNotAvailableException
    and
    exits
    after 3 tries:

    Here is last few lines of console producer log:

    No partition metadata for topic test-41 due to
    kafka.common.LeaderNotAvailableException}] for topic [test-41]:
    class
    kafka.common.LeaderNotAvailableException
    (kafka.producer.BrokerPartitionInfo)
    [2013-08-27 08:29:30,271] ERROR Failed to collate messages by
    topic,
    partition due to: Failed to fetch topic metadata for topic:
    test-41
    (kafka.producer.async.DefaultEventHandler)
    [2013-08-27 08:29:30,271] INFO Back off for 100 ms before
    retrying
    send.
    Remaining retries = 0
    (kafka.producer.async.DefaultEventHandler)
    [2013-08-27 08:29:30,372] INFO Secure sockets for data transfer
    is
    enabled
    (kafka.producer.SyncProducerConfig)
    [2013-08-27 08:29:30,372] INFO Fetching metadata from broker
    id:0,host:localhost,port:6667,secure:true with correlation id 8
    for 1
    topic(s) Set(test-41) (kafka.client.ClientUtils$)
    [2013-08-27 08:29:30,373] INFO begin ssl handshake for
    localhost/
    127.0.0.1:6667//127.0.0.1:36640(kafka.security.SSLSocketChannel)
    [2013-08-27 08:29:30,375] INFO finished ssl handshake for
    localhost/
    127.0.0.1:6667//127.0.0.1:36640(kafka.security.SSLSocketChannel)
    [2013-08-27 08:29:30,375] INFO Connected to localhost:6667:true
    for
    producing (kafka.producer.SyncProducer)
    [2013-08-27 08:29:30,380] INFO Disconnecting from
    localhost:6667:true
    (kafka.producer.SyncProducer)
    [2013-08-27 08:29:30,381] INFO Secure sockets for data transfer
    is
    enabled
    (kafka.producer.SyncProducerConfig)
    [2013-08-27 08:29:30,381] ERROR Failed to send requests for
    topics
    test-41
    with correlation ids in [0,8]
    (kafka.producer.async.DefaultEventHandler)
    kafka.common.FailedToSendMessageException: Failed to send
    messages
    after
    3
    tries.
    at
    kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.Producer.send(Producer.scala:74)
    at
    kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:168)
    at
    kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)
    [2013-08-27 08:29:30,383] INFO Shutting down producer
    (kafka.producer.Producer)
    [2013-08-27 08:29:30,384] INFO Closing all sync producers
    (kafka.producer.ProducerPool)


    Also, this happens only for new topics (we have
    auto.create.topic
    set
    to
    true), If retry sending message to existing topic, it works
    fine.
    Is
    there
    any tweaking I need to do to broker or to producer to scale
    based
    on
    number
    of partitions?

    --
    Thanks in advance for help,
    Raja.


    --
    -- Guozhang


    --
    Thanks,
    Raja.


    --
    -- Guozhang


    --
    Thanks,
    Raja.


    --
    -- Guozhang


    --
    Thanks,
    Raja.
  • Neha Narkhede at Aug 28, 2013 at 4:50 pm
    Rajasekar,

    We are trying to minimize the number of patches in 0.8 to critical bug
    fixes or broken tooling. If the patch involves significant code changes, we
    would encourage taking it on trunk. If you want to just fix the console
    producer to take the retry argument, I would think it is small enough to
    consider taking it on 0.8 branch since it affects the usability of the
    console producer.

    Thanks,
    Neha

    On Wed, Aug 28, 2013 at 8:36 AM, Rajasekar Elango wrote:

    Guozhang ,

    *The documentation says I need to work off of trunk. Can you confirm If I
    should be working in trunk or different branch.*
    *
    *
    *Thanks,*
    *Raja.*

    On Tue, Aug 27, 2013 at 8:33 PM, Guozhang Wang wrote:

    Cool! You can follow the process of creating a JIRA here:

    http://kafka.apache.org/contributing.html

    And submit patch here:

    https://cwiki.apache.org/confluence/display/KAFKA/Git+Workflow

    It will be great if you can also add an entry for this issue in FAQ since I
    think this is a common question:

    https://cwiki.apache.org/confluence/display/KAFKA/FAQ

    Guozhang


    On Tue, Aug 27, 2013 at 2:38 PM, Rajasekar Elango <
    rel...@...com
    wrote:
    Thanks Guozhang, Changing max retry to 5 worked. Since I am changing
    console producer code, I can also submit patch adding both
    message.send.max.retries
    and retry.backoff.ms to console producer. Can you let me know process for
    submitting patch?

    Thanks,
    Raja.


    On Tue, Aug 27, 2013 at 4:03 PM, Guozhang Wang <wan...@...com>
    wrote:
    Hello Rajasekar,

    The remove fetcher log entry is normal under addition of partitions, since
    they indicate that some leader changes have happened so brokers are closing
    the fetchers to the old leaders.

    I just realized that the console Producer does not have the
    message.send.max.retries options yet. Could you file a JIRA for this
    and
    I
    will followup to add this option? As for now you can hard modify the
    default value from 3 to a larger number.

    Guozhang


    On Tue, Aug 27, 2013 at 12:37 PM, Rajasekar Elango
    wrote:
    Thanks Neha & Guozhang,

    When I ran StateChangeLogMerger, I am seeing this message repeated
    16
    times
    for each partition:

    [2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker 1] Removing
    fetcher for partition [test-60,13]
    (kafka.server.ReplicaFetcherManager)
    [2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created
    log
    for
    partition [test-60,13] in
    /home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data.
    (kafka.log.LogManager)

    I am also seeing .log and .index files created for this topic in
    data
    dir.
    Also list topic command shows leaders, replicas and isrs for all
    partitions. Do you still think increasing num of retries would help
    or
    is
    it some other issue..? Also console Producer doesn't seem to have
    option
    to set num of retries. Is there a way to configure num of retries
    for
    console producer ?

    Thanks,
    Raja.


    On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede <
    neh...@...com
    wrote:
    As Guozhang said, your producer might give up sooner than the
    leader
    election completes for the new topic. To confirm if your producer
    gave
    up
    too soon, you can run the state change log merge tool for this
    topic
    and
    see when the leader election finished for all partitions

    ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger --logs <location
    to all state change logs> --topic <topic>

    Note that this tool requires you to give the state change logs
    for
    all
    brokers in the cluster.


    Thanks,
    Neha


    On Tue, Aug 27, 2013 at 9:45 AM, Guozhang Wang <
    wan...@...com
    wrote:
    Hello Rajasekar,

    In 0.8 producers keep a cache of the partition ->
    leader_broker_id
    map
    which is used to determine to which brokers should the messages
    be
    sent.
    After new partitions are added, the cache on the producer has
    not
    populated
    yet hence it will throw this exception. The producer will then
    try
    to
    refresh its cache by asking the brokers "who are the leaders of
    these
    new
    partitions that I do not know of before". The brokers at the
    beginning
    also
    do not know this information, and will only get this
    information
    from
    controller which will only propagation the leader information
    after
    the
    leader elections have all been finished.

    If you set num.retries to 3 then it is possible that producer
    gives
    up
    too
    soon before the leader info ever propagated to producers, hence
    to
    producers also. Could you try to increase producer.num.retries
    and
    see
    if
    the producer can eventually succeed in re-trying?

    Guozhang


    On Tue, Aug 27, 2013 at 8:53 AM, Rajasekar Elango <
    rel...@...com
    wrote:
    Hello everyone,

    We recently increased number of partitions from 4 to 16 and
    after
    that
    console producer mostly fails with
    LeaderNotAvailableException
    and
    exits
    after 3 tries:

    Here is last few lines of console producer log:

    No partition metadata for topic test-41 due to
    kafka.common.LeaderNotAvailableException}] for topic
    [test-41]:
    class
    kafka.common.LeaderNotAvailableException
    (kafka.producer.BrokerPartitionInfo)
    [2013-08-27 08:29:30,271] ERROR Failed to collate messages by
    topic,
    partition due to: Failed to fetch topic metadata for topic:
    test-41
    (kafka.producer.async.DefaultEventHandler)
    [2013-08-27 08:29:30,271] INFO Back off for 100 ms before
    retrying
    send.
    Remaining retries = 0
    (kafka.producer.async.DefaultEventHandler)
    [2013-08-27 08:29:30,372] INFO Secure sockets for data
    transfer
    is
    enabled
    (kafka.producer.SyncProducerConfig)
    [2013-08-27 08:29:30,372] INFO Fetching metadata from broker
    id:0,host:localhost,port:6667,secure:true with correlation
    id 8
    for 1
    topic(s) Set(test-41) (kafka.client.ClientUtils$)
    [2013-08-27 08:29:30,373] INFO begin ssl handshake for
    localhost/
    127.0.0.1:6667//127.0.0.1:36640(kafka.security.SSLSocketChannel)
    [2013-08-27 08:29:30,375] INFO finished ssl handshake for
    localhost/
    127.0.0.1:6667//127.0.0.1:36640(kafka.security.SSLSocketChannel)
    [2013-08-27 08:29:30,375] INFO Connected to
    localhost:6667:true
    for
    producing (kafka.producer.SyncProducer)
    [2013-08-27 08:29:30,380] INFO Disconnecting from
    localhost:6667:true
    (kafka.producer.SyncProducer)
    [2013-08-27 08:29:30,381] INFO Secure sockets for data
    transfer
    is
    enabled
    (kafka.producer.SyncProducerConfig)
    [2013-08-27 08:29:30,381] ERROR Failed to send requests for
    topics
    test-41
    with correlation ids in [0,8]
    (kafka.producer.async.DefaultEventHandler)
    kafka.common.FailedToSendMessageException: Failed to send
    messages
    after
    3
    tries.
    at
    kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.Producer.send(Producer.scala:74)
    at
    kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:168)
    at
    kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)
    [2013-08-27 08:29:30,383] INFO Shutting down producer
    (kafka.producer.Producer)
    [2013-08-27 08:29:30,384] INFO Closing all sync producers
    (kafka.producer.ProducerPool)


    Also, this happens only for new topics (we have
    auto.create.topic
    set
    to
    true), If retry sending message to existing topic, it works
    fine.
    Is
    there
    any tweaking I need to do to broker or to producer to scale
    based
    on
    number
    of partitions?

    --
    Thanks in advance for help,
    Raja.


    --
    -- Guozhang


    --
    Thanks,
    Raja.


    --
    -- Guozhang


    --
    Thanks,
    Raja.


    --
    -- Guozhang


    --
    Thanks,
    Raja.
  • Rajasekar Elango at Aug 28, 2013 at 4:58 pm
    Thanks, This is small fix to ConsoleProducer.scala only. Will use 0.8
    branch.

    Thanks,
    Raja.

    On Wed, Aug 28, 2013 at 12:49 PM, Neha Narkhede wrote:

    Rajasekar,

    We are trying to minimize the number of patches in 0.8 to critical bug
    fixes or broken tooling. If the patch involves significant code changes, we
    would encourage taking it on trunk. If you want to just fix the console
    producer to take the retry argument, I would think it is small enough to
    consider taking it on 0.8 branch since it affects the usability of the
    console producer.

    Thanks,
    Neha


    On Wed, Aug 28, 2013 at 8:36 AM, Rajasekar Elango <rel...@...com
    wrote:
    Guozhang ,

    *The documentation says I need to work off of trunk. Can you confirm If I
    should be working in trunk or different branch.*
    *
    *
    *Thanks,*
    *Raja.*

    On Tue, Aug 27, 2013 at 8:33 PM, Guozhang Wang wrote:

    Cool! You can follow the process of creating a JIRA here:

    http://kafka.apache.org/contributing.html

    And submit patch here:

    https://cwiki.apache.org/confluence/display/KAFKA/Git+Workflow

    It will be great if you can also add an entry for this issue in FAQ since I
    think this is a common question:

    https://cwiki.apache.org/confluence/display/KAFKA/FAQ

    Guozhang


    On Tue, Aug 27, 2013 at 2:38 PM, Rajasekar Elango <
    rel...@...com
    wrote:
    Thanks Guozhang, Changing max retry to 5 worked. Since I am changing
    console producer code, I can also submit patch adding both
    message.send.max.retries
    and retry.backoff.ms to console producer. Can you let me know
    process
    for
    submitting patch?

    Thanks,
    Raja.


    On Tue, Aug 27, 2013 at 4:03 PM, Guozhang Wang <wan...@...com>
    wrote:
    Hello Rajasekar,

    The remove fetcher log entry is normal under addition of
    partitions,
    since
    they indicate that some leader changes have happened so brokers are closing
    the fetchers to the old leaders.

    I just realized that the console Producer does not have the
    message.send.max.retries options yet. Could you file a JIRA for
    this
    and
    I
    will followup to add this option? As for now you can hard modify
    the
    default value from 3 to a larger number.

    Guozhang


    On Tue, Aug 27, 2013 at 12:37 PM, Rajasekar Elango
    wrote:
    Thanks Neha & Guozhang,

    When I ran StateChangeLogMerger, I am seeing this message
    repeated
    16
    times
    for each partition:

    [2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker
    1]
    Removing
    fetcher for partition [test-60,13]
    (kafka.server.ReplicaFetcherManager)
    [2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created
    log
    for
    partition [test-60,13] in
    /home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data.
    (kafka.log.LogManager)

    I am also seeing .log and .index files created for this topic in
    data
    dir.
    Also list topic command shows leaders, replicas and isrs for all
    partitions. Do you still think increasing num of retries would
    help
    or
    is
    it some other issue..? Also console Producer doesn't seem to
    have
    option
    to set num of retries. Is there a way to configure num of retries
    for
    console producer ?

    Thanks,
    Raja.


    On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede <
    neh...@...com
    wrote:
    As Guozhang said, your producer might give up sooner than the
    leader
    election completes for the new topic. To confirm if your
    producer
    gave
    up
    too soon, you can run the state change log merge tool for this
    topic
    and
    see when the leader election finished for all partitions

    ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger
    --logs
    <location
    to all state change logs> --topic <topic>

    Note that this tool requires you to give the state change logs
    for
    all
    brokers in the cluster.


    Thanks,
    Neha


    On Tue, Aug 27, 2013 at 9:45 AM, Guozhang Wang <
    wan...@...com
    wrote:
    Hello Rajasekar,

    In 0.8 producers keep a cache of the partition ->
    leader_broker_id
    map
    which is used to determine to which brokers should the
    messages
    be
    sent.
    After new partitions are added, the cache on the producer has
    not
    populated
    yet hence it will throw this exception. The producer will
    then
    try
    to
    refresh its cache by asking the brokers "who are the leaders
    of
    these
    new
    partitions that I do not know of before". The brokers at the
    beginning
    also
    do not know this information, and will only get this
    information
    from
    controller which will only propagation the leader information
    after
    the
    leader elections have all been finished.

    If you set num.retries to 3 then it is possible that producer
    gives
    up
    too
    soon before the leader info ever propagated to producers,
    hence
    to
    producers also. Could you try to increase
    producer.num.retries
    and
    see
    if
    the producer can eventually succeed in re-trying?

    Guozhang


    On Tue, Aug 27, 2013 at 8:53 AM, Rajasekar Elango <
    rel...@...com
    wrote:
    Hello everyone,

    We recently increased number of partitions from 4 to 16 and
    after
    that
    console producer mostly fails with
    LeaderNotAvailableException
    and
    exits
    after 3 tries:

    Here is last few lines of console producer log:

    No partition metadata for topic test-41 due to
    kafka.common.LeaderNotAvailableException}] for topic
    [test-41]:
    class
    kafka.common.LeaderNotAvailableException
    (kafka.producer.BrokerPartitionInfo)
    [2013-08-27 08:29:30,271] ERROR Failed to collate messages
    by
    topic,
    partition due to: Failed to fetch topic metadata for topic:
    test-41
    (kafka.producer.async.DefaultEventHandler)
    [2013-08-27 08:29:30,271] INFO Back off for 100 ms before
    retrying
    send.
    Remaining retries = 0
    (kafka.producer.async.DefaultEventHandler)
    [2013-08-27 08:29:30,372] INFO Secure sockets for data
    transfer
    is
    enabled
    (kafka.producer.SyncProducerConfig)
    [2013-08-27 08:29:30,372] INFO Fetching metadata from
    broker
    id:0,host:localhost,port:6667,secure:true with correlation
    id 8
    for 1
    topic(s) Set(test-41) (kafka.client.ClientUtils$)
    [2013-08-27 08:29:30,373] INFO begin ssl handshake for
    localhost/
    127.0.0.1:6667//127.0.0.1:36640(kafka.security.SSLSocketChannel)
    [2013-08-27 08:29:30,375] INFO finished ssl handshake for
    localhost/
    127.0.0.1:6667//127.0.0.1:36640(kafka.security.SSLSocketChannel)
    [2013-08-27 08:29:30,375] INFO Connected to
    localhost:6667:true
    for
    producing (kafka.producer.SyncProducer)
    [2013-08-27 08:29:30,380] INFO Disconnecting from
    localhost:6667:true
    (kafka.producer.SyncProducer)
    [2013-08-27 08:29:30,381] INFO Secure sockets for data
    transfer
    is
    enabled
    (kafka.producer.SyncProducerConfig)
    [2013-08-27 08:29:30,381] ERROR Failed to send requests for
    topics
    test-41
    with correlation ids in [0,8]
    (kafka.producer.async.DefaultEventHandler)
    kafka.common.FailedToSendMessageException: Failed to send
    messages
    after
    3
    tries.
    at
    kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.Producer.send(Producer.scala:74)
    at
    kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:168)
    at
    kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)
    [2013-08-27 08:29:30,383] INFO Shutting down producer
    (kafka.producer.Producer)
    [2013-08-27 08:29:30,384] INFO Closing all sync producers
    (kafka.producer.ProducerPool)


    Also, this happens only for new topics (we have
    auto.create.topic
    set
    to
    true), If retry sending message to existing topic, it works
    fine.
    Is
    there
    any tweaking I need to do to broker or to producer to scale
    based
    on
    number
    of partitions?

    --
    Thanks in advance for help,
    Raja.


    --
    -- Guozhang


    --
    Thanks,
    Raja.


    --
    -- Guozhang


    --
    Thanks,
    Raja.


    --
    -- Guozhang


    --
    Thanks,
    Raja.


    --
    Thanks,
    Raja.
  • Guozhang Wang at Aug 28, 2013 at 5:11 pm
    I think this patch can be made in trunk. You can mark it as 0.8.1

    Guozhang
    On Wednesday, August 28, 2013, Rajasekar Elango wrote:
    Guozhang ,

    *The documentation says I need to work off of trunk. Can you confirm If I
    should be working in trunk or different branch.*
    *
    *
    *Thanks,*
    *Raja.*

    On Tue, Aug 27, 2013 at 8:33 PM, Guozhang Wang wrote:

    Cool! You can follow the process of creating a JIRA here:

    http://kafka.apache.org/contributing.html

    And submit patch here:

    https://cwiki.apache.org/confluence/display/KAFKA/Git+Workflow

    It will be great if you can also add an entry for this issue in FAQ
    since I
    think this is a common question:

    https://cwiki.apache.org/confluence/display/KAFKA/FAQ

    Guozhang


    On Tue, Aug 27, 2013 at 2:38 PM, Rajasekar Elango <rel...@...com
    wrote:
    Thanks Guozhang, Changing max retry to 5 worked. Since I am changing
    console producer code, I can also submit patch adding both
    message.send.max.retries
    and retry.backoff.ms to console producer. Can you let me know process for
    submitting patch?

    Thanks,
    Raja.


    On Tue, Aug 27, 2013 at 4:03 PM, Guozhang Wang <wan...@...com>
    wrote:
    Hello Rajasekar,

    The remove fetcher log entry is normal under addition of partitions, since
    they indicate that some leader changes have happened so brokers are closing
    the fetchers to the old leaders.

    I just realized that the console Producer does not have the
    message.send.max.retries options yet. Could you file a JIRA for this
    and
    I
    will followup to add this option? As for now you can hard modify the
    default value from 3 to a larger number.

    Guozhang


    On Tue, Aug 27, 2013 at 12:37 PM, Rajasekar Elango
    wrote:
    Thanks Neha & Guozhang,

    When I ran StateChangeLogMerger, I am seeing this message repeated
    16
    times
    for each partition:

    [2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker 1] Removing
    fetcher for partition [test-60,13]
    (kafka.server.ReplicaFetcherManager)
    [2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created
    log
    for
    partition [test-60,13] in
    /home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data.
    (kafka.log.LogManager)

    I am also seeing .log and .index files created for this topic in
    data
    dir.
    Also list topic command shows leaders, replicas and isrs for all
    partitions. Do you still think increasing num of retries would help
    or
    is
    it some other issue..? Also console Producer doesn't seem to have
    option
    to set num of retries. Is there a way to configure num of retries
    for
    console producer ?

    Thanks,
    Raja.


    On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede <
    neh...@...com
    wrote:
    As Guozhang said, your producer might give up sooner than the
    leader
    election completes for the new topic. To confirm if your producer
    gave
    up
    too soon, you can run the state--
    Thanks,
    Raja.
    --
    -- Guozhang
  • Rajasekar Elango at Aug 29, 2013 at 4:16 pm
    Created JIRA <https://issues.apache.org/jira/browse/KAFKA-1035> and
    attached patch to it. Please review.

    On Wed, Aug 28, 2013 at 1:11 PM, Guozhang Wang wrote:

    I think this patch can be made in trunk. You can mark it as 0.8.1

    Guozhang
    On Wednesday, August 28, 2013, Rajasekar Elango wrote:
    Guozhang ,

    *The documentation says I need to work off of trunk. Can you confirm If I
    should be working in trunk or different branch.*
    *
    *
    *Thanks,*
    *Raja.*

    On Tue, Aug 27, 2013 at 8:33 PM, Guozhang Wang wrote:

    Cool! You can follow the process of creating a JIRA here:

    http://kafka.apache.org/contributing.html

    And submit patch here:

    https://cwiki.apache.org/confluence/display/KAFKA/Git+Workflow

    It will be great if you can also add an entry for this issue in FAQ
    since I
    think this is a common question:

    https://cwiki.apache.org/confluence/display/KAFKA/FAQ

    Guozhang


    On Tue, Aug 27, 2013 at 2:38 PM, Rajasekar Elango <
    rel...@...com
    wrote:
    Thanks Guozhang, Changing max retry to 5 worked. Since I am changing
    console producer code, I can also submit patch adding both
    message.send.max.retries
    and retry.backoff.ms to console producer. Can you let me know process for
    submitting patch?

    Thanks,
    Raja.


    On Tue, Aug 27, 2013 at 4:03 PM, Guozhang Wang <wan...@...com>
    wrote:
    Hello Rajasekar,

    The remove fetcher log entry is normal under addition of partitions, since
    they indicate that some leader changes have happened so brokers are closing
    the fetchers to the old leaders.

    I just realized that the console Producer does not have the
    message.send.max.retries options yet. Could you file a JIRA for this
    and
    I
    will followup to add this option? As for now you can hard modify the
    default value from 3 to a larger number.

    Guozhang


    On Tue, Aug 27, 2013 at 12:37 PM, Rajasekar Elango
    wrote:
    Thanks Neha & Guozhang,

    When I ran StateChangeLogMerger, I am seeing this message repeated
    16
    times
    for each partition:

    [2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker 1] Removing
    fetcher for partition [test-60,13]
    (kafka.server.ReplicaFetcherManager)
    [2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created
    log
    for
    partition [test-60,13] in
    /home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data.
    (kafka.log.LogManager)

    I am also seeing .log and .index files created for this topic in
    data
    dir.
    Also list topic command shows leaders, replicas and isrs for all
    partitions. Do you still think increasing num of retries would
    help
    or
    is
    it some other issue..? Also console Producer doesn't seem to have
    option
    to set num of retries. Is there a way to configure num of retries
    for
    console producer ?

    Thanks,
    Raja.


    On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede <
    neh...@...com
    wrote:
    As Guozhang said, your producer might give up sooner than the
    leader
    election completes for the new topic. To confirm if your
    producer
    gave
    up
    too soon, you can run the state--
    Thanks,
    Raja.
    --
    -- Guozhang


    --
    Thanks,
    Raja.

Related Discussions

Discussion Navigation
viewthread | post