Grokbase Groups Kafka users May 2014
FAQ
Greetings,

I'm looking for some feedback with using advertised.host.name and
advertised.port on kafka 0.8.1.1 through a load balancer. The brokers are
fronted with haproxy to support our cluster mirroring configuration. The
setup has been working as expected, where producers, consumers, and broker
connections go through haproxy. I am however sometimes getting errors when
attempting to create a new topic:

2014-05-08 19:00:49,757 - WARN
  [Controller-0-to-broker-0-send-thread:Logging$class@89] -
[Controller-0-to-broker-0-send-thread], Controller 0 fails to send a
request to broker id:0,host:localhost,port:13000
java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.
at kafka.utils.Utils$.read(Utils.scala:376)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
2014-05-08 19:00:49,769 - ERROR
[Controller-0-to-broker-0-send-thread:Logging$class@103] -
[Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send
UpdateMetadata request with correlation id 7 to broker
id:0,host:localhost,port:13000. Reconnecting to broker.
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
at
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
2014-05-08 19:00:49,770 - INFO
  [Controller-0-to-broker-0-send-thread:Logging$class@68] -
[Controller-0-to-broker-0-send-thread], Controller 0 connected to
id:0,host:localhost,port:13000 for sending state change requests


When receiving this error, the new topic is registered in zookeeper, but
not written to disk by the broker. The topic however will be written to
disk the next time the kafka broker is restarted. I did not experience
this behavior in other clusters that are not fronted by a load balancer. I
also do not get this error when kafka is initially started.

To help simplify troubleshooting, I setup a single host with kafka,
zookeeper, and haproxy running on it with these relevant configurations:

Kafka:
advertised.host.name = localhost
advertised.port = 13000

Zookeeper:
port = default

Haproxy:
listen kafka_13000 0.0.0.0:13000
mode tcp
option tcpka
timeout client 5m
timeout server 5m
timeout connect 5m
server h-kafka01-1b localhost:9092

Here are the network sockets Kafka creates on start-up:

[root@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i kafka
java 25532 kafka 18u IPv6 14717680 0t0 TCP *:44398
(LISTEN)
java 25532 kafka 23u IPv6 14717684 0t0 TCP
localhost.localdomain:58093->localhost.localdomain:2181 (ESTABLISHED)
java 25532 kafka 38u IPv6 14717692 0t0 TCP *:9092 (LISTEN)
java 25532 kafka 39u IPv6 14717694 0t0 TCP
localhost.localdomain:45037->localhost.localdomain:13000 (ESTABLISHED)
java 25532 kafka 40u IPv6 14717698 0t0 TCP
localhost.localdomain:9092->localhost.localdomain:46448 (ESTABLISHED)


After the 5m timeout configured in haproxy is surpassed, the connection
through port 13000 is closed (from kafka.log):

2014-05-08 19:05:40,904 - INFO [kafka-processor-9092-0:Logging$class@68] -
Closing socket connection to /127.0.0.1.


Looking again at the network sockets, the controller to broker connection
is in a CLOSE_WAIT state:

[root@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i kafka
java 25532 kafka 18u IPv6 14717680 0t0 TCP *:44398
(LISTEN)
java 25532 kafka 23u IPv6 14717684 0t0 TCP
localhost.localdomain:58093->localhost.localdomain:2181 (ESTABLISHED)
java 25532 kafka 38u IPv6 14717692 0t0 TCP *:9092 (LISTEN)
java 25532 kafka 39u IPv6 14717694 0t0 TCP
localhost.localdomain:45037->localhost.localdomain:13000 (CLOSE_WAIT)


This is when attemping to create a topic will error with:
java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.

The linux kernel will remove the socket in a CLOSE_WAIT state after the tcp
keepalive expires, which defaults to 2 hours:

[root@dp-robin01-dev.sea1.office.priv kafka]# ss -o | grep 13000
CLOSE-WAIT 1 0 ::ffff:127.0.0.1:45040
::ffff:127.0.0.1:13000 timer:(keepalive,46sec,0)


List of kafka sockets after the controller to broker connection has been
completely removed:

[root@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i kafka
java 25532 kafka 18u IPv6 14717680 0t0 TCP *:44398
(LISTEN)
java 25532 kafka 23u IPv6 14717684 0t0 TCP
localhost.localdomain:58093->localhost.localdomain:2181 (ESTABLISHED)
java 25532 kafka 38u IPv6 14717692 0t0 TCP *:9092 (LISTEN)


Now when attempting to create a new topic, Kafka detects that the
controller to broker connection is down, reconnects successfully, and is
able to write topic to disk:

2014-05-08 21:02:47,685 - INFO
  [ZkClient-EventThread-12-localhost:2181:Logging$class@68] - [Partition
state machine on Controller 0]: Invoking state change to OnlinePartition
for partitions
[Test2,1],[Test2,14],[Test2,3],[Test2,12],[Test2,0],[Test2,13],[Test2,4],[Test2,6],[Test2,9],[Test2,15],[Test2,2],[Test2,7],[Test2,11],[Test2,5],[Test2,8],[Test2,10]
2014-05-08 21:02:47,796 - ERROR
[Controller-0-to-broker-0-send-thread:Logging$class@103] -
[Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send
LeaderAndIsr request with correlation id 11 to broker
id:0,host:localhost,port:13000. Reconnecting to broker.
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
at sun.nio.ch.IOUtil.write(IOUtil.java:148)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:524)
at java.nio.channels.SocketChannel.write(SocketChannel.java:493)
at
kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
at
kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
at kafka.network.BlockingChannel.send(BlockingChannel.scala:92)
at
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
2014-05-08 21:02:47,802 - INFO
  [Controller-0-to-broker-0-send-thread:Logging$class@68] -
[Controller-0-to-broker-0-send-thread], Controller 0 connected to
id:0,host:localhost,port:13000 for sending state change requests


It seems that the controller isn't able to properly resolve a connection in
a CLOSE_WAIT state. The exceptions thrown is different from when the
socket is in a CLOSE_WAIT vs not existing at all.

I can somewhat work around this issue by lowering the kernel tcp keepalive
settings and increasing my haproxy timeouts, but thats not very desirable
and wouldn't work 100% of the time. I've looked through the broker
configuration documentation, and didn't get any meaningful results changing
controller.socket.timeout.ms.

Any feedback / suggestions would be greatly appreciated.

Thank you,
Robin

Search Discussions

  • Robin Yamaguchi at May 15, 2014 at 12:02 am
    It seems like this mailing list wasn't updating through the web archives
    for a few days last week, so I wanted to send this out again in case it
    wasn't seen. My apologies for the repost.

    In further troubleshooting, I've also observed if a broker is shut down
    while a connection in is CLOSE_WAIT, this error is generated on the broker
    that is still up:

    2014-05-13 20:57:35,794 - INFO
      [ZkClient-EventThread-12-localhost:2181:Logging$class@68] - [Controller
    0]: New leader and ISR for partition [Test3,10] is
    {"leader":0,"leader_epoch":2,"isr":[0]}
    2014-05-13 20:57:35,796 - WARN
      [Controller-0-to-broker-0-send-thread:Logging$class@89] -
    [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a
    request to broker id:0,host:localhost,port:13000
    java.io.EOFException: Received -1 when reading from channel, socket has
    likely been closed.
    at kafka.utils.Utils$.read(Utils.scala:376)
    at
    kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at
    kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-13 20:57:35,799 - ERROR
    [Controller-0-to-broker-0-send-thread:Logging$class@103] -
    [Controller-0-to-broker-0-send-thread], Controller 0 epoch 4 failed to send
    UpdateMetadata request with correlation id 9 to broker
    id:0,host:localhost,port:13000. Reconnecting to broker.
    java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
    at
    kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-13 20:57:35,800 - INFO
      [Controller-0-to-broker-0-send-thread:Logging$class@68] -
    [Controller-0-to-broker-0-send-thread], Controller 0 connected to
    id:0,host:localhost,port:13000 for sending state change requests


    This WARN is logged for every partition:

    2014-05-13 20:57:35,806 - WARN
      [ZkClient-EventThread-12-localhost:2181:Logging$class@83] - [Channel
    manager on controller 0]: Not sending request Name: StopReplicaRequest;
    Version: 0; CorrelationId: 9; ClientId: ; DeletePartitions: false;
    ControllerId: 0; ControllerEpoch: 4; Partitions: [Test2,8] to broker 1,
    since it is offline.


    This ERROR is then logged for every partition continuously:

    2014-05-13 20:57:45,839 - INFO [kafka-scheduler-3:Logging$class@68] -
    Partition [Test1,6] on broker 0: Shrinking ISR for partition [Test1,6] from
    0,1 to 0
    2014-05-13 20:57:45,841 - ERROR [kafka-scheduler-3:Logging$class@97] -
    Conditional update of path /brokers/topics/Test1/partitions/6/state with
    data
    {"controller_epoch":4,"leader":0,"version":1,"leader_epoch":3,"isr":[0]}
    and expected version 6 failed due to
    org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
    BadVersion for /brokers/topics/Test1/partitions/6/state
    2014-05-13 20:57:45,841 - INFO [kafka-scheduler-3:Logging$class@68] -
    Partition [Test1,6] on broker 0: Cached zkVersion [6] not equal to that in
    zookeeper, skip updating ISR



    On Thu, May 8, 2014 at 2:19 PM, Robin Yamaguchi wrote:

    Greetings,

    I'm looking for some feedback with using advertised.host.name and
    advertised.port on kafka 0.8.1.1 through a load balancer. The brokers are
    fronted with haproxy to support our cluster mirroring configuration. The
    setup has been working as expected, where producers, consumers, and broker
    connections go through haproxy. I am however sometimes getting errors when
    attempting to create a new topic:

    2014-05-08 19:00:49,757 - WARN
    [Controller-0-to-broker-0-send-thread:Logging$class@89] -
    [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a
    request to broker id:0,host:localhost,port:13000
    java.io.EOFException: Received -1 when reading from channel, socket has
    likely been closed.
    at kafka.utils.Utils$.read(Utils.scala:376)
    at
    kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at
    kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-08 19:00:49,769 - ERROR
    [Controller-0-to-broker-0-send-thread:Logging$class@103] -
    [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send
    UpdateMetadata request with correlation id 7 to broker
    id:0,host:localhost,port:13000. Reconnecting to broker.
    java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
    at
    kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-08 19:00:49,770 - INFO
    [Controller-0-to-broker-0-send-thread:Logging$class@68] -
    [Controller-0-to-broker-0-send-thread], Controller 0 connected to
    id:0,host:localhost,port:13000 for sending state change requests


    When receiving this error, the new topic is registered in zookeeper, but
    not written to disk by the broker. The topic however will be written to
    disk the next time the kafka broker is restarted. I did not experience
    this behavior in other clusters that are not fronted by a load balancer. I
    also do not get this error when kafka is initially started.

    To help simplify troubleshooting, I setup a single host with kafka,
    zookeeper, and haproxy running on it with these relevant configurations:

    Kafka:
    advertised.host.name = localhost
    advertised.port = 13000

    Zookeeper:
    port = default

    Haproxy:
    listen kafka_13000 0.0.0.0:13000
    mode tcp
    option tcpka
    timeout client 5m
    timeout server 5m
    timeout connect 5m
    server h-kafka01-1b localhost:9092

    Here are the network sockets Kafka creates on start-up:

    [root@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i kafka
    java 25532 kafka 18u IPv6 14717680 0t0 TCP *:44398
    (LISTEN)
    java 25532 kafka 23u IPv6 14717684 0t0 TCP
    localhost.localdomain:58093->localhost.localdomain:2181 (ESTABLISHED)
    java 25532 kafka 38u IPv6 14717692 0t0 TCP *:9092
    (LISTEN)
    java 25532 kafka 39u IPv6 14717694 0t0 TCP
    localhost.localdomain:45037->localhost.localdomain:13000 (ESTABLISHED)
    java 25532 kafka 40u IPv6 14717698 0t0 TCP
    localhost.localdomain:9092->localhost.localdomain:46448 (ESTABLISHED)


    After the 5m timeout configured in haproxy is surpassed, the connection
    through port 13000 is closed (from kafka.log):

    2014-05-08 19:05:40,904 - INFO [kafka-processor-9092-0:Logging$class@68]
    - Closing socket connection to /127.0.0.1.


    Looking again at the network sockets, the controller to broker connection
    is in a CLOSE_WAIT state:

    [root@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i kafka
    java 25532 kafka 18u IPv6 14717680 0t0 TCP *:44398
    (LISTEN)
    java 25532 kafka 23u IPv6 14717684 0t0 TCP
    localhost.localdomain:58093->localhost.localdomain:2181 (ESTABLISHED)
    java 25532 kafka 38u IPv6 14717692 0t0 TCP *:9092
    (LISTEN)
    java 25532 kafka 39u IPv6 14717694 0t0 TCP
    localhost.localdomain:45037->localhost.localdomain:13000 (CLOSE_WAIT)


    This is when attemping to create a topic will error with:
    java.io.EOFException: Received -1 when reading from channel, socket has
    likely been closed.

    The linux kernel will remove the socket in a CLOSE_WAIT state after the
    tcp keepalive expires, which defaults to 2 hours:

    [root@dp-robin01-dev.sea1.office.priv kafka]# ss -o | grep 13000
    CLOSE-WAIT 1 0 ::ffff:127.0.0.1:45040 ::ffff:
    127.0.0.1:13000 timer:(keepalive,46sec,0)


    List of kafka sockets after the controller to broker connection has been
    completely removed:

    [root@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i kafka
    java 25532 kafka 18u IPv6 14717680 0t0 TCP *:44398
    (LISTEN)
    java 25532 kafka 23u IPv6 14717684 0t0 TCP
    localhost.localdomain:58093->localhost.localdomain:2181 (ESTABLISHED)
    java 25532 kafka 38u IPv6 14717692 0t0 TCP *:9092
    (LISTEN)


    Now when attempting to create a new topic, Kafka detects that the
    controller to broker connection is down, reconnects successfully, and is
    able to write topic to disk:

    2014-05-08 21:02:47,685 - INFO
    [ZkClient-EventThread-12-localhost:2181:Logging$class@68] - [Partition
    state machine on Controller 0]: Invoking state change to OnlinePartition
    for partitions
    [Test2,1],[Test2,14],[Test2,3],[Test2,12],[Test2,0],[Test2,13],[Test2,4],[Test2,6],[Test2,9],[Test2,15],[Test2,2],[Test2,7],[Test2,11],[Test2,5],[Test2,8],[Test2,10]
    2014-05-08 21:02:47,796 - ERROR
    [Controller-0-to-broker-0-send-thread:Logging$class@103] -
    [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send
    LeaderAndIsr request with correlation id 11 to broker
    id:0,host:localhost,port:13000. Reconnecting to broker.
    java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
    at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
    at sun.nio.ch.IOUtil.write(IOUtil.java:148)
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:524)
    at java.nio.channels.SocketChannel.write(SocketChannel.java:493)
    at
    kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
    at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
    at
    kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:92)
    at
    kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-08 21:02:47,802 - INFO
    [Controller-0-to-broker-0-send-thread:Logging$class@68] -
    [Controller-0-to-broker-0-send-thread], Controller 0 connected to
    id:0,host:localhost,port:13000 for sending state change requests


    It seems that the controller isn't able to properly resolve a connection
    in a CLOSE_WAIT state. The exceptions thrown is different from when the
    socket is in a CLOSE_WAIT vs not existing at all.

    I can somewhat work around this issue by lowering the kernel tcp keepalive
    settings and increasing my haproxy timeouts, but thats not very desirable
    and wouldn't work 100% of the time. I've looked through the broker
    configuration documentation, and didn't get any meaningful results changing
    controller.socket.timeout.ms.

    Any feedback / suggestions would be greatly appreciated.

    Thank you,
    Robin
  • Jun Rao at May 14, 2014 at 8:06 am
    Which version of Kafka are you using?

    Thanks,

    Jun

    On Tue, May 13, 2014 at 5:56 PM, Robin Yamaguchi wrote:

    It seems like this mailing list wasn't updating through the web archives
    for a few days last week, so I wanted to send this out again in case it
    wasn't seen. My apologies for the repost.

    In further troubleshooting, I've also observed if a broker is shut down
    while a connection in is CLOSE_WAIT, this error is generated on the broker
    that is still up:

    2014-05-13 20:57:35,794 - INFO
    [ZkClient-EventThread-12-localhost:2181:Logging$class@68] - [Controller
    0]: New leader and ISR for partition [Test3,10] is
    {"leader":0,"leader_epoch":2,"isr":[0]}
    2014-05-13 20:57:35,796 - WARN
    [Controller-0-to-broker-0-send-thread:Logging$class@89] -
    [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a
    request to broker id:0,host:localhost,port:13000
    java.io.EOFException: Received -1 when reading from channel, socket has
    likely been closed.
    at kafka.utils.Utils$.read(Utils.scala:376)
    at

    kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at

    kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
    at

    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-13 20:57:35,799 - ERROR
    [Controller-0-to-broker-0-send-thread:Logging$class@103] -
    [Controller-0-to-broker-0-send-thread], Controller 0 epoch 4 failed to send
    UpdateMetadata request with correlation id 9 to broker
    id:0,host:localhost,port:13000. Reconnecting to broker.
    java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
    at

    kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
    at

    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-13 20:57:35,800 - INFO
    [Controller-0-to-broker-0-send-thread:Logging$class@68] -
    [Controller-0-to-broker-0-send-thread], Controller 0 connected to
    id:0,host:localhost,port:13000 for sending state change requests


    This WARN is logged for every partition:

    2014-05-13 20:57:35,806 - WARN
    [ZkClient-EventThread-12-localhost:2181:Logging$class@83] - [Channel
    manager on controller 0]: Not sending request Name: StopReplicaRequest;
    Version: 0; CorrelationId: 9; ClientId: ; DeletePartitions: false;
    ControllerId: 0; ControllerEpoch: 4; Partitions: [Test2,8] to broker 1,
    since it is offline.


    This ERROR is then logged for every partition continuously:

    2014-05-13 20:57:45,839 - INFO [kafka-scheduler-3:Logging$class@68] -
    Partition [Test1,6] on broker 0: Shrinking ISR for partition [Test1,6] from
    0,1 to 0
    2014-05-13 20:57:45,841 - ERROR [kafka-scheduler-3:Logging$class@97] -
    Conditional update of path /brokers/topics/Test1/partitions/6/state with
    data
    {"controller_epoch":4,"leader":0,"version":1,"leader_epoch":3,"isr":[0]}
    and expected version 6 failed due to
    org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
    BadVersion for /brokers/topics/Test1/partitions/6/state
    2014-05-13 20:57:45,841 - INFO [kafka-scheduler-3:Logging$class@68] -
    Partition [Test1,6] on broker 0: Cached zkVersion [6] not equal to that in
    zookeeper, skip updating ISR



    On Thu, May 8, 2014 at 2:19 PM, Robin Yamaguchi wrote:

    Greetings,

    I'm looking for some feedback with using advertised.host.name and
    advertised.port on kafka 0.8.1.1 through a load balancer. The brokers are
    fronted with haproxy to support our cluster mirroring configuration. The
    setup has been working as expected, where producers, consumers, and broker
    connections go through haproxy. I am however sometimes getting errors when
    attempting to create a new topic:

    2014-05-08 19:00:49,757 - WARN
    [Controller-0-to-broker-0-send-thread:Logging$class@89] -
    [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a
    request to broker id:0,host:localhost,port:13000
    java.io.EOFException: Received -1 when reading from channel, socket has
    likely been closed.
    at kafka.utils.Utils$.read(Utils.scala:376)
    at
    kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at
    kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-08 19:00:49,769 - ERROR
    [Controller-0-to-broker-0-send-thread:Logging$class@103] -
    [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send
    UpdateMetadata request with correlation id 7 to broker
    id:0,host:localhost,port:13000. Reconnecting to broker.
    java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
    at
    kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-08 19:00:49,770 - INFO
    [Controller-0-to-broker-0-send-thread:Logging$class@68] -
    [Controller-0-to-broker-0-send-thread], Controller 0 connected to
    id:0,host:localhost,port:13000 for sending state change requests


    When receiving this error, the new topic is registered in zookeeper, but
    not written to disk by the broker. The topic however will be written to
    disk the next time the kafka broker is restarted. I did not experience
    this behavior in other clusters that are not fronted by a load balancer. I
    also do not get this error when kafka is initially started.

    To help simplify troubleshooting, I setup a single host with kafka,
    zookeeper, and haproxy running on it with these relevant configurations:

    Kafka:
    advertised.host.name = localhost
    advertised.port = 13000

    Zookeeper:
    port = default

    Haproxy:
    listen kafka_13000 0.0.0.0:13000
    mode tcp
    option tcpka
    timeout client 5m
    timeout server 5m
    timeout connect 5m
    server h-kafka01-1b localhost:9092

    Here are the network sockets Kafka creates on start-up:

    [root@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i kafka
    java 25532 kafka 18u IPv6 14717680 0t0 TCP *:44398
    (LISTEN)
    java 25532 kafka 23u IPv6 14717684 0t0 TCP
    localhost.localdomain:58093->localhost.localdomain:2181 (ESTABLISHED)
    java 25532 kafka 38u IPv6 14717692 0t0 TCP *:9092
    (LISTEN)
    java 25532 kafka 39u IPv6 14717694 0t0 TCP
    localhost.localdomain:45037->localhost.localdomain:13000 (ESTABLISHED)
    java 25532 kafka 40u IPv6 14717698 0t0 TCP
    localhost.localdomain:9092->localhost.localdomain:46448 (ESTABLISHED)


    After the 5m timeout configured in haproxy is surpassed, the connection
    through port 13000 is closed (from kafka.log):

    2014-05-08 19:05:40,904 - INFO [kafka-processor-9092-0:Logging$class@68 ]
    - Closing socket connection to /127.0.0.1.


    Looking again at the network sockets, the controller to broker connection
    is in a CLOSE_WAIT state:

    [root@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i kafka
    java 25532 kafka 18u IPv6 14717680 0t0 TCP *:44398
    (LISTEN)
    java 25532 kafka 23u IPv6 14717684 0t0 TCP
    localhost.localdomain:58093->localhost.localdomain:2181 (ESTABLISHED)
    java 25532 kafka 38u IPv6 14717692 0t0 TCP *:9092
    (LISTEN)
    java 25532 kafka 39u IPv6 14717694 0t0 TCP
    localhost.localdomain:45037->localhost.localdomain:13000 (CLOSE_WAIT)


    This is when attemping to create a topic will error with:
    java.io.EOFException: Received -1 when reading from channel, socket has
    likely been closed.

    The linux kernel will remove the socket in a CLOSE_WAIT state after the
    tcp keepalive expires, which defaults to 2 hours:

    [root@dp-robin01-dev.sea1.office.priv kafka]# ss -o | grep 13000
    CLOSE-WAIT 1 0 ::ffff:127.0.0.1:45040 ::ffff:
    127.0.0.1:13000 timer:(keepalive,46sec,0)


    List of kafka sockets after the controller to broker connection has been
    completely removed:

    [root@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i kafka
    java 25532 kafka 18u IPv6 14717680 0t0 TCP *:44398
    (LISTEN)
    java 25532 kafka 23u IPv6 14717684 0t0 TCP
    localhost.localdomain:58093->localhost.localdomain:2181 (ESTABLISHED)
    java 25532 kafka 38u IPv6 14717692 0t0 TCP *:9092
    (LISTEN)


    Now when attempting to create a new topic, Kafka detects that the
    controller to broker connection is down, reconnects successfully, and is
    able to write topic to disk:

    2014-05-08 21:02:47,685 - INFO
    [ZkClient-EventThread-12-localhost:2181:Logging$class@68] - [Partition
    state machine on Controller 0]: Invoking state change to OnlinePartition
    for partitions
    [Test2,1],[Test2,14],[Test2,3],[Test2,12],[Test2,0],[Test2,13],[Test2,4],[Test2,6],[Test2,9],[Test2,15],[Test2,2],[Test2,7],[Test2,11],[Test2,5],[Test2,8],[Test2,10]
    2014-05-08 21:02:47,796 - ERROR
    [Controller-0-to-broker-0-send-thread:Logging$class@103] -
    [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send
    LeaderAndIsr request with correlation id 11 to broker
    id:0,host:localhost,port:13000. Reconnecting to broker.
    java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
    at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
    at sun.nio.ch.IOUtil.write(IOUtil.java:148)
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:524)
    at java.nio.channels.SocketChannel.write(SocketChannel.java:493)
    at
    kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
    at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
    at
    kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:92)
    at
    kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-08 21:02:47,802 - INFO
    [Controller-0-to-broker-0-send-thread:Logging$class@68] -
    [Controller-0-to-broker-0-send-thread], Controller 0 connected to
    id:0,host:localhost,port:13000 for sending state change requests


    It seems that the controller isn't able to properly resolve a connection
    in a CLOSE_WAIT state. The exceptions thrown is different from when the
    socket is in a CLOSE_WAIT vs not existing at all.

    I can somewhat work around this issue by lowering the kernel tcp keepalive
    settings and increasing my haproxy timeouts, but thats not very desirable
    and wouldn't work 100% of the time. I've looked through the broker
    configuration documentation, and didn't get any meaningful results changing
    controller.socket.timeout.ms.

    Any feedback / suggestions would be greatly appreciated.

    Thank you,
    Robin
  • Robin Yamaguchi at May 14, 2014 at 4:23 am
    0.8.1.1

    On Tue, May 13, 2014 at 9:02 PM, Jun Rao wrote:

    Which version of Kafka are you using?

    Thanks,

    Jun


    On Tue, May 13, 2014 at 5:56 PM, Robin Yamaguchi <robin@hasoffers.com
    wrote:
    It seems like this mailing list wasn't updating through the web archives
    for a few days last week, so I wanted to send this out again in case it
    wasn't seen. My apologies for the repost.

    In further troubleshooting, I've also observed if a broker is shut down
    while a connection in is CLOSE_WAIT, this error is generated on the broker
    that is still up:

    2014-05-13 20:57:35,794 - INFO
    [ZkClient-EventThread-12-localhost:2181:Logging$class@68] - [Controller
    0]: New leader and ISR for partition [Test3,10] is
    {"leader":0,"leader_epoch":2,"isr":[0]}
    2014-05-13 20:57:35,796 - WARN
    [Controller-0-to-broker-0-send-thread:Logging$class@89] -
    [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a
    request to broker id:0,host:localhost,port:13000
    java.io.EOFException: Received -1 when reading from channel, socket has
    likely been closed.
    at kafka.utils.Utils$.read(Utils.scala:376)
    at

    kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at

    kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
    at

    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-13 20:57:35,799 - ERROR
    [Controller-0-to-broker-0-send-thread:Logging$class@103] -
    [Controller-0-to-broker-0-send-thread], Controller 0 epoch 4 failed to send
    UpdateMetadata request with correlation id 9 to broker
    id:0,host:localhost,port:13000. Reconnecting to broker.
    java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
    at

    kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
    at

    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-13 20:57:35,800 - INFO
    [Controller-0-to-broker-0-send-thread:Logging$class@68] -
    [Controller-0-to-broker-0-send-thread], Controller 0 connected to
    id:0,host:localhost,port:13000 for sending state change requests


    This WARN is logged for every partition:

    2014-05-13 20:57:35,806 - WARN
    [ZkClient-EventThread-12-localhost:2181:Logging$class@83] - [Channel
    manager on controller 0]: Not sending request Name: StopReplicaRequest;
    Version: 0; CorrelationId: 9; ClientId: ; DeletePartitions: false;
    ControllerId: 0; ControllerEpoch: 4; Partitions: [Test2,8] to broker 1,
    since it is offline.


    This ERROR is then logged for every partition continuously:

    2014-05-13 20:57:45,839 - INFO [kafka-scheduler-3:Logging$class@68] -
    Partition [Test1,6] on broker 0: Shrinking ISR for partition [Test1,6] from
    0,1 to 0
    2014-05-13 20:57:45,841 - ERROR [kafka-scheduler-3:Logging$class@97] -
    Conditional update of path /brokers/topics/Test1/partitions/6/state with
    data
    {"controller_epoch":4,"leader":0,"version":1,"leader_epoch":3,"isr":[0]}
    and expected version 6 failed due to
    org.apache.zookeeper.KeeperException$BadVersionException:
    KeeperErrorCode =
    BadVersion for /brokers/topics/Test1/partitions/6/state
    2014-05-13 20:57:45,841 - INFO [kafka-scheduler-3:Logging$class@68] -
    Partition [Test1,6] on broker 0: Cached zkVersion [6] not equal to that in
    zookeeper, skip updating ISR




    On Thu, May 8, 2014 at 2:19 PM, Robin Yamaguchi <robin@hasoffers.com>
    wrote:
    Greetings,

    I'm looking for some feedback with using advertised.host.name and
    advertised.port on kafka 0.8.1.1 through a load balancer. The brokers are
    fronted with haproxy to support our cluster mirroring configuration.
    The
    setup has been working as expected, where producers, consumers, and broker
    connections go through haproxy. I am however sometimes getting errors when
    attempting to create a new topic:

    2014-05-08 19:00:49,757 - WARN
    [Controller-0-to-broker-0-send-thread:Logging$class@89] -
    [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a
    request to broker id:0,host:localhost,port:13000
    java.io.EOFException: Received -1 when reading from channel, socket has
    likely been closed.
    at kafka.utils.Utils$.read(Utils.scala:376)
    at
    kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at
    kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-08 19:00:49,769 - ERROR
    [Controller-0-to-broker-0-send-thread:Logging$class@103] -
    [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send
    UpdateMetadata request with correlation id 7 to broker
    id:0,host:localhost,port:13000. Reconnecting to broker.
    java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
    at
    kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-08 19:00:49,770 - INFO
    [Controller-0-to-broker-0-send-thread:Logging$class@68] -
    [Controller-0-to-broker-0-send-thread], Controller 0 connected to
    id:0,host:localhost,port:13000 for sending state change requests


    When receiving this error, the new topic is registered in zookeeper,
    but
    not written to disk by the broker. The topic however will be written
    to
    disk the next time the kafka broker is restarted. I did not experience
    this behavior in other clusters that are not fronted by a load
    balancer.
    I
    also do not get this error when kafka is initially started.

    To help simplify troubleshooting, I setup a single host with kafka,
    zookeeper, and haproxy running on it with these relevant
    configurations:
    Kafka:
    advertised.host.name = localhost
    advertised.port = 13000

    Zookeeper:
    port = default

    Haproxy:
    listen kafka_13000 0.0.0.0:13000
    mode tcp
    option tcpka
    timeout client 5m
    timeout server 5m
    timeout connect 5m
    server h-kafka01-1b localhost:9092

    Here are the network sockets Kafka creates on start-up:

    [root@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i
    kafka
    java 25532 kafka 18u IPv6 14717680 0t0 TCP *:44398
    (LISTEN)
    java 25532 kafka 23u IPv6 14717684 0t0 TCP
    localhost.localdomain:58093->localhost.localdomain:2181 (ESTABLISHED)
    java 25532 kafka 38u IPv6 14717692 0t0 TCP *:9092
    (LISTEN)
    java 25532 kafka 39u IPv6 14717694 0t0 TCP
    localhost.localdomain:45037->localhost.localdomain:13000 (ESTABLISHED)
    java 25532 kafka 40u IPv6 14717698 0t0 TCP
    localhost.localdomain:9092->localhost.localdomain:46448 (ESTABLISHED)


    After the 5m timeout configured in haproxy is surpassed, the connection
    through port 13000 is closed (from kafka.log):

    2014-05-08 19:05:40,904 - INFO
    [kafka-processor-9092-0:Logging$class@68
    ]
    - Closing socket connection to /127.0.0.1.


    Looking again at the network sockets, the controller to broker
    connection
    is in a CLOSE_WAIT state:

    [root@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i
    kafka
    java 25532 kafka 18u IPv6 14717680 0t0 TCP *:44398
    (LISTEN)
    java 25532 kafka 23u IPv6 14717684 0t0 TCP
    localhost.localdomain:58093->localhost.localdomain:2181 (ESTABLISHED)
    java 25532 kafka 38u IPv6 14717692 0t0 TCP *:9092
    (LISTEN)
    java 25532 kafka 39u IPv6 14717694 0t0 TCP
    localhost.localdomain:45037->localhost.localdomain:13000 (CLOSE_WAIT)


    This is when attemping to create a topic will error with:
    java.io.EOFException: Received -1 when reading from channel, socket has
    likely been closed.

    The linux kernel will remove the socket in a CLOSE_WAIT state after the
    tcp keepalive expires, which defaults to 2 hours:

    [root@dp-robin01-dev.sea1.office.priv kafka]# ss -o | grep 13000
    CLOSE-WAIT 1 0 ::ffff:127.0.0.1:45040 ::ffff:
    127.0.0.1:13000 timer:(keepalive,46sec,0)


    List of kafka sockets after the controller to broker connection has
    been
    completely removed:

    [root@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i
    kafka
    java 25532 kafka 18u IPv6 14717680 0t0 TCP *:44398
    (LISTEN)
    java 25532 kafka 23u IPv6 14717684 0t0 TCP
    localhost.localdomain:58093->localhost.localdomain:2181 (ESTABLISHED)
    java 25532 kafka 38u IPv6 14717692 0t0 TCP *:9092
    (LISTEN)


    Now when attempting to create a new topic, Kafka detects that the
    controller to broker connection is down, reconnects successfully, and
    is
    able to write topic to disk:

    2014-05-08 21:02:47,685 - INFO
    [ZkClient-EventThread-12-localhost:2181:Logging$class@68] -
    [Partition
    state machine on Controller 0]: Invoking state change to
    OnlinePartition
    for partitions
    [Test2,1],[Test2,14],[Test2,3],[Test2,12],[Test2,0],[Test2,13],[Test2,4],[Test2,6],[Test2,9],[Test2,15],[Test2,2],[Test2,7],[Test2,11],[Test2,5],[Test2,8],[Test2,10]
    2014-05-08 21:02:47,796 - ERROR
    [Controller-0-to-broker-0-send-thread:Logging$class@103] -
    [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send
    LeaderAndIsr request with correlation id 11 to broker
    id:0,host:localhost,port:13000. Reconnecting to broker.
    java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
    at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
    at sun.nio.ch.IOUtil.write(IOUtil.java:148)
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:524)
    at java.nio.channels.SocketChannel.write(SocketChannel.java:493)
    at
    kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
    at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
    at
    kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:92)
    at
    kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-08 21:02:47,802 - INFO
    [Controller-0-to-broker-0-send-thread:Logging$class@68] -
    [Controller-0-to-broker-0-send-thread], Controller 0 connected to
    id:0,host:localhost,port:13000 for sending state change requests


    It seems that the controller isn't able to properly resolve a
    connection
    in a CLOSE_WAIT state. The exceptions thrown is different from when
    the
    socket is in a CLOSE_WAIT vs not existing at all.

    I can somewhat work around this issue by lowering the kernel tcp keepalive
    settings and increasing my haproxy timeouts, but thats not very
    desirable
    and wouldn't work 100% of the time. I've looked through the broker
    configuration documentation, and didn't get any meaningful results changing
    controller.socket.timeout.ms.

    Any feedback / suggestions would be greatly appreciated.

    Thank you,
    Robin
  • Jun Rao at May 14, 2014 at 4:44 pm
    Any error in the controller and state-change log?

    Thanks,

    Jun

    On Tue, May 13, 2014 at 9:16 PM, Robin Yamaguchi wrote:

    0.8.1.1

    On Tue, May 13, 2014 at 9:02 PM, Jun Rao wrote:

    Which version of Kafka are you using?

    Thanks,

    Jun


    On Tue, May 13, 2014 at 5:56 PM, Robin Yamaguchi <robin@hasoffers.com
    wrote:
    It seems like this mailing list wasn't updating through the web
    archives
    for a few days last week, so I wanted to send this out again in case it
    wasn't seen. My apologies for the repost.

    In further troubleshooting, I've also observed if a broker is shut down
    while a connection in is CLOSE_WAIT, this error is generated on the broker
    that is still up:

    2014-05-13 20:57:35,794 - INFO
    [ZkClient-EventThread-12-localhost:2181:Logging$class@68] -
    [Controller
    0]: New leader and ISR for partition [Test3,10] is
    {"leader":0,"leader_epoch":2,"isr":[0]}
    2014-05-13 20:57:35,796 - WARN
    [Controller-0-to-broker-0-send-thread:Logging$class@89] -
    [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a
    request to broker id:0,host:localhost,port:13000
    java.io.EOFException: Received -1 when reading from channel, socket has
    likely been closed.
    at kafka.utils.Utils$.read(Utils.scala:376)
    at
    kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at
    kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-13 20:57:35,799 - ERROR
    [Controller-0-to-broker-0-send-thread:Logging$class@103] -
    [Controller-0-to-broker-0-send-thread], Controller 0 epoch 4 failed to send
    UpdateMetadata request with correlation id 9 to broker
    id:0,host:localhost,port:13000. Reconnecting to broker.
    java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
    at
    kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-13 20:57:35,800 - INFO
    [Controller-0-to-broker-0-send-thread:Logging$class@68] -
    [Controller-0-to-broker-0-send-thread], Controller 0 connected to
    id:0,host:localhost,port:13000 for sending state change requests


    This WARN is logged for every partition:

    2014-05-13 20:57:35,806 - WARN
    [ZkClient-EventThread-12-localhost:2181:Logging$class@83] - [Channel
    manager on controller 0]: Not sending request Name: StopReplicaRequest;
    Version: 0; CorrelationId: 9; ClientId: ; DeletePartitions: false;
    ControllerId: 0; ControllerEpoch: 4; Partitions: [Test2,8] to broker 1,
    since it is offline.


    This ERROR is then logged for every partition continuously:

    2014-05-13 20:57:45,839 - INFO [kafka-scheduler-3:Logging$class@68] -
    Partition [Test1,6] on broker 0: Shrinking ISR for partition [Test1,6] from
    0,1 to 0
    2014-05-13 20:57:45,841 - ERROR [kafka-scheduler-3:Logging$class@97] -
    Conditional update of path /brokers/topics/Test1/partitions/6/state
    with
    data
    {"controller_epoch":4,"leader":0,"version":1,"leader_epoch":3,"isr":[0]}
    and expected version 6 failed due to
    org.apache.zookeeper.KeeperException$BadVersionException:
    KeeperErrorCode =
    BadVersion for /brokers/topics/Test1/partitions/6/state
    2014-05-13 20:57:45,841 - INFO [kafka-scheduler-3:Logging$class@68] -
    Partition [Test1,6] on broker 0: Cached zkVersion [6] not equal to that in
    zookeeper, skip updating ISR




    On Thu, May 8, 2014 at 2:19 PM, Robin Yamaguchi <robin@hasoffers.com>
    wrote:
    Greetings,

    I'm looking for some feedback with using advertised.host.name and
    advertised.port on kafka 0.8.1.1 through a load balancer. The
    brokers
    are
    fronted with haproxy to support our cluster mirroring configuration.
    The
    setup has been working as expected, where producers, consumers, and broker
    connections go through haproxy. I am however sometimes getting
    errors
    when
    attempting to create a new topic:

    2014-05-08 19:00:49,757 - WARN
    [Controller-0-to-broker-0-send-thread:Logging$class@89] -
    [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a
    request to broker id:0,host:localhost,port:13000
    java.io.EOFException: Received -1 when reading from channel, socket
    has
    likely been closed.
    at kafka.utils.Utils$.read(Utils.scala:376)
    at
    kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at
    kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-08 19:00:49,769 - ERROR
    [Controller-0-to-broker-0-send-thread:Logging$class@103] -
    [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed
    to
    send
    UpdateMetadata request with correlation id 7 to broker
    id:0,host:localhost,port:13000. Reconnecting to broker.
    java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
    at
    kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-08 19:00:49,770 - INFO
    [Controller-0-to-broker-0-send-thread:Logging$class@68] -
    [Controller-0-to-broker-0-send-thread], Controller 0 connected to
    id:0,host:localhost,port:13000 for sending state change requests


    When receiving this error, the new topic is registered in zookeeper,
    but
    not written to disk by the broker. The topic however will be written
    to
    disk the next time the kafka broker is restarted. I did not
    experience
    this behavior in other clusters that are not fronted by a load
    balancer.
    I
    also do not get this error when kafka is initially started.

    To help simplify troubleshooting, I setup a single host with kafka,
    zookeeper, and haproxy running on it with these relevant
    configurations:
    Kafka:
    advertised.host.name = localhost
    advertised.port = 13000

    Zookeeper:
    port = default

    Haproxy:
    listen kafka_13000 0.0.0.0:13000
    mode tcp
    option tcpka
    timeout client 5m
    timeout server 5m
    timeout connect 5m
    server h-kafka01-1b localhost:9092

    Here are the network sockets Kafka creates on start-up:

    [root@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i
    kafka
    java 25532 kafka 18u IPv6 14717680 0t0 TCP *:44398
    (LISTEN)
    java 25532 kafka 23u IPv6 14717684 0t0 TCP
    localhost.localdomain:58093->localhost.localdomain:2181 (ESTABLISHED)
    java 25532 kafka 38u IPv6 14717692 0t0 TCP *:9092
    (LISTEN)
    java 25532 kafka 39u IPv6 14717694 0t0 TCP
    localhost.localdomain:45037->localhost.localdomain:13000
    (ESTABLISHED)
    java 25532 kafka 40u IPv6 14717698 0t0 TCP
    localhost.localdomain:9092->localhost.localdomain:46448 (ESTABLISHED)


    After the 5m timeout configured in haproxy is surpassed, the
    connection
    through port 13000 is closed (from kafka.log):

    2014-05-08 19:05:40,904 - INFO
    [kafka-processor-9092-0:Logging$class@68
    ]
    - Closing socket connection to /127.0.0.1.


    Looking again at the network sockets, the controller to broker
    connection
    is in a CLOSE_WAIT state:

    [root@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i
    kafka
    java 25532 kafka 18u IPv6 14717680 0t0 TCP *:44398
    (LISTEN)
    java 25532 kafka 23u IPv6 14717684 0t0 TCP
    localhost.localdomain:58093->localhost.localdomain:2181 (ESTABLISHED)
    java 25532 kafka 38u IPv6 14717692 0t0 TCP *:9092
    (LISTEN)
    java 25532 kafka 39u IPv6 14717694 0t0 TCP
    localhost.localdomain:45037->localhost.localdomain:13000 (CLOSE_WAIT)


    This is when attemping to create a topic will error with:
    java.io.EOFException: Received -1 when reading from channel, socket
    has
    likely been closed.

    The linux kernel will remove the socket in a CLOSE_WAIT state after
    the
    tcp keepalive expires, which defaults to 2 hours:

    [root@dp-robin01-dev.sea1.office.priv kafka]# ss -o | grep 13000
    CLOSE-WAIT 1 0 ::ffff:127.0.0.1:45040 ::ffff:
    127.0.0.1:13000 timer:(keepalive,46sec,0)


    List of kafka sockets after the controller to broker connection has
    been
    completely removed:

    [root@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i
    kafka
    java 25532 kafka 18u IPv6 14717680 0t0 TCP *:44398
    (LISTEN)
    java 25532 kafka 23u IPv6 14717684 0t0 TCP
    localhost.localdomain:58093->localhost.localdomain:2181 (ESTABLISHED)
    java 25532 kafka 38u IPv6 14717692 0t0 TCP *:9092
    (LISTEN)


    Now when attempting to create a new topic, Kafka detects that the
    controller to broker connection is down, reconnects successfully, and
    is
    able to write topic to disk:

    2014-05-08 21:02:47,685 - INFO
    [ZkClient-EventThread-12-localhost:2181:Logging$class@68] -
    [Partition
    state machine on Controller 0]: Invoking state change to
    OnlinePartition
    for partitions
    [Test2,1],[Test2,14],[Test2,3],[Test2,12],[Test2,0],[Test2,13],[Test2,4],[Test2,6],[Test2,9],[Test2,15],[Test2,2],[Test2,7],[Test2,11],[Test2,5],[Test2,8],[Test2,10]
    2014-05-08 21:02:47,796 - ERROR
    [Controller-0-to-broker-0-send-thread:Logging$class@103] -
    [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed
    to
    send
    LeaderAndIsr request with correlation id 11 to broker
    id:0,host:localhost,port:13000. Reconnecting to broker.
    java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
    at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
    at sun.nio.ch.IOUtil.write(IOUtil.java:148)
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:524)
    at java.nio.channels.SocketChannel.write(SocketChannel.java:493)
    at
    kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
    at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
    at
    kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:92)
    at
    kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-08 21:02:47,802 - INFO
    [Controller-0-to-broker-0-send-thread:Logging$class@68] -
    [Controller-0-to-broker-0-send-thread], Controller 0 connected to
    id:0,host:localhost,port:13000 for sending state change requests


    It seems that the controller isn't able to properly resolve a
    connection
    in a CLOSE_WAIT state. The exceptions thrown is different from when
    the
    socket is in a CLOSE_WAIT vs not existing at all.

    I can somewhat work around this issue by lowering the kernel tcp keepalive
    settings and increasing my haproxy timeouts, but thats not very
    desirable
    and wouldn't work 100% of the time. I've looked through the broker
    configuration documentation, and didn't get any meaningful results changing
    controller.socket.timeout.ms.

    Any feedback / suggestions would be greatly appreciated.

    Thank you,
    Robin
  • Robin Yamaguchi at May 16, 2014 at 11:27 pm
    The only errors logged are in the controller log:

    [2014-05-16 20:42:31,846] DEBUG [TopicChangeListener on Controller 1]:
    Topic change listener fired for path /brokers/topics with children
    Test2,Test1,Test3,Test4
    (kafka.controller.PartitionStateMachine$TopicChangeListener)
    [2014-05-16 20:42:31,897] INFO [TopicChangeListener on Controller 1]: New
    topics: [Set(Test4)], deleted topics: [Set()], new partition replica
    assignment [Map([Test4,10] -> List(0, 1), [Test4,15] -> List(1, 0),
    [Test4,5] -> List(1, 0), [Test4,9] -> List(1, 0), [Test4,14] -> List(0, 1),
    [Test4,2] -> List(0, 1), [Test4,1] -> List(1, 0), [Test4,4] -> List(0, 1),
    [T
    est4,0] -> List(0, 1), [Test4,12] -> List(0, 1), [Test4,8] -> List(0, 1),
    [Test4,6] -> List(0, 1), [Test4,7] -> List(1, 0), [Test4,13] -> List(1, 0),
    [Test4,3] -> List(1, 0), [Test4,11] -> List(1, 0))]
    (kafka.controller.PartitionStateMachine$TopicChangeListener)
    [2014-05-16 20:42:31,908] INFO [Controller 1]: New topic creation callback
    for
    [Test4,3],[Test4,14],[Test4,2],[Test4,9],[Test4,10],[Test4,6],[Test4,1],[Test4,8],[Test4,0],[Test4,5],[Test4,15],[Test4,13],[Test4,4],[Test4,7],[Test4,12],[Test4,11]
    (kafka.controller.KafkaController)
    [2014-05-16 20:42:31,911] INFO [Controller 1]: New partition creation
    callback for
    [Test4,3],[Test4,14],[Test4,2],[Test4,9],[Test4,10],[Test4,6],[Test4,1],[Test4,8],[Test4,0],[Test4,5],[Test4,15],[Test4,13],[Test4,4],[Test4,7],[Test4,12],[Test4,11]
    (kafka.controller.KafkaController)
    [2014-05-16 20:42:31,912] INFO [Partition state machine on Controller 1]:
    Invoking state change to NewPartition for partitions
    [Test4,3],[Test4,14],[Test4,2],[Test4,9],[Test4,10],[Test4,6],[Test4,1],[Test4,8],[Test4,0],[Test4,5],[Test4,15],[Test4,13],[Test4,4],[Test4,7],[Test4,12],[Test4,11]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,069] INFO [Replica state machine on controller 1]:
    Invoking state change to NewReplica for replicas
    [Topic=Test4,Partition=8,Replica=0],[Topic=Test4,Partition=11,Replica=0],[Topic=Test4,Partition=15,Replica=0],[Topic=Test4,Partition=5,Replica=1],[Topic=Test4,Partition=12,Replica=1],[Topic=Test4,Partition=10,Replica=0],[Topic=Test4,Partition=
    3,Replica=1],[Topic=Test4,Partition=4,Replica=1],[Topic=Test4,Partition=7,Replica=1],[Topic=Test4,Partition=10,Replica=1],[Topic=Test4,Partition=0,Replica=1],[Topic=Test4,Partition=8,Replica=1],[Topic=Test4,Partition=14,Replica=0],[Topic=Test4,Partition=1,Replica=0],[Topic=Test4,Partition=2,Replica=1],[Topic=Test4,Partition=4,Replica=0],[Topic=Test4,Partition=1,
    Replica=1],[Topic=Test4,Partition=13,Replica=1],[Topic=Test4,Partition=13,Replica=0],[Topic=Test4,Partition=6,Replica=1],[Topic=Test4,Partition=14,Replica=1],[Topic=Test4,Partition=3,Replica=0],[Topic=Test4,Partition=5,Replica=0],[Topic=Test4,Partition=9,Replica=1],[Topic=Test4,Partition=2,Replica=0],[Topic=Test4,Partition=11,Replica=1],[Topic=Test4,Partition=9,
    Replica=0],[Topic=Test4,Partition=0,Replica=0],[Topic=Test4,Partition=7,Replica=0],[Topic=Test4,Partition=12,Replica=0],[Topic=Test4,Partition=15,Replica=1],[Topic=Test4,Partition=6,Replica=0]
    (kafka.controller.ReplicaStateMachine)
    [2014-05-16 20:42:32,091] INFO [Partition state machine on Controller 1]:
    Invoking state change to OnlinePartition for partitions
    [Test4,3],[Test4,14],[Test4,2],[Test4,9],[Test4,10],[Test4,6],[Test4,1],[Test4,8],[Test4,0],[Test4,5],[Test4,15],[Test4,13],[Test4,4],[Test4,7],[Test4,12],[Test4,11]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,092] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,3] are: [List(1, 0)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,093] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,3] to
    (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,100] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,14] are: [List(0, 1)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,100] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,14] to
    (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,103] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,2] are: [List(0, 1)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,103] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,2] to
    (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,107] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,9] are: [List(1, 0)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,107] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,9] to
    (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,110] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,10] are: [List(0, 1)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,110] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,10] to
    (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,114] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,6] are: [List(0, 1)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,114] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,6] to
    (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,117] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,1] are: [List(1, 0)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,117] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,1] to
    (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,121] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,8] are: [List(0, 1)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,121] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,8] to
    (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,124] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,0] are: [List(0, 1)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,124] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,0] to
    (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,130] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,5] are: [List(1, 0)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,130] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,5] to
    (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,135] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,15] are: [List(1, 0)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,135] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,15] to
    (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,146] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,13] are: [List(1, 0)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,146] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,13] to
    (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,150] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,4] are: [List(0, 1)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,150] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,4] to
    (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,153] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,7] are: [List(1, 0)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,153] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,7] to
    (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,157] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,12] are: [List(0, 1)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,157] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,12] to
    (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,161] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,11] are: [List(1, 0)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,161] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,11] to
    (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,167] WARN [Controller-1-to-broker-1-send-thread],
    Controller 1 fails to send a request to broker
    id:1,host:localhost,port:13001 (kafka.controller.RequestSendThread)
    java.io.EOFException: Received -1 when reading from channel, socket has
    likely been closed.
    at kafka.utils.Utils$.read(Utils.scala:376)
    at
    kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at
    kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    [2014-05-16 20:42:32,171] WARN [Controller-1-to-broker-0-send-thread],
    Controller 1 fails to send a request to broker
    id:0,host:localhost,port:13000 (kafka.controller.RequestSendThread)
    java.io.EOFException: Received -1 when reading from channel, socket has
    likely been closed.
    at kafka.utils.Utils$.read(Utils.scala:376)
    at
    kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at
    kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    [2014-05-16 20:42:32,179] ERROR [Controller-1-to-broker-1-send-thread],
    Controller 1 epoch 13 failed to send UpdateMetadata request with
    correlation id 11 to broker id:1,host:localhost,port:13001. Reconnecting to
    broker. (kafka.controller.RequestSendThread)
    java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
    at
    kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    [2014-05-16 20:42:32,179] INFO [Controller-1-to-broker-1-send-thread],
    Controller 1 connected to id:1,host:localhost,port:13001 for sending state
    change requests (kafka.controller.RequestSendThread)
    [2014-05-16 20:42:32,182] ERROR [Controller-1-to-broker-0-send-thread],
    Controller 1 epoch 13 failed to send UpdateMetadata request with
    correlation id 11 to broker id:0,host:localhost,port:13000. Reconnecting to
    broker. (kafka.controller.RequestSendThread)
    java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
    at
    kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    [2014-05-16 20:42:32,183] INFO [Controller-1-to-broker-0-send-thread],
    Controller 1 connected to id:0,host:localhost,port:13000 for sending state
    change requests (kafka.controller.RequestSendThread)
    [2014-05-16 20:42:32,185] INFO [Replica state machine on controller 1]:
    Invoking state change to OnlineReplica for replicas
    [Topic=Test4,Partition=8,Replica=0],[Topic=Test4,Partition=11,Replica=0],[Topic=Test4,Partition=15,Replica=0],[Topic=Test4,Partition=5,Replica=1],[Topic=Test4,Partition=12,Replica=1],[Topic=Test4,Partition=10,Replica=0],[Topic=Test4,Partiti
    on=3,Replica=1],[Topic=Test4,Partition=4,Replica=1],[Topic=Test4,Partition=7,Replica=1],[Topic=Test4,Partition=10,Replica=1],[Topic=Test4,Partition=0,Replica=1],[Topic=Test4,Partition=8,Replica=1],[Topic=Test4,Partition=14,Replica=0],[Topic=Test4,Partition=1,Replica=0],[Topic=Test4,Partition=2,Replica=1],[Topic=Test4,Partition=4,Replica=0],[Topic=Test4,Partition
    =1,Replica=1],[Topic=Test4,Partition=13,Replica=1],[Topic=Test4,Partition=13,Replica=0],[Topic=Test4,Partition=6,Replica=1],[Topic=Test4,Partition=14,Replica=1],[Topic=Test4,Partition=3,Replica=0],[Topic=Test4,Partition=5,Replica=0],[Topic=Test4,Partition=9,Replica=1],[Topic=Test4,Partition=2,Replica=0],[Topic=Test4,Partition=11,Replica=1],[Topic=Test4,Partition
    =9,Replica=0],[Topic=Test4,Partition=0,Replica=0],[Topic=Test4,Partition=7,Replica=0],[Topic=Test4,Partition=12,Replica=0],[Topic=Test4,Partition=15,Replica=1],[Topic=Test4,Partition=6,Replica=0]
    (kafka.controller.ReplicaStateMachine)


    The state-change log contains no errors, and Kafka thinks the new topic is
    online and in ISR:

    [root@dp-robin01-dev.sea1.office.priv kafka01]# ./bin/kafka-topics.sh
    --describe --zookeeper localhost:2181 --topic Test4
    Topic:Test4 PartitionCount:16 ReplicationFactor:2 Configs:
    Topic: Test4 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
    Topic: Test4 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
    Topic: Test4 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
    Topic: Test4 Partition: 3 Leader: 1 Replicas: 1,0 Isr: 1,0
    Topic: Test4 Partition: 4 Leader: 0 Replicas: 0,1 Isr: 0,1
    Topic: Test4 Partition: 5 Leader: 1 Replicas: 1,0 Isr: 1,0
    Topic: Test4 Partition: 6 Leader: 0 Replicas: 0,1 Isr: 0,1
    Topic: Test4 Partition: 7 Leader: 1 Replicas: 1,0 Isr: 1,0
    Topic: Test4 Partition: 8 Leader: 0 Replicas: 0,1 Isr: 0,1
    Topic: Test4 Partition: 9 Leader: 1 Replicas: 1,0 Isr: 1,0
    Topic: Test4 Partition: 10 Leader: 0 Replicas: 0,1 Isr: 0,1
    Topic: Test4 Partition: 11 Leader: 1 Replicas: 1,0 Isr: 1,0
    Topic: Test4 Partition: 12 Leader: 0 Replicas: 0,1 Isr: 0,1
    Topic: Test4 Partition: 13 Leader: 1 Replicas: 1,0 Isr: 1,0
    Topic: Test4 Partition: 14 Leader: 0 Replicas: 0,1 Isr: 0,1
    Topic: Test4 Partition: 15 Leader: 1 Replicas: 1,0 Isr: 1,0


    But partitions were not written to disk:

    [root@dp-robin01-dev.sea1.office.priv kafka01]# ls /data/kafka
    recovery-point-offset-checkpoint Test1-0 Test1-10 Test1-12 Test1-14
      Test1-2 Test1-4 Test1-6 Test1-8 Test2-0 Test2-10 Test2-12 Test2-14
      Test2-2 Test2-4 Test2-6 Test2-8 Test3-0 Test3-10 Test3-12 Test3-14
      Test3-2 Test3-4 Test3-6 Test3-8
    replication-offset-checkpoint Test1-1 Test1-11 Test1-13 Test1-15
      Test1-3 Test1-5 Test1-7 Test1-9 Test2-1 Test2-11 Test2-13 Test2-15
      Test2-3 Test2-5 Test2-7 Test2-9 Test3-1 Test3-11 Test3-13 Test3-15
      Test3-3 Test3-5 Test3-7 Test3-9

    [root@dp-robin01-dev.sea1.office.priv kafka01]# ls /data/kafka01/
    recovery-point-offset-checkpoint Test1-0 Test1-10 Test1-12 Test1-14
      Test1-2 Test1-4 Test1-6 Test1-8 Test2-0 Test2-10 Test2-12 Test2-14
      Test2-2 Test2-4 Test2-6 Test2-8 Test3-0 Test3-10 Test3-12 Test3-14
      Test3-2 Test3-4 Test3-6 Test3-8
    replication-offset-checkpoint Test1-1 Test1-11 Test1-13 Test1-15
      Test1-3 Test1-5 Test1-7 Test1-9 Test2-1 Test2-11 Test2-13 Test2-15
      Test2-3 Test2-5 Test2-7 Test2-9 Test3-1 Test3-11 Test3-13 Test3-15
      Test3-3 Test3-5 Test3-7 Test3-9


    On Wed, May 14, 2014 at 7:57 AM, Jun Rao wrote:

    Any error in the controller and state-change log?

    Thanks,

    Jun


    On Tue, May 13, 2014 at 9:16 PM, Robin Yamaguchi <robin@hasoffers.com
    wrote:
    0.8.1.1

    On Tue, May 13, 2014 at 9:02 PM, Jun Rao wrote:

    Which version of Kafka are you using?

    Thanks,

    Jun


    On Tue, May 13, 2014 at 5:56 PM, Robin Yamaguchi <robin@hasoffers.com
    wrote:
    It seems like this mailing list wasn't updating through the web
    archives
    for a few days last week, so I wanted to send this out again in case
    it
    wasn't seen. My apologies for the repost.

    In further troubleshooting, I've also observed if a broker is shut
    down
    while a connection in is CLOSE_WAIT, this error is generated on the broker
    that is still up:

    2014-05-13 20:57:35,794 - INFO
    [ZkClient-EventThread-12-localhost:2181:Logging$class@68] -
    [Controller
    0]: New leader and ISR for partition [Test3,10] is
    {"leader":0,"leader_epoch":2,"isr":[0]}
    2014-05-13 20:57:35,796 - WARN
    [Controller-0-to-broker-0-send-thread:Logging$class@89] -
    [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a
    request to broker id:0,host:localhost,port:13000
    java.io.EOFException: Received -1 when reading from channel, socket
    has
    likely been closed.
    at kafka.utils.Utils$.read(Utils.scala:376)
    at
    kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at
    kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-13 20:57:35,799 - ERROR
    [Controller-0-to-broker-0-send-thread:Logging$class@103] -
    [Controller-0-to-broker-0-send-thread], Controller 0 epoch 4 failed
    to
    send
    UpdateMetadata request with correlation id 9 to broker
    id:0,host:localhost,port:13000. Reconnecting to broker.
    java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
    at
    kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-13 20:57:35,800 - INFO
    [Controller-0-to-broker-0-send-thread:Logging$class@68] -
    [Controller-0-to-broker-0-send-thread], Controller 0 connected to
    id:0,host:localhost,port:13000 for sending state change requests


    This WARN is logged for every partition:

    2014-05-13 20:57:35,806 - WARN
    [ZkClient-EventThread-12-localhost:2181:Logging$class@83] -
    [Channel
    manager on controller 0]: Not sending request Name:
    StopReplicaRequest;
    Version: 0; CorrelationId: 9; ClientId: ; DeletePartitions: false;
    ControllerId: 0; ControllerEpoch: 4; Partitions: [Test2,8] to broker
    1,
    since it is offline.


    This ERROR is then logged for every partition continuously:

    2014-05-13 20:57:45,839 - INFO [kafka-scheduler-3:Logging$class@68]
    -
    Partition [Test1,6] on broker 0: Shrinking ISR for partition
    [Test1,6]
    from
    0,1 to 0
    2014-05-13 20:57:45,841 - ERROR [kafka-scheduler-3:Logging$class@97]
    -
    Conditional update of path /brokers/topics/Test1/partitions/6/state
    with
    data
    {"controller_epoch":4,"leader":0,"version":1,"leader_epoch":3,"isr":[0]}
    and expected version 6 failed due to
    org.apache.zookeeper.KeeperException$BadVersionException:
    KeeperErrorCode =
    BadVersion for /brokers/topics/Test1/partitions/6/state
    2014-05-13 20:57:45,841 - INFO [kafka-scheduler-3:Logging$class@68]
    -
    Partition [Test1,6] on broker 0: Cached zkVersion [6] not equal to
    that
    in
    zookeeper, skip updating ISR




    On Thu, May 8, 2014 at 2:19 PM, Robin Yamaguchi <robin@hasoffers.com
    wrote:
    Greetings,

    I'm looking for some feedback with using advertised.host.name and
    advertised.port on kafka 0.8.1.1 through a load balancer. The
    brokers
    are
    fronted with haproxy to support our cluster mirroring
    configuration.
    The
    setup has been working as expected, where producers, consumers, and broker
    connections go through haproxy. I am however sometimes getting
    errors
    when
    attempting to create a new topic:

    2014-05-08 19:00:49,757 - WARN
    [Controller-0-to-broker-0-send-thread:Logging$class@89] -
    [Controller-0-to-broker-0-send-thread], Controller 0 fails to send
    a
    request to broker id:0,host:localhost,port:13000
    java.io.EOFException: Received -1 when reading from channel, socket
    has
    likely been closed.
    at kafka.utils.Utils$.read(Utils.scala:376)
    at
    kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at
    kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at
    kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at
    kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-08 19:00:49,769 - ERROR
    [Controller-0-to-broker-0-send-thread:Logging$class@103] -
    [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed
    to
    send
    UpdateMetadata request with correlation id 7 to broker
    id:0,host:localhost,port:13000. Reconnecting to broker.
    java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
    at
    kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-08 19:00:49,770 - INFO
    [Controller-0-to-broker-0-send-thread:Logging$class@68] -
    [Controller-0-to-broker-0-send-thread], Controller 0 connected to
    id:0,host:localhost,port:13000 for sending state change requests


    When receiving this error, the new topic is registered in
    zookeeper,
    but
    not written to disk by the broker. The topic however will be
    written
    to
    disk the next time the kafka broker is restarted. I did not
    experience
    this behavior in other clusters that are not fronted by a load
    balancer.
    I
    also do not get this error when kafka is initially started.

    To help simplify troubleshooting, I setup a single host with kafka,
    zookeeper, and haproxy running on it with these relevant
    configurations:
    Kafka:
    advertised.host.name = localhost
    advertised.port = 13000

    Zookeeper:
    port = default

    Haproxy:
    listen kafka_13000 0.0.0.0:13000
    mode tcp
    option tcpka
    timeout client 5m
    timeout server 5m
    timeout connect 5m
    server h-kafka01-1b localhost:9092

    Here are the network sockets Kafka creates on start-up:

    [root@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i
    kafka
    java 25532 kafka 18u IPv6 14717680 0t0 TCP
    *:44398
    (LISTEN)
    java 25532 kafka 23u IPv6 14717684 0t0 TCP
    localhost.localdomain:58093->localhost.localdomain:2181
    (ESTABLISHED)
    java 25532 kafka 38u IPv6 14717692 0t0 TCP *:9092
    (LISTEN)
    java 25532 kafka 39u IPv6 14717694 0t0 TCP
    localhost.localdomain:45037->localhost.localdomain:13000
    (ESTABLISHED)
    java 25532 kafka 40u IPv6 14717698 0t0 TCP
    localhost.localdomain:9092->localhost.localdomain:46448
    (ESTABLISHED)

    After the 5m timeout configured in haproxy is surpassed, the
    connection
    through port 13000 is closed (from kafka.log):

    2014-05-08 19:05:40,904 - INFO
    [kafka-processor-9092-0:Logging$class@68
    ]
    - Closing socket connection to /127.0.0.1.


    Looking again at the network sockets, the controller to broker
    connection
    is in a CLOSE_WAIT state:

    [root@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i
    kafka
    java 25532 kafka 18u IPv6 14717680 0t0 TCP
    *:44398
    (LISTEN)
    java 25532 kafka 23u IPv6 14717684 0t0 TCP
    localhost.localdomain:58093->localhost.localdomain:2181
    (ESTABLISHED)
    java 25532 kafka 38u IPv6 14717692 0t0 TCP *:9092
    (LISTEN)
    java 25532 kafka 39u IPv6 14717694 0t0 TCP
    localhost.localdomain:45037->localhost.localdomain:13000
    (CLOSE_WAIT)

    This is when attemping to create a topic will error with:
    java.io.EOFException: Received -1 when reading from channel, socket
    has
    likely been closed.

    The linux kernel will remove the socket in a CLOSE_WAIT state after
    the
    tcp keepalive expires, which defaults to 2 hours:

    [root@dp-robin01-dev.sea1.office.priv kafka]# ss -o | grep 13000
    CLOSE-WAIT 1 0 ::ffff:127.0.0.1:45040 ::ffff:
    127.0.0.1:13000 timer:(keepalive,46sec,0)


    List of kafka sockets after the controller to broker connection has
    been
    completely removed:

    [root@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep -i
    kafka
    java 25532 kafka 18u IPv6 14717680 0t0 TCP
    *:44398
    (LISTEN)
    java 25532 kafka 23u IPv6 14717684 0t0 TCP
    localhost.localdomain:58093->localhost.localdomain:2181
    (ESTABLISHED)
    java 25532 kafka 38u IPv6 14717692 0t0 TCP *:9092
    (LISTEN)


    Now when attempting to create a new topic, Kafka detects that the
    controller to broker connection is down, reconnects successfully,
    and
    is
    able to write topic to disk:

    2014-05-08 21:02:47,685 - INFO
    [ZkClient-EventThread-12-localhost:2181:Logging$class@68] -
    [Partition
    state machine on Controller 0]: Invoking state change to
    OnlinePartition
    for partitions
    [Test2,1],[Test2,14],[Test2,3],[Test2,12],[Test2,0],[Test2,13],[Test2,4],[Test2,6],[Test2,9],[Test2,15],[Test2,2],[Test2,7],[Test2,11],[Test2,5],[Test2,8],[Test2,10]
    2014-05-08 21:02:47,796 - ERROR
    [Controller-0-to-broker-0-send-thread:Logging$class@103] -
    [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed
    to
    send
    LeaderAndIsr request with correlation id 11 to broker
    id:0,host:localhost,port:13000. Reconnecting to broker.
    java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
    at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
    at sun.nio.ch.IOUtil.write(IOUtil.java:148)
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:524)
    at java.nio.channels.SocketChannel.write(SocketChannel.java:493)
    at
    kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
    at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
    at
    kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:92)
    at
    kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-08 21:02:47,802 - INFO
    [Controller-0-to-broker-0-send-thread:Logging$class@68] -
    [Controller-0-to-broker-0-send-thread], Controller 0 connected to
    id:0,host:localhost,port:13000 for sending state change requests


    It seems that the controller isn't able to properly resolve a
    connection
    in a CLOSE_WAIT state. The exceptions thrown is different from
    when
    the
    socket is in a CLOSE_WAIT vs not existing at all.

    I can somewhat work around this issue by lowering the kernel tcp keepalive
    settings and increasing my haproxy timeouts, but thats not very
    desirable
    and wouldn't work 100% of the time. I've looked through the broker
    configuration documentation, and didn't get any meaningful results changing
    controller.socket.timeout.ms.

    Any feedback / suggestions would be greatly appreciated.

    Thank you,
    Robin
  • Jun Rao at May 19, 2014 at 4:32 am
    Perhaps you will have to lower the tcp alive interval to avoid the socket
    connection from being killed.

    Thanks,

    Jun

    On Fri, May 16, 2014 at 2:12 PM, Robin Yamaguchi wrote:

    The only errors logged are in the controller log:

    [2014-05-16 20:42:31,846] DEBUG [TopicChangeListener on Controller 1]:
    Topic change listener fired for path /brokers/topics with children
    Test2,Test1,Test3,Test4
    (kafka.controller.PartitionStateMachine$TopicChangeListener)
    [2014-05-16 20:42:31,897] INFO [TopicChangeListener on Controller 1]: New
    topics: [Set(Test4)], deleted topics: [Set()], new partition replica
    assignment [Map([Test4,10] -> List(0, 1), [Test4,15] -> List(1, 0),
    [Test4,5] -> List(1, 0), [Test4,9] -> List(1, 0), [Test4,14] -> List(0, 1),
    [Test4,2] -> List(0, 1), [Test4,1] -> List(1, 0), [Test4,4] -> List(0, 1),
    [T
    est4,0] -> List(0, 1), [Test4,12] -> List(0, 1), [Test4,8] -> List(0, 1),
    [Test4,6] -> List(0, 1), [Test4,7] -> List(1, 0), [Test4,13] -> List(1, 0),
    [Test4,3] -> List(1, 0), [Test4,11] -> List(1, 0))]
    (kafka.controller.PartitionStateMachine$TopicChangeListener)
    [2014-05-16 20:42:31,908] INFO [Controller 1]: New topic creation callback
    for

    [Test4,3],[Test4,14],[Test4,2],[Test4,9],[Test4,10],[Test4,6],[Test4,1],[Test4,8],[Test4,0],[Test4,5],[Test4,15],[Test4,13],[Test4,4],[Test4,7],[Test4,12],[Test4,11]
    (kafka.controller.KafkaController)
    [2014-05-16 20:42:31,911] INFO [Controller 1]: New partition creation
    callback for

    [Test4,3],[Test4,14],[Test4,2],[Test4,9],[Test4,10],[Test4,6],[Test4,1],[Test4,8],[Test4,0],[Test4,5],[Test4,15],[Test4,13],[Test4,4],[Test4,7],[Test4,12],[Test4,11]
    (kafka.controller.KafkaController)
    [2014-05-16 20:42:31,912] INFO [Partition state machine on Controller 1]:
    Invoking state change to NewPartition for partitions

    [Test4,3],[Test4,14],[Test4,2],[Test4,9],[Test4,10],[Test4,6],[Test4,1],[Test4,8],[Test4,0],[Test4,5],[Test4,15],[Test4,13],[Test4,4],[Test4,7],[Test4,12],[Test4,11]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,069] INFO [Replica state machine on controller 1]:
    Invoking state change to NewReplica for replicas

    [Topic=Test4,Partition=8,Replica=0],[Topic=Test4,Partition=11,Replica=0],[Topic=Test4,Partition=15,Replica=0],[Topic=Test4,Partition=5,Replica=1],[Topic=Test4,Partition=12,Replica=1],[Topic=Test4,Partition=10,Replica=0],[Topic=Test4,Partition=

    3,Replica=1],[Topic=Test4,Partition=4,Replica=1],[Topic=Test4,Partition=7,Replica=1],[Topic=Test4,Partition=10,Replica=1],[Topic=Test4,Partition=0,Replica=1],[Topic=Test4,Partition=8,Replica=1],[Topic=Test4,Partition=14,Replica=0],[Topic=Test4,Partition=1,Replica=0],[Topic=Test4,Partition=2,Replica=1],[Topic=Test4,Partition=4,Replica=0],[Topic=Test4,Partition=1,

    Replica=1],[Topic=Test4,Partition=13,Replica=1],[Topic=Test4,Partition=13,Replica=0],[Topic=Test4,Partition=6,Replica=1],[Topic=Test4,Partition=14,Replica=1],[Topic=Test4,Partition=3,Replica=0],[Topic=Test4,Partition=5,Replica=0],[Topic=Test4,Partition=9,Replica=1],[Topic=Test4,Partition=2,Replica=0],[Topic=Test4,Partition=11,Replica=1],[Topic=Test4,Partition=9,

    Replica=0],[Topic=Test4,Partition=0,Replica=0],[Topic=Test4,Partition=7,Replica=0],[Topic=Test4,Partition=12,Replica=0],[Topic=Test4,Partition=15,Replica=1],[Topic=Test4,Partition=6,Replica=0]
    (kafka.controller.ReplicaStateMachine)
    [2014-05-16 20:42:32,091] INFO [Partition state machine on Controller 1]:
    Invoking state change to OnlinePartition for partitions

    [Test4,3],[Test4,14],[Test4,2],[Test4,9],[Test4,10],[Test4,6],[Test4,1],[Test4,8],[Test4,0],[Test4,5],[Test4,15],[Test4,13],[Test4,4],[Test4,7],[Test4,12],[Test4,11]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,092] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,3] are: [List(1, 0)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,093] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,3] to
    (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,100] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,14] are: [List(0, 1)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,100] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,14] to
    (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,103] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,2] are: [List(0, 1)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,103] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,2] to
    (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,107] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,9] are: [List(1, 0)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,107] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,9] to
    (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,110] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,10] are: [List(0, 1)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,110] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,10] to
    (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,114] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,6] are: [List(0, 1)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,114] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,6] to
    (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,117] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,1] are: [List(1, 0)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,117] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,1] to
    (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,121] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,8] are: [List(0, 1)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,121] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,8] to
    (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,124] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,0] are: [List(0, 1)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,124] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,0] to
    (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,130] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,5] are: [List(1, 0)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,130] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,5] to
    (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,135] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,15] are: [List(1, 0)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,135] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,15] to
    (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,146] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,13] are: [List(1, 0)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,146] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,13] to
    (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,150] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,4] are: [List(0, 1)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,150] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,4] to
    (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,153] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,7] are: [List(1, 0)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,153] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,7] to
    (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,157] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,12] are: [List(0, 1)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,157] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,12] to
    (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,161] DEBUG [Partition state machine on Controller 1]:
    Live assigned replicas for partition [Test4,11] are: [List(1, 0)]
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,161] DEBUG [Partition state machine on Controller 1]:
    Initializing leader and isr for partition [Test4,11] to
    (Leader:1,ISR:1,0,LeaderEpoch:0,ControllerEpoch:13)
    (kafka.controller.PartitionStateMachine)
    [2014-05-16 20:42:32,167] WARN [Controller-1-to-broker-1-send-thread],
    Controller 1 fails to send a request to broker
    id:1,host:localhost,port:13001 (kafka.controller.RequestSendThread)
    java.io.EOFException: Received -1 when reading from channel, socket has
    likely been closed.
    at kafka.utils.Utils$.read(Utils.scala:376)
    at

    kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at

    kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
    at

    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    [2014-05-16 20:42:32,171] WARN [Controller-1-to-broker-0-send-thread],
    Controller 1 fails to send a request to broker
    id:0,host:localhost,port:13000 (kafka.controller.RequestSendThread)
    java.io.EOFException: Received -1 when reading from channel, socket has
    likely been closed.
    at kafka.utils.Utils$.read(Utils.scala:376)
    at

    kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at

    kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
    at

    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    [2014-05-16 20:42:32,179] ERROR [Controller-1-to-broker-1-send-thread],
    Controller 1 epoch 13 failed to send UpdateMetadata request with
    correlation id 11 to broker id:1,host:localhost,port:13001. Reconnecting to
    broker. (kafka.controller.RequestSendThread)
    java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
    at

    kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
    at

    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    [2014-05-16 20:42:32,179] INFO [Controller-1-to-broker-1-send-thread],
    Controller 1 connected to id:1,host:localhost,port:13001 for sending state
    change requests (kafka.controller.RequestSendThread)
    [2014-05-16 20:42:32,182] ERROR [Controller-1-to-broker-0-send-thread],
    Controller 1 epoch 13 failed to send UpdateMetadata request with
    correlation id 11 to broker id:0,host:localhost,port:13000. Reconnecting to
    broker. (kafka.controller.RequestSendThread)
    java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
    at

    kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
    at

    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    [2014-05-16 20:42:32,183] INFO [Controller-1-to-broker-0-send-thread],
    Controller 1 connected to id:0,host:localhost,port:13000 for sending state
    change requests (kafka.controller.RequestSendThread)
    [2014-05-16 20:42:32,185] INFO [Replica state machine on controller 1]:
    Invoking state change to OnlineReplica for replicas

    [Topic=Test4,Partition=8,Replica=0],[Topic=Test4,Partition=11,Replica=0],[Topic=Test4,Partition=15,Replica=0],[Topic=Test4,Partition=5,Replica=1],[Topic=Test4,Partition=12,Replica=1],[Topic=Test4,Partition=10,Replica=0],[Topic=Test4,Partiti

    on=3,Replica=1],[Topic=Test4,Partition=4,Replica=1],[Topic=Test4,Partition=7,Replica=1],[Topic=Test4,Partition=10,Replica=1],[Topic=Test4,Partition=0,Replica=1],[Topic=Test4,Partition=8,Replica=1],[Topic=Test4,Partition=14,Replica=0],[Topic=Test4,Partition=1,Replica=0],[Topic=Test4,Partition=2,Replica=1],[Topic=Test4,Partition=4,Replica=0],[Topic=Test4,Partition

    =1,Replica=1],[Topic=Test4,Partition=13,Replica=1],[Topic=Test4,Partition=13,Replica=0],[Topic=Test4,Partition=6,Replica=1],[Topic=Test4,Partition=14,Replica=1],[Topic=Test4,Partition=3,Replica=0],[Topic=Test4,Partition=5,Replica=0],[Topic=Test4,Partition=9,Replica=1],[Topic=Test4,Partition=2,Replica=0],[Topic=Test4,Partition=11,Replica=1],[Topic=Test4,Partition

    =9,Replica=0],[Topic=Test4,Partition=0,Replica=0],[Topic=Test4,Partition=7,Replica=0],[Topic=Test4,Partition=12,Replica=0],[Topic=Test4,Partition=15,Replica=1],[Topic=Test4,Partition=6,Replica=0]
    (kafka.controller.ReplicaStateMachine)


    The state-change log contains no errors, and Kafka thinks the new topic is
    online and in ISR:

    [root@dp-robin01-dev.sea1.office.priv kafka01]# ./bin/kafka-topics.sh
    --describe --zookeeper localhost:2181 --topic Test4
    Topic:Test4 PartitionCount:16 ReplicationFactor:2 Configs:
    Topic: Test4 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
    Topic: Test4 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
    Topic: Test4 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
    Topic: Test4 Partition: 3 Leader: 1 Replicas: 1,0 Isr: 1,0
    Topic: Test4 Partition: 4 Leader: 0 Replicas: 0,1 Isr: 0,1
    Topic: Test4 Partition: 5 Leader: 1 Replicas: 1,0 Isr: 1,0
    Topic: Test4 Partition: 6 Leader: 0 Replicas: 0,1 Isr: 0,1
    Topic: Test4 Partition: 7 Leader: 1 Replicas: 1,0 Isr: 1,0
    Topic: Test4 Partition: 8 Leader: 0 Replicas: 0,1 Isr: 0,1
    Topic: Test4 Partition: 9 Leader: 1 Replicas: 1,0 Isr: 1,0
    Topic: Test4 Partition: 10 Leader: 0 Replicas: 0,1 Isr: 0,1
    Topic: Test4 Partition: 11 Leader: 1 Replicas: 1,0 Isr: 1,0
    Topic: Test4 Partition: 12 Leader: 0 Replicas: 0,1 Isr: 0,1
    Topic: Test4 Partition: 13 Leader: 1 Replicas: 1,0 Isr: 1,0
    Topic: Test4 Partition: 14 Leader: 0 Replicas: 0,1 Isr: 0,1
    Topic: Test4 Partition: 15 Leader: 1 Replicas: 1,0 Isr: 1,0


    But partitions were not written to disk:

    [root@dp-robin01-dev.sea1.office.priv kafka01]# ls /data/kafka
    recovery-point-offset-checkpoint Test1-0 Test1-10 Test1-12 Test1-14
    Test1-2 Test1-4 Test1-6 Test1-8 Test2-0 Test2-10 Test2-12 Test2-14
    Test2-2 Test2-4 Test2-6 Test2-8 Test3-0 Test3-10 Test3-12 Test3-14
    Test3-2 Test3-4 Test3-6 Test3-8
    replication-offset-checkpoint Test1-1 Test1-11 Test1-13 Test1-15
    Test1-3 Test1-5 Test1-7 Test1-9 Test2-1 Test2-11 Test2-13 Test2-15
    Test2-3 Test2-5 Test2-7 Test2-9 Test3-1 Test3-11 Test3-13 Test3-15
    Test3-3 Test3-5 Test3-7 Test3-9

    [root@dp-robin01-dev.sea1.office.priv kafka01]# ls /data/kafka01/
    recovery-point-offset-checkpoint Test1-0 Test1-10 Test1-12 Test1-14
    Test1-2 Test1-4 Test1-6 Test1-8 Test2-0 Test2-10 Test2-12 Test2-14
    Test2-2 Test2-4 Test2-6 Test2-8 Test3-0 Test3-10 Test3-12 Test3-14
    Test3-2 Test3-4 Test3-6 Test3-8
    replication-offset-checkpoint Test1-1 Test1-11 Test1-13 Test1-15
    Test1-3 Test1-5 Test1-7 Test1-9 Test2-1 Test2-11 Test2-13 Test2-15
    Test2-3 Test2-5 Test2-7 Test2-9 Test3-1 Test3-11 Test3-13 Test3-15
    Test3-3 Test3-5 Test3-7 Test3-9


    On Wed, May 14, 2014 at 7:57 AM, Jun Rao wrote:

    Any error in the controller and state-change log?

    Thanks,

    Jun


    On Tue, May 13, 2014 at 9:16 PM, Robin Yamaguchi <robin@hasoffers.com
    wrote:
    0.8.1.1

    On Tue, May 13, 2014 at 9:02 PM, Jun Rao wrote:

    Which version of Kafka are you using?

    Thanks,

    Jun


    On Tue, May 13, 2014 at 5:56 PM, Robin Yamaguchi <
    robin@hasoffers.com
    wrote:
    It seems like this mailing list wasn't updating through the web
    archives
    for a few days last week, so I wanted to send this out again in
    case
    it
    wasn't seen. My apologies for the repost.

    In further troubleshooting, I've also observed if a broker is shut
    down
    while a connection in is CLOSE_WAIT, this error is generated on the broker
    that is still up:

    2014-05-13 20:57:35,794 - INFO
    [ZkClient-EventThread-12-localhost:2181:Logging$class@68] -
    [Controller
    0]: New leader and ISR for partition [Test3,10] is
    {"leader":0,"leader_epoch":2,"isr":[0]}
    2014-05-13 20:57:35,796 - WARN
    [Controller-0-to-broker-0-send-thread:Logging$class@89] -
    [Controller-0-to-broker-0-send-thread], Controller 0 fails to send
    a
    request to broker id:0,host:localhost,port:13000
    java.io.EOFException: Received -1 when reading from channel, socket
    has
    likely been closed.
    at kafka.utils.Utils$.read(Utils.scala:376)
    at
    kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at
    kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at
    kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-13 20:57:35,799 - ERROR
    [Controller-0-to-broker-0-send-thread:Logging$class@103] -
    [Controller-0-to-broker-0-send-thread], Controller 0 epoch 4 failed
    to
    send
    UpdateMetadata request with correlation id 9 to broker
    id:0,host:localhost,port:13000. Reconnecting to broker.
    java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
    at
    kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-13 20:57:35,800 - INFO
    [Controller-0-to-broker-0-send-thread:Logging$class@68] -
    [Controller-0-to-broker-0-send-thread], Controller 0 connected to
    id:0,host:localhost,port:13000 for sending state change requests


    This WARN is logged for every partition:

    2014-05-13 20:57:35,806 - WARN
    [ZkClient-EventThread-12-localhost:2181:Logging$class@83] -
    [Channel
    manager on controller 0]: Not sending request Name:
    StopReplicaRequest;
    Version: 0; CorrelationId: 9; ClientId: ; DeletePartitions: false;
    ControllerId: 0; ControllerEpoch: 4; Partitions: [Test2,8] to
    broker
    1,
    since it is offline.


    This ERROR is then logged for every partition continuously:

    2014-05-13 20:57:45,839 - INFO [kafka-scheduler-3:Logging$class@68
    ]
    -
    Partition [Test1,6] on broker 0: Shrinking ISR for partition
    [Test1,6]
    from
    0,1 to 0
    2014-05-13 20:57:45,841 - ERROR [kafka-scheduler-3:Logging$class@97
    ]
    -
    Conditional update of path /brokers/topics/Test1/partitions/6/state
    with
    data
    {"controller_epoch":4,"leader":0,"version":1,"leader_epoch":3,"isr":[0]}
    and expected version 6 failed due to
    org.apache.zookeeper.KeeperException$BadVersionException:
    KeeperErrorCode =
    BadVersion for /brokers/topics/Test1/partitions/6/state
    2014-05-13 20:57:45,841 - INFO [kafka-scheduler-3:Logging$class@68
    ]
    -
    Partition [Test1,6] on broker 0: Cached zkVersion [6] not equal to
    that
    in
    zookeeper, skip updating ISR




    On Thu, May 8, 2014 at 2:19 PM, Robin Yamaguchi <
    robin@hasoffers.com
    wrote:
    Greetings,

    I'm looking for some feedback with using advertised.host.nameand
    advertised.port on kafka 0.8.1.1 through a load balancer. The
    brokers
    are
    fronted with haproxy to support our cluster mirroring
    configuration.
    The
    setup has been working as expected, where producers, consumers,
    and
    broker
    connections go through haproxy. I am however sometimes getting
    errors
    when
    attempting to create a new topic:

    2014-05-08 19:00:49,757 - WARN
    [Controller-0-to-broker-0-send-thread:Logging$class@89] -
    [Controller-0-to-broker-0-send-thread], Controller 0 fails to
    send
    a
    request to broker id:0,host:localhost,port:13000
    java.io.EOFException: Received -1 when reading from channel,
    socket
    has
    likely been closed.
    at kafka.utils.Utils$.read(Utils.scala:376)
    at
    kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at
    kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at
    kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at
    kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
    at
    kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-08 19:00:49,769 - ERROR
    [Controller-0-to-broker-0-send-thread:Logging$class@103] -
    [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2
    failed
    to
    send
    UpdateMetadata request with correlation id 7 to broker
    id:0,host:localhost,port:13000. Reconnecting to broker.
    java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
    at
    kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
    at
    kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-08 19:00:49,770 - INFO
    [Controller-0-to-broker-0-send-thread:Logging$class@68] -
    [Controller-0-to-broker-0-send-thread], Controller 0 connected to
    id:0,host:localhost,port:13000 for sending state change requests


    When receiving this error, the new topic is registered in
    zookeeper,
    but
    not written to disk by the broker. The topic however will be
    written
    to
    disk the next time the kafka broker is restarted. I did not
    experience
    this behavior in other clusters that are not fronted by a load
    balancer.
    I
    also do not get this error when kafka is initially started.

    To help simplify troubleshooting, I setup a single host with
    kafka,
    zookeeper, and haproxy running on it with these relevant
    configurations:
    Kafka:
    advertised.host.name = localhost
    advertised.port = 13000

    Zookeeper:
    port = default

    Haproxy:
    listen kafka_13000 0.0.0.0:13000
    mode tcp
    option tcpka
    timeout client 5m
    timeout server 5m
    timeout connect 5m
    server h-kafka01-1b localhost:9092

    Here are the network sockets Kafka creates on start-up:

    [root@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep
    -i
    kafka
    java 25532 kafka 18u IPv6 14717680 0t0 TCP
    *:44398
    (LISTEN)
    java 25532 kafka 23u IPv6 14717684 0t0 TCP
    localhost.localdomain:58093->localhost.localdomain:2181
    (ESTABLISHED)
    java 25532 kafka 38u IPv6 14717692 0t0 TCP
    *:9092
    (LISTEN)
    java 25532 kafka 39u IPv6 14717694 0t0 TCP
    localhost.localdomain:45037->localhost.localdomain:13000
    (ESTABLISHED)
    java 25532 kafka 40u IPv6 14717698 0t0 TCP
    localhost.localdomain:9092->localhost.localdomain:46448
    (ESTABLISHED)

    After the 5m timeout configured in haproxy is surpassed, the
    connection
    through port 13000 is closed (from kafka.log):

    2014-05-08 19:05:40,904 - INFO
    [kafka-processor-9092-0:Logging$class@68
    ]
    - Closing socket connection to /127.0.0.1.


    Looking again at the network sockets, the controller to broker
    connection
    is in a CLOSE_WAIT state:

    [root@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep
    -i
    kafka
    java 25532 kafka 18u IPv6 14717680 0t0 TCP
    *:44398
    (LISTEN)
    java 25532 kafka 23u IPv6 14717684 0t0 TCP
    localhost.localdomain:58093->localhost.localdomain:2181
    (ESTABLISHED)
    java 25532 kafka 38u IPv6 14717692 0t0 TCP
    *:9092
    (LISTEN)
    java 25532 kafka 39u IPv6 14717694 0t0 TCP
    localhost.localdomain:45037->localhost.localdomain:13000
    (CLOSE_WAIT)

    This is when attemping to create a topic will error with:
    java.io.EOFException: Received -1 when reading from channel,
    socket
    has
    likely been closed.

    The linux kernel will remove the socket in a CLOSE_WAIT state
    after
    the
    tcp keepalive expires, which defaults to 2 hours:

    [root@dp-robin01-dev.sea1.office.priv kafka]# ss -o | grep 1300
    CLOSE-WAIT 1 0 ::ffff:127.0.0.1:45040 ::ffff:
    127.0.0.1:13000 timer:(keepalive,46sec,0)


    List of kafka sockets after the controller to broker connection
    has
    been
    completely removed:

    [root@dp-robin01-dev.sea1.office.priv kafka]# lsof -i -P | grep
    -i
    kafka
    java 25532 kafka 18u IPv6 14717680 0t0 TCP
    *:44398
    (LISTEN)
    java 25532 kafka 23u IPv6 14717684 0t0 TCP
    localhost.localdomain:58093->localhost.localdomain:2181
    (ESTABLISHED)
    java 25532 kafka 38u IPv6 14717692 0t0 TCP
    *:9092
    (LISTEN)


    Now when attempting to create a new topic, Kafka detects that the
    controller to broker connection is down, reconnects successfully,
    and
    is
    able to write topic to disk:

    2014-05-08 21:02:47,685 - INFO
    [ZkClient-EventThread-12-localhost:2181:Logging$class@68] -
    [Partition
    state machine on Controller 0]: Invoking state change to
    OnlinePartition
    for partitions
    [Test2,1],[Test2,14],[Test2,3],[Test2,12],[Test2,0],[Test2,13],[Test2,4],[Test2,6],[Test2,9],[Test2,15],[Test2,2],[Test2,7],[Test2,11],[Test2,5],[Test2,8],[Test2,10]
    2014-05-08 21:02:47,796 - ERROR
    [Controller-0-to-broker-0-send-thread:Logging$class@103] -
    [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2
    failed
    to
    send
    LeaderAndIsr request with correlation id 11 to broker
    id:0,host:localhost,port:13000. Reconnecting to broker.
    java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
    at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
    at sun.nio.ch.IOUtil.write(IOUtil.java:148)
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:524)
    at java.nio.channels.SocketChannel.write(SocketChannel.java:493)
    at
    kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
    at
    kafka.network.Send$class.writeCompletely(Transmission.scala:75)
    at
    kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:92)
    at
    kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
    at
    kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
    at
    kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
    2014-05-08 21:02:47,802 - INFO
    [Controller-0-to-broker-0-send-thread:Logging$class@68] -
    [Controller-0-to-broker-0-send-thread], Controller 0 connected to
    id:0,host:localhost,port:13000 for sending state change requests


    It seems that the controller isn't able to properly resolve a
    connection
    in a CLOSE_WAIT state. The exceptions thrown is different from
    when
    the
    socket is in a CLOSE_WAIT vs not existing at all.

    I can somewhat work around this issue by lowering the kernel tcp keepalive
    settings and increasing my haproxy timeouts, but thats not very
    desirable
    and wouldn't work 100% of the time. I've looked through the
    broker
    configuration documentation, and didn't get any meaningful
    results
    changing
    controller.socket.timeout.ms.

    Any feedback / suggestions would be greatly appreciated.

    Thank you,
    Robin

Related Discussions

Discussion Navigation
viewthread | post
Discussion Overview
groupusers @
categorieskafka
postedMay 11, '14 at 8:37p
activeMay 19, '14 at 4:32a
posts7
users2
websitekafka.apache.org
irc#kafka

2 users in discussion

Robin Yamaguchi: 4 posts Jun Rao: 3 posts

People

Translate

site design / logo © 2021 Grokbase