Error while fetching metadata with correlation id

I am trying to use Kafka. All configurations are done properly but when I try to produce message from console I keep getting the following error WARN Error while fetching metadata with correlation...

I am trying to use Kafka.
All configurations are done properly but when I try to produce message from console I keep getting the following error

WARN Error while fetching metadata with correlation id 39 : 
     {4-3-16-topic1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

Kafka version: 2.11-0.9.0.0

Ofek Hod's user avatar

Ofek Hod

3,3742 gold badges12 silver badges25 bronze badges

asked Mar 4, 2016 at 5:32

Vishesh's user avatar

7

It could be related to advertised.host.name setting in your server.properties.

What could happen is that your producer is trying to find out who is the leader for a given partition, figures out its advertised.host.name and advertised.port and tries to connect.
If these settings are not configured correctly it then may think that the leader is unavailable.

answered Mar 15, 2016 at 9:33

Alexey Raga's user avatar

Alexey RagaAlexey Raga

7,4271 gold badge29 silver badges39 bronze badges

3

I tried all the recommendations listed here. What worked for me was to go to server.properties and add:

port = 9092
advertised.host.name = localhost 

Leave listeners and advertised_listeners commented out.

jrbedard's user avatar

jrbedard

3,6045 gold badges29 silver badges34 bronze badges

answered Nov 22, 2016 at 0:58

Vikas Deolaliker's user avatar

3

I have been witnessing this same issue in the last 2 weeks while working with Kafka and have been reading this Stackoverflow’s post ever since.

After 2 weeks of analysis i have deduced that in my case this happens
when trying to produce messages to a topic that doesn’t exist
.

The outcome in my case is that Kafka sends an error message back but creates, at the
same time, the topic that did not exist before. So if I try to produce any message again to that topic after this event, the error will not appear anymore as the topic as been created.

PLEASE NOTE: It could be that my particular Kafka installation was configured to automatically create the topic when the same does not exist; that should explain why in my case I can see the issue only once for every topic after resetting the topics: your configuration might be different and in that case you would keep receiving the same error over and over.

Kirby's user avatar

Kirby

14.8k8 gold badges88 silver badges102 bronze badges

answered Sep 5, 2018 at 15:51

Luca Tampellini's user avatar

3

What solved it for me is to set listeners like so:

advertised.listeners = PLAINTEXT://my.public.ip:9092
listeners = PLAINTEXT://0.0.0.0:9092

This makes KAFKA broker listen to all interfaces.

answered Oct 12, 2017 at 9:58

Pinelopi Kouleri's user avatar

3

I had kafka running as a Docker container and similar messages were flooding to the log.
And KAFKA_ADVERTISED_HOST_NAME was set to ‘kafka’.

In my case the reason for error was the missing /etc/hosts record for ‘kafka’ in ‘kafka’ container itself.
So, for example, running ping kafka inside ‘kafka’ container would fail with ping: bad address 'kafka'

In terms of Docker this problem gets solved by specifying hostname for the container.

Options to achieve it:

  • docker run --hostname ...
  • docker run -it --add-host ...
  • hostname in docker-compose
  • hostname in AWS EC2 Task Definition

answered Aug 3, 2016 at 11:39

Vlad.Bachurin's user avatar

Vlad.BachurinVlad.Bachurin

1,3401 gold badge14 silver badges22 bronze badges

2

I’m using kafka_2.12-0.10.2.1:

vi config/server.properties

add below line:

listeners=PLAINTEXT://localhost:9092
  • No need to change the advertised.listeners as it picks up the value
    from std listener property.

Hostname and port the broker will advertise to producers and consumers. If not set,

  • it uses the value for «listeners» if configured

Otherwise, it will use the value returned from java.net.InetAddress.getCanonicalHostName().

stop the Kafka broker:

bin/kafka-server-stop.sh

restart broker:

bin/kafka-server-start.sh -daemon config/server.properties

and now you should not see any issues.

Bonifacio2's user avatar

Bonifacio2

3,1875 gold badges35 silver badges51 bronze badges

answered Jun 22, 2017 at 6:25

Dean Jain's user avatar

Dean JainDean Jain

1,85319 silver badges15 bronze badges

2

We tend to get this message when we try to subscribe to a topic that has not been created yet. We generally rely on topics to be created a priori in our deployed environments, but we have component tests that run against a dockerized kafka instance, which starts clean every time.

In that case, we use AdminUtils in our test setup to check if the topic exists and create it if not. See this other stack overflow for more about setting up AdminUtils.

Community's user avatar

answered Aug 1, 2016 at 20:11

Ryan McKay's user avatar

Ryan McKayRyan McKay

3812 silver badges6 bronze badges

Another possibility for this warning (in 0.10.2.1) is that you try to poll on a topic that has just been created and the leader for this topic-partition is not yet available, you are in the middle of a leadership election.

Waiting a second between topic creation and polling is a workaround.

answered Jun 7, 2017 at 17:06

Benoit Delbosc's user avatar

1

For anyone trying to run kafka on kubernetes and running into this error, this is what finally solved it for me:

You have to either:

  1. Add hostname to the pod spec, that way kafka can find itself.

or

  1. If using hostPort, then you need hostNetwork: true and dnsPolicy: ClusterFirstWithHostNet

The reason for this is because Kafka needs to talk to itself, and it decides to use the ‘advertised’ listener/hostname to find itself, rather than using localhost.
Even if you have a Service that points the advertised host name at the pod, it is not visible from within the pod. I do not really know why that is the case, but at least there is a workaround.

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: zookeeper-cluster1
  namespace: default
  labels:
    app: zookeeper-cluster1
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper-cluster1
  template:
    metadata:
      labels:
        name: zookeeper-cluster1
        app: zookeeper-cluster1
    spec:
      hostname: zookeeper-cluster1
      containers:
      - name: zookeeper-cluster1
        image: wurstmeister/zookeeper:latest
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 2181
        - containerPort: 2888
        - containerPort: 3888

---

apiVersion: v1
kind: Service
metadata:
  name: zookeeper-cluster1
  namespace: default
  labels:
    app: zookeeper-cluster1
spec:
  type: NodePort
  selector:
    app: zookeeper-cluster1
  ports:
  - name: zookeeper-cluster1
    protocol: TCP
    port: 2181
    targetPort: 2181
  - name: zookeeper-follower-cluster1
    protocol: TCP
    port: 2888
    targetPort: 2888
  - name: zookeeper-leader-cluster1
    protocol: TCP
    port: 3888
    targetPort: 3888

---

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: kafka-cluster
  namespace: default
  labels:
    app: kafka-cluster
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-cluster
  template:
    metadata:
      labels:
        name: kafka-cluster
        app: kafka-cluster
    spec:
      hostname: kafka-cluster
      containers:
      - name: kafka-cluster
        image: wurstmeister/kafka:latest
        imagePullPolicy: IfNotPresent
        env:
        - name: KAFKA_ADVERTISED_LISTENERS
          value: PLAINTEXT://kafka-cluster:9092
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zookeeper-cluster1:2181
        ports:
        - containerPort: 9092

---

apiVersion: v1
kind: Service
metadata:
  name: kafka-cluster
  namespace: default
  labels:
    app: kafka-cluster
spec:
  type: NodePort
  selector:
    app: kafka-cluster
  ports:
  - name: kafka-cluster
    protocol: TCP
    port: 9092
    targetPort: 9092

answered Jul 15, 2017 at 13:22

Chris's user avatar

ChrisChris

6116 silver badges7 bronze badges

6

Adding this since it may help others. A Common problem can be a misconfiguration of advertised.host.name. With Docker using docker-compose setting the name of the service inside KAFKA_ADVERTISED_HOST_NAME wont work unless you set the hostname as well. docker-compose.yml example:

  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    hostname: kafka
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

The above without hostname: kafka can issue a LEADER_NOT_AVAILABLE when trying to connect.
You can find an example of a working docker-compose configuration here

answered May 25, 2018 at 9:04

Paizo's user avatar

PaizoPaizo

3,86631 silver badges45 bronze badges

In my case, it was working fine at home, but it was failing in office, the moment I connect to office network.

So modified the config/server.properties
listeners=PLAINTEXT://:9092 to listeners=PLAINTEXT://localhost:9092

In my case, I was getting while describing the Consumer Group

answered Sep 21, 2018 at 22:11

Yoga Gowda's user avatar

Yoga GowdaYoga Gowda

3674 silver badges8 bronze badges

1

If you are running kafka on local machine, try updating $KAFKA_DIR/config/server.properties with below line:
listeners=PLAINTEXT://localhost:9092 and then restarting kafka.

Community's user avatar

answered Nov 30, 2018 at 1:03

MrKulli's user avatar

MrKulliMrKulli

72510 silver badges18 bronze badges

2

I am using docker-compose to build the Kafka container using wurstmeister/kafka image. Adding KAFKA_ADVERTISED_PORT: 9092 property to my docker-compose file solved this error for me.

Derlin's user avatar

Derlin

9,3842 gold badges29 silver badges51 bronze badges

answered Jun 14, 2017 at 10:18

Priyanka's user avatar

Since I wanted my kafka broker to connect with remote producers and consumers, So I don’t want advertised.listener to be commented out. In my case, (running kafka on kubernetes), I found out that my kafka pod was not assigned any Cluster IP. By removing the line clusterIP: None from services.yml, the kubernetes assigns an internal-ip to kafka pod. This resolved my issue of LEADER_NOT_AVAILABLE and also remote connection of kafka producers/consumers.

answered Jan 16, 2018 at 17:56

Anum Sheraz's user avatar

Anum SherazAnum Sheraz

2,1111 gold badge27 silver badges48 bronze badges

0

When LEADER_NOT_AVAILABLE error throws, just restart the kafka broker:

/bin/kafka-server-stop.sh

followed by

/bin/kafka-server-start.sh config/server.properties

(Note: Zookeeper must be running by this time ,if you do otherway it wont work )

sniperd's user avatar

sniperd

5,0696 gold badges28 silver badges43 bronze badges

answered Feb 21, 2018 at 8:31

Dan's user avatar

2

If you get repeated error messages like this:

Error while fetching metadata with correlation id 3991 : {your.topic=LEADER_NOT_AVAILABLE}

Or

Discovered group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
(Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:509)
Group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:729)
Discovered group coordinator 172.25.40.219:9092 (id: 2147483645 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
Group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:729)

