Error initializing kafka store while initializing schema registry

Hello, I have configured confluent cluster in docker containers consisting of 3 zookeepers, 3 kafka brokers and 3 schema registries. Sometimes on deploying all 3 schema registries won't start. ...

Hello,

I have configured confluent cluster in docker containers consisting of 3 zookeepers, 3 kafka brokers and 3 schema registries. Sometimes on deploying all 3 schema registries won’t start. The problem appears very rarely, only when deploying the cluster (docker stack, using yml file). And when it does appear, the fix is to redeploy the cluster and it works again. I should note that we use 5.1.3 version of confluentinc. In the log bellow you can see that there are several errors which appear.
Here is the log I get from 1 of the schema registries (all 3 of them have the same log):

wait-for-it: waiting for kafka1:9092 without a timeout,
wait-for-it: kafka1:9092 is available after 0 seconds,
wait-for-it: waiting for kafka2:9092 without a timeout,
wait-for-it: kafka2:9092 is available after 0 seconds,
wait-for-it: waiting for kafka3:9092 without a timeout,
wait-for-it: kafka3:9092 is available after 0 seconds,
[main] INFO org.apache.kafka.clients.admin.AdminClientConfig — AdminClientConfig values: ,
bootstrap.servers = [kafka1:9092],
client.dns.lookup = default,
client.id = ,
connections.max.idle.ms = 300000,
metadata.max.age.ms = 300000,
metric.reporters = [],
metrics.num.samples = 2,
metrics.recording.level = INFO,
metrics.sample.window.ms = 30000,
receive.buffer.bytes = 65536,
reconnect.backoff.max.ms = 1000,
reconnect.backoff.ms = 50,
request.timeout.ms = 120000,�
retries = 5,
retry.backoff.ms = 100,
sasl.client.callback.handler.class = null,
sasl.jaas.config = null,
sasl.kerberos.kinit.cmd = /usr/bin/kinit,
sasl.kerberos.min.time.before.relogin = 60000,
sasl.kerberos.service.name = null,
sasl.kerberos.ticket.renew.jitter = 0.05,
sasl.kerberos.ticket.renew.window.factor = 0.8,
sasl.login.callback.handler.class = null,
sasl.login.class = null,
sasl.login.refresh.buffer.seconds = 300,
sasl.login.refresh.min.period.seconds = 60,
sasl.login.refresh.window.factor = 0.8,
sasl.login.refresh.window.jitter = 0.05,
sasl.mechanism = GSSAPI,
security.protocol = PLAINTEXT,
send.buffer.bytes = 131072,
ssl.cipher.suites = null,
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1],
ssl.endpoint.identification.algorithm = https,
ssl.key.password = null,
ssl.keymanager.algorithm = SunX509,
ssl.keystore.location = null,
ssl.keystore.password = null,
ssl.keystore.type = JKS,
ssl.protocol = TLS,
ssl.provider = null,
ssl.secure.random.implementation = null,
ssl.trustmanager.algorithm = PKIX,
ssl.truststore.location = null,
ssl.truststore.password = null,
ssl.truststore.type = JKS,
,
[main] INFO org.apache.kafka.common.utils.AppInfoParser — Kafka version : 2.1.1-cp3,
[main] INFO org.apache.kafka.common.utils.AppInfoParser — Kafka commitId : c119bfeabe023775,
[main] INFO org.apache.kafka.clients.admin.AdminClientConfig — AdminClientConfig values: ,
bootstrap.servers = [kafka2:9092],
client.dns.lookup = default,
client.id = ,
connections.max.idle.ms = 300000,
metadata.max.age.ms = 300000,
metric.reporters = [],
metrics.num.samples = 2,
metrics.recording.level = INFO,
metrics.sample.window.ms = 30000,
receive.buffer.bytes = 65536,
reconnect.backoff.max.ms = 1000,
reconnect.backoff.ms = 50,
request.timeout.ms = 120000,�
retries = 5,
retry.backoff.ms = 100,
sasl.client.callback.handler.class = null,
sasl.jaas.config = null,
sasl.kerberos.kinit.cmd = /usr/bin/kinit,
sasl.kerberos.min.time.before.relogin = 60000,
sasl.kerberos.service.name = null,
sasl.kerberos.ticket.renew.jitter = 0.05,
sasl.kerberos.ticket.renew.window.factor = 0.8,
sasl.login.callback.handler.class = null,
sasl.login.class = null,
sasl.login.refresh.buffer.seconds = 300,
sasl.login.refresh.min.period.seconds = 60,
sasl.login.refresh.window.factor = 0.8,
sasl.login.refresh.window.jitter = 0.05,
sasl.mechanism = GSSAPI,
security.protocol = PLAINTEXT,
send.buffer.bytes = 131072,
ssl.cipher.suites = null,
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1],
ssl.endpoint.identification.algorithm = https,
ssl.key.password = null,
ssl.keymanager.algorithm = SunX509,
ssl.keystore.location = null,
ssl.keystore.password = null,
ssl.keystore.type = JKS,
ssl.protocol = TLS,
ssl.provider = null,
ssl.secure.random.implementation = null,
ssl.trustmanager.algorithm = PKIX,
ssl.truststore.location = null,
ssl.truststore.password = null,
ssl.truststore.type = JKS,
,
[main] INFO org.apache.kafka.common.utils.AppInfoParser — Kafka version : 2.1.1-cp3,
[main] INFO org.apache.kafka.common.utils.AppInfoParser — Kafka commitId : c119bfeabe023775,
[main] INFO org.apache.kafka.clients.admin.AdminClientConfig — AdminClientConfig values: ,
bootstrap.servers = [kafka3:9092],
client.dns.lookup = default,
client.id = ,
connections.max.idle.ms = 300000,
metadata.max.age.ms = 300000,
metric.reporters = [],
metrics.num.samples = 2,
metrics.recording.level = INFO,
metrics.sample.window.ms = 30000,
receive.buffer.bytes = 65536,
reconnect.backoff.max.ms = 1000,
reconnect.backoff.ms = 50,
request.timeout.ms = 120000,�
retries = 5,
retry.backoff.ms = 100,
sasl.client.callback.handler.class = null,
sasl.jaas.config = null,
sasl.kerberos.kinit.cmd = /usr/bin/kinit,
sasl.kerberos.min.time.before.relogin = 60000,
sasl.kerberos.service.name = null,
sasl.kerberos.ticket.renew.jitter = 0.05,
sasl.kerberos.ticket.renew.window.factor = 0.8,
sasl.login.callback.handler.class = null,
sasl.login.class = null,
sasl.login.refresh.buffer.seconds = 300,
sasl.login.refresh.min.period.seconds = 60,
sasl.login.refresh.window.factor = 0.8,
sasl.login.refresh.window.jitter = 0.05,
sasl.mechanism = GSSAPI,
security.protocol = PLAINTEXT,
send.buffer.bytes = 131072,
ssl.cipher.suites = null,
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1],
ssl.endpoint.identification.algorithm = https,
ssl.key.password = null,
ssl.keymanager.algorithm = SunX509,
ssl.keystore.location = null,
ssl.keystore.password = null,
ssl.keystore.type = JKS,
ssl.protocol = TLS,
ssl.provider = null,
ssl.secure.random.implementation = null,
ssl.trustmanager.algorithm = PKIX,
ssl.truststore.location = null,
ssl.truststore.password = null,
ssl.truststore.type = JKS,
,
[main] INFO org.apache.kafka.common.utils.AppInfoParser — Kafka version : 2.1.1-cp3,
[main] INFO org.apache.kafka.common.utils.AppInfoParser — Kafka commitId : c119bfeabe023775,
[kafka-admin-client-thread | adminclient-1] INFO org.apache.kafka.clients.admin.internals.AdminMetadataManager — [AdminClient clientId=adminclient-1] Metadata update failed,
org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.,
[main] ERROR io.confluent.admin.utils.ClusterStatus — Error while getting broker list.,
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.,
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45),
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32),
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89),
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262),
at io.confluent.admin.utils.ClusterStatus.isKafkaReady(ClusterStatus.java:149),
at io.confluent.admin.utils.cli.KafkaReadyCommand.main(KafkaReadyCommand.java:150),
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.,
[main] INFO io.confluent.admin.utils.ClusterStatus — Expected 1 brokers but found only 0. Trying to query Kafka for metadata again …,
[main] ERROR io.confluent.admin.utils.ClusterStatus — Expected 1 brokers but found only 0. Brokers found [].,
OK. All Kafkas are ready,
===> ENV Variables …,
ALLOW_UNSIGNED=false,
COMPONENT=schema-registry,
CONFLUENT_DEB_VERSION=1,
CONFLUENT_MAJOR_VERSION=5,
CONFLUENT_MINOR_VERSION=1,
CONFLUENT_MVN_LABEL=,
CONFLUENT_PATCH_VERSION=3,
CONFLUENT_PLATFORM_LABEL=,
CONFLUENT_VERSION=5.1.3,
CUB_CLASSPATH=/etc/confluent/docker/docker-utils.jar,
HOME=/root,
HOSTNAME=704d321e9067,
KAFKA_VERSION=2.1.1cp3,�
LANG=C.UTF-8,
PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin,
PWD=/,
PYTHON_PIP_VERSION=8.1.2,
PYTHON_VERSION=2.7.9-1,
SCALA_VERSION=2.11,
SCHEMA_REGISTRY_AVRO_COMPATIBILITY_LEVEL=none,
SCHEMA_REGISTRY_HOST_NAME=schema-registry2,
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://kafka1:9092,PLAINTEXT://kafka2:9092,PLAINTEXT://kafka3:9092,
SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081,
SCHEMA_REGISTRY_ZK_NAMESPACE=sr,
SHLVL=2,
ZULU_OPENJDK_VERSION=8=8.30.0.1,
_=/usr/bin/env,�,
r,
uid=0(root) gid=0(root) groups=0(root),
===> Configuring …,
===> Running preflight checks … ,
===> Check if Kafka is healthy …,
[main] INFO org.apache.kafka.clients.admin.AdminClientConfig — AdminClientConfig values: ,
bootstrap.servers = [PLAINTEXT://kafka1:9092, PLAINTEXT://kafka2:9092, PLAINTEXT://kafka3:9092],
client.dns.lookup = default,
client.id = ,
connections.max.idle.ms = 300000,
metadata.max.age.ms = 300000,
metric.reporters = [],
metrics.num.samples = 2,
metrics.recording.level = INFO,
metrics.sample.window.ms = 30000,
receive.buffer.bytes = 65536,
reconnect.backoff.max.ms = 1000,
reconnect.backoff.ms = 50,
request.timeout.ms = 120000,�
retries = 5,
retry.backoff.ms = 100,
sasl.client.callback.handler.class = null,
sasl.jaas.config = null,
sasl.kerberos.kinit.cmd = /usr/bin/kinit,
sasl.kerberos.min.time.before.relogin = 60000,
sasl.kerberos.service.name = null,
sasl.kerberos.ticket.renew.jitter = 0.05,
sasl.kerberos.ticket.renew.window.factor = 0.8,
sasl.login.callback.handler.class = null,
sasl.login.class = null,
sasl.login.refresh.buffer.seconds = 300,
sasl.login.refresh.min.period.seconds = 60,
sasl.login.refresh.window.factor = 0.8,
sasl.login.refresh.window.jitter = 0.05,
sasl.mechanism = GSSAPI,
security.protocol = PLAINTEXT,
send.buffer.bytes = 131072,
ssl.cipher.suites = null,
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1],
ssl.endpoint.identification.algorithm = https,
ssl.key.password = null,
ssl.keymanager.algorithm = SunX509,
ssl.keystore.location = null,
ssl.keystore.password = null,
ssl.keystore.type = JKS,
ssl.protocol = TLS,
ssl.provider = null,
ssl.secure.random.implementation = null,
ssl.trustmanager.algorithm = PKIX,
ssl.truststore.location = null,
ssl.truststore.password = null,
ssl.truststore.type = JKS,
,
[main] INFO org.apache.kafka.common.utils.AppInfoParser — Kafka version : 2.1.1-cp3,
[main] INFO org.apache.kafka.common.utils.AppInfoParser — Kafka commitId : c119bfeabe023775,
===> Launching … ,
===> Launching schema-registry … ,
[2019-12-05 07:29:35,054] INFO SchemaRegistryConfig values: ,
resource.extension.class = [],
metric.reporters = [],
kafkastore.sasl.kerberos.kinit.cmd = /usr/bin/kinit,
response.mediatype.default = application/vnd.schemaregistry.v1+json,
resource.extension.classes = [],
kafkastore.ssl.trustmanager.algorithm = PKIX,
inter.instance.protocol = http,
authentication.realm = ,
ssl.keystore.type = JKS,
kafkastore.topic = _schemas,
metrics.jmx.prefix = kafka.schema.registry,
kafkastore.ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1,
kafkastore.topic.replication.factor = 3,
ssl.truststore.password = [hidden],
kafkastore.timeout.ms = 500,
host.name = schema-registry2,
kafkastore.bootstrap.servers = [PLAINTEXT://kafka1:9092, PLAINTEXT://kafka2:9092, PLAINTEXT://kafka3:9092],
schema.registry.zk.namespace = schema_registry,
kafkastore.sasl.kerberos.ticket.renew.window.factor = 0.8,
kafkastore.sasl.kerberos.service.name = ,
schema.registry.resource.extension.class = [],
ssl.endpoint.identification.algorithm = ,
compression.enable = true,
kafkastore.ssl.truststore.type = JKS,
avro.compatibility.level = none,
kafkastore.ssl.protocol = TLS,
kafkastore.ssl.provider = ,
kafkastore.ssl.truststore.location = ,
response.mediatype.preferred = [application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json],
kafkastore.ssl.keystore.type = JKS,
authentication.skip.paths = [],
ssl.truststore.type = JKS,
websocket.servlet.initializor.classes = [],
kafkastore.ssl.truststore.password = [hidden],
access.control.allow.origin = ,
ssl.truststore.location = ,
ssl.keystore.password = [hidden],�
port = 8081,
access.control.allow.headers = ,
kafkastore.ssl.keystore.location = ,
metrics.tag.map = {},
master.eligibility = true,
ssl.client.auth = false,
kafkastore.ssl.keystore.password = [hidden],
rest.servlet.initializor.classes = [],
websocket.path.prefix = /ws,
kafkastore.security.protocol = PLAINTEXT,
ssl.trustmanager.algorithm = ,
authentication.method = NONE,
request.logger.name = io.confluent.rest-utils.requests,
ssl.key.password = [hidden],
kafkastore.zk.session.timeout.ms = 30000,
kafkastore.sasl.mechanism = GSSAPI,
kafkastore.sasl.kerberos.ticket.renew.jitter = 0.05,
kafkastore.ssl.key.password = [hidden],
zookeeper.set.acl = false,
schema.registry.inter.instance.protocol = ,
authentication.roles = [*],
metrics.num.samples = 2,
ssl.protocol = TLS,
schema.registry.group.id = schema-registry,
kafkastore.ssl.keymanager.algorithm = SunX509,
kafkastore.connection.url = ,
debug = false,
listeners = [http://0.0.0.0:8081],
kafkastore.group.id = ,
ssl.provider = ,
ssl.enabled.protocols = [],
shutdown.graceful.ms = 1000,
ssl.keystore.location = ,
ssl.cipher.suites = [],
kafkastore.ssl.endpoint.identification.algorithm = ,
kafkastore.ssl.cipher.suites = ,
access.control.allow.methods = ,
kafkastore.sasl.kerberos.min.time.before.relogin = 60000,
ssl.keymanager.algorithm = ,
metrics.sample.window.ms = 30000,
kafkastore.init.timeout.ms = 60000,
(io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig),
[2019-12-05 07:29:35,100] INFO Logging initialized @353ms to org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log),
[2019-12-05 07:29:35,373] INFO Initializing KafkaStore with broker endpoints: PLAINTEXT://kafka1:9092,PLAINTEXT://kafka2:9092,PLAINTEXT://kafka3:9092 (io.confluent.kafka.schemaregistry.storage.KafkaStore),
[2019-12-05 07:29:35,388] INFO AdminClientConfig values: ,
bootstrap.servers = [PLAINTEXT://kafka1:9092, PLAINTEXT://kafka2:9092, PLAINTEXT://kafka3:9092],
client.dns.lookup = default,
client.id = ,
connections.max.idle.ms = 300000,
metadata.max.age.ms = 300000,
metric.reporters = [],
metrics.num.samples = 2,
metrics.recording.level = INFO,
metrics.sample.window.ms = 30000,
receive.buffer.bytes = 65536,
reconnect.backoff.max.ms = 1000,
reconnect.backoff.ms = 50,
request.timeout.ms = 120000,�
retries = 5,
retry.backoff.ms = 100,
sasl.client.callback.handler.class = null,
sasl.jaas.config = null,
sasl.kerberos.kinit.cmd = /usr/bin/kinit,
sasl.kerberos.min.time.before.relogin = 60000,
sasl.kerberos.service.name = null,
sasl.kerberos.ticket.renew.jitter = 0.05,
sasl.kerberos.ticket.renew.window.factor = 0.8,
sasl.login.callback.handler.class = null,
sasl.login.class = null,
sasl.login.refresh.buffer.seconds = 300,
sasl.login.refresh.min.period.seconds = 60,
sasl.login.refresh.window.factor = 0.8,
sasl.login.refresh.window.jitter = 0.05,
sasl.mechanism = GSSAPI,
security.protocol = PLAINTEXT,
send.buffer.bytes = 131072,
ssl.cipher.suites = null,
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1],
ssl.endpoint.identification.algorithm = https,
ssl.key.password = null,
ssl.keymanager.algorithm = SunX509,
ssl.keystore.location = null,
ssl.keystore.password = null,
ssl.keystore.type = JKS,
ssl.protocol = TLS,
ssl.provider = null,
ssl.secure.random.implementation = null,
ssl.trustmanager.algorithm = PKIX,
ssl.truststore.location = null,
ssl.truststore.password = null,
ssl.truststore.type = JKS,
(org.apache.kafka.clients.admin.AdminClientConfig),
[2019-12-05 07:29:35,451] INFO Kafka version : 2.1.1-cp3 (org.apache.kafka.common.utils.AppInfoParser),
[2019-12-05 07:29:35,451] INFO Kafka commitId : 74487c928927908f (org.apache.kafka.common.utils.AppInfoParser),
[2019-12-05 07:34:35,640] ERROR Error starting the schema registry (io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication),
io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException: Error initializing kafka store while initializing schema registry,
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:212),
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.initSchemaRegistry(SchemaRegistryRestApplication.java:62),
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:73),
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:40),
at io.confluent.rest.Application.createServer(Application.java:201),
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:42),
Caused by: io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException: Timed out trying to create or validate schema topic configuration,
at io.confluent.kafka.schemaregistry.storage.KafkaStore.createOrVerifySchemaTopic(KafkaStore.java:172),
at io.confluent.kafka.schemaregistry.storage.KafkaStore.init(KafkaStore.java:114),
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:210),
… 5 more,
Caused by: java.util.concurrent.TimeoutException,
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108),
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274),
at io.confluent.kafka.schemaregistry.storage.KafkaStore.createOrVerifySchemaTopic(KafkaStore.java:165),
… 7 more,

