Kafka error sending fetch request

Kafka Consumer dies after a reconnect following a network hiccup #1408 Comments Versions used akka version: 2.6.15 akka-stream-kafka version: 2.1.1 scala version: 2.13.6 Expected Behavior We’re using a committableSource to consume messages from Kafka. We’re reading the messages, parse the JSON payload and persist it into a database. We’re also using Draining Control for […]

Содержание

  1. Kafka Consumer dies after a reconnect following a network hiccup #1408
  2. Comments
  3. Versions used
  4. Expected Behavior
  5. Actual Behavior
  6. Relevant logs
  7. Consumer config
  8. Reproducible Test Case
  9. INVALID_FETCH_SESSION_EPOCH — Sending LeaveGroup request to coordinator #323
  10. Comments
  11. Footer
  12. Spring Kafka stops after group coordinator is lost #1367
  13. Comments
  14. Jack Vanlightly
  15. Scenario 1 — Fire-and-forget with a failed node and partition leader fail-over
  16. Scenario 2 — Acks=1 with a failed node and partition leader fail-over
  17. Scenario 3 — Acks=all with a failed node and partition leader fail-over (No message loss)
  18. Scenario 4 — Completely Isolate Leader from other Kafka nodes and Zookeeper with acks=1
  19. Scenario 5 — Completely Isolate Leader from other Kafka nodes and Zookeeper with acks=all (no message loss)
  20. Scenario 6 — Leader Isolated from Zookeeper only with Acks=1
  21. Scenario 7 — Leader Isolated from Zookeeper only with Acks=all (no message loss)

Kafka Consumer dies after a reconnect following a network hiccup #1408

Versions used

akka version: 2.6.15
akka-stream-kafka version: 2.1.1
scala version: 2.13.6

Expected Behavior

We’re using a committableSource to consume messages from Kafka. We’re reading the messages, parse the JSON payload and persist it into a database.
We’re also using Draining Control for graceful shutdown.
I was hoping that reconnects after network glitches are handled properly by either akka-stream-kafka or the underlying kafka client lib.

Actual Behavior

The consumer is working as expected most of the time. However, it (very sporadically) dies after a short network hiccup happened (we see error logs from our database connection, so I’m pretty sure that it’s a short network issue that’s resolved by a reconnect after a short moment).
The first thing I see in the logs is that the consumer fails to do a successful commit against the broker, then loses the assigned partitions and has to rejoin the consumer group. It does so (as I expect), but shortly after that leaves the consumer group. To me, it looks as if the Kafka consumer is shutting down, while the service itself keeps running. I can’t see a reason why the consumer should be shutting down.

At this point I’m not sure if it’s something we’re missing on our side (should we use something other than committableSource ? Do we miss some configuration? . ), or if it’s a bug or just bad luck? Or do we have to take care of such problems on our side by restarting the Kafka source in these cases, like it’s mentioned in this issue?

What’s even more confusing is the fact that we have two consumer groups that read from the same topic and do similar things when consuming the messages from Kafka. We observed this behavior twice over a couple of weeks. One time for each consumer group, both of the times the other group was consuming without any issues.

Relevant logs

I’m putting the (almost) full JSON from our logs here, so it’s clear when the logs were made, which logger and what message (I replaced sensitive stuff with . ). I’ll try to group them logically (how it makes sense to me, at least):

consumer fails to commit and loses partition assignments after network issues

Consumer config

we’re consuming rather large messages, so fetch.max.bytes is set to something unusual, but I don’t think that’s relevant here.

Reproducible Test Case

I’m currently not able to reproduce the behavior. It happened to us twice over the course of several weeks, and I think it’s initiated by a network glitch which is hard to simulate

The text was updated successfully, but these errors were encountered:

Источник

INVALID_FETCH_SESSION_EPOCH — Sending LeaveGroup request to coordinator #323

I’ve posted this on the forum (along with many other people but no responses and closed disussions):
https://discuss.elastic.co/t/kafka-invalid-fetch-session-epoch/187441

  • I’d like to fully understand why this keeps happening.
  • A better way to handle so after it «Leaves» it can «Rejoin»?

I’ve tried checking the logstash Monitoring API but I don’t see anything here to correlate this plugin isn’t working anymore after it «Leaves» so I am stuck arbitrarily looking through log output for LeaveGroup string in order to determine that I must restart logstash.

The text was updated successfully, but these errors were encountered:

Even trying this:

I also encountered the same situation.
Logstash can not consume message from kafka at all.
No workaround exists?

This seems related to this kafka bug https://issues.apache.org/jira/browse/KAFKA-8052
I created #327 to upgrade the client to 2.3.0, we’ll publish it soon.

input and output kafka plugins have been released with kafka client 2.3.0:

@jsvd is this a client side bug only or server side? I’m guessing it’s client side so upgrading logstash to use the 2.3.0 plugin with 2.1.0 server should fix the issue?

@cdenneen looking at apache/kafka#6582 seems to be the client side, so upgrading should fix it. That said I’m no kafka expert, I’ve just been scouring exception messages and hunting open/closed issues 😀

I updated the Logstash plugins today, and I no longer see the INVALID_FETCH_SESSION_EPOCH messages on the logs, so looking good to me. But, contrary to what someone mentioned above, I was still able to consume at high rate before the update, and from what I understand from KAFKA-8052 it would just cause retries.

@voiprodrigo You are right.
I was misunderstood. Actually messages was consumed despite the session log.

I am getting the same errror.

2019-08-22 14:15:03.458 INFO 3193 — [pool-1-thread-1] o.a.k.c.FetchSessionHandler : [Consumer clientId=consumer-1, groupId=aaaa2] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 3: org.apache.kafka.common.errors.DisconnectException.
2019-08-22 14:15:33.659 INFO 3193 — [pool-1-thread-1] o.a.k.c.FetchSessionHandler : [Consumer clientId=consumer-1, groupId=aaaa2] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1: org.apache.kafka.common.errors.DisconnectException.

upgraded to latest version of kafka still not working , i am not even able to establish a initial connection also

Seeing this. What was the solution?

@ashishbhumireddy , did you solve this issue? I’ve got the same one.

© 2023 GitHub, Inc.

You can’t perform that action at this time.

You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.

Источник

Spring Kafka stops after group coordinator is lost #1367

I noticed that my Spring Kafka consumer suddenly fails when the group coordinator is lost. I’m not really sure why and i dont think increasing the max.poll.interval.ms will do anything since the time is set to 300 seconds.

this is what I get prior to any issues:

then all within the same timeframe, it quits:

Is there anyway to prevent this? or have the consumer retry to connect? I have about 7 brokers assigned to the bootstrap.servers. This happened after running for about 2 weeks with no issues

The text was updated successfully, but these errors were encountered:

GitHub issues are for reporting bugs and asking for new features, ask questions on Stack Overflow tagged with [spring-kafka] and [apache-kafka], not here.

Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

dont think increasing the max.poll.interval.ms will do anything since the time is set to 300 seconds.

Well, clearly this batch took too long to process the records; perhaps a temporary glitch, perhaps a downstream network problem etc. Try increasing the poll interval and/or reducing max.records.per.poll .

