FAQ
As mentioned in previous emails, we are also working on a re-implementation
of the consumer. I would like to use this email thread to discuss the
details of the public API. I would also like us to be picky about this
public api now so it is as good as possible and we don't need to break it
in the future.

The best way to get a feel for the API is actually to take a look at the
javadoc<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>,
the hope is to get the api docs good enough so that it is self-explanatory.
You can also take a look at the configs
here<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html>

Some background info on implementation:

At a high level the primary difference in this consumer is that it removes
the distinction between the "high-level" and "low-level" consumer. The new
consumer API is non blocking and instead of returning a blocking iterator,
the consumer provides a poll() API that returns a list of records. We think
this is better compared to the blocking iterators since it effectively
decouples the threading strategy used for processing messages from the
consumer. It is worth noting that the consumer is entirely single threaded
and runs in the user thread. The advantage is that it can be easily
rewritten in less multi-threading-friendly languages. The consumer batches
data and multiplexes I/O over TCP connections to each of the brokers it
communicates with, for high throughput. The consumer also allows long poll
to reduce the end-to-end message latency for low throughput data.

The consumer provides a group management facility that supports the concept
of a group with multiple consumer instances (just like the current
consumer). This is done through a custom heartbeat and group management
protocol transparent to the user. At the same time, it allows users the
option to subscribe to a fixed set of partitions and not use group
management at all. The offset management strategy defaults to Kafka based
offset management and the API provides a way for the user to use a
customized offset store to manage the consumer's offsets.

A key difference in this consumer also is the fact that it does not depend
on zookeeper at all.

More details about the new consumer design are
here<https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design>

Please take a look at the new
API<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>and
give us any thoughts you may have.

Thanks,
Neha