Я недавно обновил Confluent до 4.1, но в реестре схем есть некоторые проблемы. На confluent start реестр схемы (и, следовательно, сервер ksql) не может запуститься.

Вот ошибка, которую я получаю в журналах реестра схем:

[2018-04-20 11:27:38,426] ERROR Error starting the schema registry (io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication:65)
io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException: Error initializing kafka store while initializing schema registry
        at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:203)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:63)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:41)
        at io.confluent.rest.Application.createServer(Application.java:165)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:43)
Caused by: io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException: io.confluent.kafka.schemaregistry.storage.exceptions.StoreException: Failed to write Noop record to kafka store.
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.init(KafkaStore.java:139)
        at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:201)
        ... 4 more
Caused by: io.confluent.kafka.schemaregistry.storage.exceptions.StoreException: Failed to write Noop record to kafka store.
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.getLatestOffset(KafkaStore.java:423)
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.waitUntilKafkaReaderReachesLastOffset(KafkaStore.java:276)
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.init(KafkaStore.java:137)
        ... 5 more
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:77)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.getLatestOffset(KafkaStore.java:418)
        ... 7 more
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
[2018-04-20 11:27:38,430] INFO Shutting down schema registry (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:726)
[2018-04-20 11:27:38,430] INFO [kafka-store-reader-thread-_schemas]: Shutting down (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2018-04-20 11:27:38,431] INFO [kafka-store-reader-thread-_schemas]: Stopped (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2018-04-20 11:27:38,440] INFO [kafka-store-reader-thread-_schemas]: Shutdown completed (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2018-04-20 11:27:38,446] INFO KafkaStoreReaderThread shutdown complete. (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:227)