Then, you need to configure listener settings like this in the kafka server.properties:

 listeners=PLAINTEXT://your.server.ip:9092

This is the solution tried on Apacke Kafka 2.5.0 and confluent platform 5.4.1.

answered Jul 22, 2020 at 12:36

Arsalan Siddiqui's user avatar

1

This below line I have added in config/server.properties, that resolved my issue similar above issue. Hope this helps, its pretty much well documented in server.properties file, try to read and understand before you modify this.
advertised.listeners=PLAINTEXT://<your_kafka_server_ip>:9092

answered May 23, 2018 at 15:27

ravibeli's user avatar

ravibeliravibeli

4848 silver badges28 bronze badges

For me, I didn’t specify broker id for Kafka instance.
It will get a new id from zookeeper sometimes when it restarts in Docker environment.
If your broker id is greater than 1000, just specify the environment variable KAFKA_BROKER_ID.

Use this to see brokers, topics and partitions.

brew install kafkacat
kafkacat -b [kafka_ip]:[kafka_poot] -L

answered Sep 15, 2019 at 9:55

Anderson's user avatar

AndersonAnderson

2,3881 gold badge25 silver badges40 bronze badges

1

I was also getting the same error message

WARN Error while fetching metadata with correlation id 39 :
{4-3-16-topic1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

Resolution Steps:

  • Go to C:WindowsSystem32driversetchosts
  • If the following line is not there, then add it to the end of the hosts file
127.0.0.1       localhost
  • Go to C:<Kafka_Config_Path>server.properties, and, at the end of the file, add
    advertised.listeners = PLAINTEXT://localhost:9092
    listeners = PLAINTEXT://0.0.0.0:9092
  • Restart the Kafka server

Kirby's user avatar

Kirby

14.8k8 gold badges88 silver badges102 bronze badges

answered Aug 15, 2021 at 14:56

rahulnikhare's user avatar

rahulnikharerahulnikhare

1,3081 gold badge16 silver badges25 bronze badges

For all those struggling with the Kafka ssl setup and seeing this LEADER_NOT_AVAILABLE error. One of the reasons that might be broken is the keystore and truststore. In the keystore you need to have private key of the server + signed server certificate. In the client truststore, you need to have intermedidate CA certificate so that client can authenticate the kafka server. If you will use ssl for interbroker communication, you need this truststore also set in the server.properties of the brokers so they can authenticate each other.

That last piece I was mistakenly missing and caused me a lot of painful hours finding out what this LEADER_NOT_AVAILABLE error might mean. Hopefully this can help somebody.

answered Oct 27, 2017 at 8:45

vojtmen's user avatar

2

Issue is resolved after adding the listener setting on server.properties file located at config directory.
listeners=PLAINTEXT://localhost(or your server):9092
Restart kafka after this change. Version used 2.11

answered Jul 11, 2018 at 14:10

Jitray's user avatar

The advertised listeners as mentioned in the above answers could be one of the reason. The other possible reasons are:

  1. The topic might not have been created. You can check this using bin/kafka-topics --list --zookeeper <zookeeper_ip>:<zookeeper_port>
  2. Check your bootstrap servers that you have given to the producer to fetch the metadata. If the bootstrap server does not contain the latest metadata about the topic (for example, when it lost its zookeeper claim). You must be adding more than one bootstrap servers.

Also, ensure that you have the advertised listener set to IP:9092 instead of localhost:9092. The latter means that the broker is accessible only through the localhost.

When I encountered the error, I remember to have used PLAINTEXT://<ip>:<PORT> in the list of bootstrap servers (or broker list) and it worked, strangely.

bin/kafka-console-producer --topic sample --broker-list PLAINTEXT://<IP>:<PORT>

answered Jun 9, 2019 at 7:00

JavaTechnical's user avatar

JavaTechnicalJavaTechnical

8,4588 gold badges57 silver badges94 bronze badges

Try this listeners=PLAINTEXT://localhost:9092
It must be helpful

Many thanks

answered Dec 16, 2020 at 1:08

saurabhshcs's user avatar

saurabhshcssaurabhshcs

6375 silver badges6 bronze badges

2

For me, it was happen due to a miss configuration
Docker port (9093)
Kafka command port «bin/kafka-console-producer.sh —broker-list localhost:9092 —topic TopicName»
I checked my configuration to match port and now everything is ok

answered Apr 26, 2018 at 8:44

guillaume verneret's user avatar

For me, the cause was using a specific Zookeeper that was not part of the Kafka package. That Zookeeper was already installed on the machine for other purposes. Apparently Kafka does not work with just any Zookeeper. Switching to the Zookeeper that came with Kafka solved it for me. To not conflict with the existing Zookeeper, I had to modify my confguration to have the Zookeeper listen on a different port:

[root@host /opt/kafka/config]# grep 2182 *
server.properties:zookeeper.connect=localhost:2182
zookeeper.properties:clientPort=2182

answered Jun 4, 2019 at 15:01

Onnonymous's user avatar

OnnonymousOnnonymous

1,3411 gold badge10 silver badges7 bronze badges

i know this was posted long time back, i would like to share how i solved it.
since i have my office laptop ( VPN and proxy was configured ).
i checked the environment variable NO_PROXY

> echo %NO_PROXY%

it returned with empty values
now i have set the NO_PROXY with localhost and 127.0.0.1

> set NO_PROXY=127.0.0.1,localhost  

if you want to append to existing values, then

> set NO_PROXY=%NO_PROXY%,127.0.0.1,localhost  

after this , i have restarted zookeeper and kafka
worked like a charm

answered Dec 15, 2019 at 5:28

Abhishek D K's user avatar

Abhishek D KAbhishek D K

2,13920 silver badges27 bronze badges

Environment : Macos 11.0.1
Docker Env: Docker version 20.10.0, build 7287ab3
jar version:

    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>kafka</artifactId>
        <version>1.15.1</version>
        <scope>test</scope>
    </dependency>

code

public class IntegratedEnvironment implements ApplicationContextInitializer<ConfigurableApplicationContext> {

    private static final Logger logger = LoggerFactory.getLogger(IntegratedEnvironment.class);

    static final KafkaContainer kaf = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.1"));

    @PreDestroy
    public void stop() {
        kaf.stop();
    }
    @Override
    public void initialize(ConfigurableApplicationContext applicationContext) {
        kaf.withEnv("KAFKA_ADVERTISED_HOST_NAME","localhost");
        kaf.start();
        TestPropertySourceUtils.addInlinedPropertiesToEnvironment(applicationContext, "spring.kafka.producer.bootstrap-servers=" + kaf.getBootstrapServers());
        TestPropertySourceUtils.addInlinedPropertiesToEnvironment(applicationContext, "spring.kafka.consumer.bootstrap-servers=" + kaf.getBootstrapServers());
    }
}

error message:

2020-12-30 22:31:41.997  INFO 8127 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-12-30 22:31:41.997  INFO 8127 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-12-30 22:31:41.998  INFO 8127 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1609338701997
2020-12-30 22:31:42.155  WARN 8127 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-localGroup-1, groupId=localGroup] Error while fetching metadata with correlation id 2 : {payment=LEADER_NOT_AVAILABLE}
2020-12-30 22:31:42.155  WARN 8127 --- [ook-Pro.local-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=Kens-MacBook-Pro.local-1] Error while fetching metadata with correlation id 1 : {payment=LEADER_NOT_AVAILABLE}
2020-12-30 22:31:42.157  INFO 8127 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-localGroup-1, groupId=localGroup] Cluster ID: nkMWp3e5RLK61Bi-g254xw