Questions about Kafka itself should be directed to the wider Kafka community.

Источник

Jack Vanlightly

Chaos testing Apache Kafka with Blockade, Docker, Python and Bash

In my previous post I used Blockade, Python and some Bash scripts to test a RabbitMQ cluster under various failure conditions such as failed nodes, network partitions, packet loss and a slow network. The aim was to find out how and when a RabbitMQ cluster loses messages. In this post we’ll do exactly the same but with a Kafka cluster. We’ll use our knowledge of the inside workings of Kafka and Zookeeper to produce various failure modes that produce message loss. Please read my post on Kafka fault tolerance as this post assumes you understand the basics of the acknowledgements and replication protocol.

In each scenario we’ll publish messages to a topic called “test1” while introducing failures. At the end we see how many messages are lost. Because Kafka has more components and more configurations than RabbitMQ, we’ll do this in two parts. In Part 1 we’ll concentrate on node failures and network partitions. In Part 2 we’ll take some of the scenarios from Part 1 and add a flaky/slow network.

I have created a few helper bash scripts to perform tasks such as recreating the cluster between scenarios, creating topics etc. You’ll find the scripts in the Github repo.

When there is no Blockade cluster up and running I run the following commands:

After that, to recreate the cluster between each scenario I run:

Scenario 1 — Fire-and-forget with a failed node and partition leader fail-over

In this scenario we’ll send 100000 messages with acks=0, meaning that the client does not require any acknowledgements back. At about the 30000 mark, I kill the node that hosts the leader partition.

First we’ll create the topic:

Next we kick off the publishing of the messages. It sends 100000 messages to the topic “test1” at a rate of 10000 messages a second. It uses acks mode 0 (fire-and-forget).

At about the 30000 message mark we tell Blockade to kill kafka3 which hosts the partition leader.

Once publishing is complete we run a script to get the current high watermark of the topic. This will tell us how many messages have been persisted to the topic. The print-hw.sh script takes three arguments: the broker to run the command on, the internal port and the topic name.

We see that we lost 6764 messages that the producer had sent. This is due to a combination of a connection failure and a leader fail-over.

Scenario 2 — Acks=1 with a failed node and partition leader fail-over

The producer config request.required.acks=1 or its alias acks=1 tells the broker that the producers wants an acknowledgement once the leader partition has written the message to its local log. This scenario should still lose messages, but less messages than acks=0 as we should lose zero messages due to the connection failure, only due to the leader fail-over.

We recreate the cluster and topic for each scenario.

This time the leader is on broker 1.

Now we starting sending the 100000 messages, with acks=1.

At the 30000 message mark more or less I killed broker 1 with the following command:

With acks=1 only 89356 messages are acknowledged, but hopefully all or nearly all will have survived the fail-over.

We see that all survived the fail-over.

Let’s rerun the scenario with ten producers, each sending 100000 messages at the same time. For this I run the python script from a bash script, running concurrently as background jobs.

I killed the leader midway and summed the success numbers to 537722.

So we see that this time we lost 5 acknowledged messages out of 1 million sent concurrently by 10 producers.

I reran the scenario again, with a variant of the script that only prints the final acknowledged total.

This time it lost 11 acknowledged writes. Not too bad for a leader fail-over with acks=1.

Scenario 3 — Acks=all with a failed node and partition leader fail-over (No message loss)

We’ll repeat the same concurrent ten producers sending 1 million messages over a period of a few seconds, killing the leader midway.

After about 10 seconds I executed the command to kill the leader like in previous scenarios. Then finally I retrieved the high watermark for the topic.

With acks=all we did not lose any messages, in fact, 7 messages for which we never got an ack also got persisted. So we see that acks=all helped guarantee no message loss during a leader fail-over.

Scenario 4 — Completely Isolate Leader from other Kafka nodes and Zookeeper with acks=1

Isolating a Kafka leader node should lead to greater message loss than a downed node as the leader does not realize it cannot talk to Zookeeper until after it has already acknowledged messages during a short period, a few seconds.

In this scenario I send 100000 messages at roughly a rate of 10000 second and at about the 30000 message mark I isolate the leader using Blockade.

Midway I isolated kafka2.

You can see that the time out messages appear after 60000 messages have been acknowledged. In the end it gets 66453 acknowledgements.

Below we see that the leader failed over to kafka3 and the high watermark is 34669 which means we lost 31784 acknowledged messages.

Let’s dig a bit deeper and see what is going on. I have a slow-producer.py script that publishes one message per second and prints out more information. We’ll reset the cluster, run this new script and isolate the leader again.

The script does a few things:

Prints out the broker id of the leader at the start and when it changes

On receipt of an acknowledgement, prints out the value of the message and that message offset.

Detects when the message offset drops to a lower value and warns of message loss

I isolated the leader at about the 13-14 message mark. Acknowledgements came in right up to message 28, at which point is seemingly hung for 60 seconds. After 60 seconds the client detected the new leader was broker 2 and we got 60 seconds worth of message timeouts all at once. After the timeouts we started getting acknowledgements from offset 15.

If we look in the logs of the three brokers we see some interesting snippets.

kafka1 logs extract

[2018-09-15 19:20:31,244] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)

[2018-09-15 19:20:37,884] WARN Client session timed out, have not heard from server in 7025ms for sessionid 0x165e83d8e870005 (org.apache.zookeeper.ClientCnxn)

[2018-09-15 19:20:37,884] INFO Client session timed out, have not heard from server in 7025ms for sessionid 0x165e83d8e870005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)

[2018-09-15 19:20:39,034] INFO [Partition test1-0 broker=3] Shrinking ISR from 1,2,3 to 1 (kafka.cluster.Partition)

[2018-09-15 19:20:43,991] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)

[2018-09-15 19:20:43,991] WARN Client session timed out, have not heard from server in 6004ms for sessionid 0x165e83d8e870005 (org.apache.zookeeper.ClientCnxn)

[2018-09-15 19:20:44,095] INFO [ZooKeeperClient] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)

Shows the broker 1 cannot connect to Zookeeper (for 7 seconds), then shortly afterwards it tries shrinks the ISR to itself, though it cannot update zookeeper with that information. So it declares that it will wait until connected to Zookeeper again.

kafka2 logs extract

[2018-09-15 19:20:26,181] INFO [Partition test1-0 broker=2] test1-0 starts at Leader Epoch 1 from offset 15. Previous Leader Epoch was: 0 (kafka.cluster.Partition)

Shows it taking on leadership of partition 0, with offset 15. Note that this happens 15 seconds before kafka1 tries to shrink the ISR, thinking that it is still leader.

kafka3 logs extract

[2018-09-15 19:20:26,245] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Truncating to 15 has no effect as the largest offset in the log is 14 (kafka.log.Log)

Shows kafka3 truncating its log to the new leader’s high watermark (though no data is truncated as it is not ahead of the new leader).