Я понятия не имею, почему сообщается об этой ошибке, и сообщения об ошибках не так значимы для меня.

После сбоя confluent start schema-registry и confluent start ksql-server запускают обе службы, но при запуске KSQL я получаю следующее предупреждение:

**************** WARNING ******************
Remote server address may not be valid:
Error issuing GET to KSQL server
Caused by: java.net.ConnectException: Connection refused (Connection refused)
Caused by: Could not connect to the server.
*******************************************

При попытке запустить команду (например, show tables;) выдается следующая ошибка:

ksql> show tables;
Error issuing POST to KSQL server
Caused by: java.net.ConnectException: Connection refused (Connection refused)
Caused by: Could not connect to the server.

РЕДАКТИРОВАТЬ: Я исправил это, уничтожив текущий запуск (confluent destroy), но было бы интересно, если бы кто-нибудь мог объяснить эту проблему.

1 ответ

Лучший ответ

Судя по опубликованной вами информации, кажется , что у вас могли быть какие-то зомби-процессы или неверные данные, хотя я не могу быть уверен.

Реестр схемы жаловался, что не может написать сообщение Kafka, потому что брокер Kafka жаловался, что ему не принадлежит раздел темы, в который выполнялась запись в реестр схемы. Это могло быть вызвано предыдущим брокером Kafka (из старой установки), который все еще работает.

Вы confluent stop перед обновлением?

Использование confluent destroy для сглаживания / сброса установки всегда является хорошим вариантом, если вы не цените свои данные. Проверка на наличие ложных процессов (или использование старого трюка с «перезагрузкой машины») также может быть хорошим местом для начала, когда что-то работает не так, как вы ожидали.

Рад, что теперь все разобрано: D

Энди


3

Andrew Coates
20 Апр 2018 в 13:09

Hi all,

I’m surely doing something wrong here, but I can’t seem to get the schema-registry to start with our non-Confluent packaged Kafka.  I’m using a 3 node Kafka cluster running Kafka 0.8.1.1, installed via our custom .deb packaging[1].  This Kafka cluster uses a chrooted zookeeper path.

My schema-registry.properites file has this:

  port=8081

  kafkastore.connection.url=localhost:2181/kafka/analytics-kafka

  kafkastore.topic=_schemas

  debug=false

Zookeeper is running on localhost:2181 as well as 2 other nodes, and the Kafka cluster is configure to use all 3 zookeepers.  I have also tried including all zookeepers in kafkastore.connection.url with the same effect.

When I run schema-registry-start, I can see the schema registry create the _schemas topic, as well as produce a null (empty?) message to it.  It then says:

[2015-04-01 19:11:32,228] INFO Initialized the consumer offset to -1 (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:87)

[2015-04-01 19:11:37,138] INFO [kafka-store-reader-thread-_schemas], Starting  (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:68)

[2015-04-01 19:11:37,389] INFO Wait to catch up until the offset of the last message at 2 (io.confluent.kafka.schemaregistry.storage.KafkaStore:221)

It then waits for 60 seconds, and then prints out:

[2015-04-01 19:12:37,391] ERROR Error starting the schema registry (io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication:57)

io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException: Error initializing kafka store while initializing schema registry

at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:164)

at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:55)

at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:37)

at io.confluent.rest.Application.createServer(Application.java:104)

at io.confluent.kafka.schemaregistry.rest.Main.main(Main.java:42)

Caused by: io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException: io.confluent.kafka.schemaregistry.storage.exceptions.StoreTimeoutException: KafkaStoreReaderThread failed to reach target offset within the timeout interval. targetOffset: 2, offsetReached: 1, timeout(ms): 60000

at io.confluent.kafka.schemaregistry.storage.KafkaStore.init(KafkaStore.java:151)

at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:162)

… 4 more

Caused by: io.confluent.kafka.schemaregistry.storage.exceptions.StoreTimeoutException: KafkaStoreReaderThread failed to reach target offset within the timeout interval. targetOffset: 2, offsetReached: 1, timeout(ms): 60000

at io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread.waitUntilOffset(KafkaStoreReaderThread.java:229)

at io.confluent.kafka.schemaregistry.storage.KafkaStore.waitUntilKafkaReaderReachesLastOffset(KafkaStore.java:222)

at io.confluent.kafka.schemaregistry.storage.KafkaStore.init(KafkaStore.java:149)

… 5 more 

The Kafka broker that the schema-registry connected to has this in the logs:

[2015-04-01 19:12:37,719] 925129 [kafka-processor-9092-0] ERROR kafka.network.Processor  — Closing socket for /10.68.16.118 because of error

java.io.IOException: Connection reset by peer

at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)

at sun.nio.ch.IOUtil.read(IOUtil.java:197)

at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)

at kafka.utils.Utils$.read(Utils.scala:375)

at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)