Posted by FatDBA on August 14, 2021

Hi Guys,

Recently I was working on a replication project where we used Kafka to move data from source to target. I tried to create a test topic using Kafka producer console and immediately kicked out with error which says “INVALID_REPLICATION_FACTOR”. This we were doing on a test VM with single CPU and with limited system resources.

[root@cantowintert bin]#
[root@cantowintert bin]# kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
>hello prashant
[2021-07-26 08:18:30,051] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 40 : {first_topic=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)
[2021-07-26 08:18:30,154] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 41 : {first_topic=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)
[2021-07-26 08:18:30,260] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 42 : {first_topic=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)
[2021-07-26 08:18:30,367] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 43 : {first_topic=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)
[2021-07-26 08:18:30,471] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 44 : {first_topic=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)
[2021-07-26 08:18:30,576] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 45 : {first_topic=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)
[2021-07-26 08:18:30,681] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 46 : {first_topic=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)
[2021-07-26 08:18:30,788] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 47 : {first_topic=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)
[2021-07-26 08:18:30,896] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 48 : {first_topic=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)
[2021-07-26 08:18:31,000] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 49 : {first_topic=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)
[2021-07-26 08:18:31,103] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 50 : {first_topic=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)
[2021-07-26 08:18:31,221] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 51 : {first_topic=INVALID_REPLICATION_FACTOR} (org.apache.kafka.clients.NetworkClient)
^Corg.apache.kafka.common.KafkaException: Producer closed while send in progress
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:909)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
        at kafka.tools.ConsoleProducer$.send(ConsoleProducer.scala:71)
        at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:53)
        at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
Caused by: org.apache.kafka.common.KafkaException: Requested metadata update after close
        at org.apache.kafka.clients.producer.internals.ProducerMetadata.awaitUpdate(ProducerMetadata.java:126)
        at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1047)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906)
        ... 4 more
[root@cantowintert bin]#

Lets check what is captured in Kafka server startup logs, and we found the hint that the RF is > than the available brokers.

org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 1 larger than available brokers: 0.
[2021-07-26 08:24:45,723] WARN [Controller id=0, targetBrokerId=0] Connection to node 0 (cantowintert.bcdomain/192.168.20.129:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-07-26 08:25:06,830] WARN [Controller id=0, targetBrokerId=0] Connection to node 0 (cantowintert.bcdomain/192.168.20.129:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-07-26 08:25:27,950] WARN [Controller id=0, targetBrokerId=0] Connection to node 0 (cantowintert.bcdomain/192.168.20.129:9092) could not be established. Broker may not be available. 

Solution to the problem is to uncomment this line and restart and try to edit the topic all over again.

listeners=PLAINTEXT://:9092

changed this to

listeners=PLAINTEXT://127.0.0.1:9092

Hope It helped
Prashant Dixit

This entry was posted on August 14, 2021 at 8:39 AM and is filed under Uncategorized.
Tagged: kafka, replication. You can follow any responses to this entry through the RSS 2.0 feed.