So we see that a completely isolated node is worse than a node failure with acks=1, as it takes a while for the broker to detect that it has lost its Zookeeper connection. While sending 100000 messages we lost 31784 acknowledged messages! Those were the messages sent during this brief 15 second window where kafka1 was still accepting writes even though a new leader had been elected.

Scenario 5 — Completely Isolate Leader from other Kafka nodes and Zookeeper with acks=all (no message loss)

I will repeat the exact same steps as scenario 4 except we’ll set acks=all.

In theory, once the leader is isolated we should not get any acknowledgements because no followers in the ISR are reachable. Once the 10 second limit is reached the leader will shrink the ISR to itself but it can’t update Zookeeper with the value and will stop accepting more writes. Meanwhile the leader should fail-over to a follower and the client should detect that change within 60 seconds and start sending messages to the new leader instead.

Again at about the 30000 message mark I isolate the leader.

This time soon after isolating the leader we get three “Broker: request timed out” errors due to the fact that the followers are out of contact. Then we get the long period with nothing, followed by thousands of “Local: Message timed out” errors.

In the end we get 37640 acknowledged messages.

We see that the new leader is kafka1 and that we did not lose a message. In fact, there is a message in the topic for which we never received an ack. Acks=all is required to avoid data loss in leader fail-overs whether they are due to a network partition or failed node.

Let’s run it again with the ten concurrent producers that will attempt to send 1 million messages in total. I ran the script and after 10 seconds I started the network partition. In the end you see that still no acknowledged messages were lost, though the number of acknowledged messages was very low.

Remember that we are not retrying failed messages, which is what you would do in production. In our case we don’t want duplicates interfering with the numbers.

Scenario 6 — Leader Isolated from Zookeeper only with Acks=1

In this scenario we will isolate the leader from Zookeeper but not from the other Kafka nodes. Once the broker that hosts the leader cannot talk to Zookeeper, it will be marked as dead and the leadership will fail-over to a follower. For a while the original leader will accept writes even though a follower has already been elected as the new leader. Any messages acknowledged during this short window will be lost.

This should be similar to scenario 4 with full isolation of the leader. They have many similarities. At some point the followers will stop sending fetch requests to the leader and the leader will try to shrink the ISR to itself. The difference is that the reason they stop sending fetch requests is that leadership failed-over to another node.

The leader is kafka2 and the controller node is kafka1.

Messages start getting sent.

Midway I isolated kafka2 from Zookeeper only.

The final count showed of the 89027 acknowledged messages, with only 45518 making it to the topic meaning 43509 we lost.

Again we can combine the slow-producer.py and the docker logs to get confirmation of what is happening.

Kafka3 Logs Extract

[2018-09-16 19:20:31,877] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)

[2018-09-16 19:20:37,884] WARN Client session timed out, have not heard from server in 7327ms for sessionid 0x165e8aa663e0005 (org.apache.zookeeper.ClientCnxn)

[2018-09-16 19:20:37,884] INFO Client session timed out, have not heard from server in 7327ms for sessionid 0x165e8aa663e0005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)

[2018-09-16 19:20:39,034] INFO [Partition test1-0 broker=3] Shrinking ISR from 3,1,2 to 3 (kafka.cluster.Partition)

[2018-09-16 19:20:39,609] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)

[2018-09-16 19:20:43,991] WARN Client session timed out, have not heard from server in 6005ms for sessionid 0x165e8aa663e0005 (org.apache.zookeeper.ClientCnxn)

[2018-09-16 19:20:43,991] INFO Client session timed out, have not heard from server in 6005ms for sessionid 0x165e8aa663e0005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)

[2018-09-16 19:20:44,095] INFO [ZooKeeperClient] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)

We see that kafka3, the original leader loses it connection to Zookeeper. Soon after it tries to shrink the ISR to itself but cannot and declares it will wait to be connected.

kafka2 log extract

[2018-09-16 19:20:26,154] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions test1-0 (kafka.server.ReplicaFetcherManager)

[2018-09-16 19:20:26,185] INFO [ReplicaFetcherManager on broker 2] Added fetcher for partitions List([test1-0, initOffset 20 to broker BrokerEndPoint(1,kafka1,19092)] ) (kafka.server.ReplicaFetcherManager)

[2018-09-16 19:20:26,186] INFO [ReplicaAlterLogDirsManager on broker 2] Added fetcher for partitions List() (kafka.server.ReplicaAlterLogDirsManager)

[2018-09-16 19:20:26,192] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Starting (kafka.server.ReplicaFetcherThread)

[2018-09-16 19:20:26,199] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)

[2018-09-16 19:20:26,209] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=1307994263, epoch=10152) to node 3: java.nio.channels.ClosedSelectorException. (org.apache.kafka.clients.FetchSessionHandler)

[2018-09-16 19:20:26,210] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)

[2018-09-16 19:20:26,211] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread)

[2018-09-16 19:20:26,245] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Truncating to 20 has no effect as the largest offset in the log is 19 (kafka.log.Log)

We see that kafka2 removes the existing fetcher and creates a new one, for the new leader, starting at offset 20.

kafka1 Log Extract