Search Discussions

  • Jay Kreps at Feb 10, 2014 at 8:20 pm
    A few items:
    1. ConsumerRebalanceCallback
        a. onPartitionsRevoked would be a better name.
        b. We should discuss the possibility of splitting this into two
    interfaces. The motivation would be that in Java 8 single method interfaces
    can directly take methods which might be more intuitive.
        c. If we stick with a single interface I would prefer the name
    RebalanceCallback as its more concise
    2. Should subscribe(String topic, int partition) should be subscribe(String
    topic, int...partition)?
    3. Is lastCommittedOffset call just a local access? If so it would be more
    convenient not to batch it.
    4. How are we going to handle the earliest/latest starting position
    functionality we currently have. Does that remain a config?
    5. Do we need to expose the general ability to get known positions from the
    log? E.g. the functionality in the OffsetRequest...? That would make the
    ability to change position a little easier.
    6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit
    unit)? Is it Long because it allows null? If so should we just add a poll()
    that polls indefinitely?
    7. I recommend we remove the boolean parameter from commit as it is really
    hard to read code that has boolean parameters without named arguments. Can
    we make it something like commit(...) and commitAsync(...)?
    8. What about the common case where you just want to commit the current
    position for all partitions?
    9. How do you unsubscribe?
    10. You say in a few places that positions() only impacts the starting
    position, but surely that isn't the case, right? Surely it controls the
    fetch position for that partition and can be called at any time? Otherwise
    it is a pretty weird api, right?
    11. How do I get my current position? Not the committed position but the
    offset of the next message that will be given to me?

    One thing that I really found helpful for the API design was writing out
    actual code for different scenarios against the API. I think it might be
    good to do that for this too--i.e. enumerate the various use cases and code
    that use case up to see how it looks. I'm not sure if it would be useful to
    collect these kinds of scenarios from people. I know they have sporadically
    popped up on the mailing list.

    -Jay

    On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede wrote:

    As mentioned in previous emails, we are also working on a re-implementation
    of the consumer. I would like to use this email thread to discuss the
    details of the public API. I would also like us to be picky about this
    public api now so it is as good as possible and we don't need to break it
    in the future.

    The best way to get a feel for the API is actually to take a look at the
    javadoc<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    ,
    the hope is to get the api docs good enough so that it is self-explanatory.
    You can also take a look at the configs
    here<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
    Some background info on implementation:

    At a high level the primary difference in this consumer is that it removes
    the distinction between the "high-level" and "low-level" consumer. The new
    consumer API is non blocking and instead of returning a blocking iterator,
    the consumer provides a poll() API that returns a list of records. We think
    this is better compared to the blocking iterators since it effectively
    decouples the threading strategy used for processing messages from the
    consumer. It is worth noting that the consumer is entirely single threaded
    and runs in the user thread. The advantage is that it can be easily
    rewritten in less multi-threading-friendly languages. The consumer batches
    data and multiplexes I/O over TCP connections to each of the brokers it
    communicates with, for high throughput. The consumer also allows long poll
    to reduce the end-to-end message latency for low throughput data.

    The consumer provides a group management facility that supports the concept
    of a group with multiple consumer instances (just like the current
    consumer). This is done through a custom heartbeat and group management
    protocol transparent to the user. At the same time, it allows users the
    option to subscribe to a fixed set of partitions and not use group
    management at all. The offset management strategy defaults to Kafka based
    offset management and the API provides a way for the user to use a
    customized offset store to manage the consumer's offsets.

    A key difference in this consumer also is the fact that it does not depend
    on zookeeper at all.

    More details about the new consumer design are
    here<
    https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
    Please take a look at the new
    API<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    and
    give us any thoughts you may have.

    Thanks,
    Neha
  • Pradeep Gollakota at Feb 10, 2014 at 10:12 pm
    Couple of very quick thoughts.

    1. +1 about renaming commit(...) and commitAsync(...)
    2. I'd also like to extend the above for the poll() method as well. poll()
    and pollWithTimeout(long, TimeUnit)?
    3. Have you guys given any thought around how this API would be used with
    hierarchical topics?
    4. Would it make sense to add classes such as TopicId, PartitionId, etc?
    Seems like it would be easier to read code with these classes as opposed to
    string and longs.

    - Pradeep

    On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps wrote:

    A few items:
    1. ConsumerRebalanceCallback
    a. onPartitionsRevoked would be a better name.
    b. We should discuss the possibility of splitting this into two
    interfaces. The motivation would be that in Java 8 single method interfaces
    can directly take methods which might be more intuitive.
    c. If we stick with a single interface I would prefer the name
    RebalanceCallback as its more concise
    2. Should subscribe(String topic, int partition) should be subscribe(String
    topic, int...partition)?
    3. Is lastCommittedOffset call just a local access? If so it would be more
    convenient not to batch it.
    4. How are we going to handle the earliest/latest starting position
    functionality we currently have. Does that remain a config?
    5. Do we need to expose the general ability to get known positions from the
    log? E.g. the functionality in the OffsetRequest...? That would make the
    ability to change position a little easier.
    6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit
    unit)? Is it Long because it allows null? If so should we just add a poll()
    that polls indefinitely?
    7. I recommend we remove the boolean parameter from commit as it is really
    hard to read code that has boolean parameters without named arguments. Can
    we make it something like commit(...) and commitAsync(...)?
    8. What about the common case where you just want to commit the current
    position for all partitions?
    9. How do you unsubscribe?
    10. You say in a few places that positions() only impacts the starting
    position, but surely that isn't the case, right? Surely it controls the
    fetch position for that partition and can be called at any time? Otherwise
    it is a pretty weird api, right?
    11. How do I get my current position? Not the committed position but the
    offset of the next message that will be given to me?

    One thing that I really found helpful for the API design was writing out
    actual code for different scenarios against the API. I think it might be
    good to do that for this too--i.e. enumerate the various use cases and code
    that use case up to see how it looks. I'm not sure if it would be useful to
    collect these kinds of scenarios from people. I know they have sporadically
    popped up on the mailing list.

    -Jay


    On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede <neha.narkhede@gmail.com
    wrote:
    As mentioned in previous emails, we are also working on a
    re-implementation
    of the consumer. I would like to use this email thread to discuss the
    details of the public API. I would also like us to be picky about this
    public api now so it is as good as possible and we don't need to break it
    in the future.

    The best way to get a feel for the API is actually to take a look at the
    javadoc<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    ,
    the hope is to get the api docs good enough so that it is
    self-explanatory.
    You can also take a look at the configs
    here<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
    Some background info on implementation:

    At a high level the primary difference in this consumer is that it removes
    the distinction between the "high-level" and "low-level" consumer. The new
    consumer API is non blocking and instead of returning a blocking iterator,
    the consumer provides a poll() API that returns a list of records. We think
    this is better compared to the blocking iterators since it effectively
    decouples the threading strategy used for processing messages from the
    consumer. It is worth noting that the consumer is entirely single threaded
    and runs in the user thread. The advantage is that it can be easily
    rewritten in less multi-threading-friendly languages. The consumer batches
    data and multiplexes I/O over TCP connections to each of the brokers it
    communicates with, for high throughput. The consumer also allows long poll
    to reduce the end-to-end message latency for low throughput data.

    The consumer provides a group management facility that supports the concept
    of a group with multiple consumer instances (just like the current
    consumer). This is done through a custom heartbeat and group management
    protocol transparent to the user. At the same time, it allows users the
    option to subscribe to a fixed set of partitions and not use group
    management at all. The offset management strategy defaults to Kafka based
    offset management and the API provides a way for the user to use a
    customized offset store to manage the consumer's offsets.

    A key difference in this consumer also is the fact that it does not depend
    on zookeeper at all.

    More details about the new consumer design are
    here<
    https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
    Please take a look at the new
    API<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    and
    give us any thoughts you may have.

    Thanks,
    Neha
  • Neha Narkhede at Feb 10, 2014 at 11:09 pm
    Thanks for the feedback.

    Mattijs -

    - Constructors link to
    http://kafka.apache.org/documentation.html#consumerconfigs for valid
    configurations, which lists zookeeper.connect rather than
    metadata.broker.list, the value for BROKER_LIST_CONFIG in ConsumerConfig.
    Fixed it to just point to ConsumerConfig for now until we finalize the new
    configs
    - Docs for poll(long) mention consumer.commit(true), which I can't find in
    the Consumer docs. For a simple consumer setup, that call is something that
    would make a lot of sense.
    Missed changing the examples to use consumer.commit(true, offsets). The
    suggestions by Jay would change it to commit(offsets) and
    commitAsync(offsets), which will hopefully make it easier to understand
    those commit APIs.
    - Love the addition of MockConsumer, awesome for unittesting :)
    I'm not quite satisfied with what it does as of right now, but we will
    surely improve it as we start writing the consumer.

    Jay -

    1. ConsumerRebalanceCallback
         a. Makes sense. Renamed to onPartitionsRevoked
         b. Ya, it will be good to make it forward compatible with Java 8
    capabilities. We can change it to PartitionsAssignedCallback and
              PartitionsRevokedCallback or RebalanceBeginCallback and
    RebalanceEndCallback?
         c. Ya, I thought about that but then didn't name it just
    RebalanceCallback since there could be a conflict with a controller side
    rebalance callback if/when we have one. However, you can argue that at that
    time we can name it ControllerRebalanceCallback instead of polluting a user
    facing API. So agree with you here.
    2. Ya, that is a good idea. Changed to subscribe(String topic,
    int...partitions).
    3. lastCommittedOffset() is not necessarily a local access since the
    consumer can potentially ask for the last committed offsets of partitions
    that the consumer does not consume and maintain the offsets for. That's the
    reason it is batched right now.
    4. Yes, look at
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG
    5. Sure, but that is not part of the consumer API right? I think you're
    suggesting looking at OffsetRequest to see if it would do that properly?
    6. Good point. Changed to poll(long timeout, TimeUnit) and poll with a
    negative timeout will poll indefinitely?
    7. Good point. Changed to commit(...) and commitAsync(...)
    8. To commit the current position for all partitions owned by the consumer,
    you can use commit(). If you don't use group management, then
    commit(customListOfPartitions)
    9. Forgot to include unsubscribe. Done now.
    10. positions() can be called at any time and affects the next fetch on the
    next poll(). Fixed the places that said "starting fetch offsets"
    11. Can we not look that up by going through the messages returned and
    getting the offset from the ConsumerRecord?

    One thing that I really found helpful for the API design was writing out
    actual code for different scenarios against the API. I think it might be
    good to do that for this too--i.e. enumerate the various use cases and code
    that use case up to see how it looks
    The javadocs include examples for almost all possible scenarios of consumer
    usage, that I could come up with. Will add more to the javadocs as I get
    more feedback from our users. The advantage of having the examples in the
    javadoc itself is to that the usage is self explanatory to new users.

    Pradeep -

    2. Changed to poll(long, TimeUnit) and a negative value for the timeout
    would block in the poll forever until there is new data
    3. We don't have hierarchical topics support. Would you mind explaining
    what you meant?
    4. I'm not so sure that we need a class to express a topic which is a
    string and a separate class for just partition id. We do have a class for
    TopicPartition which uniquely identifies a partition of a topic

    Thanks,
    Neha

    On Mon, Feb 10, 2014 at 12:36 PM, Pradeep Gollakota wrote:

    Couple of very quick thoughts.

    1. +1 about renaming commit(...) and commitAsync(...)
    2. I'd also like to extend the above for the poll() method as well. poll()
    and pollWithTimeout(long, TimeUnit)?
    3. Have you guys given any thought around how this API would be used with
    hierarchical topics?
    4. Would it make sense to add classes such as TopicId, PartitionId, etc?
    Seems like it would be easier to read code with these classes as opposed to
    string and longs.

    - Pradeep

    On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps wrote:

    A few items:
    1. ConsumerRebalanceCallback
    a. onPartitionsRevoked would be a better name.
    b. We should discuss the possibility of splitting this into two
    interfaces. The motivation would be that in Java 8 single method
    interfaces
    can directly take methods which might be more intuitive.
    c. If we stick with a single interface I would prefer the name
    RebalanceCallback as its more concise
    2. Should subscribe(String topic, int partition) should be
    subscribe(String
    topic, int...partition)?
    3. Is lastCommittedOffset call just a local access? If so it would be more
    convenient not to batch it.
    4. How are we going to handle the earliest/latest starting position
    functionality we currently have. Does that remain a config?
    5. Do we need to expose the general ability to get known positions from the
    log? E.g. the functionality in the OffsetRequest...? That would make the
    ability to change position a little easier.
    6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit
    unit)? Is it Long because it allows null? If so should we just add a poll()
    that polls indefinitely?
    7. I recommend we remove the boolean parameter from commit as it is really
    hard to read code that has boolean parameters without named arguments. Can
    we make it something like commit(...) and commitAsync(...)?
    8. What about the common case where you just want to commit the current
    position for all partitions?
    9. How do you unsubscribe?
    10. You say in a few places that positions() only impacts the starting
    position, but surely that isn't the case, right? Surely it controls the
    fetch position for that partition and can be called at any time? Otherwise
    it is a pretty weird api, right?
    11. How do I get my current position? Not the committed position but the
    offset of the next message that will be given to me?

    One thing that I really found helpful for the API design was writing out
    actual code for different scenarios against the API. I think it might be
    good to do that for this too--i.e. enumerate the various use cases and code
    that use case up to see how it looks. I'm not sure if it would be useful to
    collect these kinds of scenarios from people. I know they have
    sporadically
    popped up on the mailing list.

    -Jay


    On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede <neha.narkhede@gmail.com
    wrote:
    As mentioned in previous emails, we are also working on a
    re-implementation
    of the consumer. I would like to use this email thread to discuss the
    details of the public API. I would also like us to be picky about this
    public api now so it is as good as possible and we don't need to break
    it
    in the future.

    The best way to get a feel for the API is actually to take a look at
    the
    javadoc<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    ,
    the hope is to get the api docs good enough so that it is
    self-explanatory.
    You can also take a look at the configs
    here<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
    Some background info on implementation:

    At a high level the primary difference in this consumer is that it removes
    the distinction between the "high-level" and "low-level" consumer. The new
    consumer API is non blocking and instead of returning a blocking iterator,
    the consumer provides a poll() API that returns a list of records. We think
    this is better compared to the blocking iterators since it effectively
    decouples the threading strategy used for processing messages from the
    consumer. It is worth noting that the consumer is entirely single threaded
    and runs in the user thread. The advantage is that it can be easily
    rewritten in less multi-threading-friendly languages. The consumer batches
    data and multiplexes I/O over TCP connections to each of the brokers it
    communicates with, for high throughput. The consumer also allows long poll
    to reduce the end-to-end message latency for low throughput data.

    The consumer provides a group management facility that supports the concept
    of a group with multiple consumer instances (just like the current
    consumer). This is done through a custom heartbeat and group management
    protocol transparent to the user. At the same time, it allows users the
    option to subscribe to a fixed set of partitions and not use group
    management at all. The offset management strategy defaults to Kafka
    based
    offset management and the API provides a way for the user to use a
    customized offset store to manage the consumer's offsets.

    A key difference in this consumer also is the fact that it does not depend
    on zookeeper at all.

    More details about the new consumer design are
    here<
    https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
    Please take a look at the new
    API<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    and
    give us any thoughts you may have.

    Thanks,
    Neha
  • Pradeep Gollakota at Feb 10, 2014 at 11:38 pm
    WRT to hierarchical topics, I'm referring to
    KAFKA-1175<https://issues.apache.org/jira/browse/KAFKA-1175>.
    I would just like to think through the implications for the Consumer API if
    and when we do implement hierarchical topics. For example, in the
    proposal<https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics#>written
    by Jay, he says that initially wildcard subscriptions are not going
    to be supported. But does that mean that they will be supported in v2? If
    that's the case, that would change the semantics of the Consumer API.

    As to having classes for Topic, PartitionId, etc. it looks like I was
    referring to the TopicPartition and TopicPartitionOffset classes (I didn't
    realize these were already there). I was only looking at the confluence
    page which shows List[(String, Int, Long)] instead of
    List[TopicParitionOffset] (as is shown in the javadoc). However, I did
    notice that we're not being consistent in the Java version. E.g. we have
    commit(TopicPartitionOffset... offsets) and
    lastCommittedOffsets(TopicPartition... partitions) on the one hand. On the
    other hand we have subscribe(String topic, int... partitions). I agree that
    creating a class for TopicId today would probably not make too much sense
    today. But with hierarchical topics, I may change my mind. This is exactly
    what was done in the HBase API in 0.96 when namespaces were added. 0.96
    HBase API introduced a class called 'TableName' to represent the namespace
    and table name.

    On Mon, Feb 10, 2014 at 3:08 PM, Neha Narkhede wrote:

    Thanks for the feedback.

    Mattijs -

    - Constructors link to
    http://kafka.apache.org/documentation.html#consumerconfigs for valid
    configurations, which lists zookeeper.connect rather than
    metadata.broker.list, the value for BROKER_LIST_CONFIG in ConsumerConfig.
    Fixed it to just point to ConsumerConfig for now until we finalize the new
    configs
    - Docs for poll(long) mention consumer.commit(true), which I can't find in
    the Consumer docs. For a simple consumer setup, that call is something that
    would make a lot of sense.
    Missed changing the examples to use consumer.commit(true, offsets). The
    suggestions by Jay would change it to commit(offsets) and
    commitAsync(offsets), which will hopefully make it easier to understand
    those commit APIs.
    - Love the addition of MockConsumer, awesome for unittesting :)
    I'm not quite satisfied with what it does as of right now, but we will
    surely improve it as we start writing the consumer.

    Jay -

    1. ConsumerRebalanceCallback
    a. Makes sense. Renamed to onPartitionsRevoked
    b. Ya, it will be good to make it forward compatible with Java 8
    capabilities. We can change it to PartitionsAssignedCallback and
    PartitionsRevokedCallback or RebalanceBeginCallback and
    RebalanceEndCallback?
    c. Ya, I thought about that but then didn't name it just
    RebalanceCallback since there could be a conflict with a controller side
    rebalance callback if/when we have one. However, you can argue that at that
    time we can name it ControllerRebalanceCallback instead of polluting a user
    facing API. So agree with you here.
    2. Ya, that is a good idea. Changed to subscribe(String topic,
    int...partitions).
    3. lastCommittedOffset() is not necessarily a local access since the
    consumer can potentially ask for the last committed offsets of partitions
    that the consumer does not consume and maintain the offsets for. That's the
    reason it is batched right now.
    4. Yes, look at

    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG
    5. Sure, but that is not part of the consumer API right? I think you're
    suggesting looking at OffsetRequest to see if it would do that properly?
    6. Good point. Changed to poll(long timeout, TimeUnit) and poll with a
    negative timeout will poll indefinitely?
    7. Good point. Changed to commit(...) and commitAsync(...)
    8. To commit the current position for all partitions owned by the consumer,
    you can use commit(). If you don't use group management, then
    commit(customListOfPartitions)
    9. Forgot to include unsubscribe. Done now.
    10. positions() can be called at any time and affects the next fetch on the
    next poll(). Fixed the places that said "starting fetch offsets"
    11. Can we not look that up by going through the messages returned and
    getting the offset from the ConsumerRecord?

    One thing that I really found helpful for the API design was writing out
    actual code for different scenarios against the API. I think it might be
    good to do that for this too--i.e. enumerate the various use cases and code
    that use case up to see how it looks
    The javadocs include examples for almost all possible scenarios of consumer
    usage, that I could come up with. Will add more to the javadocs as I get
    more feedback from our users. The advantage of having the examples in the
    javadoc itself is to that the usage is self explanatory to new users.

    Pradeep -

    2. Changed to poll(long, TimeUnit) and a negative value for the timeout
    would block in the poll forever until there is new data
    3. We don't have hierarchical topics support. Would you mind explaining
    what you meant?
    4. I'm not so sure that we need a class to express a topic which is a
    string and a separate class for just partition id. We do have a class for
    TopicPartition which uniquely identifies a partition of a topic

    Thanks,
    Neha


    On Mon, Feb 10, 2014 at 12:36 PM, Pradeep Gollakota <pradeepg26@gmail.com
    wrote:
    Couple of very quick thoughts.

    1. +1 about renaming commit(...) and commitAsync(...)
    2. I'd also like to extend the above for the poll() method as well. poll()
    and pollWithTimeout(long, TimeUnit)?
    3. Have you guys given any thought around how this API would be used with
    hierarchical topics?
    4. Would it make sense to add classes such as TopicId, PartitionId, etc?
    Seems like it would be easier to read code with these classes as opposed to
    string and longs.

    - Pradeep

    On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps wrote:

    A few items:
    1. ConsumerRebalanceCallback
    a. onPartitionsRevoked would be a better name.
    b. We should discuss the possibility of splitting this into two
    interfaces. The motivation would be that in Java 8 single method
    interfaces
    can directly take methods which might be more intuitive.
    c. If we stick with a single interface I would prefer the name
    RebalanceCallback as its more concise
    2. Should subscribe(String topic, int partition) should be
    subscribe(String
    topic, int...partition)?
    3. Is lastCommittedOffset call just a local access? If so it would be more
    convenient not to batch it.
    4. How are we going to handle the earliest/latest starting position
    functionality we currently have. Does that remain a config?
    5. Do we need to expose the general ability to get known positions from the
    log? E.g. the functionality in the OffsetRequest...? That would make
    the
    ability to change position a little easier.
    6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit
    unit)? Is it Long because it allows null? If so should we just add a poll()
    that polls indefinitely?
    7. I recommend we remove the boolean parameter from commit as it is really
    hard to read code that has boolean parameters without named arguments. Can
    we make it something like commit(...) and commitAsync(...)?
    8. What about the common case where you just want to commit the current
    position for all partitions?
    9. How do you unsubscribe?
    10. You say in a few places that positions() only impacts the starting
    position, but surely that isn't the case, right? Surely it controls the
    fetch position for that partition and can be called at any time? Otherwise
    it is a pretty weird api, right?
    11. How do I get my current position? Not the committed position but
    the
    offset of the next message that will be given to me?

    One thing that I really found helpful for the API design was writing
    out
    actual code for different scenarios against the API. I think it might
    be
    good to do that for this too--i.e. enumerate the various use cases and code
    that use case up to see how it looks. I'm not sure if it would be
    useful
    to
    collect these kinds of scenarios from people. I know they have
    sporadically
    popped up on the mailing list.

    -Jay


    On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede <
    neha.narkhede@gmail.com
    wrote:
    As mentioned in previous emails, we are also working on a
    re-implementation
    of the consumer. I would like to use this email thread to discuss the
    details of the public API. I would also like us to be picky about
    this
    public api now so it is as good as possible and we don't need to
    break
    it
    in the future.

    The best way to get a feel for the API is actually to take a look at
    the
    javadoc<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    ,
    the hope is to get the api docs good enough so that it is
    self-explanatory.
    You can also take a look at the configs
    here<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
    Some background info on implementation:

    At a high level the primary difference in this consumer is that it removes
    the distinction between the "high-level" and "low-level" consumer.
    The
    new
    consumer API is non blocking and instead of returning a blocking iterator,
    the consumer provides a poll() API that returns a list of records. We think
    this is better compared to the blocking iterators since it
    effectively
    decouples the threading strategy used for processing messages from
    the
    consumer. It is worth noting that the consumer is entirely single threaded
    and runs in the user thread. The advantage is that it can be easily
    rewritten in less multi-threading-friendly languages. The consumer batches
    data and multiplexes I/O over TCP connections to each of the brokers
    it
    communicates with, for high throughput. The consumer also allows long poll
    to reduce the end-to-end message latency for low throughput data.

    The consumer provides a group management facility that supports the concept
    of a group with multiple consumer instances (just like the current
    consumer). This is done through a custom heartbeat and group
    management
    protocol transparent to the user. At the same time, it allows users
    the
    option to subscribe to a fixed set of partitions and not use group
    management at all. The offset management strategy defaults to Kafka
    based
    offset management and the API provides a way for the user to use a
    customized offset store to manage the consumer's offsets.

    A key difference in this consumer also is the fact that it does not depend
    on zookeeper at all.

    More details about the new consumer design are
    here<
    https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
    Please take a look at the new
    API<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    and
    give us any thoughts you may have.

    Thanks,
    Neha
  • Jay Kreps at Feb 11, 2014 at 4:05 pm
    Hey Pradeep,

    That wiki is fairly old and it predated more flexible subscription
    mechanisms. In the high-level consumer you currently have wildcard
    subscription and in the new proposed interface you can actually subscribe
    based on any logic you want to create a "union" of streams. Personally I
    think this gives you everything you would want with a hierarchy and more
    actual flexibility (since you can define groupings however you want). What
    do you think?

    -Jay

    On Mon, Feb 10, 2014 at 3:37 PM, Pradeep Gollakota wrote:

    WRT to hierarchical topics, I'm referring to
    KAFKA-1175<https://issues.apache.org/jira/browse/KAFKA-1175>.
    I would just like to think through the implications for the Consumer API if
    and when we do implement hierarchical topics. For example, in the
    proposal<
    https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics#
    written
    by Jay, he says that initially wildcard subscriptions are not going
    to be supported. But does that mean that they will be supported in v2? If
    that's the case, that would change the semantics of the Consumer API.

    As to having classes for Topic, PartitionId, etc. it looks like I was
    referring to the TopicPartition and TopicPartitionOffset classes (I didn't
    realize these were already there). I was only looking at the confluence
    page which shows List[(String, Int, Long)] instead of
    List[TopicParitionOffset] (as is shown in the javadoc). However, I did
    notice that we're not being consistent in the Java version. E.g. we have
    commit(TopicPartitionOffset... offsets) and
    lastCommittedOffsets(TopicPartition... partitions) on the one hand. On the
    other hand we have subscribe(String topic, int... partitions). I agree that
    creating a class for TopicId today would probably not make too much sense
    today. But with hierarchical topics, I may change my mind. This is exactly
    what was done in the HBase API in 0.96 when namespaces were added. 0.96
    HBase API introduced a class called 'TableName' to represent the namespace
    and table name.


    On Mon, Feb 10, 2014 at 3:08 PM, Neha Narkhede <neha.narkhede@gmail.com
    wrote:
    Thanks for the feedback.

    Mattijs -

    - Constructors link to
    http://kafka.apache.org/documentation.html#consumerconfigs for valid
    configurations, which lists zookeeper.connect rather than
    metadata.broker.list, the value for BROKER_LIST_CONFIG in ConsumerConfig.
    Fixed it to just point to ConsumerConfig for now until we finalize the new
    configs
    - Docs for poll(long) mention consumer.commit(true), which I can't find in
    the Consumer docs. For a simple consumer setup, that call is something that
    would make a lot of sense.
    Missed changing the examples to use consumer.commit(true, offsets). The
    suggestions by Jay would change it to commit(offsets) and
    commitAsync(offsets), which will hopefully make it easier to understand
    those commit APIs.
    - Love the addition of MockConsumer, awesome for unittesting :)
    I'm not quite satisfied with what it does as of right now, but we will
    surely improve it as we start writing the consumer.

    Jay -

    1. ConsumerRebalanceCallback
    a. Makes sense. Renamed to onPartitionsRevoked
    b. Ya, it will be good to make it forward compatible with Java 8
    capabilities. We can change it to PartitionsAssignedCallback and
    PartitionsRevokedCallback or RebalanceBeginCallback and
    RebalanceEndCallback?
    c. Ya, I thought about that but then didn't name it just
    RebalanceCallback since there could be a conflict with a controller side
    rebalance callback if/when we have one. However, you can argue that at that
    time we can name it ControllerRebalanceCallback instead of polluting a user
    facing API. So agree with you here.
    2. Ya, that is a good idea. Changed to subscribe(String topic,
    int...partitions).
    3. lastCommittedOffset() is not necessarily a local access since the
    consumer can potentially ask for the last committed offsets of partitions
    that the consumer does not consume and maintain the offsets for. That's the
    reason it is batched right now.
    4. Yes, look at

    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG
    5. Sure, but that is not part of the consumer API right? I think you're
    suggesting looking at OffsetRequest to see if it would do that properly?
    6. Good point. Changed to poll(long timeout, TimeUnit) and poll with a
    negative timeout will poll indefinitely?
    7. Good point. Changed to commit(...) and commitAsync(...)
    8. To commit the current position for all partitions owned by the consumer,
    you can use commit(). If you don't use group management, then
    commit(customListOfPartitions)
    9. Forgot to include unsubscribe. Done now.
    10. positions() can be called at any time and affects the next fetch on the
    next poll(). Fixed the places that said "starting fetch offsets"
    11. Can we not look that up by going through the messages returned and
    getting the offset from the ConsumerRecord?

    One thing that I really found helpful for the API design was writing out
    actual code for different scenarios against the API. I think it might be
    good to do that for this too--i.e. enumerate the various use cases and code
    that use case up to see how it looks
    The javadocs include examples for almost all possible scenarios of consumer
    usage, that I could come up with. Will add more to the javadocs as I get
    more feedback from our users. The advantage of having the examples in the
    javadoc itself is to that the usage is self explanatory to new users.

    Pradeep -

    2. Changed to poll(long, TimeUnit) and a negative value for the timeout
    would block in the poll forever until there is new data
    3. We don't have hierarchical topics support. Would you mind explaining
    what you meant?
    4. I'm not so sure that we need a class to express a topic which is a
    string and a separate class for just partition id. We do have a class for
    TopicPartition which uniquely identifies a partition of a topic

    Thanks,
    Neha


    On Mon, Feb 10, 2014 at 12:36 PM, Pradeep Gollakota <
    pradeepg26@gmail.com
    wrote:
    Couple of very quick thoughts.

    1. +1 about renaming commit(...) and commitAsync(...)
    2. I'd also like to extend the above for the poll() method as well. poll()
    and pollWithTimeout(long, TimeUnit)?
    3. Have you guys given any thought around how this API would be used
    with
    hierarchical topics?
    4. Would it make sense to add classes such as TopicId, PartitionId,
    etc?
    Seems like it would be easier to read code with these classes as
    opposed
    to
    string and longs.

    - Pradeep

    On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps wrote:

    A few items:
    1. ConsumerRebalanceCallback
    a. onPartitionsRevoked would be a better name.
    b. We should discuss the possibility of splitting this into two
    interfaces. The motivation would be that in Java 8 single method
    interfaces
    can directly take methods which might be more intuitive.
    c. If we stick with a single interface I would prefer the name
    RebalanceCallback as its more concise
    2. Should subscribe(String topic, int partition) should be
    subscribe(String
    topic, int...partition)?
    3. Is lastCommittedOffset call just a local access? If so it would be more
    convenient not to batch it.
    4. How are we going to handle the earliest/latest starting position
    functionality we currently have. Does that remain a config?
    5. Do we need to expose the general ability to get known positions
    from
    the
    log? E.g. the functionality in the OffsetRequest...? That would make
    the
    ability to change position a little easier.
    6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit
    unit)? Is it Long because it allows null? If so should we just add a poll()
    that polls indefinitely?
    7. I recommend we remove the boolean parameter from commit as it is really
    hard to read code that has boolean parameters without named
    arguments.
    Can
    we make it something like commit(...) and commitAsync(...)?
    8. What about the common case where you just want to commit the
    current
    position for all partitions?
    9. How do you unsubscribe?
    10. You say in a few places that positions() only impacts the
    starting
    position, but surely that isn't the case, right? Surely it controls
    the
    fetch position for that partition and can be called at any time? Otherwise
    it is a pretty weird api, right?
    11. How do I get my current position? Not the committed position but
    the
    offset of the next message that will be given to me?

    One thing that I really found helpful for the API design was writing
    out
    actual code for different scenarios against the API. I think it might
    be
    good to do that for this too--i.e. enumerate the various use cases
    and
    code
    that use case up to see how it looks. I'm not sure if it would be
    useful
    to
    collect these kinds of scenarios from people. I know they have
    sporadically
    popped up on the mailing list.

    -Jay


    On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede <
    neha.narkhede@gmail.com
    wrote:
    As mentioned in previous emails, we are also working on a
    re-implementation
    of the consumer. I would like to use this email thread to discuss
    the
    details of the public API. I would also like us to be picky about
    this
    public api now so it is as good as possible and we don't need to
    break
    it
    in the future.

    The best way to get a feel for the API is actually to take a look
    at
    the
    javadoc<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    ,
    the hope is to get the api docs good enough so that it is
    self-explanatory.
    You can also take a look at the configs
    here<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
    Some background info on implementation:

    At a high level the primary difference in this consumer is that it removes
    the distinction between the "high-level" and "low-level" consumer.
    The
    new
    consumer API is non blocking and instead of returning a blocking iterator,
    the consumer provides a poll() API that returns a list of records.
    We
    think
    this is better compared to the blocking iterators since it
    effectively
    decouples the threading strategy used for processing messages from
    the
    consumer. It is worth noting that the consumer is entirely single threaded
    and runs in the user thread. The advantage is that it can be easily
    rewritten in less multi-threading-friendly languages. The consumer batches
    data and multiplexes I/O over TCP connections to each of the
    brokers
    it
    communicates with, for high throughput. The consumer also allows
    long
    poll
    to reduce the end-to-end message latency for low throughput data.

    The consumer provides a group management facility that supports the concept
    of a group with multiple consumer instances (just like the current
    consumer). This is done through a custom heartbeat and group
    management
    protocol transparent to the user. At the same time, it allows users
    the
    option to subscribe to a fixed set of partitions and not use group
    management at all. The offset management strategy defaults to Kafka
    based
    offset management and the API provides a way for the user to use a
    customized offset store to manage the consumer's offsets.

    A key difference in this consumer also is the fact that it does not depend
    on zookeeper at all.

    More details about the new consumer design are
    here<
    https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
    Please take a look at the new
    API<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    and
    give us any thoughts you may have.

    Thanks,
    Neha
  • Pradeep Gollakota at Feb 11, 2014 at 7:45 pm
    Hi Jay,

    I apologize for derailing the conversation about the consumer API. We
    should start a new discussion about hierarchical topics, if we want to keep
    talking about it. My final thought on the matter is that, hierarchical
    topics is still an important feature to have in Kafka, because it gives us
    flexibility to do namespace level access controls.

    Getting back to the topic of the Consumer API:

        1. Any thoughts on consistency for method arguments and return types?
        2. lastCommittedOffsets() method returns a
    List<TopicPartitionOffset>where as the confluence page suggested a
    Map<TopicPartition,
        Long>. I would think that a Map is the more appropriate return type.


    On Tue, Feb 11, 2014 at 8:04 AM, Jay Kreps wrote:

    Hey Pradeep,

    That wiki is fairly old and it predated more flexible subscription
    mechanisms. In the high-level consumer you currently have wildcard
    subscription and in the new proposed interface you can actually subscribe
    based on any logic you want to create a "union" of streams. Personally I
    think this gives you everything you would want with a hierarchy and more
    actual flexibility (since you can define groupings however you want). What
    do you think?

    -Jay


    On Mon, Feb 10, 2014 at 3:37 PM, Pradeep Gollakota <pradeepg26@gmail.com
    wrote:
    WRT to hierarchical topics, I'm referring to
    KAFKA-1175<https://issues.apache.org/jira/browse/KAFKA-1175>.
    I would just like to think through the implications for the Consumer API if
    and when we do implement hierarchical topics. For example, in the
    proposal<
    https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics#
    written
    by Jay, he says that initially wildcard subscriptions are not going
    to be supported. But does that mean that they will be supported in v2? If
    that's the case, that would change the semantics of the Consumer API.

    As to having classes for Topic, PartitionId, etc. it looks like I was
    referring to the TopicPartition and TopicPartitionOffset classes (I didn't
    realize these were already there). I was only looking at the confluence
    page which shows List[(String, Int, Long)] instead of
    List[TopicParitionOffset] (as is shown in the javadoc). However, I did
    notice that we're not being consistent in the Java version. E.g. we have
    commit(TopicPartitionOffset... offsets) and
    lastCommittedOffsets(TopicPartition... partitions) on the one hand. On the
    other hand we have subscribe(String topic, int... partitions). I agree that
    creating a class for TopicId today would probably not make too much sense
    today. But with hierarchical topics, I may change my mind. This is exactly
    what was done in the HBase API in 0.96 when namespaces were added. 0.96
    HBase API introduced a class called 'TableName' to represent the namespace
    and table name.


    On Mon, Feb 10, 2014 at 3:08 PM, Neha Narkhede <neha.narkhede@gmail.com
    wrote:
    Thanks for the feedback.

    Mattijs -

    - Constructors link to
    http://kafka.apache.org/documentation.html#consumerconfigs for valid
    configurations, which lists zookeeper.connect rather than
    metadata.broker.list, the value for BROKER_LIST_CONFIG in
    ConsumerConfig.
    Fixed it to just point to ConsumerConfig for now until we finalize the new
    configs
    - Docs for poll(long) mention consumer.commit(true), which I can't find in
    the Consumer docs. For a simple consumer setup, that call is something that
    would make a lot of sense.
    Missed changing the examples to use consumer.commit(true, offsets). The
    suggestions by Jay would change it to commit(offsets) and
    commitAsync(offsets), which will hopefully make it easier to understand
    those commit APIs.
    - Love the addition of MockConsumer, awesome for unittesting :)
    I'm not quite satisfied with what it does as of right now, but we will
    surely improve it as we start writing the consumer.

    Jay -

    1. ConsumerRebalanceCallback
    a. Makes sense. Renamed to onPartitionsRevoked
    b. Ya, it will be good to make it forward compatible with Java 8
    capabilities. We can change it to PartitionsAssignedCallback and
    PartitionsRevokedCallback or RebalanceBeginCallback and
    RebalanceEndCallback?
    c. Ya, I thought about that but then didn't name it just
    RebalanceCallback since there could be a conflict with a controller
    side
    rebalance callback if/when we have one. However, you can argue that at that
    time we can name it ControllerRebalanceCallback instead of polluting a user
    facing API. So agree with you here.
    2. Ya, that is a good idea. Changed to subscribe(String topic,
    int...partitions).
    3. lastCommittedOffset() is not necessarily a local access since the
    consumer can potentially ask for the last committed offsets of
    partitions
    that the consumer does not consume and maintain the offsets for. That's the
    reason it is batched right now.
    4. Yes, look at
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG
    5. Sure, but that is not part of the consumer API right? I think you're
    suggesting looking at OffsetRequest to see if it would do that
    properly?
    6. Good point. Changed to poll(long timeout, TimeUnit) and poll with a
    negative timeout will poll indefinitely?
    7. Good point. Changed to commit(...) and commitAsync(...)
    8. To commit the current position for all partitions owned by the consumer,
    you can use commit(). If you don't use group management, then
    commit(customListOfPartitions)
    9. Forgot to include unsubscribe. Done now.
    10. positions() can be called at any time and affects the next fetch on the
    next poll(). Fixed the places that said "starting fetch offsets"
    11. Can we not look that up by going through the messages returned and
    getting the offset from the ConsumerRecord?

    One thing that I really found helpful for the API design was writing
    out
    actual code for different scenarios against the API. I think it might
    be
    good to do that for this too--i.e. enumerate the various use cases and code
    that use case up to see how it looks
    The javadocs include examples for almost all possible scenarios of consumer
    usage, that I could come up with. Will add more to the javadocs as I
    get
    more feedback from our users. The advantage of having the examples in
    the
    javadoc itself is to that the usage is self explanatory to new users.

    Pradeep -

    2. Changed to poll(long, TimeUnit) and a negative value for the timeout
    would block in the poll forever until there is new data
    3. We don't have hierarchical topics support. Would you mind explaining
    what you meant?
    4. I'm not so sure that we need a class to express a topic which is a
    string and a separate class for just partition id. We do have a class
    for
    TopicPartition which uniquely identifies a partition of a topic

    Thanks,
    Neha


    On Mon, Feb 10, 2014 at 12:36 PM, Pradeep Gollakota <
    pradeepg26@gmail.com
    wrote:
    Couple of very quick thoughts.

    1. +1 about renaming commit(...) and commitAsync(...)
    2. I'd also like to extend the above for the poll() method as well. poll()
    and pollWithTimeout(long, TimeUnit)?
    3. Have you guys given any thought around how this API would be used
    with
    hierarchical topics?
    4. Would it make sense to add classes such as TopicId, PartitionId,
    etc?
    Seems like it would be easier to read code with these classes as
    opposed
    to
    string and longs.

    - Pradeep


    On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps <jay.kreps@gmail.com>
    wrote:
    A few items:
    1. ConsumerRebalanceCallback
    a. onPartitionsRevoked would be a better name.
    b. We should discuss the possibility of splitting this into two
    interfaces. The motivation would be that in Java 8 single method
    interfaces
    can directly take methods which might be more intuitive.
    c. If we stick with a single interface I would prefer the name
    RebalanceCallback as its more concise
    2. Should subscribe(String topic, int partition) should be
    subscribe(String
    topic, int...partition)?
    3. Is lastCommittedOffset call just a local access? If so it would
    be
    more
    convenient not to batch it.
    4. How are we going to handle the earliest/latest starting position
    functionality we currently have. Does that remain a config?
    5. Do we need to expose the general ability to get known positions
    from
    the
    log? E.g. the functionality in the OffsetRequest...? That would
    make
    the
    ability to change position a little easier.
    6. Should poll(java.lang.Long timeout) be poll(long timeout,
    TimeUnit
    unit)? Is it Long because it allows null? If so should we just add
    a
    poll()
    that polls indefinitely?
    7. I recommend we remove the boolean parameter from commit as it is really
    hard to read code that has boolean parameters without named
    arguments.
    Can
    we make it something like commit(...) and commitAsync(...)?
    8. What about the common case where you just want to commit the
    current
    position for all partitions?
    9. How do you unsubscribe?
    10. You say in a few places that positions() only impacts the
    starting
    position, but surely that isn't the case, right? Surely it controls
    the
    fetch position for that partition and can be called at any time? Otherwise
    it is a pretty weird api, right?
    11. How do I get my current position? Not the committed position
    but
    the
    offset of the next message that will be given to me?

    One thing that I really found helpful for the API design was
    writing
    out
    actual code for different scenarios against the API. I think it
    might
    be
    good to do that for this too--i.e. enumerate the various use cases
    and
    code
    that use case up to see how it looks. I'm not sure if it would be
    useful
    to
    collect these kinds of scenarios from people. I know they have
    sporadically
    popped up on the mailing list.

    -Jay


    On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede <
    neha.narkhede@gmail.com
    wrote:
    As mentioned in previous emails, we are also working on a
    re-implementation
    of the consumer. I would like to use this email thread to discuss
    the
    details of the public API. I would also like us to be picky about
    this
    public api now so it is as good as possible and we don't need to
    break
    it
    in the future.

    The best way to get a feel for the API is actually to take a look
    at
    the
    javadoc<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    ,
    the hope is to get the api docs good enough so that it is
    self-explanatory.
    You can also take a look at the configs
    here<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
    Some background info on implementation:

    At a high level the primary difference in this consumer is that
    it
    removes
    the distinction between the "high-level" and "low-level"
    consumer.
    The
    new
    consumer API is non blocking and instead of returning a blocking iterator,
    the consumer provides a poll() API that returns a list of
    records.
    We
    think
    this is better compared to the blocking iterators since it
    effectively
    decouples the threading strategy used for processing messages
    from
    the
    consumer. It is worth noting that the consumer is entirely single threaded
    and runs in the user thread. The advantage is that it can be
    easily
    rewritten in less multi-threading-friendly languages. The
    consumer
    batches
    data and multiplexes I/O over TCP connections to each of the
    brokers
    it
    communicates with, for high throughput. The consumer also allows
    long
    poll
    to reduce the end-to-end message latency for low throughput data.

    The consumer provides a group management facility that supports
    the
    concept
    of a group with multiple consumer instances (just like the
    current
    consumer). This is done through a custom heartbeat and group
    management
    protocol transparent to the user. At the same time, it allows
    users
    the
    option to subscribe to a fixed set of partitions and not use
    group
    management at all. The offset management strategy defaults to
    Kafka
    based
    offset management and the API provides a way for the user to use
    a
    customized offset store to manage the consumer's offsets.

    A key difference in this consumer also is the fact that it does
    not
    depend
    on zookeeper at all.

    More details about the new consumer design are
    here<
    https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
    Please take a look at the new
    API<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    and
    give us any thoughts you may have.

    Thanks,
    Neha
  • Pradeep Gollakota at Feb 11, 2014 at 8:59 pm
    Updated thoughts.

        1.

        subscribe(String topic, int... paritions) and unsubscribe(String topic,
        int... partitions) should be subscribe(TopicPartition...
    topicPartitions)and unsubscribe(TopicPartition...
        topicPartitons)
         2.

        Does it make sense to provide a convenience method to subscribe to
        topics at a particular offset directly? E.g.
    subscribe(TopicPartitionOffset...
        offsets)
         3.

        The javadoc makes no mention of what would happen if positions() is
        called with a TopicPartitionOffset to which the Consumer is not
        subscribed to.
         4.

        The javadoc makes no mention of what would happen if positions() is
        called with two different offsets for a single TopicPartition
         5. The javadoc shows lastCommittedOffsets() return type as
        List<TopicPartitionOffset>. This should either be Map<TopicPartition,
        Long> or Map<TopicPartition, TopicPartitionOffset>
        6. It seems like #4 can be avoided by using Map<TopicPartition,
    Long> or Map<TopicPartition,
        TopicPartitionOffset> as the argument type.
        7. To address #3, maybe we can return List<TopicPartitionOffset> that
        are invalid.


    On Tue, Feb 11, 2014 at 12:04 PM, Neha Narkhede wrote:

    Pradeep,

    To be clear, we want to get feedback on the APIs from the
    javadoc<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    since
    the wiki will be slightly behind on the APIs.

    1. Regarding consistency, do you have specific feedback on which APIs
    should have different arguments/return types?
    2. lastCommittedOffsets() does what you said in the javadoc.

    Thanks,
    Neha


    On Tue, Feb 11, 2014 at 11:45 AM, Pradeep Gollakota <pradeepg26@gmail.com
    wrote:
    Hi Jay,

    I apologize for derailing the conversation about the consumer API. We
    should start a new discussion about hierarchical topics, if we want to keep
    talking about it. My final thought on the matter is that, hierarchical
    topics is still an important feature to have in Kafka, because it gives us
    flexibility to do namespace level access controls.

    Getting back to the topic of the Consumer API:

    1. Any thoughts on consistency for method arguments and return types?
    2. lastCommittedOffsets() method returns a
    List<TopicPartitionOffset>where as the confluence page suggested a
    Map<TopicPartition,
    Long>. I would think that a Map is the more appropriate return type.


    On Tue, Feb 11, 2014 at 8:04 AM, Jay Kreps wrote:

    Hey Pradeep,

    That wiki is fairly old and it predated more flexible subscription
    mechanisms. In the high-level consumer you currently have wildcard
    subscription and in the new proposed interface you can actually
    subscribe
    based on any logic you want to create a "union" of streams. Personally
    I
    think this gives you everything you would want with a hierarchy and
    more
    actual flexibility (since you can define groupings however you want). What
    do you think?

    -Jay


    On Mon, Feb 10, 2014 at 3:37 PM, Pradeep Gollakota <
    pradeepg26@gmail.com
    wrote:
    WRT to hierarchical topics, I'm referring to
    KAFKA-1175<https://issues.apache.org/jira/browse/KAFKA-1175>.
    I would just like to think through the implications for the Consumer
    API
    if
    and when we do implement hierarchical topics. For example, in the
    proposal<
    https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics#
    written
    by Jay, he says that initially wildcard subscriptions are not going
    to be supported. But does that mean that they will be supported in
    v2?
    If
    that's the case, that would change the semantics of the Consumer API.

    As to having classes for Topic, PartitionId, etc. it looks like I was
    referring to the TopicPartition and TopicPartitionOffset classes (I didn't
    realize these were already there). I was only looking at the
    confluence
    page which shows List[(String, Int, Long)] instead of
    List[TopicParitionOffset] (as is shown in the javadoc). However, I
    did
    notice that we're not being consistent in the Java version. E.g. we
    have
    commit(TopicPartitionOffset... offsets) and
    lastCommittedOffsets(TopicPartition... partitions) on the one hand.
    On
    the
    other hand we have subscribe(String topic, int... partitions). I
    agree
    that
    creating a class for TopicId today would probably not make too much
    sense
    today. But with hierarchical topics, I may change my mind. This is exactly
    what was done in the HBase API in 0.96 when namespaces were added.
    0.96
    HBase API introduced a class called 'TableName' to represent the namespace
    and table name.


    On Mon, Feb 10, 2014 at 3:08 PM, Neha Narkhede <
    neha.narkhede@gmail.com
    wrote:
    Thanks for the feedback.

    Mattijs -

    - Constructors link to
    http://kafka.apache.org/documentation.html#consumerconfigs for
    valid
    configurations, which lists zookeeper.connect rather than
    metadata.broker.list, the value for BROKER_LIST_CONFIG in
    ConsumerConfig.
    Fixed it to just point to ConsumerConfig for now until we finalize
    the
    new
    configs
    - Docs for poll(long) mention consumer.commit(true), which I can't
    find
    in
    the Consumer docs. For a simple consumer setup, that call is
    something
    that
    would make a lot of sense.
    Missed changing the examples to use consumer.commit(true, offsets).
    The
    suggestions by Jay would change it to commit(offsets) and
    commitAsync(offsets), which will hopefully make it easier to
    understand
    those commit APIs.
    - Love the addition of MockConsumer, awesome for unittesting :)
    I'm not quite satisfied with what it does as of right now, but we
    will
    surely improve it as we start writing the consumer.

    Jay -

    1. ConsumerRebalanceCallback
    a. Makes sense. Renamed to onPartitionsRevoked
    b. Ya, it will be good to make it forward compatible with Java
    8
    capabilities. We can change it to PartitionsAssignedCallback and
    PartitionsRevokedCallback or RebalanceBeginCallback and
    RebalanceEndCallback?
    c. Ya, I thought about that but then didn't name it just
    RebalanceCallback since there could be a conflict with a controller
    side
    rebalance callback if/when we have one. However, you can argue that
    at
    that
    time we can name it ControllerRebalanceCallback instead of
    polluting
    a
    user
    facing API. So agree with you here.
    2. Ya, that is a good idea. Changed to subscribe(String topic,
    int...partitions).
    3. lastCommittedOffset() is not necessarily a local access since
    the
    consumer can potentially ask for the last committed offsets of
    partitions
    that the consumer does not consume and maintain the offsets for.
    That's
    the
    reason it is batched right now.
    4. Yes, look at
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG
    5. Sure, but that is not part of the consumer API right? I think
    you're
    suggesting looking at OffsetRequest to see if it would do that
    properly?
    6. Good point. Changed to poll(long timeout, TimeUnit) and poll
    with
    a
    negative timeout will poll indefinitely?
    7. Good point. Changed to commit(...) and commitAsync(...)
    8. To commit the current position for all partitions owned by the consumer,
    you can use commit(). If you don't use group management, then
    commit(customListOfPartitions)
    9. Forgot to include unsubscribe. Done now.
    10. positions() can be called at any time and affects the next
    fetch
    on
    the
    next poll(). Fixed the places that said "starting fetch offsets"
    11. Can we not look that up by going through the messages returned
    and
    getting the offset from the ConsumerRecord?

    One thing that I really found helpful for the API design was
    writing
    out
    actual code for different scenarios against the API. I think it
    might
    be
    good to do that for this too--i.e. enumerate the various use cases
    and
    code
    that use case up to see how it looks
    The javadocs include examples for almost all possible scenarios of consumer
    usage, that I could come up with. Will add more to the javadocs as
    I
    get
    more feedback from our users. The advantage of having the examples
    in
    the
    javadoc itself is to that the usage is self explanatory to new
    users.
    Pradeep -

    2. Changed to poll(long, TimeUnit) and a negative value for the
    timeout
    would block in the poll forever until there is new data
    3. We don't have hierarchical topics support. Would you mind
    explaining
    what you meant?
    4. I'm not so sure that we need a class to express a topic which
    is a
    string and a separate class for just partition id. We do have a
    class
    for
    TopicPartition which uniquely identifies a partition of a topic

    Thanks,
    Neha


    On Mon, Feb 10, 2014 at 12:36 PM, Pradeep Gollakota <
    pradeepg26@gmail.com
    wrote:
    Couple of very quick thoughts.

    1. +1 about renaming commit(...) and commitAsync(...)
    2. I'd also like to extend the above for the poll() method as
    well.
    poll()
    and pollWithTimeout(long, TimeUnit)?
    3. Have you guys given any thought around how this API would be
    used
    with
    hierarchical topics?
    4. Would it make sense to add classes such as TopicId,
    PartitionId,
    etc?
    Seems like it would be easier to read code with these classes as
    opposed
    to
    string and longs.

    - Pradeep


    On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps <jay.kreps@gmail.com
    wrote:
    A few items:
    1. ConsumerRebalanceCallback
    a. onPartitionsRevoked would be a better name.
    b. We should discuss the possibility of splitting this into
    two
    interfaces. The motivation would be that in Java 8 single
    method
    interfaces
    can directly take methods which might be more intuitive.
    c. If we stick with a single interface I would prefer the
    name
    RebalanceCallback as its more concise
    2. Should subscribe(String topic, int partition) should be
    subscribe(String
    topic, int...partition)?
    3. Is lastCommittedOffset call just a local access? If so it
    would
    be
    more
    convenient not to batch it.
    4. How are we going to handle the earliest/latest starting
    position
    functionality we currently have. Does that remain a config?
    5. Do we need to expose the general ability to get known
    positions
    from
    the
    log? E.g. the functionality in the OffsetRequest...? That would
    make
    the
    ability to change position a little easier.
    6. Should poll(java.lang.Long timeout) be poll(long timeout,
    TimeUnit
    unit)? Is it Long because it allows null? If so should we just
    add
    a
    poll()
    that polls indefinitely?
    7. I recommend we remove the boolean parameter from commit as
    it
    is
    really
    hard to read code that has boolean parameters without named
    arguments.
    Can
    we make it something like commit(...) and commitAsync(...)?
    8. What about the common case where you just want to commit the
    current
    position for all partitions?
    9. How do you unsubscribe?
    10. You say in a few places that positions() only impacts the
    starting
    position, but surely that isn't the case, right? Surely it
    controls
    the
    fetch position for that partition and can be called at any
    time?
    Otherwise
    it is a pretty weird api, right?
    11. How do I get my current position? Not the committed
    position
    but
    the
    offset of the next message that will be given to me?

    One thing that I really found helpful for the API design was
    writing
    out
    actual code for different scenarios against the API. I think it
    might
    be
    good to do that for this too--i.e. enumerate the various use
    cases
    and
    code
    that use case up to see how it looks. I'm not sure if it would
    be
    useful
    to
    collect these kinds of scenarios from people. I know they have
    sporadically
    popped up on the mailing list.

    -Jay


    On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede <
    neha.narkhede@gmail.com
    wrote:
    As mentioned in previous emails, we are also working on a
    re-implementation
    of the consumer. I would like to use this email thread to
    discuss
    the
    details of the public API. I would also like us to be picky
    about
    this
    public api now so it is as good as possible and we don't need
    to
    break
    it
    in the future.

    The best way to get a feel for the API is actually to take a
    look
    at
    the
    javadoc<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    ,
    the hope is to get the api docs good enough so that it is
    self-explanatory.
    You can also take a look at the configs
    here<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
    Some background info on implementation:

    At a high level the primary difference in this consumer is
    that
    it
    removes
    the distinction between the "high-level" and "low-level"
    consumer.
    The
    new
    consumer API is non blocking and instead of returning a
    blocking
    iterator,
    the consumer provides a poll() API that returns a list of
    records.
    We
    think
    this is better compared to the blocking iterators since it
    effectively
    decouples the threading strategy used for processing messages
    from
    the
    consumer. It is worth noting that the consumer is entirely
    single
    threaded
    and runs in the user thread. The advantage is that it can be
    easily
    rewritten in less multi-threading-friendly languages. The
    consumer
    batches
    data and multiplexes I/O over TCP connections to each of the
    brokers
    it
    communicates with, for high throughput. The consumer also
    allows
    long
    poll
    to reduce the end-to-end message latency for low throughput
    data.
    The consumer provides a group management facility that
    supports
    the
    concept
    of a group with multiple consumer instances (just like the
    current
    consumer). This is done through a custom heartbeat and group
    management
    protocol transparent to the user. At the same time, it allows
    users
    the
    option to subscribe to a fixed set of partitions and not use
    group
    management at all. The offset management strategy defaults to
    Kafka
    based
    offset management and the API provides a way for the user to
    use
    a
    customized offset store to manage the consumer's offsets.

    A key difference in this consumer also is the fact that it
    does
    not
    depend
    on zookeeper at all.

    More details about the new consumer design are
    here<
    https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
    Please take a look at the new
    API<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    and
    give us any thoughts you may have.

    Thanks,
    Neha
  • Guozhang Wang at Feb 11, 2014 at 11:31 pm
    Hi Pradeep:

    1. I think TopicPartition is designed as an internal class and the plan was
    not to expose it to users just for simplicity. We probably will change the
    commit APIs not exposing them.

    2. We have thought about that before, and finally decide to make it as

    subscribe(topic, partition)
    positions(partition, offset)

    Does this look good to you?

    3. We will update the javadoc accordingly: an exception should be thrown.

    4. I think this is related to how are we going to deal with 1).

    5. Agree.

    On Tue, Feb 11, 2014 at 12:58 PM, Pradeep Gollakota wrote:

    Updated thoughts.

    1.

    subscribe(String topic, int... paritions) and unsubscribe(String topic,
    int... partitions) should be subscribe(TopicPartition...
    topicPartitions)and unsubscribe(TopicPartition...
    topicPartitons)
    2.

    Does it make sense to provide a convenience method to subscribe to
    topics at a particular offset directly? E.g.
    subscribe(TopicPartitionOffset...
    offsets)
    3.

    The javadoc makes no mention of what would happen if positions() is
    called with a TopicPartitionOffset to which the Consumer is not
    subscribed to.
    4.

    The javadoc makes no mention of what would happen if positions() is
    called with two different offsets for a single TopicPartition
    5. The javadoc shows lastCommittedOffsets() return type as
    List<TopicPartitionOffset>. This should either be Map<TopicPartition,
    Long> or Map<TopicPartition, TopicPartitionOffset>
    6. It seems like #4 can be avoided by using Map<TopicPartition,
    Long> or Map<TopicPartition,
    TopicPartitionOffset> as the argument type.
    7. To address #3, maybe we can return List<TopicPartitionOffset> that
    are invalid.



    On Tue, Feb 11, 2014 at 12:04 PM, Neha Narkhede <neha.narkhede@gmail.com
    wrote:
    Pradeep,

    To be clear, we want to get feedback on the APIs from the
    javadoc<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    since
    the wiki will be slightly behind on the APIs.

    1. Regarding consistency, do you have specific feedback on which APIs
    should have different arguments/return types?
    2. lastCommittedOffsets() does what you said in the javadoc.

    Thanks,
    Neha


    On Tue, Feb 11, 2014 at 11:45 AM, Pradeep Gollakota <
    pradeepg26@gmail.com
    wrote:
    Hi Jay,

    I apologize for derailing the conversation about the consumer API. We
    should start a new discussion about hierarchical topics, if we want to keep
    talking about it. My final thought on the matter is that, hierarchical
    topics is still an important feature to have in Kafka, because it gives us
    flexibility to do namespace level access controls.

    Getting back to the topic of the Consumer API:

    1. Any thoughts on consistency for method arguments and return
    types?
    2. lastCommittedOffsets() method returns a
    List<TopicPartitionOffset>where as the confluence page suggested a
    Map<TopicPartition,
    Long>. I would think that a Map is the more appropriate return type.


    On Tue, Feb 11, 2014 at 8:04 AM, Jay Kreps wrote:

    Hey Pradeep,

    That wiki is fairly old and it predated more flexible subscription
    mechanisms. In the high-level consumer you currently have wildcard
    subscription and in the new proposed interface you can actually
    subscribe
    based on any logic you want to create a "union" of streams.
    Personally
    I
    think this gives you everything you would want with a hierarchy and
    more
    actual flexibility (since you can define groupings however you want). What
    do you think?

    -Jay


    On Mon, Feb 10, 2014 at 3:37 PM, Pradeep Gollakota <
    pradeepg26@gmail.com
    wrote:
    WRT to hierarchical topics, I'm referring to
    KAFKA-1175<https://issues.apache.org/jira/browse/KAFKA-1175>.
    I would just like to think through the implications for the
    Consumer
    API
    if
    and when we do implement hierarchical topics. For example, in the
    proposal<
    https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics#
    written
    by Jay, he says that initially wildcard subscriptions are not going
    to be supported. But does that mean that they will be supported in
    v2?
    If
    that's the case, that would change the semantics of the Consumer
    API.
    As to having classes for Topic, PartitionId, etc. it looks like I
    was
    referring to the TopicPartition and TopicPartitionOffset classes (I didn't
    realize these were already there). I was only looking at the
    confluence
    page which shows List[(String, Int, Long)] instead of
    List[TopicParitionOffset] (as is shown in the javadoc). However, I
    did
    notice that we're not being consistent in the Java version. E.g. we
    have
    commit(TopicPartitionOffset... offsets) and
    lastCommittedOffsets(TopicPartition... partitions) on the one hand.
    On
    the
    other hand we have subscribe(String topic, int... partitions). I
    agree
    that
    creating a class for TopicId today would probably not make too much
    sense
    today. But with hierarchical topics, I may change my mind. This is exactly
    what was done in the HBase API in 0.96 when namespaces were added.
    0.96
    HBase API introduced a class called 'TableName' to represent the namespace
    and table name.


    On Mon, Feb 10, 2014 at 3:08 PM, Neha Narkhede <
    neha.narkhede@gmail.com
    wrote:
    Thanks for the feedback.

    Mattijs -

    - Constructors link to
    http://kafka.apache.org/documentation.html#consumerconfigs for
    valid
    configurations, which lists zookeeper.connect rather than
    metadata.broker.list, the value for BROKER_LIST_CONFIG in
    ConsumerConfig.
    Fixed it to just point to ConsumerConfig for now until we
    finalize
    the
    new
    configs
    - Docs for poll(long) mention consumer.commit(true), which I
    can't
    find
    in
    the Consumer docs. For a simple consumer setup, that call is
    something
    that
    would make a lot of sense.
    Missed changing the examples to use consumer.commit(true,
    offsets).
    The
    suggestions by Jay would change it to commit(offsets) and
    commitAsync(offsets), which will hopefully make it easier to
    understand
    those commit APIs.
    - Love the addition of MockConsumer, awesome for unittesting :)
    I'm not quite satisfied with what it does as of right now, but we
    will
    surely improve it as we start writing the consumer.

    Jay -

    1. ConsumerRebalanceCallback
    a. Makes sense. Renamed to onPartitionsRevoked
    b. Ya, it will be good to make it forward compatible with
    Java
    8
    capabilities. We can change it to PartitionsAssignedCallback and
    PartitionsRevokedCallback or RebalanceBeginCallback and
    RebalanceEndCallback?
    c. Ya, I thought about that but then didn't name it just
    RebalanceCallback since there could be a conflict with a
    controller
    side
    rebalance callback if/when we have one. However, you can argue
    that
    at
    that
    time we can name it ControllerRebalanceCallback instead of
    polluting
    a
    user
    facing API. So agree with you here.
    2. Ya, that is a good idea. Changed to subscribe(String topic,
    int...partitions).
    3. lastCommittedOffset() is not necessarily a local access since
    the
    consumer can potentially ask for the last committed offsets of
    partitions
    that the consumer does not consume and maintain the offsets for.
    That's
    the
    reason it is batched right now.
    4. Yes, look at
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG
    5. Sure, but that is not part of the consumer API right? I think
    you're
    suggesting looking at OffsetRequest to see if it would do that
    properly?
    6. Good point. Changed to poll(long timeout, TimeUnit) and poll
    with
    a
    negative timeout will poll indefinitely?
    7. Good point. Changed to commit(...) and commitAsync(...)
    8. To commit the current position for all partitions owned by the consumer,
    you can use commit(). If you don't use group management, then
    commit(customListOfPartitions)
    9. Forgot to include unsubscribe. Done now.
    10. positions() can be called at any time and affects the next
    fetch
    on
    the
    next poll(). Fixed the places that said "starting fetch offsets"
    11. Can we not look that up by going through the messages
    returned
    and
    getting the offset from the ConsumerRecord?

    One thing that I really found helpful for the API design was
    writing
    out
    actual code for different scenarios against the API. I think it
    might
    be
    good to do that for this too--i.e. enumerate the various use
    cases
    and
    code
    that use case up to see how it looks
    The javadocs include examples for almost all possible scenarios
    of
    consumer
    usage, that I could come up with. Will add more to the javadocs
    as
    I
    get
    more feedback from our users. The advantage of having the
    examples
    in
    the
    javadoc itself is to that the usage is self explanatory to new
    users.
    Pradeep -

    2. Changed to poll(long, TimeUnit) and a negative value for the
    timeout
    would block in the poll forever until there is new data
    3. We don't have hierarchical topics support. Would you mind
    explaining
    what you meant?
    4. I'm not so sure that we need a class to express a topic which
    is a
    string and a separate class for just partition id. We do have a
    class
    for
    TopicPartition which uniquely identifies a partition of a topic

    Thanks,
    Neha


    On Mon, Feb 10, 2014 at 12:36 PM, Pradeep Gollakota <
    pradeepg26@gmail.com
    wrote:
    Couple of very quick thoughts.

    1. +1 about renaming commit(...) and commitAsync(...)
    2. I'd also like to extend the above for the poll() method as
    well.
    poll()
    and pollWithTimeout(long, TimeUnit)?
    3. Have you guys given any thought around how this API would be
    used
    with
    hierarchical topics?
    4. Would it make sense to add classes such as TopicId,
    PartitionId,
    etc?
    Seems like it would be easier to read code with these classes
    as
    opposed
    to
    string and longs.

    - Pradeep


    On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps <
    jay.kreps@gmail.com
    wrote:
    A few items:
    1. ConsumerRebalanceCallback
    a. onPartitionsRevoked would be a better name.
    b. We should discuss the possibility of splitting this
    into
    two
    interfaces. The motivation would be that in Java 8 single
    method
    interfaces
    can directly take methods which might be more intuitive.
    c. If we stick with a single interface I would prefer the
    name
    RebalanceCallback as its more concise
    2. Should subscribe(String topic, int partition) should be
    subscribe(String
    topic, int...partition)?
    3. Is lastCommittedOffset call just a local access? If so it
    would
    be
    more
    convenient not to batch it.
    4. How are we going to handle the earliest/latest starting
    position
    functionality we currently have. Does that remain a config?
    5. Do we need to expose the general ability to get known
    positions
    from
    the
    log? E.g. the functionality in the OffsetRequest...? That
    would
    make
    the
    ability to change position a little easier.
    6. Should poll(java.lang.Long timeout) be poll(long timeout,
    TimeUnit
    unit)? Is it Long because it allows null? If so should we
    just
    add
    a
    poll()
    that polls indefinitely?
    7. I recommend we remove the boolean parameter from commit as
    it
    is
    really
    hard to read code that has boolean parameters without named
    arguments.
    Can
    we make it something like commit(...) and commitAsync(...)?
    8. What about the common case where you just want to commit
    the
    current
    position for all partitions?
    9. How do you unsubscribe?
    10. You say in a few places that positions() only impacts the
    starting
    position, but surely that isn't the case, right? Surely it
    controls
    the
    fetch position for that partition and can be called at any
    time?
    Otherwise
    it is a pretty weird api, right?
    11. How do I get my current position? Not the committed
    position
    but
    the
    offset of the next message that will be given to me?

    One thing that I really found helpful for the API design was
    writing
    out
    actual code for different scenarios against the API. I think
    it
    might
    be
    good to do that for this too--i.e. enumerate the various use
    cases
    and
    code
    that use case up to see how it looks. I'm not sure if it
    would
    be
    useful
    to
    collect these kinds of scenarios from people. I know they
    have
    sporadically
    popped up on the mailing list.

    -Jay


    On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede <
    neha.narkhede@gmail.com
    wrote:
    As mentioned in previous emails, we are also working on a
    re-implementation
    of the consumer. I would like to use this email thread to
    discuss
    the
    details of the public API. I would also like us to be picky
    about
    this
    public api now so it is as good as possible and we don't
    need
    to
    break
    it
    in the future.

    The best way to get a feel for the API is actually to take
    a
    look
    at
    the
    javadoc<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    ,
    the hope is to get the api docs good enough so that it is
    self-explanatory.
    You can also take a look at the configs
    here<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
    Some background info on implementation:

    At a high level the primary difference in this consumer is
    that
    it
    removes
    the distinction between the "high-level" and "low-level"
    consumer.
    The
    new
    consumer API is non blocking and instead of returning a
    blocking
    iterator,
    the consumer provides a poll() API that returns a list of
    records.
    We
    think
    this is better compared to the blocking iterators since it
    effectively
    decouples the threading strategy used for processing
    messages
    from
    the
    consumer. It is worth noting that the consumer is entirely
    single
    threaded
    and runs in the user thread. The advantage is that it can
    be
    easily
    rewritten in less multi-threading-friendly languages. The
    consumer
    batches
    data and multiplexes I/O over TCP connections to each of
    the
    brokers
    it
    communicates with, for high throughput. The consumer also
    allows
    long
    poll
    to reduce the end-to-end message latency for low throughput
    data.
    The consumer provides a group management facility that
    supports
    the
    concept
    of a group with multiple consumer instances (just like the
    current
    consumer). This is done through a custom heartbeat and
    group
    management
    protocol transparent to the user. At the same time, it
    allows
    users
    the
    option to subscribe to a fixed set of partitions and not
    use
    group
    management at all. The offset management strategy defaults
    to
    Kafka
    based
    offset management and the API provides a way for the user
    to
    use
    a
    customized offset store to manage the consumer's offsets.

    A key difference in this consumer also is the fact that it
    does
    not
    depend
    on zookeeper at all.

    More details about the new consumer design are
    here<
    https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
    Please take a look at the new
    API<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    and
    give us any thoughts you may have.

    Thanks,
    Neha


    --
    -- Guozhang
  • Imran Rashid at Feb 11, 2014 at 10:06 pm
    Hi,

    thanks for sharing this and getting feedback. Sorry I am probably missing
    something basic, but I'm not sure how a multi-threaded consumer would
    work. I can imagine its either:

    a) I just have one thread poll kafka. If I want to process msgs in
    multiple threads, than I deal w/ that after polling, eg. stick them into a
    blocking queue or something, and have more threads that read from the queue.

    b) each thread creates its own KafkaConsumer. They are all registered the
    same way, and I leave it to kafka to figure out what data to give to each
    one.


    (a) certainly makes things simple, but I worry about throughput -- is that
    just as good as having one thread trying to consumer each partition?

    (b) makes it a bit of a pain to figure out how many threads to use. I
    assume there is no point in using more threads than there are partitions,
    so first you've got to figure out how many partitions there are in each
    topic. Might be nice if there were some util functions to simplify this.


    Also, since the initial call to subscribe doesn't give the partition
    assignment, does that mean the first call to poll() will always call the
    ConsumerRebalanceCallback?

    probably a short code-sample would clear up all my questions. I'm
    imagining pseudo-code like:


    int numPartitions = ...
    int numThreads = min(maxThreads, numPartitions);
    //maybe should be something even more complicated, to take into account how
    many other active consumers there are right now for the given group

    List<MyConsumer> consumers = new ArrayList<MyConsumer>();
    for (int i = 0; i < numThreads; i++) {
       MyConsumer c = new MyConsumer();
       c.subscribe(...);
       //if subscribe is expensive, then this should already happen in another
    thread
       consumers.add(c);
    }

    // if each subscribe() happened in a different thread, we should put a
    barrier in here, so everybody subscribes before they begin polling

    //now launch a thread per consumer, where they each poll



    If I'm on the right track, I'd like to expand this example, showing how
    each "MyConsumer" can keep track of its partitions & offsets, even in the
    face of rebalances. As Jay said, I think a minimal code example could
    really help us see the utility & faults of the api.

    overall I really like what I see, seems like a big improvement!

    thanks,
    Imran


    On Mon, Feb 10, 2014 at 12:54 PM, Neha Narkhede wrote:

    As mentioned in previous emails, we are also working on a re-implementation
    of the consumer. I would like to use this email thread to discuss the
    details of the public API. I would also like us to be picky about this
    public api now so it is as good as possible and we don't need to break it
    in the future.

    The best way to get a feel for the API is actually to take a look at the
    javadoc<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    ,
    the hope is to get the api docs good enough so that it is self-explanatory.
    You can also take a look at the configs
    here<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
    Some background info on implementation:

    At a high level the primary difference in this consumer is that it removes
    the distinction between the "high-level" and "low-level" consumer. The new
    consumer API is non blocking and instead of returning a blocking iterator,
    the consumer provides a poll() API that returns a list of records. We think
    this is better compared to the blocking iterators since it effectively
    decouples the threading strategy used for processing messages from the
    consumer. It is worth noting that the consumer is entirely single threaded
    and runs in the user thread. The advantage is that it can be easily
    rewritten in less multi-threading-friendly languages. The consumer batches
    data and multiplexes I/O over TCP connections to each of the brokers it
    communicates with, for high throughput. The consumer also allows long poll
    to reduce the end-to-end message latency for low throughput data.

    The consumer provides a group management facility that supports the concept
    of a group with multiple consumer instances (just like the current
    consumer). This is done through a custom heartbeat and group management
    protocol transparent to the user. At the same time, it allows users the
    option to subscribe to a fixed set of partitions and not use group
    management at all. The offset management strategy defaults to Kafka based
    offset management and the API provides a way for the user to use a
    customized offset store to manage the consumer's offsets.

    A key difference in this consumer also is the fact that it does not depend
    on zookeeper at all.

    More details about the new consumer design are
    here<
    https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
    Please take a look at the new
    API<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    and
    give us any thoughts you may have.

    Thanks,
    Neha
  • Jay Kreps at Feb 13, 2014 at 9:24 pm
    Hey guys,

    One thing that bugs me is the lack of symmetric for the different position
    calls. The way I see it there are two positions we maintain: the fetch
    position and the last commit position. There are two things you can do to
    these positions: get the current value or change the current value. But the
    names somewhat obscure this:
       Fetch position:
         - No get
         - set by positions(TopicOffsetPosition...)
       Committed position:
         - get by List<TopicOffsetPosition> lastCommittedPosition(
    TopicPartition...)
         - set by commit or commitAsync

    The lastCommittedPosition is particular bothersome because:
    1. The name is weird and long
    2. It returns a list of results. But how can you use the list? The only way
    to use the list is to make a map of tp=>offset and then look up results in
    this map (or do a for loop over the list for the partition you want). I
    recommend that if this is an in-memory check we just do one at a time. E.g.
    long committedPosition(TopicPosition).

    What if we made it:
        long position(TopicPartition tp)
        void seek(TopicOffsetPosition p)
        long committed(TopicPartition tp)
        void commit(TopicOffsetPosition...);

    This still isn't terribly consistent, but I think it is better.

    I would also like to shorten the name TopicOffsetPosition. Offset and
    Position are duplicative of each other. So perhaps we could call it a
    PartitionOffset or a TopicPosition or something like that. In general class
    names that are just a concatenation of the fields (e.g.
    TopicAndPartitionAndOffset) seem kind of lazy to me since the name doesn't
    really describe it just enumerates. But that is more of a nit pick.

    -Jay

    On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede wrote:

    As mentioned in previous emails, we are also working on a re-implementation
    of the consumer. I would like to use this email thread to discuss the
    details of the public API. I would also like us to be picky about this
    public api now so it is as good as possible and we don't need to break it
    in the future.

    The best way to get a feel for the API is actually to take a look at the
    javadoc<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    ,
    the hope is to get the api docs good enough so that it is self-explanatory.
    You can also take a look at the configs
    here<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
    Some background info on implementation:

    At a high level the primary difference in this consumer is that it removes
    the distinction between the "high-level" and "low-level" consumer. The new
    consumer API is non blocking and instead of returning a blocking iterator,
    the consumer provides a poll() API that returns a list of records. We think
    this is better compared to the blocking iterators since it effectively
    decouples the threading strategy used for processing messages from the
    consumer. It is worth noting that the consumer is entirely single threaded
    and runs in the user thread. The advantage is that it can be easily
    rewritten in less multi-threading-friendly languages. The consumer batches
    data and multiplexes I/O over TCP connections to each of the brokers it
    communicates with, for high throughput. The consumer also allows long poll
    to reduce the end-to-end message latency for low throughput data.

    The consumer provides a group management facility that supports the concept
    of a group with multiple consumer instances (just like the current
    consumer). This is done through a custom heartbeat and group management
    protocol transparent to the user. At the same time, it allows users the
    option to subscribe to a fixed set of partitions and not use group
    management at all. The offset management strategy defaults to Kafka based
    offset management and the API provides a way for the user to use a
    customized offset store to manage the consumer's offsets.

    A key difference in this consumer also is the fact that it does not depend
    on zookeeper at all.

    More details about the new consumer design are
    here<
    https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
    Please take a look at the new
    API<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    and
    give us any thoughts you may have.

    Thanks,
    Neha
  • Neha Narkhede at Feb 13, 2014 at 9:55 pm
    2. It returns a list of results. But how can you use the list? The only way
    to use the list is to make a map of tp=>offset and then look up results in
    this map (or do a for loop over the list for the partition you want). I
    recommend that if this is an in-memory check we just do one at a time. E.g.
    long committedPosition(
    TopicPosition).

    This was discussed in the previous emails. There is a choice between
    returning a map or a list. Some people found the map to be more usable.

    What if we made it:
        long position(TopicPartition tp)
        void seek(TopicOffsetPosition p)
        long committed(TopicPartition tp)
        void commit(TopicOffsetPosition...);

    This is fine, but TopicOffsetPosition doesn't make sense. Offset and
    Position is confusing. Also both fetch and commit positions are related to
    partitions, not topics. Some more options are TopicPartitionPosition or
    TopicPartitionOffset. And we should use either position everywhere in Kafka
    or offset but having both is confusing.

        void seek(TopicOffsetPosition p)
        long committed(TopicPartition tp)

    Whether these are batched or not really depends on how flexible we want
    these APIs to be. The question is whether we allow a consumer to fetch or
    set the offsets for partitions that it doesn't own or consume. For example,
    if I choose to skip group management and do my own partition assignment but
    choose Kafka based offset management. I could imagine a use case where I
    want to change the partition assignment on the fly, and to do that, I would
    need to fetch the last committed offsets of partitions that I currently
    don't consume.

    If we want to allow this, these APIs would be more performant if batched.
    And would probably look like -
        Map<TopicPartition, Long> positions(TopicPartition... tp)
        void seek(TopicOffsetPosition... p)
        Map<TopicPartition, Long> committed(TopicPartition... tp)
        void commit(TopicOffsetPosition...)

    These are definitely more clunky than the non batched ones though.

    Thanks,
    Neha


    On Thu, Feb 13, 2014 at 1:24 PM, Jay Kreps wrote:

    Hey guys,

    One thing that bugs me is the lack of symmetric for the different position
    calls. The way I see it there are two positions we maintain: the fetch
    position and the last commit position. There are two things you can do to
    these positions: get the current value or change the current value. But the
    names somewhat obscure this:
    Fetch position:
    - No get
    - set by positions(TopicOffsetPosition...)
    Committed position:
    - get by List<TopicOffsetPosition> lastCommittedPosition(
    TopicPartition...)
    - set by commit or commitAsync

    The lastCommittedPosition is particular bothersome because:
    1. The name is weird and long
    2. It returns a list of results. But how can you use the list? The only way
    to use the list is to make a map of tp=>offset and then look up results in
    this map (or do a for loop over the list for the partition you want). I
    recommend that if this is an in-memory check we just do one at a time. E.g.
    long committedPosition(TopicPosition).

    What if we made it:
    long position(TopicPartition tp)
    void seek(TopicOffsetPosition p)
    long committed(TopicPartition tp)
    void commit(TopicOffsetPosition...);

    This still isn't terribly consistent, but I think it is better.

    I would also like to shorten the name TopicOffsetPosition. Offset and
    Position are duplicative of each other. So perhaps we could call it a
    PartitionOffset or a TopicPosition or something like that. In general class
    names that are just a concatenation of the fields (e.g.
    TopicAndPartitionAndOffset) seem kind of lazy to me since the name doesn't
    really describe it just enumerates. But that is more of a nit pick.

    -Jay


    On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede <neha.narkhede@gmail.com
    wrote:
    As mentioned in previous emails, we are also working on a
    re-implementation
    of the consumer. I would like to use this email thread to discuss the
    details of the public API. I would also like us to be picky about this
    public api now so it is as good as possible and we don't need to break it
    in the future.

    The best way to get a feel for the API is actually to take a look at the
    javadoc<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    ,
    the hope is to get the api docs good enough so that it is
    self-explanatory.
    You can also take a look at the configs
    here<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
    Some background info on implementation:

    At a high level the primary difference in this consumer is that it removes
    the distinction between the "high-level" and "low-level" consumer. The new
    consumer API is non blocking and instead of returning a blocking iterator,
    the consumer provides a poll() API that returns a list of records. We think
    this is better compared to the blocking iterators since it effectively
    decouples the threading strategy used for processing messages from the
    consumer. It is worth noting that the consumer is entirely single threaded
    and runs in the user thread. The advantage is that it can be easily
    rewritten in less multi-threading-friendly languages. The consumer batches
    data and multiplexes I/O over TCP connections to each of the brokers it
    communicates with, for high throughput. The consumer also allows long poll
    to reduce the end-to-end message latency for low throughput data.

    The consumer provides a group management facility that supports the concept
    of a group with multiple consumer instances (just like the current
    consumer). This is done through a custom heartbeat and group management
    protocol transparent to the user. At the same time, it allows users the
    option to subscribe to a fixed set of partitions and not use group
    management at all. The offset management strategy defaults to Kafka based
    offset management and the API provides a way for the user to use a
    customized offset store to manage the consumer's offsets.

    A key difference in this consumer also is the fact that it does not depend
    on zookeeper at all.

    More details about the new consumer design are
    here<
    https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
    Please take a look at the new
    API<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    and
    give us any thoughts you may have.

    Thanks,
    Neha
  • Jay Kreps at Feb 14, 2014 at 5:22 am
    Hey Neha,

    I actually wasn't proposing the name TopicOffsetPosition, that was just a
    typo. I meant TopicPartitionOffset, and I was just referencing what was in
    the javadoc. So to restate my proposal without the typo, using just the
    existing classes (that naming is a separate question):
        long position(TopicPartition tp)
        void seek(TopicPartitionOffset p)
        long committed(TopicPartition tp)
        void commit(TopicPartitionOffset...);

    So I may be unclear on committed() (AKA lastCommittedOffset). Is it
    returning the in-memory value from the last commit by this consumer, or is
    it doing a remote fetch, or both? I think you are saying both, i.e. if you
    have committed on a partition it returns you that value but if you haven't
    it does a remote lookup?

    The other argument for making committed batched is that commit() is
    batched, so there is symmetry.

    position() and seek() are always in memory changes (I assume) so there is
    no need to batch them.

    So taking all that into account what if we revise it to
        long position(TopicPartition tp)
        void seek(TopicPartitionOffset p)
        Map<TopicPartition, Long> committed(TopicPartition tp);
        void commit(TopicPartitionOffset...);

    This is not symmetric between position/seek and commit/committed but it is
    convenient. Another option for naming would be position/reposition instead
    of position/seek.

    With respect to the name TopicPartitionOffset, what I was trying to say is
    that I recommend we change that to something shorter. I think TopicPosition
    or ConsumerPosition might be better. Position does not refer to the
    variables in the object, it refers to the meaning of the object--it
    represents a position within a topic. The offset field in that object is
    still called the offset. TopicOffset, PartitionOffset, or ConsumerOffset
    would all be workable too. Basically I am just objecting to concatenating
    three nouns together. :-)

    -Jay




    On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede wrote:

    2. It returns a list of results. But how can you use the list? The only way
    to use the list is to make a map of tp=>offset and then look up results in
    this map (or do a for loop over the list for the partition you want). I
    recommend that if this is an in-memory check we just do one at a time. E.g.
    long committedPosition(
    TopicPosition).

    This was discussed in the previous emails. There is a choice between
    returning a map or a list. Some people found the map to be more usable.

    What if we made it:
    long position(TopicPartition tp)
    void seek(TopicOffsetPosition p)
    long committed(TopicPartition tp)
    void commit(TopicOffsetPosition...);

    This is fine, but TopicOffsetPosition doesn't make sense. Offset and
    Position is confusing. Also both fetch and commit positions are related to
    partitions, not topics. Some more options are TopicPartitionPosition or
    TopicPartitionOffset. And we should use either position everywhere in Kafka
    or offset but having both is confusing.

    void seek(TopicOffsetPosition p)
    long committed(TopicPartition tp)

    Whether these are batched or not really depends on how flexible we want
    these APIs to be. The question is whether we allow a consumer to fetch or
    set the offsets for partitions that it doesn't own or consume. For example,
    if I choose to skip group management and do my own partition assignment but
    choose Kafka based offset management. I could imagine a use case where I
    want to change the partition assignment on the fly, and to do that, I would
    need to fetch the last committed offsets of partitions that I currently
    don't consume.

    If we want to allow this, these APIs would be more performant if batched.
    And would probably look like -
    Map<TopicPartition, Long> positions(TopicPartition... tp)
    void seek(TopicOffsetPosition... p)
    Map<TopicPartition, Long> committed(TopicPartition... tp)
    void commit(TopicOffsetPosition...)

    These are definitely more clunky than the non batched ones though.

    Thanks,
    Neha


    On Thu, Feb 13, 2014 at 1:24 PM, Jay Kreps wrote:

    Hey guys,

    One thing that bugs me is the lack of symmetric for the different position
    calls. The way I see it there are two positions we maintain: the fetch
    position and the last commit position. There are two things you can do to
    these positions: get the current value or change the current value. But the
    names somewhat obscure this:
    Fetch position:
    - No get
    - set by positions(TopicOffsetPosition...)
    Committed position:
    - get by List<TopicOffsetPosition> lastCommittedPosition(
    TopicPartition...)
    - set by commit or commitAsync

    The lastCommittedPosition is particular bothersome because:
    1. The name is weird and long
    2. It returns a list of results. But how can you use the list? The only way
    to use the list is to make a map of tp=>offset and then look up results in
    this map (or do a for loop over the list for the partition you want). I
    recommend that if this is an in-memory check we just do one at a time. E.g.
    long committedPosition(TopicPosition).

    What if we made it:
    long position(TopicPartition tp)
    void seek(TopicOffsetPosition p)
    long committed(TopicPartition tp)
    void commit(TopicOffsetPosition...);

    This still isn't terribly consistent, but I think it is better.

    I would also like to shorten the name TopicOffsetPosition. Offset and
    Position are duplicative of each other. So perhaps we could call it a
    PartitionOffset or a TopicPosition or something like that. In general class
    names that are just a concatenation of the fields (e.g.
    TopicAndPartitionAndOffset) seem kind of lazy to me since the name doesn't
    really describe it just enumerates. But that is more of a nit pick.

    -Jay


    On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede <neha.narkhede@gmail.com
    wrote:
    As mentioned in previous emails, we are also working on a
    re-implementation
    of the consumer. I would like to use this email thread to discuss the
    details of the public API. I would also like us to be picky about this
    public api now so it is as good as possible and we don't need to break
    it
    in the future.

    The best way to get a feel for the API is actually to take a look at
    the
    javadoc<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    ,
    the hope is to get the api docs good enough so that it is
    self-explanatory.
    You can also take a look at the configs
    here<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
    Some background info on implementation:

    At a high level the primary difference in this consumer is that it removes
    the distinction between the "high-level" and "low-level" consumer. The new
    consumer API is non blocking and instead of returning a blocking iterator,
    the consumer provides a poll() API that returns a list of records. We think
    this is better compared to the blocking iterators since it effectively
    decouples the threading strategy used for processing messages from the
    consumer. It is worth noting that the consumer is entirely single threaded
    and runs in the user thread. The advantage is that it can be easily
    rewritten in less multi-threading-friendly languages. The consumer batches
    data and multiplexes I/O over TCP connections to each of the brokers it
    communicates with, for high throughput. The consumer also allows long poll
    to reduce the end-to-end message latency for low throughput data.

    The consumer provides a group management facility that supports the concept
    of a group with multiple consumer instances (just like the current
    consumer). This is done through a custom heartbeat and group management
    protocol transparent to the user. At the same time, it allows users the
    option to subscribe to a fixed set of partitions and not use group
    management at all. The offset management strategy defaults to Kafka
    based
    offset management and the API provides a way for the user to use a
    customized offset store to manage the consumer's offsets.

    A key difference in this consumer also is the fact that it does not depend
    on zookeeper at all.

    More details about the new consumer design are
    here<
    https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
    Please take a look at the new
    API<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    and
    give us any thoughts you may have.

    Thanks,
    Neha
  • Neha Narkhede at Feb 14, 2014 at 6:40 am
    I think you are saying both, i.e. if you
    have committed on a partition it returns you that value but if you haven't
    it does a remote lookup?

    Correct.

    The other argument for making committed batched is that commit() is
    batched, so there is symmetry.

    position() and seek() are always in memory changes (I assume) so there is
    no need to batch them.

    I'm not as sure as you are about that assumption being true. Basically in
    my example above, the batching argument for committed() also applies to
    position() since one purpose of fetching a partition's offset is to use it
    to set the position of the consumer to that offset. Since that might lead
    to a remote OffsetRequest call, I think we probably would be better off
    batching it.

    Another option for naming would be position/reposition instead
    of position/seek.

    I think position/seek is better since it aligns with Java file APIs.

    I also think your suggestion about ConsumerPosition makes sense.

    Thanks,
    Neha
    On Feb 13, 2014 9:22 PM, "Jay Kreps" wrote:

    Hey Neha,

    I actually wasn't proposing the name TopicOffsetPosition, that was just a
    typo. I meant TopicPartitionOffset, and I was just referencing what was in
    the javadoc. So to restate my proposal without the typo, using just the
    existing classes (that naming is a separate question):
    long position(TopicPartition tp)
    void seek(TopicPartitionOffset p)
    long committed(TopicPartition tp)
    void commit(TopicPartitionOffset...);

    So I may be unclear on committed() (AKA lastCommittedOffset). Is it
    returning the in-memory value from the last commit by this consumer, or is
    it doing a remote fetch, or both? I think you are saying both, i.e. if you
    have committed on a partition it returns you that value but if you haven't
    it does a remote lookup?

    The other argument for making committed batched is that commit() is
    batched, so there is symmetry.

    position() and seek() are always in memory changes (I assume) so there is
    no need to batch them.

    So taking all that into account what if we revise it to
    long position(TopicPartition tp)
    void seek(TopicPartitionOffset p)
    Map<TopicPartition, Long> committed(TopicPartition tp);
    void commit(TopicPartitionOffset...);

    This is not symmetric between position/seek and commit/committed but it is
    convenient. Another option for naming would be position/reposition instead
    of position/seek.

    With respect to the name TopicPartitionOffset, what I was trying to say is
    that I recommend we change that to something shorter. I think TopicPosition
    or ConsumerPosition might be better. Position does not refer to the
    variables in the object, it refers to the meaning of the object--it
    represents a position within a topic. The offset field in that object is
    still called the offset. TopicOffset, PartitionOffset, or ConsumerOffset
    would all be workable too. Basically I am just objecting to concatenating
    three nouns together. :-)

    -Jay





    On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede <neha.narkhede@gmail.com
    wrote:
    2. It returns a list of results. But how can you use the list? The only way
    to use the list is to make a map of tp=>offset and then look up results in
    this map (or do a for loop over the list for the partition you want). I
    recommend that if this is an in-memory check we just do one at a time. E.g.
    long committedPosition(
    TopicPosition).

    This was discussed in the previous emails. There is a choice between
    returning a map or a list. Some people found the map to be more usable.

    What if we made it:
    long position(TopicPartition tp)
    void seek(TopicOffsetPosition p)
    long committed(TopicPartition tp)
    void commit(TopicOffsetPosition...);

    This is fine, but TopicOffsetPosition doesn't make sense. Offset and
    Position is confusing. Also both fetch and commit positions are related to
    partitions, not topics. Some more options are TopicPartitionPosition or
    TopicPartitionOffset. And we should use either position everywhere in Kafka
    or offset but having both is confusing.

    void seek(TopicOffsetPosition p)
    long committed(TopicPartition tp)

    Whether these are batched or not really depends on how flexible we want
    these APIs to be. The question is whether we allow a consumer to fetch or
    set the offsets for partitions that it doesn't own or consume. For example,
    if I choose to skip group management and do my own partition assignment but
    choose Kafka based offset management. I could imagine a use case where I
    want to change the partition assignment on the fly, and to do that, I would
    need to fetch the last committed offsets of partitions that I currently
    don't consume.

    If we want to allow this, these APIs would be more performant if batched.
    And would probably look like -
    Map<TopicPartition, Long> positions(TopicPartition... tp)
    void seek(TopicOffsetPosition... p)
    Map<TopicPartition, Long> committed(TopicPartition... tp)
    void commit(TopicOffsetPosition...)

    These are definitely more clunky than the non batched ones though.

    Thanks,
    Neha


    On Thu, Feb 13, 2014 at 1:24 PM, Jay Kreps wrote:

    Hey guys,

    One thing that bugs me is the lack of symmetric for the different position
    calls. The way I see it there are two positions we maintain: the fetch
    position and the last commit position. There are two things you can do
    to
    these positions: get the current value or change the current value. But the
    names somewhat obscure this:
    Fetch position:
    - No get
    - set by positions(TopicOffsetPosition...)
    Committed position:
    - get by List<TopicOffsetPosition> lastCommittedPosition(
    TopicPartition...)
    - set by commit or commitAsync

    The lastCommittedPosition is particular bothersome because:
    1. The name is weird and long
    2. It returns a list of results. But how can you use the list? The only way
    to use the list is to make a map of tp=>offset and then look up results in
    this map (or do a for loop over the list for the partition you want). I
    recommend that if this is an in-memory check we just do one at a time. E.g.
    long committedPosition(TopicPosition).

    What if we made it:
    long position(TopicPartition tp)
    void seek(TopicOffsetPosition p)
    long committed(TopicPartition tp)
    void commit(TopicOffsetPosition...);

    This still isn't terribly consistent, but I think it is better.

    I would also like to shorten the name TopicOffsetPosition. Offset and
    Position are duplicative of each other. So perhaps we could call it a
    PartitionOffset or a TopicPosition or something like that. In general class
    names that are just a concatenation of the fields (e.g.
    TopicAndPartitionAndOffset) seem kind of lazy to me since the name doesn't
    really describe it just enumerates. But that is more of a nit pick.

    -Jay


    On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede <
    neha.narkhede@gmail.com
    wrote:
    As mentioned in previous emails, we are also working on a
    re-implementation
    of the consumer. I would like to use this email thread to discuss the
    details of the public API. I would also like us to be picky about
    this
    public api now so it is as good as possible and we don't need to
    break
    it
    in the future.

    The best way to get a feel for the API is actually to take a look at
    the
    javadoc<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    ,
    the hope is to get the api docs good enough so that it is
    self-explanatory.
    You can also take a look at the configs
    here<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
    Some background info on implementation:

    At a high level the primary difference in this consumer is that it removes
    the distinction between the "high-level" and "low-level" consumer.
    The
    new
    consumer API is non blocking and instead of returning a blocking iterator,
    the consumer provides a poll() API that returns a list of records. We think
    this is better compared to the blocking iterators since it
    effectively
    decouples the threading strategy used for processing messages from
    the
    consumer. It is worth noting that the consumer is entirely single threaded
    and runs in the user thread. The advantage is that it can be easily
    rewritten in less multi-threading-friendly languages. The consumer batches
    data and multiplexes I/O over TCP connections to each of the brokers
    it
    communicates with, for high throughput. The consumer also allows long poll
    to reduce the end-to-end message latency for low throughput data.

    The consumer provides a group management facility that supports the concept
    of a group with multiple consumer instances (just like the current
    consumer). This is done through a custom heartbeat and group
    management
    protocol transparent to the user. At the same time, it allows users
    the
    option to subscribe to a fixed set of partitions and not use group
    management at all. The offset management strategy defaults to Kafka
    based
    offset management and the API provides a way for the user to use a
    customized offset store to manage the consumer's offsets.

    A key difference in this consumer also is the fact that it does not depend
    on zookeeper at all.

    More details about the new consumer design are
    here<
    https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
    Please take a look at the new
    API<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    and
    give us any thoughts you may have.

    Thanks,
    Neha
  • Pradeep Gollakota at Feb 14, 2014 at 12:15 am
    Hi Neha,

        6. It seems like #4 can be avoided by using Map<TopicPartition,
    Long> or Map<TopicPartition, TopicPartitionOffset> as the argument type.
    How? lastCommittedOffsets() is independent of positions(). I'm not sure I
    understood your suggestion.
    I think of subscription as you're subscribing to a Set of TopicPartitions.
    Because the argument to positions() is TopicPartitionOffset ... it's
    conceivable that the method can be called with two offsets for the same
    TopicPartition. One way to handle this, is to accept either the first or
    the last offset for a TopicPartition. However, if the argument type is
    changed to Map<TopicPartition, Long> it precludes the possibility of
    getting duplicate offsets of the same TopicPartition.

        7. To address #3, maybe we can return List<TopicPartitionOffset> that are
    invalid.
    I don't particularly see the advantage of returning a list of invalid
    partitions from position(). It seems a bit awkward to return a list to

    indicate what is obviously a bug. Prefer throwing an error since the user
    should just fix that logic.
    I'm not sure if an Exception is needed or desirable here. I don't see this
    as a catastrophic failure or a non-recoverable failure. Even if we just
    write the bad offsets to a log file and call it a day, I'm ok with that.
    But my main goal is to communicate to the API users somehow that they've
    provided bad offests which are simply being ignored.

    Hi Jay,

    I would also like to shorten the name TopicOffsetPosition. Offset and
    Position are duplicative of each other. So perhaps we could call it a
    PartitionOffset or a TopicPosition or something like that. In general class
    names that are just a concatenation of the fields (e.g.
    TopicAndPartitionAndOffset) seem kind of lazy to me since the name doesn't
    really describe it just enumerates. But that is more of a nit pick.

        1. Did you mean to say TopicPartitionOffset instead of
        TopicOffsetPosition?
        2. +1 on PartitionOffset

    The lastCommittedPosition is particular bothersome because:
    1. The name is weird and long
    2. It returns a list of results. But how can you use the list? The only way
    to use the list is to make a map of tp=>offset and then look up results in
    this map (or do a for loop over the list for the partition you want).
    This is sort of what I was talking about in my previous email. My
    suggestion was to change the return type to Map<TopicPartition, Long>.

    What if we made it:
    long position(TopicPartition tp)
    void seek(TopicOffsetPosition p)
    long committed(TopicPartition tp)
    void commit(TopicOffsetPosition...);

        1. Absolutely love the idea of position(TopicPartition tp).
        2. I think we also need to provide a method for accessing all positions
        positions() which maybe returns a Map<TopicPartition, Long>?
        3. What is the difference between position(TopicPartition tp) and
    committed(TopicPartition
        tp)?
        4. +1 on commit(PartitionOffset...)
        5. +1 on seek(PartitionOffset p)
        6. We should also provide a seek(PartitionOffset... offsets)

    Finally, in all the methods where we're using varargs, we should use an
    appropriate Collection data structure. For example, for the
    subscribe(TopicPartition...
    partitions) method, I think a more accurate API would be
    subscribe(Set<TopicPartition>
    partitions). This allows for the code to be self-documenting.
  • Tom Brown at Feb 14, 2014 at 6:31 am
    Conceptually, do the position methods only apply to topics you've
    subscribed to, or do they apply to all topics in the cluster?

    E.g., could I retrieve or set the committed position of any partition?

    The positive use case for having access to all partition information would
    be to setup an active monitoring system (that can feed the positions to a
    pretty GUI, for instance).

    A downside is that you could have invalid partition offsets committed
    (perhaps being reset to 0 by an overzealous client).

    --Tom

    On Thu, Feb 13, 2014 at 5:15 PM, Pradeep Gollakota wrote:

    Hi Neha,

    6. It seems like #4 can be avoided by using Map<TopicPartition,
    Long> or Map<TopicPartition, TopicPartitionOffset> as the argument type.
    How? lastCommittedOffsets() is independent of positions(). I'm not sure I
    understood your suggestion.
    I think of subscription as you're subscribing to a Set of TopicPartitions.
    Because the argument to positions() is TopicPartitionOffset ... it's
    conceivable that the method can be called with two offsets for the same
    TopicPartition. One way to handle this, is to accept either the first or
    the last offset for a TopicPartition. However, if the argument type is
    changed to Map<TopicPartition, Long> it precludes the possibility of
    getting duplicate offsets of the same TopicPartition.

    7. To address #3, maybe we can return List<TopicPartitionOffset> that
    are
    invalid.
    I don't particularly see the advantage of returning a list of invalid
    partitions from position(). It seems a bit awkward to return a list to

    indicate what is obviously a bug. Prefer throwing an error since the user
    should just fix that logic.
    I'm not sure if an Exception is needed or desirable here. I don't see this
    as a catastrophic failure or a non-recoverable failure. Even if we just
    write the bad offsets to a log file and call it a day, I'm ok with that.
    But my main goal is to communicate to the API users somehow that they've
    provided bad offests which are simply being ignored.

    Hi Jay,

    I would also like to shorten the name TopicOffsetPosition. Offset and
    Position are duplicative of each other. So perhaps we could call it a
    PartitionOffset or a TopicPosition or something like that. In general class
    names that are just a concatenation of the fields (e.g.
    TopicAndPartitionAndOffset) seem kind of lazy to me since the name doesn't
    really describe it just enumerates. But that is more of a nit pick.

    1. Did you mean to say TopicPartitionOffset instead of
    TopicOffsetPosition?
    2. +1 on PartitionOffset

    The lastCommittedPosition is particular bothersome because:
    1. The name is weird and long
    2. It returns a list of results. But how can you use the list? The only way
    to use the list is to make a map of tp=>offset and then look up results in
    this map (or do a for loop over the list for the partition you want).
    This is sort of what I was talking about in my previous email. My
    suggestion was to change the return type to Map<TopicPartition, Long>.

    What if we made it:
    long position(TopicPartition tp)
    void seek(TopicOffsetPosition p)
    long committed(TopicPartition tp)
    void commit(TopicOffsetPosition...);

    1. Absolutely love the idea of position(TopicPartition tp).
    2. I think we also need to provide a method for accessing all positions
    positions() which maybe returns a Map<TopicPartition, Long>?
    3. What is the difference between position(TopicPartition tp) and
    committed(TopicPartition
    tp)?
    4. +1 on commit(PartitionOffset...)
    5. +1 on seek(PartitionOffset p)
    6. We should also provide a seek(PartitionOffset... offsets)

    Finally, in all the methods where we're using varargs, we should use an
    appropriate Collection data structure. For example, for the
    subscribe(TopicPartition...
    partitions) method, I think a more accurate API would be
    subscribe(Set<TopicPartition>
    partitions). This allows for the code to be self-documenting.
  • Jun Rao at Feb 22, 2014 at 12:37 am
    Looks good overall. Some comments below.

    1. The using of ellipsis: This may make passing a list of items from a
    collection to the api a bit harder. Suppose that you have a list of topics
    stored in

    ArrayList<String> topics;

    If you want subscribe to all topics in one call, you will have to do:

    String[] topicArray = new String[topics.size()];
    consumer.subscribe(topics.toArray(topicArray));

    2. It would be good to document that the following apis are mutually
    exclusive. Also, if the partition level subscription is specified, there is
    no group management. Finally, unsubscribe() can only be used to cancel
    subscriptions with the same pattern. For example, you can't unsubscribe at
    the partition level if the subscription is done at the topic level.

    *subscribe*(java.lang.String... topics)
    *subscribe*(java.lang.String topic, int... partitions)

    3.commit(): The following comment in the doc should probably say "commit
    offsets for partitions assigned to this consumer".

      If no partitions are specified, commits offsets for the subscribed list of
    topics and partitions to Kafka.

    4. There is inconsistency in specifying partitions. Sometimes we use
    TopicPartition and some other times we use String and int (see
    examples below).

    void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions)

    public void *subscribe*(java.lang.String topic, int... partitions)

    Thanks,

    Jun

    On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede wrote:

    As mentioned in previous emails, we are also working on a re-implementation
    of the consumer. I would like to use this email thread to discuss the
    details of the public API. I would also like us to be picky about this
    public api now so it is as good as possible and we don't need to break it
    in the future.

    The best way to get a feel for the API is actually to take a look at the
    javadoc<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    ,
    the hope is to get the api docs good enough so that it is self-explanatory.
    You can also take a look at the configs
    here<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
    Some background info on implementation:

    At a high level the primary difference in this consumer is that it removes
    the distinction between the "high-level" and "low-level" consumer. The new
    consumer API is non blocking and instead of returning a blocking iterator,
    the consumer provides a poll() API that returns a list of records. We think
    this is better compared to the blocking iterators since it effectively
    decouples the threading strategy used for processing messages from the
    consumer. It is worth noting that the consumer is entirely single threaded
    and runs in the user thread. The advantage is that it can be easily
    rewritten in less multi-threading-friendly languages. The consumer batches
    data and multiplexes I/O over TCP connections to each of the brokers it
    communicates with, for high throughput. The consumer also allows long poll
    to reduce the end-to-end message latency for low throughput data.

    The consumer provides a group management facility that supports the concept
    of a group with multiple consumer instances (just like the current
    consumer). This is done through a custom heartbeat and group management
    protocol transparent to the user. At the same time, it allows users the
    option to subscribe to a fixed set of partitions and not use group
    management at all. The offset management strategy defaults to Kafka based
    offset management and the API provides a way for the user to use a
    customized offset store to manage the consumer's offsets.

    A key difference in this consumer also is the fact that it does not depend
    on zookeeper at all.

    More details about the new consumer design are
    here<
    https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
    Please take a look at the new
    API<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
    and
    give us any thoughts you may have.

    Thanks,
    Neha
  • Chris Riccomini at Mar 3, 2014 at 6:19 pm
    Hey Guys,

    Sorry for the late follow up. Here are my questions/thoughts on the API:

    1. Why is the config String->Object instead of String->String?

    2. Are these Java docs correct?

       KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
       A consumer is instantiated by providing a set of key-value pairs as
    configuration and a ConsumerRebalanceCallback implementation

    There is no ConsumerRebalanceCallback parameter.

    3. Would like to have a method:

       poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
    TopicPartition... topicAndPartitionsToPoll)

    I see I can effectively do this by just fiddling with subscribe and
    unsubscribe before each poll. Is this a low-overhead operation? Can I just
    unsubscribe from everything after each poll, then re-subscribe to a topic
    the next iteration. I would probably be doing this in a fairly tight loop.

    4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
    are use cases for decoupling "what to do when no offset exists" from "what
    to do when I'm out of range". I might want to start from smallest the
    first time I run, but fail if I ever get offset out of range.

    5. ENABLE_JMX could use Java docs, even though it's fairly
    self-explanatory.

    6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or
    across all topic/partitions is useful. I believe it's per-topic/partition,
    right? That is, setting to 2 megs with two TopicAndPartitions would result
    in 4 megs worth of data coming in per fetch, right?

    7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out?
    Retry, or throw exception?

    8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and
    fetch requests?

    9. What does SESSION_TIMEOUT_MS default to?

    10. Is this consumer thread-safe?

    11. How do you use a different offset management strategy? Your email
    implies that it's pluggable, but I don't see how. "The offset management
    strategy defaults to Kafka based offset management and the API provides a
    way for the user to use a customized offset store to manage the consumer's
    offsets."

    12. If I wish to decouple the consumer from the offset checkpointing, is
    it OK to use Joel's offset management stuff directly, rather than through
    the consumer's commit API?


    Cheers,
    Chris
    On 2/10/14 10:54 AM, "Neha Narkhede" wrote:

    As mentioned in previous emails, we are also working on a
    re-implementation
    of the consumer. I would like to use this email thread to discuss the
    details of the public API. I would also like us to be picky about this
    public api now so it is as good as possible and we don't need to break it
    in the future.

    The best way to get a feel for the API is actually to take a look at the
    javadoc<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/
    doc/kafka/clients/consumer/KafkaConsumer.html>,
    the hope is to get the api docs good enough so that it is
    self-explanatory.
    You can also take a look at the configs
    here<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc
    /kafka/clients/consumer/ConsumerConfig.html>

    Some background info on implementation:

    At a high level the primary difference in this consumer is that it removes
    the distinction between the "high-level" and "low-level" consumer. The new
    consumer API is non blocking and instead of returning a blocking iterator,
    the consumer provides a poll() API that returns a list of records. We
    think
    this is better compared to the blocking iterators since it effectively
    decouples the threading strategy used for processing messages from the
    consumer. It is worth noting that the consumer is entirely single threaded
    and runs in the user thread. The advantage is that it can be easily
    rewritten in less multi-threading-friendly languages. The consumer batches
    data and multiplexes I/O over TCP connections to each of the brokers it
    communicates with, for high throughput. The consumer also allows long poll
    to reduce the end-to-end message latency for low throughput data.

    The consumer provides a group management facility that supports the
    concept
    of a group with multiple consumer instances (just like the current
    consumer). This is done through a custom heartbeat and group management
    protocol transparent to the user. At the same time, it allows users the
    option to subscribe to a fixed set of partitions and not use group
    management at all. The offset management strategy defaults to Kafka based
    offset management and the API provides a way for the user to use a
    customized offset store to manage the consumer's offsets.

    A key difference in this consumer also is the fact that it does not depend
    on zookeeper at all.

    More details about the new consumer design are
    here<https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+
    Rewrite+Design>

    Please take a look at the new
    API<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/
    kafka/clients/consumer/KafkaConsumer.html>and
    give us any thoughts you may have.

    Thanks,
    Neha
  • Chris Riccomini at Mar 3, 2014 at 6:28 pm
    Hey Guys,

    Also, for reference, we'll be looking to implement new Samza consumers
    which have these APIs:

    http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or
    g/apache/samza/system/SystemConsumer.html

    http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or
    g/apache/samza/checkpoint/CheckpointManager.html


    Question (3) below is a result of having Samza's SystemConsumers poll
    allow specific topic/partitions to be specified.

    The split between consumer and checkpoint manager is the reason for
    question (12) below.

    Cheers,
    Chris
    On 3/3/14 10:19 AM, "Chris Riccomini" wrote:

    Hey Guys,

    Sorry for the late follow up. Here are my questions/thoughts on the API:

    1. Why is the config String->Object instead of String->String?

    2. Are these Java docs correct?

    KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
    A consumer is instantiated by providing a set of key-value pairs as
    configuration and a ConsumerRebalanceCallback implementation

    There is no ConsumerRebalanceCallback parameter.

    3. Would like to have a method:

    poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
    TopicPartition... topicAndPartitionsToPoll)

    I see I can effectively do this by just fiddling with subscribe and
    unsubscribe before each poll. Is this a low-overhead operation? Can I just
    unsubscribe from everything after each poll, then re-subscribe to a topic
    the next iteration. I would probably be doing this in a fairly tight loop.

    4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
    are use cases for decoupling "what to do when no offset exists" from "what
    to do when I'm out of range". I might want to start from smallest the
    first time I run, but fail if I ever get offset out of range.

    5. ENABLE_JMX could use Java docs, even though it's fairly
    self-explanatory.

    6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or
    across all topic/partitions is useful. I believe it's per-topic/partition,
    right? That is, setting to 2 megs with two TopicAndPartitions would result
    in 4 megs worth of data coming in per fetch, right?

    7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out?
    Retry, or throw exception?

    8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and
    fetch requests?

    9. What does SESSION_TIMEOUT_MS default to?

    10. Is this consumer thread-safe?

    11. How do you use a different offset management strategy? Your email
    implies that it's pluggable, but I don't see how. "The offset management
    strategy defaults to Kafka based offset management and the API provides a
    way for the user to use a customized offset store to manage the consumer's
    offsets."

    12. If I wish to decouple the consumer from the offset checkpointing, is
    it OK to use Joel's offset management stuff directly, rather than through
    the consumer's commit API?


    Cheers,
    Chris
    On 2/10/14 10:54 AM, "Neha Narkhede" wrote:

    As mentioned in previous emails, we are also working on a
    re-implementation
    of the consumer. I would like to use this email thread to discuss the
    details of the public API. I would also like us to be picky about this
    public api now so it is as good as possible and we don't need to break it
    in the future.

    The best way to get a feel for the API is actually to take a look at the
    javadoc<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc
    /
    doc/kafka/clients/consumer/KafkaConsumer.html>,
    the hope is to get the api docs good enough so that it is
    self-explanatory.
    You can also take a look at the configs
    here<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/do
    c
    /kafka/clients/consumer/ConsumerConfig.html>

    Some background info on implementation:

    At a high level the primary difference in this consumer is that it
    removes
    the distinction between the "high-level" and "low-level" consumer. The
    new
    consumer API is non blocking and instead of returning a blocking
    iterator,
    the consumer provides a poll() API that returns a list of records. We
    think
    this is better compared to the blocking iterators since it effectively
    decouples the threading strategy used for processing messages from the
    consumer. It is worth noting that the consumer is entirely single
    threaded
    and runs in the user thread. The advantage is that it can be easily
    rewritten in less multi-threading-friendly languages. The consumer
    batches
    data and multiplexes I/O over TCP connections to each of the brokers it
    communicates with, for high throughput. The consumer also allows long
    poll
    to reduce the end-to-end message latency for low throughput data.

    The consumer provides a group management facility that supports the
    concept
    of a group with multiple consumer instances (just like the current
    consumer). This is done through a custom heartbeat and group management
    protocol transparent to the user. At the same time, it allows users the
    option to subscribe to a fixed set of partitions and not use group
    management at all. The offset management strategy defaults to Kafka based
    offset management and the API provides a way for the user to use a
    customized offset store to manage the consumer's offsets.

    A key difference in this consumer also is the fact that it does not
    depend
    on zookeeper at all.

    More details about the new consumer design are
    here<https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer
    +
    Rewrite+Design>

    Please take a look at the new
    API<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc
    /
    kafka/clients/consumer/KafkaConsumer.html>and
    give us any thoughts you may have.

    Thanks,
    Neha
  • Neha Narkhede at Mar 25, 2014 at 12:30 am
    Hey Chris,

    Really sorry for the late reply, wonder how this fell through the cracks.
    Anyhow, thanks for the great feedback! Here are my comments -

    1. Why is the config String->Object instead of String->String?

    This is probably more of a feedback about the new config management that
    we adopted in the new clients. I think it is more convenient to write
    configs.put("a", 42);
    instead of
    configs.put("a", Integer.toString(42));

    2. Are these Java docs correct?

       KafkaConsumer(java.util.Map<
    java.lang.String,java.lang.Object> configs)
       A consumer is instantiated by providing a set of key-value pairs as
    configuration and a ConsumerRebalanceCallback implementation

    There is no ConsumerRebalanceCallback parameter.

    Fixed.

    3. Would like to have a method:

       poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
    TopicPartition... topicAndPartitionsToPoll)

    I see I can effectively do this by just fiddling with subscribe and
    unsubscribe before each poll. Is this a low-overhead operation? Can I just
    unsubscribe from everything after each poll, then re-subscribe to a topic
    the next iteration. I would probably be doing this in a fairly tight loop.

    The subscribe and unsubscribe will be very lightweight in-memory operations,
    so it shouldn't be a problem to just use those APIs directly.
    Let me know if you think otherwise.

    4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
    are use cases for decoupling "what to do when no offset exists" from "what
    to do when I'm out of range". I might want to start from smallest the
    first time I run, but fail if I ever get offset out of range.

    How about adding a third option "disable" to "auto.offset.reset"?
    What this says is that never automatically reset the offset, either if one
    is not found or if the offset
    falls out of range. Presumably, you would want to turn this off when you
    want to control the offsets
    yourself and use custom rewind/replay logic to reset the consumer's offset.
    In this case, you would
    want to turn this feature off so Kafka does not accidentally reset the
    offset to something else.

    I'm not so sure when you would want to make the distinction regarding
    startup and offset falling out
    of range. Presumably, if you don't trust Kafka to reset the offset, then
    you can always turn this off
    and use commit/commitAsync and seek() to set the consumer to the right
    offset on startup and every
    time your consumer falls out of range.

    Does that make sense?

    5. ENABLE_JMX could use Java docs, even though it's fairly
    self-explanatory.

    Fixed.

    6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or
    across all topic/partitions is useful. I believe it's per-topic/partition,
    right? That is, setting to 2 megs with two TopicAndPartitions would result
    in 4 megs worth of data coming in per fetch, right?

    Good point, clarified that. Take a look again to see if it makes sense now.

    7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out?
    Retry, or throw exception?

    Throw a TimeoutException. Clarified that in the
    docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html>
    .

    8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and
    fetch requests?

    Applies to all requests. Clarified that in the docs.

    9. What does SESSION_TIMEOUT_MS default to?

    Defaults are largely TODO, but session.timeout.ms currently defaults to
    1000.

    10. Is this consumer thread-safe?

    It should be. Updated the
    docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>to
    clarify that.

    11. How do you use a different offset management strategy? Your email
    implies that it's pluggable, but I don't see how. "The offset management
    strategy defaults to Kafka based offset management and the API provides a
    way for the user to use a customized offset store to manage the consumer's
    offsets."

    12. If I wish to decouple the consumer from the offset checkpointing, is
    it OK to use Joel's offset management stuff directly, rather than through
    the consumer's commit API?

    For #11 and #12, I updated the
    docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>to
    include actual usage examples.
    Could you take a look and see if answers your questions?

    Thanks,
    Neha


    On Mon, Mar 3, 2014 at 10:28 AM, Chris Riccomini wrote:

    Hey Guys,

    Also, for reference, we'll be looking to implement new Samza consumers
    which have these APIs:

    http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or
    g/apache/samza/system/SystemConsumer.html

    http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or
    g/apache/samza/checkpoint/CheckpointManager.html


    Question (3) below is a result of having Samza's SystemConsumers poll
    allow specific topic/partitions to be specified.

    The split between consumer and checkpoint manager is the reason for
    question (12) below.

    Cheers,
    Chris
    On 3/3/14 10:19 AM, "Chris Riccomini" wrote:

    Hey Guys,

    Sorry for the late follow up. Here are my questions/thoughts on the API:

    1. Why is the config String->Object instead of String->String?

    2. Are these Java docs correct?

    KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
    A consumer is instantiated by providing a set of key-value pairs as
    configuration and a ConsumerRebalanceCallback implementation

    There is no ConsumerRebalanceCallback parameter.

    3. Would like to have a method:

    poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
    TopicPartition... topicAndPartitionsToPoll)

    I see I can effectively do this by just fiddling with subscribe and
    unsubscribe before each poll. Is this a low-overhead operation? Can I just
    unsubscribe from everything after each poll, then re-subscribe to a topic
    the next iteration. I would probably be doing this in a fairly tight loop.

    4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
    are use cases for decoupling "what to do when no offset exists" from "what
    to do when I'm out of range". I might want to start from smallest the
    first time I run, but fail if I ever get offset out of range.

    5. ENABLE_JMX could use Java docs, even though it's fairly
    self-explanatory.

    6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or
    across all topic/partitions is useful. I believe it's per-topic/partition,
    right? That is, setting to 2 megs with two TopicAndPartitions would result
    in 4 megs worth of data coming in per fetch, right?

    7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out?
    Retry, or throw exception?

    8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and
    fetch requests?

    9. What does SESSION_TIMEOUT_MS default to?

    10. Is this consumer thread-safe?

    11. How do you use a different offset management strategy? Your email
    implies that it's pluggable, but I don't see how. "The offset management
    strategy defaults to Kafka based offset management and the API provides a
    way for the user to use a customized offset store to manage the consumer's
    offsets."

    12. If I wish to decouple the consumer from the offset checkpointing, is
    it OK to use Joel's offset management stuff directly, rather than through
    the consumer's commit API?


    Cheers,
    Chris
    On 2/10/14 10:54 AM, "Neha Narkhede" wrote:

    As mentioned in previous emails, we are also working on a
    re-implementation
    of the consumer. I would like to use this email thread to discuss the
    details of the public API. I would also like us to be picky about this
    public api now so it is as good as possible and we don't need to break it
    in the future.

    The best way to get a feel for the API is actually to take a look at the
    javadoc<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc
    /
    doc/kafka/clients/consumer/KafkaConsumer.html>,
    the hope is to get the api docs good enough so that it is
    self-explanatory.
    You can also take a look at the configs
    here<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/do
    c
    /kafka/clients/consumer/ConsumerConfig.html>

    Some background info on implementation:

    At a high level the primary difference in this consumer is that it
    removes
    the distinction between the "high-level" and "low-level" consumer. The
    new
    consumer API is non blocking and instead of returning a blocking
    iterator,
    the consumer provides a poll() API that returns a list of records. We
    think
    this is better compared to the blocking iterators since it effectively
    decouples the threading strategy used for processing messages from the
    consumer. It is worth noting that the consumer is entirely single
    threaded
    and runs in the user thread. The advantage is that it can be easily
    rewritten in less multi-threading-friendly languages. The consumer
    batches
    data and multiplexes I/O over TCP connections to each of the brokers it
    communicates with, for high throughput. The consumer also allows long
    poll
    to reduce the end-to-end message latency for low throughput data.

    The consumer provides a group management facility that supports the
    concept
    of a group with multiple consumer instances (just like the current
    consumer). This is done through a custom heartbeat and group management
    protocol transparent to the user. At the same time, it allows users the
    option to subscribe to a fixed set of partitions and not use group
    management at all. The offset management strategy defaults to Kafka based
    offset management and the API provides a way for the user to use a
    customized offset store to manage the consumer's offsets.

    A key difference in this consumer also is the fact that it does not
    depend
    on zookeeper at all.

    More details about the new consumer design are
    here<
    https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer
    +
    Rewrite+Design>

    Please take a look at the new
    API<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc
    /
    kafka/clients/consumer/KafkaConsumer.html>and
    give us any thoughts you may have.

    Thanks,
    Neha
  • Neha Narkhede at Mar 25, 2014 at 12:40 am
    I took some time to write some example code using the new consumer APIs to
    cover a range of use cases. This exercise was very useful (thanks for the
    suggestion, Jay!) since I found several improvements to the APIs to make
    them more usable. Here are some of the
    changes<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/>I
    made -

    1. Added usage examples to the KafkaConsumer
    javadoc<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>.
    I find it useful for the examples to be in the javadoc vs some wiki. Please
    go through these examples and suggest improvements. The goal would be to
    document a limited set of examples that cover every major use case.
    2. All APIs that either accept or return offsets are changed to
    Map<TopicPartition,Long> instead of TopicPartitionOffset... In all the
    examples that I wrote, it was much easier to deal with offsets and pass
    them around in the consumer APIs if they were maps instead of lists
    3. Due to the above change, I had to introduce
    commit()<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html#commit%28%29>and
    commitAsync() APIs explicitly, in addition to
    commit(Map<TopicPartition,Long> offsets) and
    commitAsync(Map<TopicPartition,Long> offsets), since the no-argument case
    would not be covered automatically with Map as the input parameter to the
    commit APIs
    4. Offset rewind logic is funky with group management. I took a stab and it
    and wrote examples to cover the various offset rewind uses cases I could
    think of. I'm not so sure I like it, so I encourage people to take a look
    at the examples and provide feedback. This feedback is very critical in
    finalizing the consumer APIs as we might have to add/change APIs to make
    offset rewind intuitive and easy to use. (Please see the 3rd and 4th
    examples here<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>
    )

    Once I have feedback on the above, I will go ahead and submit a review
    board for the new APIs and javadoc.

    Thanks
    Neha

    On Mon, Mar 24, 2014 at 5:29 PM, Neha Narkhede wrote:

    Hey Chris,

    Really sorry for the late reply, wonder how this fell through the cracks.
    Anyhow, thanks for the great feedback! Here are my comments -


    1. Why is the config String->Object instead of String->String?

    This is probably more of a feedback about the new config management that
    we adopted in the new clients. I think it is more convenient to write
    configs.put("a", 42);
    instead of
    configs.put("a", Integer.toString(42));

    2. Are these Java docs correct?

    KafkaConsumer(java.util.Map<
    java.lang.String,java.lang.Object> configs)
    A consumer is instantiated by providing a set of key-value pairs as
    configuration and a ConsumerRebalanceCallback implementation

    There is no ConsumerRebalanceCallback parameter.

    Fixed.


    3. Would like to have a method:

    poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
    TopicPartition... topicAndPartitionsToPoll)

    I see I can effectively do this by just fiddling with subscribe and
    unsubscribe before each poll. Is this a low-overhead operation? Can I just
    unsubscribe from everything after each poll, then re-subscribe to a topic
    the next iteration. I would probably be doing this in a fairly tight loop.

    The subscribe and unsubscribe will be very lightweight in-memory
    operations,
    so it shouldn't be a problem to just use those APIs directly.
    Let me know if you think otherwise.

    4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
    are use cases for decoupling "what to do when no offset exists" from "what
    to do when I'm out of range". I might want to start from smallest the
    first time I run, but fail if I ever get offset out of range.

    How about adding a third option "disable" to "auto.offset.reset"?
    What this says is that never automatically reset the offset, either if one
    is not found or if the offset
    falls out of range. Presumably, you would want to turn this off when you
    want to control the offsets
    yourself and use custom rewind/replay logic to reset the consumer's
    offset. In this case, you would
    want to turn this feature off so Kafka does not accidentally reset the
    offset to something else.

    I'm not so sure when you would want to make the distinction regarding
    startup and offset falling out
    of range. Presumably, if you don't trust Kafka to reset the offset, then
    you can always turn this off
    and use commit/commitAsync and seek() to set the consumer to the right
    offset on startup and every
    time your consumer falls out of range.

    Does that make sense?

    5. ENABLE_JMX could use Java docs, even though it's fairly
    self-explanatory.

    Fixed.

    6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or
    across all topic/partitions is useful. I believe it's per-topic/partition,
    right? That is, setting to 2 megs with two TopicAndPartitions would result
    in 4 megs worth of data coming in per fetch, right?

    Good point, clarified that. Take a look again to see if it makes sense now.

    7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out?
    Retry, or throw exception?

    Throw a TimeoutException. Clarified that in the docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html>
    .


    8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and
    fetch requests?

    Applies to all requests. Clarified that in the docs.

    9. What does SESSION_TIMEOUT_MS default to?

    Defaults are largely TODO, but session.timeout.ms currently defaults to
    1000.

    10. Is this consumer thread-safe?

    It should be. Updated the docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>to clarify that.

    11. How do you use a different offset management strategy? Your email
    implies that it's pluggable, but I don't see how. "The offset management
    strategy defaults to Kafka based offset management and the API provides a
    way for the user to use a customized offset store to manage the consumer's
    offsets."

    12. If I wish to decouple the consumer from the offset checkpointing, is
    it OK to use Joel's offset management stuff directly, rather than through
    the consumer's commit API?

    For #11 and #12, I updated the docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>to include actual usage examples.
    Could you take a look and see if answers your questions?

    Thanks,
    Neha


    On Mon, Mar 3, 2014 at 10:28 AM, Chris Riccomini wrote:

    Hey Guys,

    Also, for reference, we'll be looking to implement new Samza consumers
    which have these APIs:


    http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or
    g/apache/samza/system/SystemConsumer.html<http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/SystemConsumer.html>


    http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or
    g/apache/samza/checkpoint/CheckpointManager.html<http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/checkpoint/CheckpointManager.html>


    Question (3) below is a result of having Samza's SystemConsumers poll
    allow specific topic/partitions to be specified.

    The split between consumer and checkpoint manager is the reason for
    question (12) below.

    Cheers,
    Chris
    On 3/3/14 10:19 AM, "Chris Riccomini" wrote:

    Hey Guys,

    Sorry for the late follow up. Here are my questions/thoughts on the API:

    1. Why is the config String->Object instead of String->String?

    2. Are these Java docs correct?

    KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
    A consumer is instantiated by providing a set of key-value pairs as
    configuration and a ConsumerRebalanceCallback implementation

    There is no ConsumerRebalanceCallback parameter.

    3. Would like to have a method:

    poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
    TopicPartition... topicAndPartitionsToPoll)

    I see I can effectively do this by just fiddling with subscribe and
    unsubscribe before each poll. Is this a low-overhead operation? Can I just
    unsubscribe from everything after each poll, then re-subscribe to a topic
    the next iteration. I would probably be doing this in a fairly tight loop.
    4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
    are use cases for decoupling "what to do when no offset exists" from "what
    to do when I'm out of range". I might want to start from smallest the
    first time I run, but fail if I ever get offset out of range.

    5. ENABLE_JMX could use Java docs, even though it's fairly
    self-explanatory.

    6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or
    across all topic/partitions is useful. I believe it's
    per-topic/partition,
    right? That is, setting to 2 megs with two TopicAndPartitions would result
    in 4 megs worth of data coming in per fetch, right?

    7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out?
    Retry, or throw exception?

    8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and
    fetch requests?

    9. What does SESSION_TIMEOUT_MS default to?

    10. Is this consumer thread-safe?

    11. How do you use a different offset management strategy? Your email
    implies that it's pluggable, but I don't see how. "The offset management
    strategy defaults to Kafka based offset management and the API provides a
    way for the user to use a customized offset store to manage the
    consumer's
    offsets."

    12. If I wish to decouple the consumer from the offset checkpointing, is
    it OK to use Joel's offset management stuff directly, rather than through
    the consumer's commit API?


    Cheers,
    Chris
    On 2/10/14 10:54 AM, "Neha Narkhede" wrote:

    As mentioned in previous emails, we are also working on a
    re-implementation
    of the consumer. I would like to use this email thread to discuss the
    details of the public API. I would also like us to be picky about this
    public api now so it is as good as possible and we don't need to break
    it
    in the future.

    The best way to get a feel for the API is actually to take a look at the
    javadoc<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc
    /
    doc/kafka/clients/consumer/KafkaConsumer.html>,
    the hope is to get the api docs good enough so that it is
    self-explanatory.
    You can also take a look at the configs
    here<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/do
    c
    /kafka/clients/consumer/ConsumerConfig.html>

    Some background info on implementation:

    At a high level the primary difference in this consumer is that it
    removes
    the distinction between the "high-level" and "low-level" consumer. The
    new
    consumer API is non blocking and instead of returning a blocking
    iterator,
    the consumer provides a poll() API that returns a list of records. We
    think
    this is better compared to the blocking iterators since it effectively
    decouples the threading strategy used for processing messages from the
    consumer. It is worth noting that the consumer is entirely single
    threaded
    and runs in the user thread. The advantage is that it can be easily
    rewritten in less multi-threading-friendly languages. The consumer
    batches
    data and multiplexes I/O over TCP connections to each of the brokers it
    communicates with, for high throughput. The consumer also allows long
    poll
    to reduce the end-to-end message latency for low throughput data.

    The consumer provides a group management facility that supports the
    concept
    of a group with multiple consumer instances (just like the current
    consumer). This is done through a custom heartbeat and group management
    protocol transparent to the user. At the same time, it allows users the
    option to subscribe to a fixed set of partitions and not use group
    management at all. The offset management strategy defaults to Kafka
    based
    offset management and the API provides a way for the user to use a
    customized offset store to manage the consumer's offsets.

    A key difference in this consumer also is the fact that it does not
    depend
    on zookeeper at all.

    More details about the new consumer design are
    here<
    https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer
    +
    Rewrite+Design>

    Please take a look at the new
    API<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc
    /
    kafka/clients/consumer/KafkaConsumer.html>and
    give us any thoughts you may have.

    Thanks,
    Neha
  • Neha Narkhede at Mar 27, 2014 at 3:29 pm
    If people don't have any more thoughts on this, I will go ahead and submit
    a reviewboard to https://issues.apache.org/jira/browse/KAFKA-1328.

    Thanks,
    Neha

    On Mon, Mar 24, 2014 at 5:39 PM, Neha Narkhede wrote:

    I took some time to write some example code using the new consumer APIs to
    cover a range of use cases. This exercise was very useful (thanks for the
    suggestion, Jay!) since I found several improvements to the APIs to make
    them more usable. Here are some of the changes<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/>I made -

    1. Added usage examples to the KafkaConsumer javadoc<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>.
    I find it useful for the examples to be in the javadoc vs some wiki. Please
    go through these examples and suggest improvements. The goal would be to
    document a limited set of examples that cover every major use case.
    2. All APIs that either accept or return offsets are changed to
    Map<TopicPartition,Long> instead of TopicPartitionOffset... In all the
    examples that I wrote, it was much easier to deal with offsets and pass
    them around in the consumer APIs if they were maps instead of lists
    3. Due to the above change, I had to introduce commit()<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html#commit%28%29>and commitAsync() APIs explicitly, in addition to
    commit(Map<TopicPartition,Long> offsets) and
    commitAsync(Map<TopicPartition,Long> offsets), since the no-argument case
    would not be covered automatically with Map as the input parameter to the
    commit APIs
    4. Offset rewind logic is funky with group management. I took a stab and
    it and wrote examples to cover the various offset rewind uses cases I could
    think of. I'm not so sure I like it, so I encourage people to take a look
    at the examples and provide feedback. This feedback is very critical in
    finalizing the consumer APIs as we might have to add/change APIs to make
    offset rewind intuitive and easy to use. (Please see the 3rd and 4th
    examples here<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>
    )

    Once I have feedback on the above, I will go ahead and submit a review
    board for the new APIs and javadoc.

    Thanks
    Neha

    On Mon, Mar 24, 2014 at 5:29 PM, Neha Narkhede wrote:

    Hey Chris,

    Really sorry for the late reply, wonder how this fell through the cracks.
    Anyhow, thanks for the great feedback! Here are my comments -


    1. Why is the config String->Object instead of String->String?

    This is probably more of a feedback about the new config management that
    we adopted in the new clients. I think it is more convenient to write
    configs.put("a", 42);
    instead of
    configs.put("a", Integer.toString(42));

    2. Are these Java docs correct?

    KafkaConsumer(java.util.Map<
    java.lang.String,java.lang.Object> configs)
    A consumer is instantiated by providing a set of key-value pairs as
    configuration and a ConsumerRebalanceCallback implementation

    There is no ConsumerRebalanceCallback parameter.

    Fixed.


    3. Would like to have a method:

    poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
    TopicPartition... topicAndPartitionsToPoll)

    I see I can effectively do this by just fiddling with subscribe and
    unsubscribe before each poll. Is this a low-overhead operation? Can I just
    unsubscribe from everything after each poll, then re-subscribe to a topic
    the next iteration. I would probably be doing this in a fairly tight loop.

    The subscribe and unsubscribe will be very lightweight in-memory
    operations,
    so it shouldn't be a problem to just use those APIs directly.
    Let me know if you think otherwise.

    4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
    are use cases for decoupling "what to do when no offset exists" from "what
    to do when I'm out of range". I might want to start from smallest the
    first time I run, but fail if I ever get offset out of range.

    How about adding a third option "disable" to "auto.offset.reset"?
    What this says is that never automatically reset the offset, either if
    one is not found or if the offset
    falls out of range. Presumably, you would want to turn this off when you
    want to control the offsets
    yourself and use custom rewind/replay logic to reset the consumer's
    offset. In this case, you would
    want to turn this feature off so Kafka does not accidentally reset the
    offset to something else.

    I'm not so sure when you would want to make the distinction regarding
    startup and offset falling out
    of range. Presumably, if you don't trust Kafka to reset the offset, then
    you can always turn this off
    and use commit/commitAsync and seek() to set the consumer to the right
    offset on startup and every
    time your consumer falls out of range.

    Does that make sense?

    5. ENABLE_JMX could use Java docs, even though it's fairly
    self-explanatory.

    Fixed.

    6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or
    across all topic/partitions is useful. I believe it's per-topic/partition,
    right? That is, setting to 2 megs with two TopicAndPartitions would result
    in 4 megs worth of data coming in per fetch, right?

    Good point, clarified that. Take a look again to see if it makes sense
    now.

    7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out?
    Retry, or throw exception?

    Throw a TimeoutException. Clarified that in the docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html>
    .


    8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and
    fetch requests?

    Applies to all requests. Clarified that in the docs.

    9. What does SESSION_TIMEOUT_MS default to?

    Defaults are largely TODO, but session.timeout.ms currently defaults to
    1000.

    10. Is this consumer thread-safe?

    It should be. Updated the docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>to clarify that.

    11. How do you use a different offset management strategy? Your email
    implies that it's pluggable, but I don't see how. "The offset management
    strategy defaults to Kafka based offset management and the API provides a
    way for the user to use a customized offset store to manage the consumer
    's
    offsets."

    12. If I wish to decouple the consumer from the offset checkpointing, is
    it OK to use Joel's offset management stuff directly, rather than through
    the consumer's commit API?

    For #11 and #12, I updated the docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>to include actual usage examples.
    Could you take a look and see if answers your questions?

    Thanks,
    Neha



    On Mon, Mar 3, 2014 at 10:28 AM, Chris Riccomini <criccomini@linkedin.com
    wrote:
    Hey Guys,

    Also, for reference, we'll be looking to implement new Samza consumers
    which have these APIs:


    http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or
    g/apache/samza/system/SystemConsumer.html<http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/SystemConsumer.html>


    http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or
    g/apache/samza/checkpoint/CheckpointManager.html<http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/checkpoint/CheckpointManager.html>


    Question (3) below is a result of having Samza's SystemConsumers poll
    allow specific topic/partitions to be specified.

    The split between consumer and checkpoint manager is the reason for
    question (12) below.

    Cheers,
    Chris
    On 3/3/14 10:19 AM, "Chris Riccomini" wrote:

    Hey Guys,

    Sorry for the late follow up. Here are my questions/thoughts on the API:

    1. Why is the config String->Object instead of String->String?

    2. Are these Java docs correct?

    KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
    A consumer is instantiated by providing a set of key-value pairs as
    configuration and a ConsumerRebalanceCallback implementation

    There is no ConsumerRebalanceCallback parameter.

    3. Would like to have a method:

    poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
    TopicPartition... topicAndPartitionsToPoll)

    I see I can effectively do this by just fiddling with subscribe and
    unsubscribe before each poll. Is this a low-overhead operation? Can I just
    unsubscribe from everything after each poll, then re-subscribe to a topic
    the next iteration. I would probably be doing this in a fairly tight loop.
    4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
    are use cases for decoupling "what to do when no offset exists" from "what
    to do when I'm out of range". I might want to start from smallest the
    first time I run, but fail if I ever get offset out of range.

    5. ENABLE_JMX could use Java docs, even though it's fairly
    self-explanatory.

    6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or
    across all topic/partitions is useful. I believe it's
    per-topic/partition,
    right? That is, setting to 2 megs with two TopicAndPartitions would result
    in 4 megs worth of data coming in per fetch, right?

    7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out?
    Retry, or throw exception?

    8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and
    fetch requests?

    9. What does SESSION_TIMEOUT_MS default to?

    10. Is this consumer thread-safe?

    11. How do you use a different offset management strategy? Your email
    implies that it's pluggable, but I don't see how. "The offset management
    strategy defaults to Kafka based offset management and the API provides a
    way for the user to use a customized offset store to manage the
    consumer's
    offsets."

    12. If I wish to decouple the consumer from the offset checkpointing, is
    it OK to use Joel's offset management stuff directly, rather than through
    the consumer's commit API?


    Cheers,
    Chris
    On 2/10/14 10:54 AM, "Neha Narkhede" wrote:

    As mentioned in previous emails, we are also working on a
    re-implementation
    of the consumer. I would like to use this email thread to discuss the
    details of the public API. I would also like us to be picky about this
    public api now so it is as good as possible and we don't need to break
    it
    in the future.

    The best way to get a feel for the API is actually to take a look at
    the
    javadoc<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc
    /
    doc/kafka/clients/consumer/KafkaConsumer.html>,
    the hope is to get the api docs good enough so that it is
    self-explanatory.
    You can also take a look at the configs
    here<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/do
    c
    /kafka/clients/consumer/ConsumerConfig.html>

    Some background info on implementation:

    At a high level the primary difference in this consumer is that it
    removes
    the distinction between the "high-level" and "low-level" consumer. The
    new
    consumer API is non blocking and instead of returning a blocking
    iterator,
    the consumer provides a poll() API that returns a list of records. We
    think
    this is better compared to the blocking iterators since it effectively
    decouples the threading strategy used for processing messages from the
    consumer. It is worth noting that the consumer is entirely single
    threaded
    and runs in the user thread. The advantage is that it can be easily
    rewritten in less multi-threading-friendly languages. The consumer
    batches
    data and multiplexes I/O over TCP connections to each of the brokers it
    communicates with, for high throughput. The consumer also allows long
    poll
    to reduce the end-to-end message latency for low throughput data.

    The consumer provides a group management facility that supports the
    concept
    of a group with multiple consumer instances (just like the current
    consumer). This is done through a custom heartbeat and group management
    protocol transparent to the user. At the same time, it allows users the
    option to subscribe to a fixed set of partitions and not use group
    management at all. The offset management strategy defaults to Kafka
    based
    offset management and the API provides a way for the user to use a
    customized offset store to manage the consumer's offsets.

    A key difference in this consumer also is the fact that it does not
    depend
    on zookeeper at all.

    More details about the new consumer design are
    here<
    https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer
    +
    Rewrite+Design>

    Please take a look at the new
    API<
    http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc
    /
    kafka/clients/consumer/KafkaConsumer.html>and
    give us any thoughts you may have.

    Thanks,
    Neha

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupdev @
categorieskafka
postedFeb 10, '14 at 6:55p
activeMar 27, '14 at 3:29p
posts22
users8
websitekafka.apache.org
irc#kafka

People

Translate

site design / logo © 2021 Grokbase