You can leave a response, or trackback from your own site.

If you are using Kafka on the public cloud like AWS or Azure or on Docker you are more likely to experience the below error.

WARN Error while fetching metadata with correlation id 39 : {sales-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

As a producer or consumer in a Kafka setup, the broker hostname you pass will help you get the metadata of the other brokers in the cluster which are acting as leaders for the partitions you are trying to write to or read from.

Let’s say your brokers are set up in AWS EC2. AWS has a multi-layer networking setup so for every EC2 host you will have a public DNS (ec2-x-y-z-zz.us-west-2.compute.amazonaws.com) and a private DNS (ip-a-b-c-d.us-west-2.compute.internal). If you are a producer or consumer outside the Kakfa cluster network you will be able to only reach the brokers with their public DNS and not private DNS. Private DNS can only be used when your producer or consumer is on the same network as your Kafka brokers. 

Let’s assume your client, that is your producer or consumer is outside the network of your Kakfa cluster and in that case you will be able to reach the brokers only with their public DNS. The broker will return the private DNS of the brokers hosting the leader partitions and not the public DNS. Unfortunately since your client is not on the same network as your Kafka cluster, your client will not be able to  resolve the private DNS which will lead to LEADER_NOT_AVAILABLE error.

Solution to LEADER_NOT_AVAILABLE error

You might have seen that a fix is to update your host name mappings in /etc/hosts file and it is more of a hack than a real fix.

When you start Kafka brokers, the brokers can listen to multiple listeners. Listeners are nothing but hostname or IP, port and protocol combination. 

The below properties go in server.properties  file on each Kafka broker. Note that advertised.listeners  is the important property that will help you solve the issue.

  • listeners  – comma separated hostnames with ports on which Kafka brokers listen to 
  • advertised.listeners  – comma separated hostnames with ports which will be passed back to the clients. Make sure to include only the hostnames which will be resolved at the client (producer or consumer) side for eg. public DNS.
  • listener.security.protocol.map  – includes the supported protocols for each listener
  • inter.broker.listener.name  – listeners to be used for internal traffic between brokers. These hostnames included here don’t need to be resolved at the client side but need to be resolved at all the brokers in the cluster.
listeners: LISTENER_PUBLIC://kafka0:29092,LISTENER_INTERNAL://localhost:9092
advertised.listeners: LISTENER_PUBLIC://kafka0:29092,LISTENER_INTERNAL://localhost:9092
listener.security.protocol.map: LISTENER_PUBLIC:PLAINTEXT,LISTENER_INTERNAL:PLAINTEXT
inter.broker.listener.name: LISTENER_PUBLIC

Docker setup

Use the below properties if you are running your Kafka in Docker

KAFKA_LISTENERS: LISTENER_PUBLIC://kafka0:29092,LISTENER_INTERNAL://localhost:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_PUBLIC://kafka0:29092,LISTENER_INTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_PUBLIC:PLAINTEXT,LISTENER_INTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_PUBLIC

This article is a summary of common errors in the use of Kafka. I hope it will help you.

1,UnknownTopicOrPartitionException

org.apache.kafka.common.errors.UnknownTopicOrPartitionException:
This server does not host this topic-partition

Error content: partition data is not available

Cause analysis: the producer sends a message to a topic that does not exist. The user can check whether the topic exists or set auto create. topics. Enable parameter

2,LEADER_NOT_AVAILABLE

WARN Error while fetching metadata with correlation id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClien

Error content: the leader is unavailable

Cause analysis: many topics are being deleted and leader election is in progress. Use Kafka topics script to check leader information

Then check the survival of the broker and try to restart the solution.

3,NotLeaderForPartitionException

org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition

Error content: the broker is no longer the leader of the corresponding partition

Cause analysis: when the leader changes, when the leader switches from one broker to another, analyze what causes the leader to switch.

4,TimeoutException

org.apache.kafka.common.errors.TimeoutException: Expiring 5 record(s) for test-0: 30040 ms has passe

Error content: request timeout

Cause analysis: observe where the thrown observation network can pass. If so, consider adding a request timeout. Value of MS

5,RecordTooLargeException

WARN async.DefaultEventHandler: Produce request with correlation id 92548048 failed due to [TopicName,1]: org.apache.kafka.common.errors.RecordTooLargeException

Error content: the message is too large

Cause analysis: if the producer cannot process the message, you can add a request timeout. MS reduce batch size

6,Closing socket connection

Closing socket connection to/127,0,0,1.(kafka.network.Processor)

Error content: connection closed

Cause analysis: if the javaApi producer has a high version and wants to start the low version Verification on the client consumer, it will constantly report errors

Unrecognized client message.

7,ConcurrentModificationException

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

Error content: thread unsafe

Cause analysis: Kafka consumer is non thread safe

8,NetWorkException

[kafka-producer-network-thread | producer-1] o.apache.kafka.common.network.Selector : [Producer clientId=producer-1] Connection with / disconnected

Error content: network exception

Cause analysis: the network connection is interrupted. Check the network condition of the broker

9,ILLEGAL_GENERATION

ILLEGAL_GENERATION occurred while committing offsets for group

Error content: invalid «generation»

Cause analysis: consumer missed rebalance because consumer spent a lot of time processing data.

You need to reduce max.poll Increase the records value by max.poll interval. MS or try to increase the speed of message processing

10. Start advertised Abnormal listener configuration

java.lang.IllegalArgumentException: requirement failed: advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address.
    at scala.Predef$.require(Predef.scala:277)
    at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1203)
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1170)
    at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:881)
    at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:878)
    at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
    at kafka.Kafka$.main(Kafka.scala:82)
    at kafka.Kafka.main(Kafka.scala)

Solution: modify the server properties

advertised.listeners=PLAINTEXT://{ip}: 9092 # ip can be intranet, extranet and 127.0 0.1 or domain name

Resolution:

server. There are two listeners in properties. Listeners: start the ip and port that kafka service listens to. You can listen to intranet ip and 0.0 0.0 (can’t be Internet ip), the default is Java net. InetAddress. The ip obtained by getcanonicalhostname(). advertised.listeners: the address of the producer and consumer connection. kafka will register this address in zookeeper, so it can only be divided by 0.0 For legal ip or domain names other than 0.0, the default configuration is the same as that of listeners.

11. Exception starting PrintGCDateStamps

[0.004s][warning][gc] -Xloggc is deprecated. Will use -Xlog:gc:/data/service/kafka_2.11-0.11.0.2/bin/../logs/kafkaServer-gc.log instead.
Unrecognized VM option 'PrintGCDateStamps'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

Solution: replace jdk1 Version 8. X or > = kafka1 Version of 0. X.

Resolution:

Only in jdk1 9 and kafka version is 1.0 Before X.

12. The generator fails to send a message or the consumer cannot consume (kafka1.0.1)

#(java)org.apache.kafka warning
Connection to node 0 could not be established. Broker may not be available.