[2018-09-16 19:20:26,024] INFO [Controller Newly added brokers: , deleted brokers: 3, all live brokers: 1,2 (kafka.controller.KafkaController)

[2018-09-16 19:20:26,025] INFO [RequestSendThread controllerId=1] Shutting down (kafka.controller.RequestSendThread)

[2018-09-16 19:20:26,029] INFO [RequestSendThread controllerId=1] Stopped (kafka.controller.RequestSendThread)

[2018-09-16 19:20:26,031] INFO [RequestSendThread controllerId=1] Shutdown completed (kafka.controller.RequestSendThread)

[2018-09-16 19:20:26,069] INFO [Controller Broker failure callback for 3 (kafka.controller.KafkaController)

[2018-09-16 19:20:26,074] INFO [Controller Removed ArrayBuffer() from list of shutting down brokers. (kafka.controller.KafkaController)

[2018-09-16 19:20:26,148] INFO [RequestSendThread controllerId=1] Controller 1 connected to kafka2:19093 (id: 2 rack: null) for sending state change requests (kafka.controller.RequestSendThread)

[2018-09-16 19:20:26,171] INFO [RequestSendThread controllerId=1] Controller 1 connected to kafka1:19092 (id: 1 rack: null) for sending state change requests (kafka.controller.RequestSendThread)

[2018-09-16 19:20:26,180] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions test1-0 (kafka.server.ReplicaFetcherManager)

[2018-09-16 19:20:26,181] INFO [Partition test1-0 broker=1] test1-0 starts at Leader Epoch 1 from offset 20. Previous Leader Epoch was: 0 (kafka.cluster.Partition)

We see that kafka1 sees that kafka3 is gone and that it stops its fetcher. The partition on kafka1 gets elected leader.

Note that the fail-over occurred at 19:20:26 and that kafka3 was still thinking it was the leader at 19:20:39, stopping to receive messages at 19:20:40 — a whole 14 seconds after the fail-over.

One last interesting thing is that the controller node could have stopped kafka3 from accepting writes if it had sent a request to it to stop being leader. In the logs we see the following line straight after the fail-over:

[2018-09-17 19:20:26,323] WARN [Channel manager on controller 1]: Not sending request (type=StopReplicaRequest, controllerId=1, controllerEpoch=1, deletePartitions=false, partitions=test1-0) to broker 3, since it is offline. (kafka.controller.ControllerChannelManager)

It didn’t send the request as it believed the node was offline because it had expired in Zookeeper.

The conclusion is that acks=1 loses loads of messages even when a kafka node is only isolated from Zookeeper.

Scenario 7 — Leader Isolated from Zookeeper only with Acks=all (no message loss)

We’ll repeat the same actions as in scenario 6 except we’ll set acks to all.

Just like in scenario 5 I don’t expect to see message loss. The reason is that the leader will be isolated from Zookeeper and so when the fail-over occurs, the controller won’t bother to tell the original leader to stop being leader. Thinking it’s still leader, it will see the other followers not sending fetch requests and try to remove them from the ISR, but it won’t be able to. So when the followers are still in the ISR, we’ll not get an ack because they’ll never confirm receipt and once the attempt to shrink the ISR has occurred no more writes will be accepted.

The fail-over will occur and the client will detect the changes within a minute and start publishing to the new leader.

Источник

We build a kafka cluster with 5 brokers. But one of brokers suddenly stopped running during the run. And it happened twice in the same broker. Here is the log and is this a bug in kafka ?

[2019-01-25 12:57:14,686] INFO [ReplicaFetcher replicaId=3, leaderId=2, fetcherId=0] Error sending fetch request (sessionId=1578860481, epoch=INITIAL) to node 2: java.io.IOException: Connection to 2 was disconnected before the response was read. (org.apache.kafka.clients.FetchSessionHandler)
[2019-01-25 12:57:14,687] WARN [ReplicaFetcher replicaId=3, leaderId=2, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=3, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={api-result-bi-heatmap-8=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), api-result-bi-heatmap-save-12=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), api-result-bi-heatmap-task-2=(offset=2, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), api-result-bi-flow-39=(offset=1883206, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), __consumer_offsets-47=(offset=349437, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), api-result-bi-heatmap-track-6=(offset=1039889, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), api-result-bi-heatmap-task-17=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), __consumer_offsets-2=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), api-result-bi-heatmap-aggs-19=(offset=1255056, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4])}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1578860481, epoch=INITIAL)) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 2 was disconnected before the response was read
at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97)
at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190)
at kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241)
at kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130)
at kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129)
at scala.Option.foreach(Option.scala:257)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)

Comments

@gennevawang

Affects Version(s): <2.2.6.RELEASE>

We are looking for a mechanism to for long polling, (ie, we need to consumer poll only at 30 minute interval, not as soon as message came) I came across #819, tried it sample code provided, works fine. Our platform requires us to use higher version of the spring-boot-starter-parent (2.1.4) and spring-kafka (2.2.6.RELEASE), tested the same code with spring versions, I get «Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 0: org.apache.kafka.common.errors.DisconnectException»

I test have with different configuration of FETCH_MIN_BYTES_CONFIG, FETCH_MAX_WAIT_MS_CONFIG, it appears like if the configuration is set greater than 30 seconds, the DisconectException would shown.

Please find my test configuration program https://github.com/gennevawang/test

Thanks you!

@garyrussell

First, as explained in the template, GitHub issues are NOT for asking questions.

🙅 «Please DO NOT Raise an Issue» Cases

  • Question
    STOP!! Please ask questions about how to use something, or to understand why something isn’t
    working as you expect it to, on Stack Overflow using the spring-kafka tag.

Second, this has nothing to do with Spring; you should point such questions to the wider Kafka community.

That said, you also need to increase the request.timeout.ms to avoid that error.

This works fine for me…

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.properties.fetch.max.wait.ms=60000
spring.kafka.consumer.properties.request.timeout.ms=70000
spring.kafka.consumer.properties.fetch.min.bytes=999999999
spring.kafka.listener.poll-timeout=60000