at kafka.network.Processor.read(SocketServer.scala:347)

at kafka.network.Processor.run(SocketServer.scala:245)

at java.lang.Thread.run(Thread.java:745)

Does anyone have any obvious tips for me?  Will the schema-registry work with Kafka 0.8.1.1?  The kafka-rest-proxy works just fine with this setup.

Thanks!

-Ao

Recommend Projects

  • React photo

    React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo

    Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo

    Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo

    TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo

    Django

    The Web framework for perfectionists with deadlines.

  • Laravel photo

    Laravel

    A PHP framework for web artisans

  • D3 photo

    D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Visualization

    Some thing interesting about visualization, use data art

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo

    Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo

    Microsoft

    Open source projects and samples from Microsoft.

  • Google photo

    Google

    Google ❤️ Open Source for everyone.

  • Alibaba photo

    Alibaba

    Alibaba Open Source for everyone

  • D3 photo

    D3

    Data-Driven Documents codes.

  • Tencent photo

    Tencent

    China tencent open source team.

/** * Copyright 2014 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package io.confluent.kafka.schemaregistry.storage; import io.confluent.common.metrics.JmxReporter; import io.confluent.common.metrics.MetricConfig; import io.confluent.common.metrics.MetricName; import io.confluent.common.metrics.Metrics; import io.confluent.common.metrics.MetricsReporter; import io.confluent.common.metrics.Sensor; import io.confluent.common.metrics.stats.Gauge; import io.confluent.common.utils.SystemTime; import io.confluent.kafka.schemaregistry.avro.AvroCompatibilityLevel; import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.avro.AvroUtils; import io.confluent.kafka.schemaregistry.client.rest.RestService; import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString; import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest; import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.schemaregistry.client.rest.utils.UrlList; import io.confluent.kafka.schemaregistry.exceptions.IncompatibleSchemaException; import io.confluent.kafka.schemaregistry.exceptions.InvalidSchemaException; import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException; import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException; import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryRequestForwardingException; import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryStoreException; import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryTimeoutException; import io.confluent.kafka.schemaregistry.exceptions.UnknownMasterException; import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig; import io.confluent.kafka.schemaregistry.rest.VersionId; import io.confluent.kafka.schemaregistry.rest.exceptions.Errors; import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException; import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException; import io.confluent.kafka.schemaregistry.storage.exceptions.StoreTimeoutException; import io.confluent.kafka.schemaregistry.storage.serialization.Serializer; import io.confluent.kafka.schemaregistry.zookeeper.SchemaRegistryIdentity; import io.confluent.kafka.schemaregistry.zookeeper.ZookeeperMasterElector; import io.confluent.rest.Application; import io.confluent.rest.RestConfig; import io.confluent.rest.exceptions.RestException; import kafka.utils.ZkUtils; import org.apache.kafka.common.config.ConfigException; import scala.Tuple2; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.apache.avro.reflect.Nullable; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.security.JaasUtils; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; import java.util.Arrays; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.Vector; import java.util.concurrent.TimeUnit; public class KafkaSchemaRegistry implements SchemaRegistry { /** * Schema versions under a particular subject are indexed from MIN_VERSION. */ public static final int MIN_VERSION = 1; public static final int MAX_VERSION = Integer.MAX_VALUE; public static final int ZOOKEEPER_SCHEMA_ID_COUNTER_BATCH_SIZE = 20; public static final String ZOOKEEPER_SCHEMA_ID_COUNTER = "/schema_id_counter"; private static final int ZOOKEEPER_SCHEMA_ID_COUNTER_BATCH_WRITE_RETRY_BACKOFF_MS = 50; private static final Logger log = LoggerFactory.getLogger(KafkaSchemaRegistry.class); final Map<Integer, SchemaKey> guidToSchemaKey; final Map<MD5, SchemaIdAndSubjects> schemaHashToGuid; private final KafkaStore<SchemaRegistryKey, SchemaRegistryValue> kafkaStore; private final Serializer<SchemaRegistryKey, SchemaRegistryValue> serializer; private final SchemaRegistryIdentity myIdentity; private final Object masterLock = new Object(); private final AvroCompatibilityLevel defaultCompatibilityLevel; private final String schemaRegistryZkNamespace; private final String kafkaClusterZkUrl; private final int zkSessionTimeoutMs; private final int kafkaStoreTimeoutMs; private final boolean isEligibleForMasterElector; private String schemaRegistryZkUrl; private ZkUtils zkUtils; private SchemaRegistryIdentity masterIdentity; private RestService masterRestService; private ZookeeperMasterElector masterElector = null; private Metrics metrics; private Sensor masterNodeSensor; private boolean zkAclsEnabled; // Hand out this id during the next schema registration. Indexed from 1. private int nextAvailableSchemaId; // Tracks the upper bound of the current id batch (inclusive). When nextAvailableSchemaId goes // above this value, it's time to allocate a new batch of ids private int idBatchInclusiveUpperBound; // Track the largest id in the kafka store so far (-1 indicates none in the store) // This is automatically updated by the KafkaStoreReaderThread every time a new Schema is added // Used to ensure that any newly allocated batch of ids does not overlap // with any id in the kafkastore. Primarily for bootstrapping the SchemaRegistry when // data is already in the kafkastore. private int maxIdInKafkaStore = -1; public KafkaSchemaRegistry(SchemaRegistryConfig config, Serializer<SchemaRegistryKey, SchemaRegistryValue> serializer) throws SchemaRegistryException { String host = config.getString(SchemaRegistryConfig.HOST_NAME_CONFIG); int port = getPortForIdentity(config.getInt(SchemaRegistryConfig.PORT_CONFIG), config.getList(RestConfig.LISTENERS_CONFIG)); this.schemaRegistryZkNamespace = config.getString(SchemaRegistryConfig.SCHEMAREGISTRY_ZK_NAMESPACE); this.isEligibleForMasterElector = config.getBoolean(SchemaRegistryConfig.MASTER_ELIGIBILITY); this.myIdentity = new SchemaRegistryIdentity(host, port, isEligibleForMasterElector); this.kafkaClusterZkUrl = config.getString(SchemaRegistryConfig.KAFKASTORE_CONNECTION_URL_CONFIG); this.zkSessionTimeoutMs = config.getInt(SchemaRegistryConfig.KAFKASTORE_ZK_SESSION_TIMEOUT_MS_CONFIG); this.kafkaStoreTimeoutMs = config.getInt(SchemaRegistryConfig.KAFKASTORE_TIMEOUT_CONFIG); this.serializer = serializer; this.defaultCompatibilityLevel = config.compatibilityType(); this.guidToSchemaKey = new HashMap<Integer, SchemaKey>(); this.schemaHashToGuid = new HashMap<MD5, SchemaIdAndSubjects>(); Store store = new InMemoryStore<SchemaRegistryKey, SchemaRegistryValue>(); kafkaStore = new KafkaStore<SchemaRegistryKey, SchemaRegistryValue>( config, new KafkaStoreMessageHandler(this, store), this.serializer, store, new NoopKey()); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); String jmxPrefix = "kafka.schema.registry"; reporters.add(new JmxReporter(jmxPrefix)); this.metrics = new Metrics(metricConfig, reporters, new SystemTime()); this.masterNodeSensor = metrics.sensor("master-slave-role"); MetricName m = new MetricName("master-slave-role", "master-slave-role", "1.0 indicates the node is the active master in the cluster and is the" + " node where all register schema and config update requests are " + "served."); this.masterNodeSensor.add(m, new Gauge()); this.zkAclsEnabled = checkZkAclConfig(config); } /** * Checks if the user has configured ZooKeeper ACLs or not. Throws an exception if the ZooKeeper * client is set to create znodes with an ACL, yet the JAAS config is not present. Otherwise, * returns whether or not the user has enabled ZooKeeper ACLs. */ public static boolean checkZkAclConfig(SchemaRegistryConfig config) { if (config.getBoolean(SchemaRegistryConfig.ZOOKEEPER_SET_ACL_CONFIG) && !JaasUtils .isZkSecurityEnabled()) { throw new ConfigException( SchemaRegistryConfig.ZOOKEEPER_SET_ACL_CONFIG + " is set to true but ZooKeeper's " + "JAAS SASL configuration is not configured."); } return config.getBoolean(SchemaRegistryConfig.ZOOKEEPER_SET_ACL_CONFIG); } /** * A Schema Registry instance's identity is in part the port it listens on. Currently the port can * either be configured via the deprecated `port` configuration, or via the `listeners` * configuration. * * <p>This method uses `Application.parseListeners()` from `rest-utils` to get a list of * listeners, and returns the port of the first listener to be used for the instance's identity. * * <p></p>In theory, any port from any listener would be sufficient. Choosing the first, instead * of say the last, is arbitrary. */ // TODO: once RestConfig.PORT_CONFIG is deprecated, remove the port parameter. static int getPortForIdentity(int port, List<String> configuredListeners) { List<URI> listeners = Application.parseListeners(configuredListeners, port, Arrays.asList("http", "https"), "http"); return listeners.get(0).getPort(); } @Override public void init() throws SchemaRegistryInitializationException { try { kafkaStore.init(); } catch (StoreInitializationException e) { throw new SchemaRegistryInitializationException( "Error initializing kafka store while initializing schema registry", e); } try { createZkNamespace(); masterElector = new ZookeeperMasterElector(zkUtils, myIdentity, this, isEligibleForMasterElector); } catch (SchemaRegistryStoreException e) { throw new SchemaRegistryInitializationException( "Error electing master while initializing schema registry", e); } catch (SchemaRegistryTimeoutException e) { throw new SchemaRegistryInitializationException(e); } } private void createZkNamespace() { int kafkaNamespaceIndex = kafkaClusterZkUrl.indexOf("/"); String zkConnForNamespaceCreation = kafkaNamespaceIndex > 0 ? kafkaClusterZkUrl.substring(0, kafkaNamespaceIndex) : kafkaClusterZkUrl; String schemaRegistryNamespace = "/" + schemaRegistryZkNamespace; schemaRegistryZkUrl = zkConnForNamespaceCreation + schemaRegistryNamespace; ZkUtils zkUtilsForNamespaceCreation = ZkUtils.apply( zkConnForNamespaceCreation, zkSessionTimeoutMs, zkSessionTimeoutMs, zkAclsEnabled); // create the zookeeper namespace using cluster.name if it doesn't already exist zkUtilsForNamespaceCreation.makeSurePersistentPathExists( schemaRegistryNamespace, zkUtilsForNamespaceCreation.defaultAcls(schemaRegistryNamespace)); log.info("Created schema registry namespace " + zkConnForNamespaceCreation + schemaRegistryNamespace); zkUtilsForNamespaceCreation.close(); this.zkUtils = ZkUtils.apply( schemaRegistryZkUrl, zkSessionTimeoutMs, zkSessionTimeoutMs, zkAclsEnabled); } public boolean isMaster() { synchronized (masterLock) { if (masterIdentity != null && masterIdentity.equals(myIdentity)) { return true; } else { return false; } } } /** * 'Inform' this SchemaRegistry instance which SchemaRegistry is the current master. * If this instance is set as the new master, ensure it is up-to-date with data in * the kafka store, and tell Zookeeper to allocate the next batch of schema IDs. * * @param newMaster Identity of the current master. null means no master is alive. */ public void setMaster(@Nullable SchemaRegistryIdentity newMaster) throws SchemaRegistryTimeoutException, SchemaRegistryStoreException { log.debug("Setting the master to " + newMaster); // Only schema registry instances eligible for master can be set to master if (newMaster != null && !newMaster.getMasterEligibility()) { throw new IllegalStateException( "Tried to set an ineligible node to master: " + newMaster); } synchronized (masterLock) { SchemaRegistryIdentity previousMaster = masterIdentity; masterIdentity = newMaster; if (masterIdentity == null) { masterRestService = null; } else { masterRestService = new RestService(String.format("http://%s:%d", masterIdentity.getHost(), masterIdentity.getPort())); } if (masterIdentity != null && !masterIdentity.equals(previousMaster) && isMaster()) { nextAvailableSchemaId = nextSchemaIdCounterBatch(); idBatchInclusiveUpperBound = getInclusiveUpperBound(nextAvailableSchemaId); // The new master may not know the exact last offset in the Kafka log. So, mark the // last offset invalid here and let the logic in register() deal with it later. kafkaStore.markLastWrittenOffsetInvalid(); } masterNodeSensor.record(isMaster() ? 1.0 : 0.0); } } /** * Return json data encoding basic information about this SchemaRegistry instance, such as * host, port, etc. */ public SchemaRegistryIdentity myIdentity() { return myIdentity; } /** * Return the identity of the SchemaRegistry that this instance thinks is current master. * Any request that requires writing new data gets forwarded to the master. */ public SchemaRegistryIdentity masterIdentity() { synchronized (masterLock) { return masterIdentity; } } @Override public int register(String subject, Schema schema) throws SchemaRegistryException { try { // Ensure cache is up-to-date before any potential writes kafkaStore.waitUntilKafkaReaderReachesLastOffset(kafkaStoreTimeoutMs); // see if the schema to be registered already exists MD5 md5 = MD5.ofString(schema.getSchema()); int schemaId = -1; if (this.schemaHashToGuid.containsKey(md5)) { SchemaIdAndSubjects schemaIdAndSubjects = this.schemaHashToGuid.get(md5); if (schemaIdAndSubjects.hasSubject(subject) && !isSubjectVersionDeleted(subject, schemaIdAndSubjects.getVersion(subject))) { // return only if the schema was previously registered under the input subject return schemaIdAndSubjects.getSchemaId(); } else { // need to register schema under the input subject schemaId = schemaIdAndSubjects.getSchemaId(); } } // determine the latest version of the schema in the subject Iterator<Schema> allVersions = getAllVersions(subject, true); Iterator<Schema> undeletedVersions = getAllVersions(subject, false); List<String> undeletedSchemasList = new ArrayList<>(); Schema latestSchema = null; int newVersion = MIN_VERSION; while (allVersions.hasNext()) { newVersion = allVersions.next().getVersion() + 1; } while (undeletedVersions.hasNext()) { latestSchema = undeletedVersions.next(); undeletedSchemasList.add(latestSchema.getSchema()); } AvroSchema avroSchema = canonicalizeSchema(schema); // assign a guid and put the schema in the kafka store if (latestSchema == null || isCompatible(subject, avroSchema.canonicalString, undeletedSchemasList)) { schema.setVersion(newVersion); if (schemaId >= 0) { schema.setId(schemaId); } else { schema.setId(nextAvailableSchemaId); nextAvailableSchemaId++; } if (reachedEndOfIdBatch()) { idBatchInclusiveUpperBound = getInclusiveUpperBound(nextSchemaIdCounterBatch()); } SchemaValue schemaValue = new SchemaValue(schema); kafkaStore.put(new SchemaKey(subject, newVersion), schemaValue); return schema.getId(); } else { throw new IncompatibleSchemaException( "New schema is incompatible with an earlier schema."); } } catch (StoreTimeoutException te) { throw new SchemaRegistryTimeoutException("Write to the Kafka store timed out while", te); } catch (StoreException e) { throw new SchemaRegistryStoreException("Error while registering the schema in the" + " backend Kafka store", e); } } public int registerOrForward(String subject, Schema schema, Map<String, String> headerProperties) throws SchemaRegistryException { Schema existingSchema = lookUpSchemaUnderSubject(subject, schema, false); if (existingSchema != null) { return existingSchema.getId(); } synchronized (masterLock) { if (isMaster()) { return register(subject, schema); } else { // forward registering request to the master if (masterIdentity != null) { return forwardRegisterRequestToMaster(subject, schema.getSchema(), headerProperties); } else { throw new UnknownMasterException("Register schema request failed since master is " + "unknown"); } } } } @Override public void deleteSchemaVersion(String subject, Schema schema) throws SchemaRegistryException { try { // Ensure cache is up-to-date before any potential writes kafkaStore.waitUntilKafkaReaderReachesLastOffset(kafkaStoreTimeoutMs); SchemaValue schemaValue = new SchemaValue(schema); schemaValue.setDeleted(true); kafkaStore.put(new SchemaKey(subject, schema.getVersion()), schemaValue); if (!getAllVersions(subject, false).hasNext() && getCompatibilityLevel(subject) != null) { deleteSubjectCompatibility(subject); } } catch (StoreTimeoutException te) { throw new SchemaRegistryTimeoutException("Write to the Kafka store timed out while", te); } catch (StoreException e) { throw new SchemaRegistryStoreException("Error while deleting the schema in the" + " backend Kafka store", e); } } public void deleteSchemaVersionOrForward(String subject, Schema schema) throws SchemaRegistryException { synchronized (masterLock) { if (isMaster()) { deleteSchemaVersion(subject, schema); } else { // forward registering request to the master if (masterIdentity != null) { forwardDeleteSchemaVersionRequestToMaster(subject, schema.getVersion()); } else { throw new UnknownMasterException("Register schema request failed since master is " + "unknown"); } } } } @Override public List<Integer> deleteSubject(String subject) throws SchemaRegistryException { // Ensure cache is up-to-date before any potential writes try { kafkaStore.waitUntilKafkaReaderReachesLastOffset(kafkaStoreTimeoutMs); List<Integer> deletedVersions = new ArrayList<>(); int deleteWatermarkVersion = 0; Iterator<Schema> schemasToBeDeleted = getAllVersions(subject, false); while (schemasToBeDeleted.hasNext()) { deleteWatermarkVersion = schemasToBeDeleted.next().getVersion(); deletedVersions.add(deleteWatermarkVersion); } DeleteSubjectKey key = new DeleteSubjectKey(subject); DeleteSubjectValue value = new DeleteSubjectValue(subject, deleteWatermarkVersion); kafkaStore.put(key, value); if (getCompatibilityLevel(subject) != null) { deleteSubjectCompatibility(subject); } return deletedVersions; } catch (StoreTimeoutException te) { throw new SchemaRegistryTimeoutException("Write to the Kafka store timed out while", te); } catch (StoreException e) { throw new SchemaRegistryStoreException("Error while deleting the subject in the" + " backend Kafka store", e); } } public List<Integer> deleteSubjectOrForward(String subject) throws SchemaRegistryException { synchronized (masterLock) { if (isMaster()) { return deleteSubject(subject); } else { // forward registering request to the master if (masterIdentity != null) { return forwardDeleteSubjectRequestToMaster(subject); } else { throw new UnknownMasterException("Register schema request failed since master is " + "unknown"); } } } } /** * Checks if given schema was ever registered under a subject. If found, it returns the version of * the schema under the subject. If not, returns -1 */ public Schema lookUpSchemaUnderSubject(String subject, Schema schema, boolean lookupDeletedSchema) throws SchemaRegistryException { canonicalizeSchema(schema); // see if the schema to be registered already exists MD5 md5 = MD5.ofString(schema.getSchema()); if (this.schemaHashToGuid.containsKey(md5)) { SchemaIdAndSubjects schemaIdAndSubjects = this.schemaHashToGuid.get(md5); if (schemaIdAndSubjects.hasSubject(subject) && (lookupDeletedSchema || !isSubjectVersionDeleted(subject, schemaIdAndSubjects .getVersion(subject)))) { Schema matchingSchema = new Schema(subject, schemaIdAndSubjects.getVersion(subject), schemaIdAndSubjects.getSchemaId(), schema.getSchema()); return matchingSchema; } else { // this schema was never registered under the input subject return null; } } else { // this schema was never registered in the registry under any subject return null; } } /** * Allocate and lock the next batch of schema ids. Signal a global lock over the next batch by * writing the inclusive upper bound of the batch to ZooKeeper. I.e. the value stored in * ZOOKEEPER_SCHEMA_ID_COUNTER in ZooKeeper indicates the current max allocated id for assignment. * * <p>When a schema registry server is initialized, kafka may have preexisting persistent * schema -> id assignments, and zookeeper may have preexisting counter data. * Therefore, when allocating the next batch of ids, it's necessary to ensure the entire new batch * is greater than the greatest id in kafka and also greater than the previously recorded batch * in zookeeper. * * <p>Return the first available id in the newly allocated batch of ids. */ private Integer nextSchemaIdCounterBatch() throws SchemaRegistryStoreException { int nextIdBatchLowerBound = 1; while (true) { if (!zkUtils.zkClient().exists(ZOOKEEPER_SCHEMA_ID_COUNTER)) { // create ZOOKEEPER_SCHEMA_ID_COUNTER if it already doesn't exist try { nextIdBatchLowerBound = getNextBatchLowerBoundFromKafkaStore(); int nextIdBatchUpperBound = getInclusiveUpperBound(nextIdBatchLowerBound); zkUtils.createPersistentPath(ZOOKEEPER_SCHEMA_ID_COUNTER, String.valueOf(nextIdBatchUpperBound), zkUtils.defaultAcls(ZOOKEEPER_SCHEMA_ID_COUNTER)); return nextIdBatchLowerBound; } catch (ZkNodeExistsException ignore) { // A zombie master may have created this zk node after the initial existence check // Ignore and try again } } else { // ZOOKEEPER_SCHEMA_ID_COUNTER exists // read the latest counter value final Tuple2<String, Stat> counterValue = zkUtils.readData(ZOOKEEPER_SCHEMA_ID_COUNTER); final String counterData = counterValue._1(); final Stat counterStat = counterValue._2(); if (counterData == null) { throw new SchemaRegistryStoreException( "Failed to read schema id counter " + ZOOKEEPER_SCHEMA_ID_COUNTER + " from zookeeper"); } // Compute the lower bound of next id batch based on zk data and kafkastore data int zkIdCounterValue = Integer.valueOf(counterData); int zkNextIdBatchLowerBound = zkIdCounterValue + 1; if (zkIdCounterValue % ZOOKEEPER_SCHEMA_ID_COUNTER_BATCH_SIZE != 0) { // ZooKeeper id counter should be an integer multiple of id batch size in normal // operation; handle corrupted/stale id counter data gracefully by bumping // up to the next id batch // fixedZkIdCounterValue is the smallest multiple of // ZOOKEEPER_SCHEMA_ID_COUNTER_BATCH_SIZE greater than the bad zkIdCounterValue int fixedZkIdCounterValue = ZOOKEEPER_SCHEMA_ID_COUNTER_BATCH_SIZE * (1 + zkIdCounterValue / ZOOKEEPER_SCHEMA_ID_COUNTER_BATCH_SIZE); zkNextIdBatchLowerBound = fixedZkIdCounterValue + 1; log.warn( "Zookeeper schema id counter is not an integer multiple of id batch size." + " Zookeeper may have stale id counter data.n" + "zk id counter: " + zkIdCounterValue + "n" + "id batch size: " + ZOOKEEPER_SCHEMA_ID_COUNTER_BATCH_SIZE); } nextIdBatchLowerBound = Math.max(zkNextIdBatchLowerBound, getNextBatchLowerBoundFromKafkaStore()); String nextIdBatchUpperBound = String.valueOf(getInclusiveUpperBound(nextIdBatchLowerBound)); // conditionally update the zookeeper path with the upper bound of the new id batch. // newSchemaIdCounterDataVersion < 0 indicates a failed conditional update. // Most probable cause is the existence of another master which tries to do the same // counter batch allocation at the same time. If this happens, re-read the value and // continue until one master is determined to be the zombie master. // NOTE: The handling of multiple masters is still a TODO int newSchemaIdCounterDataVersion = (Integer) zkUtils.conditionalUpdatePersistentPath( ZOOKEEPER_SCHEMA_ID_COUNTER, nextIdBatchUpperBound, counterStat.getVersion(), null)._2(); if (newSchemaIdCounterDataVersion >= 0) { break; } } try { // Wait a bit and attempt id batch allocation again Thread.sleep(ZOOKEEPER_SCHEMA_ID_COUNTER_BATCH_WRITE_RETRY_BACKOFF_MS); } catch (InterruptedException ignored) { // ignored } } return nextIdBatchLowerBound; } private int forwardRegisterRequestToMaster(String subject, String schemaString, Map<String, String> headerProperties) throws SchemaRegistryRequestForwardingException { UrlList baseUrl = masterRestService.getBaseUrls(); RegisterSchemaRequest registerSchemaRequest = new RegisterSchemaRequest(); registerSchemaRequest.setSchema(schemaString); log.debug(String.format("Forwarding registering schema request %s to %s", registerSchemaRequest, baseUrl)); try { int id = masterRestService.registerSchema(headerProperties, registerSchemaRequest, subject); return id; } catch (IOException e) { throw new SchemaRegistryRequestForwardingException( String.format("Unexpected error while forwarding the registering schema request %s to %s", registerSchemaRequest, baseUrl), e); } catch (RestClientException e) { throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), e); } } private void forwardUpdateCompatibilityLevelRequestToMaster( String subject, AvroCompatibilityLevel compatibilityLevel, Map<String, String> headerProperties) throws SchemaRegistryRequestForwardingException { UrlList baseUrl = masterRestService.getBaseUrls(); ConfigUpdateRequest configUpdateRequest = new ConfigUpdateRequest(); configUpdateRequest.setCompatibilityLevel(compatibilityLevel.name); log.debug(String.format("Forwarding update config request %s to %s", configUpdateRequest, baseUrl)); try { masterRestService.updateConfig(headerProperties, configUpdateRequest, subject); } catch (IOException e) { throw new SchemaRegistryRequestForwardingException( String.format("Unexpected error while forwarding the update config request %s to %s", configUpdateRequest, baseUrl), e); } catch (RestClientException e) { throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), e); } } private void forwardDeleteSchemaVersionRequestToMaster(String subject, Integer version) throws SchemaRegistryRequestForwardingException { UrlList baseUrl = masterRestService.getBaseUrls(); log.debug(String.format("Forwarding deleteSchemaVersion schema version request %s-%s to %s", subject, version, baseUrl)); try { masterRestService.deleteSchemaVersion(subject, String.valueOf(version)); } catch (IOException e) { throw new SchemaRegistryRequestForwardingException( String.format( "Unexpected error while forwarding deleteSchemaVersion schema version " + "request %s-%s to %s", subject, version, baseUrl), e); } catch (RestClientException e) { throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), e); } } private List<Integer> forwardDeleteSubjectRequestToMaster(String subject) throws SchemaRegistryRequestForwardingException { UrlList baseUrl = masterRestService.getBaseUrls(); log.debug(String.format("Forwarding delete subject request for %s to %s", subject, baseUrl)); try { return masterRestService.deleteSubject(subject); } catch (IOException e) { throw new SchemaRegistryRequestForwardingException( String.format( "Unexpected error while forwarding delete subject " + "request %s to %s", subject, baseUrl), e); } catch (RestClientException e) { throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), e); } } private AvroSchema canonicalizeSchema(Schema schema) throws InvalidSchemaException { AvroSchema avroSchema = AvroUtils.parseSchema(schema.getSchema()); if (avroSchema == null) { throw new InvalidSchemaException("Invalid schema " + schema.toString()); } schema.setSchema(avroSchema.canonicalString); return avroSchema; } public Schema validateAndGetSchema(String subject, VersionId versionId, boolean returnDeletedSchema) throws SchemaRegistryException { Schema schema = this.get(subject, versionId.getVersionId(), returnDeletedSchema); if (schema == null) { if (!this.listSubjects().contains(subject)) { throw Errors.subjectNotFoundException(); } else { throw Errors.versionNotFoundException(); } } return schema; } @Override public Schema get(String subject, int version, boolean returnDeletedSchema) throws SchemaRegistryException { VersionId versionId = new VersionId(version); if (versionId.isLatest()) { return getLatestVersion(subject); } else { SchemaKey key = new SchemaKey(subject, version); try { SchemaValue schemaValue = (SchemaValue) kafkaStore.get(key); Schema schema = null; if ((schemaValue != null && !schemaValue.isDeleted()) || returnDeletedSchema) { schema = getSchemaEntityFromSchemaValue(schemaValue); } return schema; } catch (StoreException e) { throw new SchemaRegistryStoreException( "Error while retrieving schema from the backend Kafka" + " store", e); } } } @Override public SchemaString get(int id) throws SchemaRegistryException { SchemaValue schema = null; try { SchemaKey subjectVersionKey = guidToSchemaKey.get(id); if (subjectVersionKey == null) { return null; } schema = (SchemaValue) kafkaStore.get(subjectVersionKey); } catch (StoreException e) { throw new SchemaRegistryStoreException( "Error while retrieving schema with id " + id + " from the backend Kafka" + " store", e); } SchemaString schemaString = new SchemaString(); schemaString.setSchemaString(schema.getSchema()); return schemaString; } @Override public Set<String> listSubjects() throws SchemaRegistryException { try { Iterator<SchemaRegistryKey> allKeys = kafkaStore.getAllKeys(); return extractUniqueSubjects(allKeys); } catch (StoreException e) { throw new SchemaRegistryStoreException( "Error from the backend Kafka store", e); } } private Set<String> extractUniqueSubjects(Iterator<SchemaRegistryKey> allKeys) throws StoreException { Set<String> subjects = new HashSet<String>(); while (allKeys.hasNext()) { SchemaRegistryKey k = allKeys.next(); if (k instanceof SchemaKey) { SchemaKey key = (SchemaKey) k; SchemaValue value = (SchemaValue) kafkaStore.get(key); if (value != null && !value.isDeleted()) { subjects.add(key.getSubject()); } } } return subjects; } @Override public Iterator<Schema> getAllVersions(String subject, boolean returnDeletedSchemas) throws SchemaRegistryException { try { SchemaKey key1 = new SchemaKey(subject, MIN_VERSION); SchemaKey key2 = new SchemaKey(subject, MAX_VERSION); Iterator<SchemaRegistryValue> allVersions = kafkaStore.getAll(key1, key2); return sortSchemasByVersion(allVersions, returnDeletedSchemas).iterator(); } catch (StoreException e) { throw new SchemaRegistryStoreException( "Error from the backend Kafka store", e); } } @Override public Schema getLatestVersion(String subject) throws SchemaRegistryException { try { SchemaKey key1 = new SchemaKey(subject, MIN_VERSION); SchemaKey key2 = new SchemaKey(subject, MAX_VERSION); Iterator<SchemaRegistryValue> allVersions = kafkaStore.getAll(key1, key2); Vector<Schema> sortedVersions = sortSchemasByVersion(allVersions, false); Schema latestSchema = null; if (sortedVersions.size() > 0) { latestSchema = sortedVersions.lastElement(); } return latestSchema; } catch (StoreException e) { throw new SchemaRegistryStoreException( "Error from the backend Kafka store", e); } } @Override public void close() { log.info("Shutting down schema registry"); kafkaStore.close(); if (masterElector != null) { masterElector.close(); } if (zkUtils != null) { zkUtils.close(); } } public void updateCompatibilityLevel(String subject, AvroCompatibilityLevel newCompatibilityLevel) throws SchemaRegistryStoreException, UnknownMasterException { ConfigKey configKey = new ConfigKey(subject); try { kafkaStore.put(configKey, new ConfigValue(newCompatibilityLevel)); log.debug("Wrote new compatibility level: " + newCompatibilityLevel.name + " to the" + " Kafka data store with key " + configKey.toString()); } catch (StoreException e) { throw new SchemaRegistryStoreException("Failed to write new config value to the store", e); } } public void updateConfigOrForward(String subject, AvroCompatibilityLevel newCompatibilityLevel, Map<String, String> headerProperties) throws SchemaRegistryStoreException, SchemaRegistryRequestForwardingException, UnknownMasterException { synchronized (masterLock) { if (isMaster()) { updateCompatibilityLevel(subject, newCompatibilityLevel); } else { // forward update config request to the master if (masterIdentity != null) { forwardUpdateCompatibilityLevelRequestToMaster(subject, newCompatibilityLevel, headerProperties); } else { throw new UnknownMasterException("Update config request failed since master is " + "unknown"); } } } } public AvroCompatibilityLevel getCompatibilityLevel(String subject) throws SchemaRegistryStoreException { ConfigKey subjectConfigKey = new ConfigKey(subject); ConfigValue config; try { config = (ConfigValue) kafkaStore.get(subjectConfigKey); if (config == null && subject == null) { // if top level config was never updated, send the configured value for this instance config = new ConfigValue(this.defaultCompatibilityLevel); } else if (config == null) { config = new ConfigValue(); } } catch (StoreException e) { throw new SchemaRegistryStoreException("Failed to read config from the kafka store", e); } return config.getCompatibilityLevel(); } @Override public boolean isCompatible(String subject, String newSchemaObj, String latestSchema) throws SchemaRegistryException { if (latestSchema == null) { throw new InvalidSchemaException( "Latest schema not provided"); } return isCompatible(subject, newSchemaObj, Collections.singletonList(latestSchema)); } /** * @param previousSchemas Full schema history in chronological order */ @Override public boolean isCompatible(String subject, String newSchemaObj, List<String> previousSchemas) throws SchemaRegistryException { if (previousSchemas == null || previousSchemas.isEmpty()) { throw new InvalidSchemaException( "Previous schema not provided"); } List<org.apache.avro.Schema> previousAvroSchemas = new ArrayList<>(previousSchemas.size()); for (String previousSchema : previousSchemas) { if (previousSchema == null) { throw new InvalidSchemaException( "Existing schema " + previousSchema + " is not a valid Avro schema"); } AvroSchema previousAvroSchema = AvroUtils.parseSchema(previousSchema); previousAvroSchemas.add(previousAvroSchema.schemaObj); } AvroCompatibilityLevel compatibility = getCompatibilityLevel(subject); if (compatibility == null) { compatibility = getCompatibilityLevel(null); } return compatibility.compatibilityChecker .isCompatible(AvroUtils.parseSchema(newSchemaObj).schemaObj, previousAvroSchemas); } private void deleteSubjectCompatibility(String subject) throws StoreException { ConfigKey configKey = new ConfigKey(subject); this.kafkaStore.delete(configKey); } /** * For testing. */ KafkaStore<SchemaRegistryKey, SchemaRegistryValue> getKafkaStore() { return this.kafkaStore; } private Vector<Schema> sortSchemasByVersion(Iterator<SchemaRegistryValue> schemas, boolean returnDeletedSchemas) { Vector<Schema> schemaVector = new Vector<Schema>(); while (schemas.hasNext()) { SchemaValue schemaValue = (SchemaValue) schemas.next(); if (returnDeletedSchemas || !schemaValue.isDeleted()) { schemaVector.add(getSchemaEntityFromSchemaValue(schemaValue)); } } Collections.sort(schemaVector); return schemaVector; } private Schema getSchemaEntityFromSchemaValue(SchemaValue schemaValue) { if (schemaValue == null) { return null; } return new Schema(schemaValue.getSubject(), schemaValue.getVersion(), schemaValue.getId(), schemaValue.getSchema()); } private boolean isSubjectVersionDeleted(String subject, int version) throws SchemaRegistryException { try { SchemaValue schemaValue = (SchemaValue) this.kafkaStore.get(new SchemaKey(subject, version)); return schemaValue.isDeleted(); } catch (StoreException e) { throw new SchemaRegistryStoreException( "Error while retrieving schema from the backend Kafka" + " store", e); } } int getMaxIdInKafkaStore() { return this.maxIdInKafkaStore; } /** * This should only be updated by the KafkastoreReaderThread. */ void setMaxIdInKafkaStore(int id) { this.maxIdInKafkaStore = id; } /** * If true, it's time to allocate a new batch of ids with a call to nextSchemaIdCounterBatch() */ private boolean reachedEndOfIdBatch() { return nextAvailableSchemaId > idBatchInclusiveUpperBound; } /** * Return a minimum lower bound on the next batch of ids based on ids currently in the * kafka store. */ private int getNextBatchLowerBoundFromKafkaStore() { if (this.getMaxIdInKafkaStore() <= 0) { return 1; } int nextBatchLowerBound = 1 + this.getMaxIdInKafkaStore() / ZOOKEEPER_SCHEMA_ID_COUNTER_BATCH_SIZE; return 1 + nextBatchLowerBound * ZOOKEEPER_SCHEMA_ID_COUNTER_BATCH_SIZE; } /** * E.g. if inclusiveLowerBound is 61, and BATCH_SIZE is 20, the inclusiveUpperBound should be 80. */ private int getInclusiveUpperBound(int inclusiveLowerBound) { return inclusiveLowerBound + ZOOKEEPER_SCHEMA_ID_COUNTER_BATCH_SIZE - 1; } }