# (nodejs) Kafka node exception (exception after executing producer.send)
{ TimeoutError: Request timed out after 30000ms
    at new TimeoutError (D:projectnodekafka-testsrcnode_moduleskafka-nodeliberrorsTimeoutError.js:6:9)
    at Timeout.setTimeout [as _onTimeout] (D:projectnodekafka-testsrcnode_moduleskafka-nodelibkafkaClient.js:737:14)
    at ontimeout (timers.js:466:11)
    at tryOnTimeout (timers.js:304:5)
    at Timer.listOnTimeout (timers.js:264:5) message: 'Request timed out after 30000ms' }

Solution: check advertised Configure the listeners (if there are multiple brokers, check the configuration according to the corresponding node number of the java version) to determine whether the current network can be connected to the address (telnet, etc.)

13. The value of partitions configuration is too small, resulting in an error (kafka1.0.1)

#(java)org. apache. Kafka (execute producer.send)
Exception in thread "main" org.apache.kafka.common.KafkaException: Invalid partition given with record: 1 is not in the range [0...1).
    at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:908)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:778)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:768)
    at com.wenshao.dal.TestProducer.main(TestProducer.java:36)


# (nodejs) Kafka node exception (exception after executing producer.send)
{ BrokerNotAvailableError: Could not find the leader
    at new BrokerNotAvailableError (D:projectnodekafka-testsrcnode_moduleskafka-nodeliberrorsBrokerNotAvailableError.js:11:9)
    at refreshMetadata.error (D:projectnodekafka-testsrcnode_moduleskafka-nodelibkafkaClient.js:831:16)
    at D:projectnodekafka-testsrcnode_moduleskafka-nodelibclient.js:514:9
    at KafkaClient.wrappedFn (D:projectnodekafka-testsrcnode_moduleskafka-nodelibkafkaClient.js:379:14)
    at KafkaClient.Client.handleReceivedData (D:projectnodekafka-testsrcnode_moduleskafka-nodelibclient.js:770:60)
    at Socket.<anonymous> (D:projectnodekafka-testsrcnode_moduleskafka-nodelibkafkaClient.js:618:10)
    at Socket.emit (events.js:159:13)
    at addChunk (_stream_readable.js:265:12)
    at readableAddChunk (_stream_readable.js:252:11)
    at Socket.Readable.push (_stream_readable.js:209:10) message: 'Could not find the leader' }

Solution: modify the value of num.partitions. Partitions is the number of partitions nodes created by default when creating topics. It only takes effect for newly created topics. Try to set a reasonable value during project planning. You can also dynamically expand capacity through the command line ()

./bin/kafka-topics.sh --zookeeper  localhost:2181 --alter --partitions 2 --topic  foo

14. Kafka topic operation

  • Add: add a new Kafka topic: «mobilePhone», set a partition and a replica for it, and create it on the local zookeeper. It can be master:2181,slave1:2181,slave3:2181/kafka, or directly localhost: 2181/kafka

cd /usr/kafka/bin
./kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave3:2181/kafka --replication-factor 1 --partitions 1 --topic mobilePhone

When creating, the following error is reported: error while executing topic command: replication factor: 1 large than available brokers: 0

Solution: it is likely to be in the server The properties configuration folder is inconsistent with the zookeeper directory where the command is executed— The value of zookeeper needs to bring the root directory, otherwise such an error will be reported. For example, the connection directory written in the configuration file is zookeeper Connect = Master: 2181, slave1:2181, slave3:2181 / kafka, but the kafka directory is written less when executing the command. Write it as follows

—zookeeper master:2181,slave1:2181,slave3:2181. The above error will be reported. Therefore, make sure that the directory of zookeeper is consistent.

When the Topic is successfully created, the Created topic «mobilePhone» will be output, as shown in the figure above.

Note: replication factor cannot be greater than the number of broker s.

  • Query: query the information of a Topic and mobilePhone

cd /usr/kafka/bin
./kafka-topics.sh --describe --zookeeper master:2181,slave1:2181,slave3:2181/kafka --topic mobilePhone

You can query all topics and their information without specifying a specific topic name, that is, executing a command without the — Topic parameter.

  • Modify: modify a Topic parameter, for example, modify mobilePhone to 5 partitions, alter to modify the number of partitions (can only be increased)

cd /usr/kafka/bin
./kafka-topics.sh --alter --zookeeper master:2181,slave1:2181,slave3:2181/kafka  --partitions 5 --topic mobilePhone

Note: the following error occurred just after entering the command:

Error while executing topic command : Topic mobilePhone does not exist on ZK path master:2181,slave1:2181,slave3:2181 [2018-11-29 16:14:02,100] ERROR java.lang.IllegalArgumentException: Topic mobilePhone does not exist on ZK path master:2181,slave1:2181,slave3:2181 at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:124) at kafka.admin.TopicCommand$.main(TopicCommand.scala:65) at kafka.admin.TopicCommand.main(TopicCommand.scala) (kafka.admin.TopicCommand$)

The reason for this error is that when executing the command, you forget to enter the root directory hostname:port/kafak when configuring zookeeper, which is directly written as the host name and port number, so zookeeper can’t find the path of topic.

  • Delete: delete some unnecessary topics

cd /usr/kafka/bin
./kafka-topics.sh --delete --zookeeper master:2181,slave1:2181,slave3:2181/kafka  --topic mobilePhone

When you execute the delete topic command, you will be prompted that the topic cannot be deleted because it is on the server In the configuration file of properties, kafka defaults to unable to delete, that is, false. Therefore, you need to modify delete in the configuration file of each node topic. enable=true. Then it can be deleted normally. Mark as shown in the figure below

However, this deletion only marks the topic as deleted, and does not delete it in a real sense. When a topic with the same name is re created, an error will still be reported, and the topic already exists. Therefore, in order to completely delete topic, we enter the bin directory of zookeeper and enter/ zkCli.sh enters the command line of zookeeper and deletes three directories: 1. rmr /kafka/brokers/topics/mobilePhone; 2,rmr /kafka/admin/delete_topics/mobliePhone; 3,rmr /kafka/config/topics/mobilePhone

At this point, you can completely delete the topic. If you re create the topic with the same name and still report an existing error, you need to restart the kafka service.

15. Kafka producer operation

Before executing the producer and consumer commands, we create a topic as newPhone according to the above creation method, and change its partition to 2. The zookeeper addresses we set here are localhost:2181/kafak, which is no different from the above, but created on the current machine. This method is recommended, which is not only concise, but also saves space.

  • Create producer

cd /usr/kafka/config
./kafka-console-producer.sh --broker-list localhost:9092 --topic newPhone

Broker list: the service address of Kafka (separated by multiple commas). The default port number is 9092. If you do not want to use this port number, you can change the server.properties configuration file under config to modify it, as shown in the following figure:

—topic newPhone: it means that the producer has bound this topic and will produce data to this topic. After correctly executing the command, it can be as shown in the figure below, and data input can be started.

16. Kafka consumer operation

  • Create consumer

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic newPhone [--from-beginning]