logging.level.org.springframework.kafka=debug
2019-07-27 09:20:24.223  INFO 32482 --- [  kgh1180-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : kgh1180: partitions assigned: [kgh1180-0]
2019-07-27 09:21:24.177 DEBUG 32482 --- [  kgh1180-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2019-07-27 09:21:24.177 DEBUG 32482 --- [  kgh1180-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {}
2019-07-27 09:22:24.179 DEBUG 32482 --- [  kgh1180-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2019-07-27 09:22:24.179 DEBUG 32482 --- [  kgh1180-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {}

However, I don’t think kafka was designed for this kind of activity.

Why don’t you simply start/stop the container every 30 minutes?

@garyrussell

Closing due to inactivity.

2 participants

@garyrussell

@gennevawang

#apache-kafka #apache-kafka-streams

Вопрос:

У меня периодически возникают проблемы в моих журналах.

Кажется, сердечная нить постоянно борется и получает

 Error sending fetch request
org.apache.kafka.common.errors.DisconnectException

Group coordinator xxx is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted.
 

я уже сделал heartbeat.interval.ms немного больше, но это все еще происходит.

  1. Я хотел бы понять, во что это может превратиться в приложении kafka-streams, настроенном для статического членства. В частности, может ли это привести к восстановлению баланса ?
  2. Я также увеличился request.timeout.ms , должен ли я также увеличиваться delivery.timeout.ms , если да, то почему?
  3. Я пытаюсь понять, как все складывается. То есть, что произойдет, если поток сердцебиения продолжит пытаться, а затем достигнет delivery.timeout.ms ? это то, что вызывает is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true ? Что управляет последующей повторной попыткой после этого. Действительно, потребитель не терпит неудачу, он начинает, как только обнаруживает координатора ?

Я немного и смущен тем, как это может произойти, поэтому ищу способ объяснить это и разобраться с этим.

Есть идеи, как с этим помочь ?

 09:48:12.357 [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2] Processed 70000 total records, ran 0 punctuators, and committed 3 total tasks since the last update
09:48:21.125 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.clients.FetchSessionHandler - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Error sending fetch request (sessionId=1490129196, epoch=INITIAL) to node 0:
org.apache.kafka.common.errors.DisconnectException: null
09:48:21.125 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.clients.FetchSessionHandler - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Error sending fetch request (sessionId=1666554310, epoch=INITIAL) to node 4:
org.apache.kafka.common.errors.DisconnectException: null
09:48:21.225 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.clients.FetchSessionHandler - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Error sending fetch request (sessionId=522549112, epoch=INITIAL) to node 5:
org.apache.kafka.common.errors.DisconnectException: null
09:48:42.711 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.clients.FetchSessionHandler - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-2, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2-consumer, groupId=_entellect-cbe-builder-resnet-0] Error sending fetch request (sessionId=810037183, epoch=INITIAL) to node 2:
org.apache.kafka.common.errors.DisconnectException: null
09:49:23.172 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.clients.FetchSessionHandler - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-2, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2-consumer, groupId=_entellect-cbe-builder-resnet-0] Error sending fetch request (sessionId=810037183, epoch=INITIAL) to node 2:
org.apache.kafka.common.errors.DisconnectException: null
09:50:17.295 [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2] Processed 42290 total records, ran 0 punctuators, and committed 3 total tasks since the last update
09:52:28.299 [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2] Processed 60000 total records, ran 0 punctuators, and committed 1 total tasks since the last update
09:54:35.743 [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2] Processed 60000 total records, ran 0 punctuators, and committed 1 total tasks since the last update
09:55:07.269 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Group coordinator sdc-oxygen-dev-cp-kafka-4.sdc-oxygen-dev-cp-kafka-headless.sdc-oxygen-dev:9092 (id: 2147483643 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted.
09:55:07.371 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Discovered group coordinator sdc-oxygen-dev-cp-kafka-4.sdc-oxygen-dev-cp-kafka-headless.sdc-oxygen-dev:9092 (id: 2147483643 rack: null)
09:55:07.473 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Discovered group coordinator sdc-oxygen-dev-cp-kafka-4.sdc-oxygen-dev-cp-kafka-headless.sdc-oxygen-dev:9092 (id: 2147483643 rack: null)
09:55:50.644 [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1] Processed 30000 total records, ran 0 punctuators, and committed 1 total tasks since the last update
09:56:20.888 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.clients.FetchSessionHandler - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Error sending fetch request (sessionId=1490129196, epoch=INITIAL) to node 0:
org.apache.kafka.common.errors.DisconnectException: null
09:56:20.888 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.clients.FetchSessionHandler - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Error sending fetch request (sessionId=1666554310, epoch=INITIAL) to node 4:
org.apache.kafka.common.errors.DisconnectException: null
09:56:20.989 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.clients.FetchSessionHandler - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Error sending fetch request (sessionId=522549112, epoch=INITIAL) to node 5:
org.apache.kafka.common.errors.DisconnectException: null
09:56:47.135 [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-2] Processed 70000 total records, ran 0 punctuators, and committed 1 total tasks since the last update
09:57:04.873 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.clients.FetchSessionHandler - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Error sending fetch request (sessionId=1490129196, epoch=INITIAL) to node 0:
org.apache.kafka.common.errors.DisconnectException: null
09:57:04.873 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.clients.FetchSessionHandler - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Error sending fetch request (sessionId=1666554310, epoch=INITIAL) to node 4:
org.apache.kafka.common.errors.DisconnectException: null
09:57:04.973 [kafka-coordinator-heartbeat-thread | _entellect-cbe-builder-resnet-0] INFO  o.a.k.clients.FetchSessionHandler - [Consumer instanceId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-1, clientId=sdc-oxygen-dev-entellect-cbe-builder-resnet-22-StreamThread-1-consumer, groupId=_entellect-cbe-builder-resnet-0] Error sending fetch request (sessionId=522549112, epoch=INITIAL) to node 5:
org.apache.kafka.common.errors.DisconnectException: null
 

Ответ №1:

я уже сделал heartbeat.interval.ms немного больше, но это все еще происходит

Это неэффективно для того, чтобы избежать исключения, На самом деле, это механизм для обнаружения неисправностей; Для обнаружения ненормальных ситуаций. Если вы зададите большой интервал, аномальное состояние будет обнаружено позже и наоборот. В заключение, если вы сделали это большим или маленьким, никакой разницы.

Я также увеличился request.timeout.ms должен ли я также увеличить delivery.timeout.ms, если так, то почему?

Я считаю, что они оба независимы друг от друга. например, если запрос отправляется в конечную точку, но не получает никакой обратной связи, это не означает, что конечная точка неактивна. Действительно, это происходит в реальности, когда конечная точка перегружена нагрузкой и не может получить обратную связь как можно скорее. В результате request.timeout.ms должно быть установлено рациональное значение. Однако для delivery.timeout.ms него должно быть установлено большое число. Как указано в официальном документе о delivery.timeout.ms :

который устанавливает верхнюю границу общего времени между отправкой записи и получением подтверждения от брокера. По умолчанию время ожидания доставки установлено равным 2 минутам.

Как я уже упоминал.

Я пытаюсь понять, как все складывается. То есть, что произойдет, если поток сердцебиения продолжит пытаться, а затем достигнет delivery.timeout.ms ? это то, что вызывает недоступность или недопустимость из-за причины: координатор недоступен.isDisconnected: true

Да, поток сердцебиения сигнализирует главному узлу о том, что после тайм-аута любого типа запроса или доставки существует группа «подозреваемый в сбое».

Что управляет последующей повторной попыткой после этого

Администрирование, управление и проверка тем, брокеров, списков управления доступом и других объектов Кафки

Hi all, I have a Kafka cluster hosted via AWS MSK with:

  • 2 brokers b-1 and b-2.

  • 2 topics with both: PartitionCount:1 ReplicationFactor:2 min.insync.replicas=1

I got this error when my producer performed a poll around that time:

2021-02-20 07:59:08,174 - ERROR - Failed to deliver message due to error: KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}

Here is what was collected by my console logs:

%6|1613807298.974|FAIL|rdkafka#producer-2| [thrd:sasl_ssl://b-2.xxxx.kafka.ap-northeast-1.am]: sasl_ssl://b-2.xxxx.kafka.ap-northeast-1.amazonaws.com:9096/2: Disconnected (after 3829996ms in state UP)
%3|1613807299.011|FAIL|rdkafka#producer-2| [thrd:sasl_ssl://b-2.xxxx.kafka.ap-northeast-1.am]: sasl_ssl://b-2.xxxx.kafka.ap-northeast-1.amazonaws.com:9096/2: Connect to ipv4#172.31.18.172:9096 failed: Connection refused (after 36ms in state CONNECT)
%3|1613807299.128|FAIL|rdkafka#producer-2| [thrd:sasl_ssl://b-2.xxxx.kafka.ap-northeast-1.am]: sasl_ssl://b-2.xxxx.kafka.ap-northeast-1.amazonaws.com:9096/2: Connect to ipv4#172.31.18.172:9096 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)
%4|1613807907.225|REQTMOUT|rdkafka#producer-2| [thrd:sasl_ssl://b-2.xxxx.kafka.ap-northeast-1.am]: sasl_ssl://b-2.xxxx.kafka.ap-northeast-1.amazonaws.com:9096/2: Timed out 0 in-flight, 0 retry-queued, 1 out-queue, 1 partially-sent requests
%3|1613807907.225|FAIL|rdkafka#producer-2| [thrd:sasl_ssl://b-2.xxxx.kafka.ap-northeast-1.am]: sasl_ssl://b-2.xxxx.kafka.ap-northeast-1.amazonaws.com:9096/2: 1 request(s) timed out: disconnect (after 343439ms in state UP)
%5|1613807938.942|REQTMOUT|rdkafka#producer-1| [thrd:sasl_ssl://b-1.xxxx.kafka.ap-northeast-1.am]: sasl_ssl://b-1.xxxx.kafka.ap-northeast-1.amazonaws.com:9096/1: Timed out ProduceRequest in flight (after 60767ms, timeout #0)
%5|1613807938.943|REQTMOUT|rdkafka#producer-1| [thrd:sasl_ssl://b-1.xxxx.kafka.ap-northeast-1.am]: sasl_ssl://b-1.xxxx.kafka.ap-northeast-1.amazonaws.com:9096/1: Timed out ProduceRequest in flight (after 60459ms, timeout #1)
%5|1613807938.943|REQTMOUT|rdkafka#producer-1| [thrd:sasl_ssl://b-1.xxxx.kafka.ap-northeast-1.am]: sasl_ssl://b-1.xxxx.kafka.ap-northeast-1.amazonaws.com:9096/1: Timed out ProduceRequest in flight (after 60342ms, timeout #2)
%5|1613807938.943|REQTMOUT|rdkafka#producer-1| [thrd:sasl_ssl://b-1.xxxx.kafka.ap-northeast-1.am]: sasl_ssl://b-1.xxxx.kafka.ap-northeast-1.amazonaws.com:9096/1: Timed out ProduceRequest in flight (after 60305ms, timeout #3)
%5|1613807938.943|REQTMOUT|rdkafka#producer-1| [thrd:sasl_ssl://b-1.xxxx.kafka.ap-northeast-1.am]: sasl_ssl://b-1.xxxx.kafka.ap-northeast-1.amazonaws.com:9096/1: Timed out ProduceRequest in flight (after 60293ms, timeout #4)
%4|1613807938.943|REQTMOUT|rdkafka#producer-1| [thrd:sasl_ssl://b-1.xxxx.kafka.ap-northeast-1.am]: sasl_ssl://b-1.xxxx.kafka.ap-northeast-1.amazonaws.com:9096/1: Timed out 6 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%3|1613807938.943|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://b-1.xxxx.kafka.ap-northeast-1.am]: sasl_ssl://b-1.xxxx.kafka.ap-northeast-1.amazonaws.com:9096/1: 6 request(s) timed out: disconnect (after 4468987ms in state UP)

Broker b-2 logs have this:

[2021-02-20 07:57:24,781] WARN Client session timed out, have not heard from server in 15103ms for sessionid 0x2000190b5d40001 (org.apache.zookeeper.ClientCnxn)
[2021-02-20 07:57:24,782] WARN Client session timed out, have not heard from server in 12701ms for sessionid 0x2000190b5d40000 (org.apache.zookeeper.ClientCnxn)
[2021-02-20 07:57:24,931] INFO Client session timed out, have not heard from server in 12701ms for sessionid 0x2000190b5d40000, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-02-20 07:57:24,932] INFO Client session timed out, have not heard from server in 15103ms for sessionid 0x2000190b5d40001, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2021-02-20 07:57:32,884] INFO Opening socket connection to server INTERNAL_ZK_DNS/INTERNAL_IP. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2021-02-20 07:57:32,910] INFO Opening socket connection to server INTERNAL_ZK_DNS/INTERNAL_IP. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2021-02-20 07:57:33,032] INFO Socket connection established to INTERNAL_ZK_DNS/INTERNAL_IP, initiating session (org.apache.zookeeper.ClientCnxn)
[2021-02-20 07:57:33,032] INFO Socket connection established to INTERNAL_ZK_DNS/INTERNAL_IP, initiating session (org.apache.zookeeper.ClientCnxn
[2021-02-20 07:59:35,428] INFO EventThread shut down for session: 0x2000190b5d40000 (org.apache.zookeeper.ClientCnxn)
[2021-02-20 07:59:38,119] INFO Kafka version: 2.2.1 (org.apache.kafka.common.utils.AppInfoParser)
[2021-02-20 07:59:38,329] INFO Kafka commitId: unknown (org.apache.kafka.common.utils.AppInfoParser)
[2021-02-20 07:59:49,490] INFO ConsumerConfig values:
...
[2021-02-20 08:01:54,727] INFO [ZooKeeperClient] Session expired. (kafka.zookeeper.ZooKeeperClient)
[2021-02-20 08:02:06,174] INFO [ZooKeeperClient] Session expired. (kafka.zookeeper.ZooKeeperClient)
[2021-02-20 08:02:06,247] INFO [Log partition=v1.dev.snapshot-0, dir=LOG_DIR] Found deletable segments with base offsets [183472,183504] due to retention time 300000ms breach (kafka.log.Log)
[2021-02-20 08:02:06,278] INFO [Log partition=v1.dev.snapshot-0, dir=LOG_DIR] Scheduling log segment [baseOffset 183472, size 1049368838] for deletion. (kafka.log.Log)
[2021-02-20 08:02:10,753] INFO [Log partition=v1.dev.snapshot-0, dir=LOG_DIR] Scheduling log segment [baseOffset 183504, size 1048047093] for deletion. (kafka.log.Log)
[2021-02-20 08:02:16,373] INFO [Log partition=v1.dev.snapshot-0, dir=LOG_DIR] Incrementing log start offset to 183536 (kafka.log.Log)
[2021-02-20 08:03:11,908] INFO [Log partition=v1.dev.snapshot-0, dir=LOG_DIR] Deleting segment 183472 (kafka.log.Log)
[2021-02-20 08:03:16,417] INFO Deleted log LOG_DIR/v1.dev.snapshot-0/00000000000000183472.log.deleted. (kafka.log.LogSegment)
[2021-02-20 08:03:16,697] INFO [Log partition=v1.dev.snapshot-0, dir=LOG_DIR] Deleting segment 183504 (kafka.log.Log)
[2021-02-20 08:03:57,318] INFO Deleted offset index LOG_DIR/v1.dev.snapshot-0/00000000000000183472.index.deleted. (kafka.log.LogSegment)
[2021-02-20 08:03:57,331] INFO Deleted time index LOG_DIR/v1.dev.snapshot-0/00000000000000183472.timeindex.deleted. (kafka.log.LogSegment)
[2021-02-20 08:03:57,337] INFO Deleted log LOG_DIR/v1.dev.snapshot-0/00000000000000183504.log.deleted. (kafka.log.LogSegment)
[2021-02-20 08:06:01,625] INFO Deleted offset index LOG_DIR/v1.dev.snapshot-0/00000000000000183504.index.deleted. (kafka.log.LogSegment)
[2021-02-20 08:06:02,424] INFO Deleted time index LOG_DIR/v1.dev.snapshot-0/00000000000000183504.timeindex.deleted. (kafka.log.LogSegment)
[2021-02-20 08:09:12,344] INFO [AdminClient clientId=adminclient-9] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager)
org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
[2021-02-20 08:19:24,294] WARN Attempting to send response via channel for which there is no open connection, connection id INTERNAL_IP-INTERNAL_IP-0 (kafka.network.Processor)
[2021-02-20 08:17:19,563] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 1106713 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2021-02-20 08:14:08,232] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error sending fetch request (sessionId=1935723386, epoch=3316) to node 1: java.io.IOException: Connection to 1 was disconnected before the response was read. (org.apache.kafka.clients.FetchSessionHandler)
[2021-02-20 08:14:01,296] INFO [ZooKeeperClient] Initializing a new session to INTERNAL_ZK_DNS,INTERNAL_ZK_DNS,INTERNAL_ZK_DNS,. (kafka.zookeeper.ZooKeeperClient)
[2021-02-20 08:23:41,318] INFO Initiating client connection, connectString=INTERNAL_ZK_DNS,INTERNAL_ZK_DNS,INTERNAL_ZK_DNS, sessionTimeout=18000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@1dde4cb2 (org.apache.zookeeper.ZooKeeper)
[2021-02-20 08:10:01,804] INFO Kafka version: 2.2.1 (org.apache.kafka.common.utils.AppInfoParser)
[2021-02-20 08:23:41,318] INFO Kafka commitId: unknown (org.apache.kafka.common.utils.AppInfoParser)
[2021-02-20 08:09:14,150] INFO [Partition __consumer_offsets-34 broker=2] Shrinking ISR from 1,2 to 2. Leader: (highWatermark: 88518, endOffset: 88519). Out of sync replicas: (brokerId: 1, endOffset: 88518). (kafka.cluster.Partition)
[2021-02-20 08:10:01,695] INFO [ZooKeeperClient] Initializing a new session to INTERNAL_ZK_DNS,INTERNAL_ZK_DNS,INTERNAL_ZK_DNS,. (kafka.zookeeper.ZooKeeperClient)
[2021-02-20 08:23:41,319] INFO Initiating client connection, connectString=INTERNAL_ZK_DNS,INTERNAL_ZK_DNS,INTERNAL_ZK_DNS, sessionTimeout=18000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@56ace400 (org.apache.zookeeper.ZooKeeper)
[2021-02-20 08:23:41,567] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 250 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2021-02-20 08:23:41,680] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2021-02-20 08:23:47,842] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1935723386, epoch=3316)) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 1 was disconnected before the response was read
at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:100)
at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:193)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:280)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
at scala.Option.foreach(Option.scala:274)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
[2021-02-20 08:23:47,899] WARN [AdminClient clientId=adminclient-9] Connection to node -1 (INTERNAL_BROKER_DNS/INTERNAL_IP) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue. (org.apache.kafka.clients.NetworkClient)
[2021-02-20 08:23:47,899] INFO [AdminClient clientId=adminclient-9] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager)
[2021-02-20 08:23:47,899] WARN [AdminClient clientId=adminclient-9] Connection to node -1 (INTERNAL_BROKER_DNS/INTERNAL_IP) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue. (org.apache.kafka.clients.NetworkClient)
[2021-02-20 08:23:47,899] INFO [AdminClient clientId=adminclient-9] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager)
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
[2021-02-20 08:23:47,944] INFO Creating /brokers/ids/2 (is it secure? false) (kafka.zk.KafkaZkClient)
[2021-02-20 08:23:47,984] INFO Opening socket connection to server INTERNAL_ZK_DNS/INTERNAL_IP. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2021-02-20 08:23:47,985] INFO Socket connection established to INTERNAL_ZK_DNS/INTERNAL_IP, initiating session (org.apache.zookeeper.ClientCnxn)
[2021-02-20 08:23:47,989] INFO Session establishment complete on server INTERNAL_ZK_DNS/INTERNAL_IP, sessionid = 0x100000927df003f, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
[2021-02-20 08:23:47,995] INFO Opening socket connection to server INTERNAL_ZK_DNS/INTERNAL_IP. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2021-02-20 08:23:47,998] INFO Socket connection established to INTERNAL_ZK_DNS/INTERNAL_IP, initiating session (org.apache.zookeeper.ClientCnxn)
[2021-02-20 08:23:48,017] INFO Session establishment complete on server INTERNAL_ZK_DNS/INTERNAL_IP, sessionid = 0x2000190b5d40002, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
[2021-02-20 08:23:48,049] INFO Stat of the created znode at /brokers/ids/2 is: 4294970647,4294970647,1613809428018,1613809428018,1,0,0,144116909113344002,589,0,4294970647 (kafka.zk.KafkaZkClient)
[2021-02-20 08:23:48,049] INFO Registered broker 2 at path /brokers/ids/2 with addresses: ArrayBuffer(EndPoint(b-2.xxxx.kafka.ap-northeast-1.amazonaws.com,9096,ListenerName(CLIENT_SASL_SCRAM),SASL_SSL), EndPoint(INTERNAL_BROKER_DNS,9093,ListenerName(REPLICATION),PLAINTEXT), EndPoint(INTERNAL_BROKER_DNS,9095,ListenerName(REPLICATION_SECURE),SSL)), czxid (broker epoch): 4294970647 (kafka.zk.KafkaZkClient)
[2021-02-20 08:23:48,111] INFO [Partition __consumer_offsets-34 broker=2] Cached zkVersion [24] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2021-02-20 08:23:48,112] INFO [Partition v1.dev.snapshot-0 broker=2] Shrinking ISR from 1,2 to 2. Leader: (highWatermark: 183544, endOffset: 183545). Out of sync replicas: (brokerId: 1, endOffset: 183544). (kafka.cluster.Partition)
[2021-02-20 08:23:48,126] INFO [Partition v1.dev.snapshot-0 broker=2] Cached zkVersion [43] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)