У меня установлен aws MSK, и я пытаюсь перенести записи из MSK в эластичный поиск. Я могу помещать данные в MSK в формат json. Хочу погрузиться в эластичный поиск. Я могу все настроить правильно. Это то, что я сделал на инстансе EC2

wget /usr/local http://packages.confluent.io/archive/3.1/confluent-oss-3.1.2-2.11.tar.gz -P ~/Downloads/
tar -zxvf ~/Downloads/confluent-oss-3.1.2-2.11.tar.gz -C ~/Downloads/
sudo mv ~/Downloads/confluent-3.1.2 /usr/local/confluent

/usr/local/confluent/etc/kafka-connect-elasticsearch

После этого я изменил kafka-connect-elasticsearch и установил свой URL-адрес эластичного поиска

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=AWSKafkaTutorialTopic
key.ignore=true
connection.url=https://search-abcdefg-risdfgdfgk-es-ex675zav7k6mmmqodfgdxxipg5cfsi.us-east-1.es.amazonaws.com
type.name=kafka-connect

Производитель отправляет сообщение, как показано ниже fomrat

{
        "data": {
                "RequestID":    517082653,
                "ContentTypeID":        9,
                "OrgID":        16145,
                "UserID":       4,
                "PromotionStartDateTime":       "2019-12-14T16:06:21Z",
                "PromotionEndDateTime": "2019-12-14T16:16:04Z",
                "SystemStartDatetime":  "2019-12-14T16:17:45.507000000Z"
        },
        "metadata":     {
                "timestamp":    "2019-12-29T10:37:31.502042Z",
                "record-type":  "data",
                "operation":    "insert",
                "partition-key-type":   "schema-table",
                "schema-name":  "dbo",
                "table-name":   "TRFSDIQueue"
        }
}