—Bootstrap server: the service address of kafka, — topic newPhone: bind the topic and start consuming (fetching) data from the specified topic, [— from beginning]: read the data from the beginning, not after the consumer is connected.

In the whole operation process, first we are using producer to produce several pieces of data:

At this time, on the ssh tool (the kitchen uses SecureCRT, which is very easy to use), clone a Session. Execute. / kafka-console-consumer.sh — bootstrap server localhost: 9092 — Topic newphone. As shown in the figure below, if the from starting parameter option is not added, the consumer cannot read the four pieces of data produced by the previous Producer in the Topic.

After adding the from starting parameter option, the consumer can consume the data in the Topic from scratch. As shown in the figure below:

Tip: if you enter message when the producer production data, the following error occurs:

[root@master bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic newPhone
>hisdhodsa        
[2018-11-29 17:28:16,926] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {newPhone=LEADER_NOT_AVAILABLE}          
(org.apache.kafka.clients.NetworkClient)         
[2018-11-29 17:28:17,080] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 5 : {newPhone=LEADER_NOT_AVAILABLE}           
(org.apache.kafka.clients.NetworkClient)

Solution: since port 9092 is not open, it is on the server In the properties configuration file, delete the comment of listeners=PLAINTEXT://:9092, as shown in the following figure

17. kafka startup error

First error

2017-02-17 17:25:29,224] FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)

kafka.common.KafkaException: Failed to acquire lock on file .lock in /var/log/kafka-logs. A Kafka instance in another process or thread is using this directory.

    at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:100)

    at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:97)

    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)

    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

    at scala.collection.AbstractTraversable.map(Traversable.scala:104)

    at kafka.log.LogManager.lockLogDirs(LogManager.scala:97)

    at kafka.log.LogManager.<init>(LogManager.scala:59)

    at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:609)

    at kafka.server.KafkaServer.startup(KafkaServer.scala:183)

    at io.confluent.support.metrics.SupportedServerStartable.startup(SupportedServerStartable.java:100)

    at io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:49)

Solution: failed to acquire lock on file lock in /var/log/kafka-logs.— The reason for the problem is that another process is using Kafka, ps -ef|grep kafka. Just kill the process using the directory;

The second error: no permission to the index file

Change the permissions of the file to the correct user name and user group;

Directory / var / log / Kafka logs /, where__ consumer_offsets-29 is the offset;

The third type of production and consumption error: jaas connection problem

kafka_client_jaas.conf There is a problem with the file configuration

16 Environment

/opt/dataload/filesource_wangjuan/conf lower kafka_client_jaas.conf

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    keyTab="/home/client/keytabs/client.keytab"

        serviceName="kafka"

    principal="client/dcp@DCP.COM";

};

18. kafka production error report

First: the producer failed to send a message to topic:

[2017-03-09 09:16:00,982] [ERROR] [startJob_Worker-10] [DCPKafkaProducer.java line:62] produceR towards topicdf02211 Exception in sending message

org.apache.kafka.common.KafkaException: Failed to construct kafka producer

        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:335)

The reason is the configuration file: Kafka_ client_ jaas. There is a problem with the configuration in conf, which is caused by the wrong path of keyTab;

Second: production and consumption error: Failed to construct kafka producer

Key error information: Failed to construct kafka producer

Solution: configuration file problem: the serviceName in KafkaClient should be kafka, which was previously configured as zookeeper; Just restart it;

The configuration file is as follows:

KafkaServer {
    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    useTicketCache=false

    serviceName=kafka

    keyTab="/etc/security/keytabs/kafka.service.keytab"

    principal="kafka/dcp16@DCP.COM";

};

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    serviceName=kafka

    keyTab="/etc/security/keytabs/kafka.service.keytab"

    principal="kafka/dcp16@DCP.COM";

};

Client {
    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    useTicketCache=false

    serviceName=zookeeper

    keyTab="/etc/security/keytabs/kafka.service.keytab"

    principal="kafka/dcp16@DCP.COM";

};

Problem Description:

[kafka@DCP16 bin]$ ./kafka-console-producer   --broker-list DCP16:9092 --topic topicin050511  --producer.config ../etc/kafka/producer.properties

org.apache.kafka.common.KafkaException: Failed to construct kafka producer

    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:335)

    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:188)

    at kafka.producer.NewShinyProducer.<init>(BaseProducer.scala:40)

    at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:45)

    at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)

Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: Conflicting serviceName values found in JAAS and Kafka configs value in JAAS file zookeeper, value in Kafka config kafka

    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)

    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)

    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)

    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:277)

    ... 4 more

Caused by: java.lang.IllegalArgumentException: Conflicting serviceName values found in JAAS and Kafka configs value in JAAS file zookeeper, value in Kafka config kafka

    at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:305)

    at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:103)

    at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:45)

    at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)

    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)

    ... 7 more

[kafka@DCP16 bin]$ ./kafka-console-producer   --broker-list DCP16:9092 --topic topicin050511  --producer.config ../etc/kafka/producer.properties



Consumption times error: ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)



[root@DCP16 bin]# ./kafka-console-consumer --zookeeper dcp18:2181,dcp16:2181,dcp19:2181/kafkakerberos --from-beginning --topic topicout050511 --new-consumer --consumer.config ../etc/kafka/consumer.properties --bootstrap-server DCP16:9092

[2017-05-07 22:24:37,479] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)

    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587)

    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569)

    at kafka.consumer.NewShinyConsumer.<init>(BaseConsumer.scala:53)

    at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:64)

    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:51)

    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner  authentication information from the user

    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)

    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)

    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)

    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)

    ... 6 more

Caused by: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner  authentication information from the user

    at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:899)

    at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:719)

    at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:584)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:606)

    at javax.security.auth.login.LoginContext.invoke(LoginContext.java:762)

    at javax.security.auth.login.LoginContext.access$000(LoginContext.java:203)

    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:690)

    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:688)

    at java.security.AccessController.doPrivileged(Native Method)

    at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:687)

    at javax.security.auth.login.LoginContext.login(LoginContext.java:595)

    at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:69)

    at org.apache.kafka.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:110)

    at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:46)

    at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)

    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)

Derivative questions:

The kafka production message will report an error:

[2017-05-07 23:17:16,240] ERROR Error when sending message to topic topicin050511 with key: null, value: 0 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

Change KafkaClient to the following configuration:

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required

   useTicketCache=true;

};

19. kafka Consumption Error Report

First error:

replication factor: 1 larger than available brokers: 0

Consumption times error:

Error while executing topic command : replication factor: 1 larger than available brokers: 0

terms of settlement:

/confluent-3.0.0/bin  Next restart daemon

./kafka-server-stop  -daemon   ../etc/kafka/server.properties

./kafka-server-start  -daemon   ../etc/kafka/server.properties

Then zk restart; sh zkCli.sh -server ai186;

/usr/hdp/2.4.2.0-258/zookeeper/bin/zkCli.sh — directory of scripts