My understanding here is that (1) b-2 went down i.e. unable to connect to Zookeeper (2) Messages were produced to b-1 successfully during this time. (3) b-1 was also trying to forward messages to b-2during this downtime due to the replication factor set to 2 (4) All these forwarded messages (ProduceRequests) got timed-out after 600s

My question:

  1. Is my understanding correct and how I can prevent this from happening again?

  2. If I had 3 brokers here, would b-1 have tried to connect to b-3 right away rather than waiting for b-2? Is that a good workaround? (Assuming topic replication factor = 2 everywhere)

Я постоянно получаю FETCH_SESSION_ID_NOT_FOUND. Я не уверен, почему это происходит. Может кто-нибудь порадовать меня здесь, в чем проблема и как это отразится на потребителях и брокерах.

Журнал Сервера Кафки:

INFO [2019-10-18 12:09:00,709] [ReplicaFetcherThread-1-8][] org.apache.kafka.clients.FetchSessionHandler - [ReplicaFetcher replicaId=6, leaderId=8, fetcherId=1] Node 8 was unable to process the fetch request with (sessionId=258818904, epoch=2233): FETCH_SESSION_ID_NOT_FOUND.
 INFO [2019-10-18 12:09:01,078] [ReplicaFetcherThread-44-10][] org.apache.kafka.clients.FetchSessionHandler - [ReplicaFetcher replicaId=6, leaderId=10, fetcherId=44] Node 10 was unable to process the fetch request with (sessionId=518415741, epoch=4416): FETCH_SESSION_ID_NOT_FOUND.
 INFO [2019-10-18 12:09:01,890] [ReplicaFetcherThread-32-9][] org.apache.kafka.clients.FetchSessionHandler - [ReplicaFetcher replicaId=6, leaderId=9, fetcherId=32] Node 9 was unable to process the fetch request with (sessionId=418200413, epoch=3634): FETCH_SESSION_ID_NOT_FOUND.