Я немного запутался в том, как здесь начнется подключение kafka? если да, как я могу это начать?

Я также запустил реестр схем, как показано ниже, что дало мне ошибку.

/usr/local/confluent/bin/schema-registry-start /usr/local/confluent/etc/schema-registry/schema-registry.properties

Когда я это сделаю, я получаю ошибку ниже

[2019-12-29 13:49:17,861] ERROR Server died unexpectedly:  (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51)
kafka.common.KafkaException: Failed to parse the broker info from zookeeper: {"listener_security_protocol_map":{"CLIENT":"PLAINTEXT","CLIENT_SECURE":"SSL","REPLICATION":"PLAINTEXT","REPLICATION_SECURE":"SSL"},"endpoints":["CLIENT:/

Пожалуйста помоги .

Как было предложено в ответе, я обновил версию подключения kafka, но затем я начал получать ошибку ниже

 ERROR Error starting the schema registry (io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication:63)
io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException: Error initializing kafka store while initializing schema registry
        at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:210)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.initSchemaRegistry(SchemaRegistryRestApplication.java:61)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:72)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:39)
        at io.confluent.rest.Application.createServer(Application.java:201)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:41)
Caused by: io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException: Timed out trying to create or validate schema topic configuration
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.createOrVerifySchemaTopic(KafkaStore.java:168)
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.init(KafkaStore.java:111)
        at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:208)
        ... 5 more