If an error is reported, you can view the following configuration in the configuration file:

zookeeper.connect=dcp18:2181/kafkakerberos;— Is the group name

Second error: TOPIC_AUTHORIZATION_FAILED

./bin/kafka-console-consumer --zookeeper DCP185:2181,DCP186:2181,DCP187:2181/kafka --from-beginning --topic wangjuan_topic1 --new-consumer --consumer.config ./etc/kafka/consumer.properties --bootstrap-server DCP187:9092

[2017-03-02 13:44:38,398] WARN The configuration zookeeper.connection.timeout.ms = 6000 was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)

[2017-03-02 13:44:38,575] WARN Error while fetching metadata with correlation id 1 : {wangjuan_topic1=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)

[2017-03-02 13:44:38,677] WARN Error while fetching metadata with correlation id 2 : {wangjuan_topic1=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)

[2017-03-02 13:44:38,780] WARN Error while fetching metadata with correlation id 3 : {wangjuan_topic1=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)

Solution: the U of User in the following parameters in the configuration file must be uppercase;

super.users=User:kafka

Or maybe server Adver. In properties The IP address of listen is incorrect. It may be the IP address written in the code;

Possible solutions to the third error:

If consumption is not possible, check the error message in kafka’s startup log: the log file belongs to the wrong group, which should be hadoop;

Or, check whether the configuration suffix of zookeeper corresponding to kafka has been changed. If so, the topic needs to be regenerated;

The third error: the tomcat error report of consumption:

[2017-04-01 06:37:21,823] [INFO] [Thread-5] [AbstractCoordinator.java line:542] Marking the coordinator DCP187:9092 (id: 2147483647 rack: null) dead for group test-consumer-group

[2017-04-01 06:37:21,825] [WARN] [Thread-5] [ConsumerCoordinator.java line:476] Auto offset commit failed for group test-consumer-group: Commit offsets failed with retriable exception. You should retry committing offsets.

In the changed code, the heartbeat timeout of tomcat is as follows:

Not changed:;

./webapps/web/WEB-INF/classes/com/ai/bdx/dcp/hadoop/service/impl/DCPKafkaConsumer.class;

After restart, the log shows:

[2017-04-01 10:14:56,167] [INFO] [Thread-5] [AbstractCoordinator.java line:542] Marking the coordinator DCP187:9092 (id: 2147483647 rack: null) dead for group test-consumer-group

[2017-04-01 10:14:56,286] [INFO] [Thread-5] [AbstractCoordinator.java line:505] Discovered coordinator DCP187:9092 (id: 2147483647 rack: null) for group test-consumer-group.

20. kafka — error creating topic

Error when creating topic:

[2017-04-10 10:32:23,776] WARN SASL configuration failed: javax.security.auth.login.LoginException: Checksum failed Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)

Exception in thread "main" org.I0Itec.zkclient.exception.ZkAuthFailedException: Authentication failure

    at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:946)

    at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:923)

    at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1230)

    at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:156)

    at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:130)

    at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:75)

    at kafka.utils.ZkUtils$.apply(ZkUtils.scala:57)

    at kafka.admin.TopicCommand$.main(TopicCommand.scala:54)

    at kafka.admin.TopicCommand.main(TopicCommand.scala)

Problem location: there is a problem with the jaas file:

Solution: server Super. In the properties file The user should be consistent with the principle of the keytab in the jaas file;

server.properties:super.users=User:client

kafka_ server_ jaas. The conf file is changed to:

KafkaServer {
    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    useTicketCache=false

    serviceName=kafka

    keyTab="/data/data1/confluent-3.0.0/kafka.keytab"

    principal="kafka@DCP.COM";

};



KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    keyTab="/home/client/client.keytab"

    principal="client/DCP187@DCP.COM";

};



Client {
    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    useTicketCache=false

    serviceName=zookeeper

    keyTab="/home/client/client.keytab"

    principal="client/DCP187@DCP.COM";

};

21. Datacaptain — > kafka component reason

Error reporting prompt

Not has broker can connection metadataBrokerList

java.lang.IllegalArgumentException: requirement failed: advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address.
    at scala.Predef$.require(Predef.scala:277)
    at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1203)
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1170)
    at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:881)
    at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:878)
    at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
    at kafka.Kafka$.main(Kafka.scala:82)
    at kafka.Kafka.main(Kafka.scala)

Solution: modify the server properties

advertised.listeners=PLAINTEXT://{ip}: 9092 # ip can be intranet, extranet and 127.0 0.1 or domain name

Resolution:
server. There are two listeners in properties.

listeners: start the ip and port that kafka service listens to. You can listen to intranet ip and 0.0 0.0 (can’t be Internet ip), the default is Java net. InetAddress. The ip obtained by getcanonicalhostname().

advertised.listeners: the address of the producer and consumer connection. kafka will register this address in zookeeper, so it can only be divided by 0.0 For legal ip or domain names other than 0.0, the default configuration is the same as that of listeners.

22. An error «TimeoutException(Java)» or «run out of brokers(Go)» or «Authentication failed for user(Python)» is reported

First, make sure that the servers are configured correctly, and then eliminate network problems through ping and telnet. Assuming that the network is running normally, Kafka on the cloud will authenticate the client when establishing a connection. There are two authentication methods (sasl_mechanism):

  • ONS: only for Java language; You need to configure your own AccessKey and SecretKey.

  • Plan: all languages are available; AccessKey needs to be configured, and the last 10 bits of SecretKey need to be configured.

If authentication fails, Kafka on the cloud will cut off the connection.
In addition, please carefully refer to the readme of each demo to configure it correctly.

23. The error «leader is not available» or «leader is in election» is reported

First, check whether the Topic has been created; Secondly, check whether the Topic type is «Kafka message».

24. Similar words like «TOPIC_AUTHORIZATION_FAILED» or «Topic or group not authorized» are reported in error

This kind of error usually indicates that the permission is wrong, that is, your AccessKey does not have permission to access the corresponding Topic or Consumer ID (also known as group or consumer group).

The permission rules for Topic and Consumer ID are as follows:

  • Topic must be created by the master account; When using, topic can be used by the master account itself or authorized by the master account to a sub account.

  • The user ID only belongs to the creator; The Consumer ID created by the primary account cannot be used by the sub account, and vice versa.

Note: please carefully check which account AccessKey and SecretKey come from to avoid misuse.

25. The Java client (including the Spring framework) reports an error «Failed to send SSL close message»

This error is usually followed by «connection reset by peer» or «broken pipe». The main reason for this error is that the server is a VIP network environment and will take the initiative to cut off the idle connection. It is recommended to retry sending when such errors are encountered. There is a retry mechanism inside the Java client, which can be configured by referring to the best practices of Producer. For other language clients, please refer to relevant documents. You can avoid this error by modifying the log level. Take log4j as an example and add the following line of configuration:

log4j.logger.org.apache.kafka.common.network.SslTransportLayer=ERROR

26. The Spring Cloud Stream consumer information times wrong «arrayindexoutofboundexception»