Kafka Consumer Log:

12:29:58,936 INFO  [FetchSessionHandler:383] [Consumer clientId=bannerGroupMap#87e2af7cf742#test, groupId=bannerGroupMap#87e2af7cf742#test] Node 8 was unable to process the fetch request with (sessionId=1368981303, epoch=60): FETCH_SESSION_ID_NOT_FOUND.
12:29:58,937 INFO  [FetchSessionHandler:383] [Consumer clientId=bannerGroupMap#87e2af7cf742#test, groupId=bannerGroupMap#87e2af7cf742#test] Node 3 was unable to process the fetch request with (sessionId=1521862194, epoch=59): FETCH_SESSION_ID_NOT_FOUND.
12:29:59,939 INFO  [FetchSessionHandler:383] [Consumer clientId=zoneGroupMap#87e2af7cf742#test, groupId=zoneGroupMap#87e2af7cf742#test] Node 7 was unable to process the fetch request with (sessionId=868804875, epoch=58): FETCH_SESSION_ID_NOT_FOUND.
12:30:06,952 INFO  [FetchSessionHandler:383] [Consumer clientId=creativeMap#87e2af7cf742#test, groupId=creativeMap#87e2af7cf742#test] Node 3 was unable to process the fetch request with (sessionId=1135396084, epoch=58): FETCH_SESSION_ID_NOT_FOUND.
12:30:12,965 INFO  [FetchSessionHandler:383] [Consumer clientId=creativeMap#87e2af7cf742#test, groupId=creativeMap#87e2af7cf742#test] Node 6 was unable to process the fetch request with (sessionId=1346340004, epoch=56): FETCH_SESSION_ID_NOT_FOUND.

Детали кластера:

Broker: 13 (1 Broker : 14 cores & 36GB memory)
Kafka cluster version: 2.0.0
Kafka Java client version: 2.0.0
Number topics: ~15. 
Number of consumers: 7K (all independent and manually assigned all partitions of a topic to a consumers. One consumer is consuming all partitions from a topic only) 

WebWhen fetch response is processed by the heartbeat thread, polling thread may send new fetch request with the same epoch as the previous fetch request if heartbeat thread …
From issues.apache.org
See details »


SPARK BATCH JOB READING FROM KAFKA FAILING FOR LARGE WORK LOADS

WebJul 28, 2020 This error commonly occurs when a core or task node is terminated because of high disk space utilization, or when a node becomes unresponsive due to prolonged …
From stackoverflow.com
See details »


HOW TO HANDLE ERRORS IN KAFKA STREAMS — CONFLUENT

WebOn the other hand, if you’re too loose with your configurations, it’s possible that your application could shut down even for transient issues. One solution is a configuration …
From developer.confluent.io
See details »


[KAFKA-9357] ERROR SENDING FETCH REQUEST — ASF JIRA

Web(org.apache.kafka.clients.FetchSessionHandler)[2019-12-28 23:57:05,153] INFO [ReplicaFetcher replicaId=3, leaderId=2, fetcherId=0] Error sending fetch request …
From issues.apache.org
See details »


[KAFKA-7870] ERROR SENDING FETCH REQUEST …

WebWe build a kafka cluster with 5 brokers. But one of brokers suddenly stopped running during the run. And it happened twice in the same broker.
From issues.apache.org
See details »


ERROR SENDING FETCH REQUEST / JOIN GROUP FAILED — KAFKA STREAMS …

WebMay 18, 2021 Error sending fetch request (sessionId=1445202357, epoch=INITIAL) to node 0: org.apache.kafka.common.errors.TimeoutException: Failed to send request …
From forum.confluent.io
See details »


WHY DO KAFKA CONSUMERS OUTPUT …

WebJun 8, 2018 INFO Jun 08 08:30:20.335 61161458 [KafkaRecordConsumer-0] org.apache.kafka.clients.FetchSessionHandler [Consumer clientId=consumer-1, …
From stackoverflow.com
See details »


KAFKA ERROR HANDLING AND LOGGING — APACHE KAFKA — APACHE …

WebAug 11, 2014 Possible cause: null (kafka.server.KafkaApis) Some related JIRAs have been filed for these issues, for example, KAFKA-1066 and KAFKA-1122. In order to …
From cwiki.apache.org
See details »


TROUBLESHOOT ISSUES WITH AZURE EVENT HUBS FOR APACHE KAFKA

WebMay 25, 2021 Kafka versions 0.9 and earlier don’t support the required SASL protocols and can’t connect to Event Hubs. Strange encodings on AMQP headers when …
From learn.microsoft.com
See details »


UNABLE TO PROCESS FETCH REQUESTS IN KAFKA — STACK OVERFLOW

WebJul 6, 2020 apache-kafka Share Improve this question Follow asked Jul 6, 2020 at 12:51 Praveenks 1,366 8 36 74 You might have to increase the value of …
From stackoverflow.com
See details »


KAFKA FETCH REQUEST CANNOT FETCH ANY MESSAGES IF THERE IS …

WebJun 17, 2015 1 Answer Sorted by: 1 Two approaches here: configure your server to reject messages larger than the max message size your consumers can fetch. Make sure your …
From stackoverflow.com
See details »


KAFKA INVALID_FETCH_SESSION_EPOCH — STACK OVERFLOW

WebFeb 21, 2019 Kafka brokers, which are replica followers, fetch messages from the leader. In order to avoid sending full metadata each time for all partitions, only those partitions …
From stackoverflow.com
See details »


KAFKA CONSUMER SENDING MORE FETCH REQUESTS TO BROKER

WebFeb 28, 2020 I am having a Kafka Consumer which fetches the broker in a given fetch interval. It is fetching with the given time interval and it is fine when the messages are …
From stackoverflow.com
See details »


KAFKA INVALID_FETCH_SESSION_EPOCH — DISCUSS THE ELASTIC STACK

WebJun 25, 2019 Hi Occasionally log stash Kafka consumer produces [INFO ] [org.apache.kafka.clients.FetchSessionHandler] [Consumer clientId=elk-logstash …
From discuss.elastic.co
See details »


COMMITTING AND FETCHING CONSUMER OFFSETS IN KAFKA — APACHE …

WebMar 24, 2015 The official Kafka documentation describes how the feature works and how to migrate offsets from ZooKeeper to Kafka. This wiki provides sample code that shows …
From cwiki.apache.org
See details »


[SOLVED] ERROR SENDING FETCH REQUEST (SESSIONID=INVALID,

WebJul 25, 2022 Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1001: org.apache.kafka.common.errors.DisconnectException apache-kafka kafka …
From 9to5answer.com
See details »


A GUIDE TO THE KAFKA PROTOCOL — APACHE SOFTWARE FOUNDATION

WebJun 14, 2017 Process fetch or produce requests, directing them to the appropriate broker based on the topic/partitions they send to or fetch from. If we get an appropriate error, …
From cwiki.apache.org
See details »


Related Search


Понравилась статья? Поделить с друзьями:
  • Kad arbitr ru ошибка 429
  • K tag checksum error при записи
  • K lite codec pack error
  • K amd device disconnected error amazing
  • Jxb 178 ошибка error 1