Caused by: java.util.concurrent.TimeoutException
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274)
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.createOrVerifySchemaTopic(KafkaStore.java:161)
        ... 7 more

In this article, we will explain how to resolve “SchemaRegistryTimeoutException: Timed out waiting for join group to complete” in Kafka cluster.

Kafka is one of the distributed mechanism for distributed message system. While I am connecting Kafka server & Zookeeper server getting below error in the Confluent Kafka multi node cluster along with Big Data environment.

ERROR : SchemaRegistryTimeoutException in Kafka

Error starting the schema registry (io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication:77)
io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException: io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryTimeoutException: Timed out waiting for join group to complete
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:310)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.initSchemaRegistry(SchemaRegistryRestApplication.java:75)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.configureBaseApplication(SchemaRegistryRestApplication.java:90)
at io.confluent.rest.Application.configureHandler(Application.java:217)
at io.confluent.rest.ApplicationServer.doStart(ApplicationServer.java:187)
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:72)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:43)
Caused by: io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryTimeoutException: Timed out waiting for join group to complete
at io.confluent.kafka.schemaregistry.masterelector.kafka.KafkaGroupMasterElector.init(KafkaGroupMasterElector.java:207)
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:305)

Solution 1:

First, check all services are in the Kafka cluster. In my case Zookeeper goes down.
Step 1 : Check the Zookeeper log files, in that log file it showing “java.netConnectionExeption: Connection refused” .