This error occurs because the Spring Cloud parses the message content in its own format. If you use Spring Cloud to send and consume at the same time, there will be no problem. This is also the recommended way to use it.

If you use other methods to send, such as calling Kafka’s native Java client, you need to set the headerMode to raw when consuming with Spring Cloud, that is, disable parsing the message content. See the Spring Cloud official website for details.

27. Error «No worthy mechs found»

This error will be reported by the C + + client or the client wrapping C + +. This error indicates that a system library is missing: Cyrus SASL plain. The installation methods for yum managed systems are: yum install Cyrus sasl{, — plain}.

28. What are CID, Consumer ID, Consumer Group and Group ID

These names refer to the same concept: Kafka’s Consumer Group. CID is the abbreviation of Consumer ID, which is also equivalent to Group ID (short for Consumer Group ID, which refers to a specific consumption group). Each consumption group can contain multiple consumption instances to consume subscribed topics for load balancing, so the relationship between CID and topic can be summarized as follows: each CID can subscribe to multiple topics, and each topic can also be subscribed by multiple CIDS; each CID is relatively independent and does not affect each other.

Assuming that CID 1 subscribes to Topic 1, each message of Topic 1 will be sent to an instance of CID 1, and only one instance will be sent. If CID 2 also subscribes to Topic 1, each message of Topic 1 will also be sent to an instance in CID 2, and only one instance will be sent.

[backcolor=transparent] Note: the Producer ID seen in the console is a concept in Aliware MQ and will not be used by Kafka. It will be improved and optimized later.

29. How to view consumption progress

To view the consumption progress of a specific subscription consumer, please follow the steps below:

  • On the left side of the ONS console, click [backcolor=transparent] publish and subscribe management — subscription management.

  • Enter topic or Cosumer ID in the search box and click [backcolor=transparent] search to find the Consumer ID you want to view the consumption progress.

  • After finding the Consumer ID, click [backcolor=transparent] consumer status in the operation column to view the total accumulation of [backcolor=transparent] in the pop-up page.

Total heap = number of all messages — number of messages consumed
[backcolor=transparent] Note: at present, the status of consumers will be displayed offline and will be optimized in the future. In addition to the total accumulation, other information is for reference only.

30. What if the messages accumulate

Message accumulation is generally caused by slow consumption or blocked consumption threads. It is recommended to print out the stack of consuming threads and view the thread execution.

Note: Java processes can use jstack.

31,java.lang.OutOfMemoryError:Map failed

The reason for the above problem is that OOM will cause the kafka process to crash directly! Therefore, you can only restart the broker node. However, in order to make the broker node start faster, you can increase the value of a parameter: «num.recovery.threads.per.data.dir=30». Yes, it is him. The greater the value, the better. This number of threads is mainly responsible for stopping and starting the broker. Because it is a 32core server, it is allocated 30. You can increase this parameter as much as possible to facilitate the broker node to join the ISR list faster.

First, according to the above tips, restoring the service is the first thing to do. Next, we have to analyze why this happened and allocate 20G memory to kafka cluster, as shown in the following figure:

After checking the monitoring chart for nearly 2 weeks, it is found that the available memory continues to decrease, and it is preliminarily suspected that a memory leak may have occurred.

This is just a doubt, because the JVM was not monitored before the error. After learning a lesson, we quickly monitored kafka’s JVM with zabbix.

After that, adjust the following parameters, first observe a period of time.

sysctl vm. max_ map_ Count = maximum number of memory regions in 262144 # process. When malloc is called, mmap and mprotect are called directly, and the shared library is loaded, the memory mapping area is generated. Although most programs need no more than 1000 memory mapping areas, specific programs, especially malloc debugger, may need many. For example, each break will produce one or two memory mapping areas, and the default value is 65536.

Solution:

First: kafka’s heap memory allocation should not be greater than 6G. We know that kafka does not eat heap memory, and it is not reasonable to set the default 1G. The recommended setting is 6G. The server has 32G of memory, and then allocated 22G of heap memory to kafka. After referring to the notes of kafka authoritative guide and Apache kafka actual combat, they recommend setting the heap size of kafka to 5G or 6G. Finally, I configured kafka’s heap memory to 6G.

Second: adjust the «VM. Max_map_count» parameter. Yes, it is the parameter mentioned above. In the actual production environment, if there are too many topics on a single broker, the user may encounter a serious error of «java.lang.OutOfMemoryError:Map failed», resulting in the collapse of the kafka node. This is because creating a large number of topics will greatly consume the operating system memory for memory mapping operations. In this case, you need to adjust the vm.max_map_count parameter. The specific method can be set with the command: «sysctl vm.max_map_count=N». The default value of this parameter is 65535. You can consider setting a larger value for the online environment, such as 262144 or even larger.

Пытаюсь развернуть Кафку в Докере с помощью Docker-compose.
Использую машину Macbook Pro M1 с виртуалкой Ubuntu 20.04.
За основу взял вот это руководство и соотвутствующий исходный код (отключил авторизацию и поменял образы т.к. образы в статье для х86 машин).
Zookeeper запускается в отдельном контейнере, Kafka в отдельном.
Все запускается, однако при попытке запуска Console consumer следующим скриптом

/home/ubuntu/kafka/bin/kafka-console-consumer.sh —topic new_topic —from-beginning —bootstrap-server 192.168.0.106:9092

много раз выводится ошибка:

[2021-12-26 16:36:00,362] WARN [Consumer clientId=consumer-console-consumer-32734-1, groupId=console-consumer-32734] Error while fetching metadata with correlation id 3 : {new_topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

при попытке создать producer использую следующий скрипт:

/home/ubuntu/kafka/bin/kafka-console-producer.sh —topic new_topic —bootstrap-server localhost:9092

producer создается, однако, при попытке отправки сообщения много раз в консоль выводится ошибка:

[2021-12-26 16:09:53,385] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 5 : {new_topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

Использую следующий код в файле docker-compose.yml (все образы с docker hub):

version: '2'
services:
  zookeeper:
    image: zookeeper
    expose:
      - "2181"
  kafka:
    image: fogsyio/kafka:2.2.0
    ports:
      - "9092:9092"
    expose:
      - "9093"
    environment:
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
#      KAFKA_ADVERTISED_PORT: 9093
#      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://192.168.0.106:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
    depends_on:
      - zookeeper

заккоментировал строчки, которые пытался добавить для исправления ситуации, они не помогли. Натыкался на посты, где сказано, что это связано с тем, что на момент запуска Кафки топик не существует и при повторной отправке сообщения оно отправится, однако этого не происходит даже после перезапуска producer/consumer.

Что еще можно попробовать? хотел бы задать вопрос автору исходного поста в комментариях, но мой аккаунт позволяет комментировать только свежие посты(

Понравилась статья? Поделить с друзьями:
  • Error while committing the transaction фсс арм лпу
  • Error while executing the query postgresql
  • Error while executing gem errno enoent
  • Error while committing the transaction postgresql
  • Error while executing gem errno eacces