Step 2: Re-start the Zookeeper service in the cluster using below command:

zk restart

or

sudo /etc/init.d/zookeeper-server start

Step 3: After re-started ZK service then re-start Kafka service.

Note: In case still getting same error try to do below solution 2.

Solution 2:

Step 1: First, delete the all Zookeeper data and rebuild-again

Step 2: After that change Zookeeper service hostname  like localhost:2181. If you have localhost then change into fully qualified domain name (FQDN)

Step 3: Then setting up schema -registry connection like below :

listeners =https://FQDN: PORTNumber
kafkastore.connection.url= FQDN/IP:2181
kafkastore.topic=_schemas
debug=false

Step 4: Restart ZK server & Kafka Server using below commands:

nohup /usr/bin/kafka-server-start /etc/kafka/server.properties > /logdir/server.log 2>&1 &
nohup /usr/bin/zookeeper-server-start /etc/zookeeper/zookeeper_server.properties > /logdir/server.log 2>&1 &

Summary: The above resolutions are very simple to resolve the Kafka time – out exception for Kafka Admins/Developers for Kafka Confluent. Here we provided two solutions with simple steps for this type issues. After setting up schema – registry file then re-start both Zookeeper & Kafka servers in the Confluent Kafka cluster. Currently, Kafka is used for large data streaming with fortune companies in IT market with huge Kafka clusters in the Big Data environment for Kafka Professionals.

Понравилась статья? Поделить с друзьями:
  • Error initializing eleven rack midi engine
  • Error initializing dynmap bukkit version incompatible
  • Error initializing download engine торрент
  • Error initializing directx your system does not support the directx agility sdk windows
  • Error initializing directx wrc 6