Description
When subscribed to a set of topics and if the group authorization fails, the error is thrown. If I use the same kafka_consumer to unsubscribe and subscribe to the same set of topics with same conf, the error is not observed.
Rather we get an unauthorized error during the topic metadata refresh
How to reproduce
Did the below change with the rdkafka example code
diff --git a/examples/rdkafka_complex_consumer_example.cpp b/examples/rdkafka_complex_consumer_example.cpp
index 220f3d91..1dc309cb 100644
--- a/examples/rdkafka_complex_consumer_example.cpp
+++ b/examples/rdkafka_complex_consumer_example.cpp
@@ -466,6 +466,31 @@ int main (int argc, char **argv) {
delete msg;
}
+ /*
+ * Subscribe to topics
+ */
+ std::cerr << "UnSubscribingn";
+
+ err = consumer->unsubscribe();
+
+ /*
+ * Subscribe to topics
+ */
+ std::cerr << "Subscribing";
+
+ err = consumer->subscribe(topics);
+ run = 1;
+
+ /*
+ * Consume messages
+ */
+ while (run) {
+ RdKafka::Message *msg = consumer->consume(1000);
+ msg_consume(msg, NULL);
+ delete msg;
+ }
+
+
#ifndef _WIN32
alarm(10);
#endif
Output:
examples/rdkafka_complex_consumer_example_cpp -b kafka.bootstrap.servers=mwkafka-prod-01.nyc.firm.com:9092,mwkafka-prod-02.nyc.firm.com:9092,mwkafka-prod-01.dr.firm.com:9092,mwkafka-prod-02.dr.firm.com:9092,mwkafka-prod-01.tbd.firm.com:9092,mwkafka-prod-02.tbd.firm.com:9092 -g aaafps.new_cons_gp -X security.protocol=SASL_PLAINTEXT -X sasl.kerberos.service.name=kafka -X sasl.mechanism=GSSAPI -X sasl.kerberos.kinit.cmd='skinit --force' -X client.id=aaafps.cppcon aaafps.dhamodha.test-n-9
dhamodha acquired ticket: client dhamodha/hostbased@UNIX.firm.COM, server krbtgt/UNIX.firm.COM@UNIX.firm.COM
% Created consumer aaafps.cppcon#consumer-1
Consume failed: FindCoordinator response error: Group authorization failed.
UnSubscribing
Subscribingdhamodha acquired ticket: client dhamodha/hostbased@UNIX.DESfirmAW.COM, server krbtgt/UNIX.firm.COM@UNIX.firm.COM
dhamodha acquired ticket: client dhamodha/hostbased@UNIX.firm.COM, server krbtgt/UNIX.firm.COM@UNIX.firm.COM
dhamodha acquired ticket: client dhamodha/hostbased@UNIX.firm.COM, server krbtgt/UNIX.firm.COM@UNIX.firm.COM
dhamodha acquired ticket: client dhamodha/hostbased@UNIX.firm.COM, server krbtgt/UNIX.firm.COM@UNIX.firm.COM
Consume failed: Subscribed topic not available: aaafps.dhamodha.test-n-9: Broker: Topic authorization failed
% Consumed 0 messages (0 bytes)
- Tried with latest version and still the same behavior
Checklist
IMPORTANT: We will close issues where the checklist has not been completed.
Please provide the following information:
- librdkafka version (release number or git tag):
<REPLACE with e.g., v0.10.5 or a git sha. NOT "latest" or "current">
- Apache Kafka version:
<REPLACE with e.g., 0.10.2.3>
- librdkafka client configuration:
<REPLACE with e.g., message.timeout.ms=123, auto.reset.offset=earliest, ..>
- Operating system:
<REPLACE with e.g., Centos 5 (x64)>
- Provide logs (with
debug=..
as necessary) from librdkafka - Provide broker log excerpts
- Critical issue
I have a Kafka consumer which had been working for months but now I receive the following:
Broker: Group authorization failed
What might have changed in the environment that would cause this error?
GroupId looks to be a required field for any Kafka consumer so I’m not getting what is happening.
I’m using Confluent.Kafka 1.3.0
OneCricketeer
170k18 gold badges124 silver badges232 bronze badges
asked Jan 7, 2021 at 14:36
2
I have the same issue and the comment made by Mário Meyrelles helped me.
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "your-name-of-application-here");
This guy needed to have a starting prefix without which it would not permit.
answered Nov 19, 2021 at 8:36
1
I found my problem but it may not apply to others.
I first saw this error when running/debugging in VS.
After going around and around it turns out that in my environment, which is VS2017 on Win10, that Kafka under the covers uses Kerberos to get the credentials for the executing user and passes those credentials to validate access to Kafka. So in a Windows 10 environment that means Kerberos uses Active Directory and passes my AD id.
And no you can’t change and use an alternative like a Keytab file.
So bottom-line is that Kafka admins had made some changes to Windows AD groups which removed my NTID. The admins were trying to limit access by only system Ids. After discovering this I asked to be re-instated into the AD groups and all is well now.
answered Nov 19, 2021 at 14:28
user3297833user3297833
1112 silver badges9 bronze badges
Группа потребителей состоит из одного или нескольких потребителей. В принципе, у каждого потребителя должен быть groupId. Это можно указать при создании KafkaConsumer. Когда в группе потребителей только один потребитель, это можно рассматривать как одноранговую модель; когда есть несколько потребителей, ее можно рассматривать как модель публикации-подписки.
Для TopicPartition на стороне брокера раздел может использоваться только одним потребителем. Другими словами, предположим, что тема имеет 3 раздела (TopicA). В настоящее время в группе потребителей есть 4 потребителя, groupId которых является test. В это время каждый член группы подписан на TopicA. В настоящее время данные могут потреблять до 3 потребителей, поскольку существует только 3 тематических раздела.
Компонент координатора
Координатор представляет собой класс компонентов, который включает ConsumerCoordinator на стороне потребителя и GroupCoordinator на стороне брокера. На стороне брокера GroupCoordinator отвечает за: управление членами группы потребителей и отправку компенсаций. Отправленные потребителем смещения хранятся в Zookeeper в старой версии Kafka. В новой версии Kafka смещение потребления темы сохраняется в теме под названием __consumer_offsets. Это внутренняя тема Kafka, по умолчанию будет 50 разделов, и каждый раздел будет иметь по 3 копии.
Позже мы подробно объясним процесс взаимодействия ConsumerCoordinator на клиенте и Брокере. В основном это включает в себя то, как потребитель может присоединиться к группе и подачу компенсации.
ConsumerCoordinator основной процесс
Во-первых, когда мы вызываем KafkaConsumer.poll (), мы сначала вызываем ConsumerCoordinator.poll (), а затем мы также вызываем связанные операции отправки смещения. Для ConsumerCoordinator.poll (), который является записью на приведенном выше рисунке, давайте посмотрим на реализацию кода записи ConsumerCoordinator.poll ().
/**
* Когда потребитель присоединяется к группе, это гарантирует, что координатор группы известен, а потребитель уже присоединился к группе, что также используется для периодической подачи компенсации.
* Если время истечет, он вернется немедленно.
*/
public boolean poll(Timer timer) {
// Может обновлять информацию о метаданных подписки
maybeUpdateSubscriptionMetadata();
// для тестирования
invokeCompletedOffsetCommitCallbacks();
if (subscriptions.partitionsAutoAssigned()) {
/ ** Проверяем, нормально ли работает поток пульса, в случае сбоя потока пульса будет сгенерировано исключение; в противном случае обновите время вызова опроса * /
pollHeartbeat(timer.currentTimeMs());
// Координатор неизвестен, инициализируйте Consumer Coordinator
if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
return false;
}
// Определяем, нужно ли вам повторно присоединиться к группе. Если подписанный раздел меняется или назначенный раздел изменяется, требуется повторное присоединение
if (rejoinNeededOrPending()) {
// Поскольку существует состояние гонки между инициализированным обновлением метаданных и инициализированной перебалансировкой, необходимо убедиться, что обновление метаданных находится впереди. Это гарантирует, что перед присоединением к группе будет хотя бы один процесс сопоставления между темой подписки и темой брокера.
if (subscriptions.hasPatternSubscription()) {
// Для потребителей, подписанных с помощью сопоставления с образцом, когда тема создается, любой потребитель находит новую тему после обновления метаданных, это вызывает перебалансировку. Следовательно, в это время может быть большое количество операций по повторной балансировке, и следующая оценка времени отсрочки значительно снизит частоту повторной балансировки.
if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) == 0) {
this.metadata.requestUpdate();
}
if (!client.ensureFreshMetadata(timer)) {
return false;
}
maybeUpdateSubscriptionMetadata();
}
// Убедимся, что группа активна; присоединяемся к группе; присваиваем подписку разделу
if (!ensureActiveGroup(timer)) {
return false;
}
}
} else {
// For manually assigned partitions, if there are no ready nodes, await metadata.
// If connections to all nodes fail, wakeups triggered while attempting to send fetch
// requests result in polls returning immediately, causing a tight loop of polls. Without
// the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
// awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop.
// When group management is used, metadata wait is already performed for this scenario as
// coordinator is unknown, hence this check is not required.
if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {
client.awaitMetadataUpdate(timer);
}
}
// При установке автоматической фиксации, когда время достигнуто, выполнить автоматическую фиксацию
maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
return true;
}
Кратко опишите вышеуказанный процесс:
1. Основная функция sureCoordinatorReady () — отправить запрос GroupCoordinator и установить соединение.
2. Определите, нужно ли вам присоединиться к группе.Если раздел темы, на который вы подписаны, изменится или к группе присоединятся новые потребители, вам необходимо повторно войти в группу. В это время JoinGroup и SyncGroup отправляются через sureActiveGroup (), и получается TopicPartition, присвоенный себе.
3. Проверьте, нормальный ли поток пульса. Поток пульса должен регулярно посылать биения в GroupCoordinator. Если согласованный порог превышен, Потребитель будет считаться покидающим группу, и будет запущена перебалансировка.
4. Если смещение отправляется автоматически, смещение будет отправлено при достижении порогового значения времени.
Подробные сведения о функциях sureCoordinatorReady () и sureActiveGroup () будут объяснены позже.
Статус группы потребителей
Статус группы потребителей включает в себя следующие
Поток между различными состояниями группы потребителей выглядит следующим образом
Ниже приводится краткое описание процесса потока состояний: Сначала группа потребителей находится в состоянии «Пусто». При включении Rebalance она будет переведена в состояние RreparingRebalance и будет ждать, пока участники присоединятся к группе. Позже, когда член присоединяется к группе, он переходит в состояние CompletingRebalance и будет ждать плана распределения. После завершения распределения он перейдет в стабильное состояние для завершения начисления и баланса.
Когда новый член присоединяется к группе или член выходит, статус группы потребителей изменяется со Стабильного на статус PreparingRebalance. В это время всем членам необходимо повторно присоединиться к группе. Когда все участники выйдут из группы, статус изменится на Пусто. Условием для Kafka регулярного автоматического удаления просроченных смещений является то, что группа должна находиться в Пустом состоянии. Если группа потребителей отключена на долгое время (более 7 дней), Kafka может удалить ее в это время.
Установите TCP-соединение с брокером
Установление TCP-соединения с брокером достигается с помощью метода sureCoordinatorReady (). Давайте рассмотрим конкретную реализацию этого метода ниже.
/**
* Убедитесь, что координатор готов принять запрос (он подключен и может отправить запрос)
*/
protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
if (!coordinatorUnknown())
return true;
do {
// Получаем GroupCoordinator и устанавливаем соединение
final RequestFuture<Void> future = lookupCoordinator();
client.poll(future, timer);
if (!future.isDone()) {
// ran out of time
break;
}
// Если процесс приобретения не удался
if (future.failed()) {
if (future.isRetriable()) {
log.debug("Coordinator discovery failed, refreshing metadata");
client.awaitMetadataUpdate(timer);
} else
throw future.exception();
} else if (coordinator != null && client.isUnavailable(coordinator)) {
// Когда координатор найден, но соединение не удается, пометьте его как неработающий в данный момент и повторите попытку
markCoordinatorUnknown();
timer.sleep(rebalanceConfig.retryBackoffMs);
}
} while (coordinatorUnknown() && timer.notExpired());
return !coordinatorUnknown();
}
В приведенной выше записи метода основная логика установления TCP-соединения с брокером делегируется lookupCoordinator () для реализации.
/**
* Выберите самый простой узел и отправьте запрос groupCoordinator
*/
protected synchronized RequestFuture<Void> lookupCoordinator() {
if (findCoordinatorFuture == null) {
// Выбираем узел с наименьшим количеством подключений (притворяемся самым простаивающим)
Node node = this.client.leastLoadedNode();
if (node == null) {
log.debug("No broker available to send FindCoordinator request");
return RequestFuture.noBrokersAvailable();
} else
// Отправляем запрос и обрабатываем ответ
findCoordinatorFuture = sendFindCoordinatorRequest(node);
}
return findCoordinatorFuture;
}
Затем отправьте детали запроса GroupCoordinator следующим образом:
/**
* Отправить запрос координатора
* Найдите координатора группы и отправьте запрос GroupMetadata Брокеру.
*/
private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
// Инициализируем запрос GroupMetadata
log.debug("Sending FindCoordinator request to broker {}", node);
FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder(new FindCoordinatorRequestData().setKeyType(CoordinatorType.GROUP.id()).setKey(this.rebalanceConfig.groupId));
// Результат ответа после отправки запроса делегируется Обработчику для выполнения
return client.send(node, requestBuilder).compose(new FindCoordinatorResponseHandler());
}
// Обработка ответа GroupCoordinator и обратный вызов
private class FindCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
@Override
public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
log.debug("Received FindCoordinator response {}", resp);
clearFindCoordinatorFuture();
FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody();
Errors error = findCoordinatorResponse.error();
if (error == Errors.NONE) {
synchronized (AbstractCoordinator.this) {
int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.data().nodeId();
/ ** Если GroupCoordinator получен правильно, установить соединение и обновить время биения * /
AbstractCoordinator.this.coordinator = new Node(coordinatorConnectionId, findCoordinatorResponse.data().host(), findCoordinatorResponse.data().port());
log.info("Discovered group coordinator {}", coordinator);
// Инициализируем tcp-соединение
client.tryConnect(coordinator);
// Обновить время биения
heartbeat.resetSessionTimeout();
}
future.complete(null);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
} else {
log.debug("Group coordinator lookup failed: {}", findCoordinatorResponse.data().errorMessage());
future.raise(error);
}
}
@Override
public void onFailure(RuntimeException e, RequestFuture<Void> future) {
clearFindCoordinatorFuture();
super.onFailure(e, future);
}
}
Отправить запрос на присоединение к группе (процесс ребалансировки)
Потребителей, впервые присоединяющихся к группе, также можно рассматривать как разновидность Ребалансировки, которая содержит два типа запросов: запросы JoinGroup и SyncGroup. Давайте сначала посмотрим на процесс двух запросов:
Когда член группы присоединяется к группе, он отправляет запрос JoinGroup координатору. Тема, на которую вы хотите подписаться, будет указана в запросе, чтобы координатор мог собрать информацию о подписке всех участников. После сбора информации о подписке, как правило, первый участник, который отправляет запрос JoinGroup, автоматически называется Лидером. Лидер и секционированная ведущая копия не являются концепцией. Лидер здесь является лидером группы потребителей, и он будет отвечать за формулировку конкретного плана распределения разделов. Давайте посмотрим на реализацию исходного кода:
/**
* Убедитесь, что группа активна, и присоединитесь к группе
* Отправьте запросы JoinGroup и SyncGroup в GroupCoordinator и получите назначенный раздел темы
*/
boolean ensureActiveGroup(final Timer timer) {
// Убедитесь, что GroupCoordinator подключен, чтобы предотвратить отключение ранее установленного подключения
if (!ensureCoordinatorReady(timer)) {
return false;
}
// Запускаем поток отправки пульса (не обязательно отправка пульса, пульс будет отправлен только после выполнения условий)
startHeartbeatThreadIfNeeded();
// Отправляем запрос JoinGroup и обрабатываем возвращенную информацию
return joinGroupIfNeeded(timer);
}
Отправка запроса JoinGroup реализована в joinGroupIfNeeded ():
/**
* Отправить запрос JoinGroup + SyncGroup
*/
boolean joinGroupIfNeeded(final Timer timer) {
while (rejoinNeededOrPending()) {
if (!ensureCoordinatorReady(timer)) {
return false;
}
/ ** Триггер onJoinPrepare, включая приемник смещения и перебалансировку * /
if (needsJoinPrepare) {
onJoinPrepare(generation.generationId, generation.memberId);
needsJoinPrepare = false;
}
/ ** Инициализируем запрос JoinGroup и отправляем запрос * /
final RequestFuture<ByteBuffer> future = initiateJoinGroup();
client.poll(future, timer);
if (!future.isDone()) {
// we ran out of time
return false;
}
/ ** На этом этапе SyncGroup преуспела во времени * /
if (future.succeeded()) {
ByteBuffer memberAssignment = future.value().duplicate();
onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment);
// сбросить joinFuture в пустое
resetJoinGroupFuture();
needsJoinPrepare = true;
} else {
resetJoinGroupFuture();
final RuntimeException exception = future.exception();
if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException ||
exception instanceof IllegalGenerationException || exception instanceof MemberIdRequiredException)
continue;
else if (!future.isRetriable())
throw exception;
timer.sleep(rebalanceConfig.retryBackoffMs);
}
}
return true;
}
Давайте посмотрим на реализацию initiateJoinGroup () ниже:
/**
* Отправить запрос JoinGroup и добавить слушателя
*/
private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
if (joinFuture == null) {
/ ** Во время перебалансировки поток пульса останавливается * /
disableHeartbeatThread();
/ ** Отметить как ребалансировку * /
state = MemberState.REBALANCING;
joinFuture = sendJoinGroupRequest(); / ** Отправить запрос JoinGroup * /
joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
@Override
public void onSuccess(ByteBuffer value) {
synchronized (AbstractCoordinator.this) {
log.info("Successfully joined group with generation {}", generation.generationId);
/ ** Отметить Consumer как стабильную * /
state = MemberState.STABLE;
rejoinNeeded = false;
if (heartbeatThread != null)
heartbeatThread.enable();
}
}
@Override
public void onFailure(RuntimeException e) {
synchronized (AbstractCoordinator.this) {
/ ** Отметить потребителя как неподключенного * /
state = MemberState.UNJOINED;
}
}
});
}
return joinFuture;
}
Давайте посмотрим на метод отправки запроса sendJoinGroupRequest ():
/**
* Отправить запрос JoinGroup и вернуть результат распределения (реализовано в JoinGroupResponseHandler)
*/
RequestFuture<ByteBuffer> sendJoinGroupRequest() {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
// Отправляем запрос JoinGroup
log.info("(Re-)joining group");
JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
new JoinGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setSessionTimeoutMs(this.rebalanceConfig.sessionTimeoutMs)
.setMemberId(this.generation.memberId)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setProtocolType(protocolType())
.setProtocols(metadata())
.setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs)
);
log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
int joinGroupTimeoutMs = Math.max(rebalanceConfig.rebalanceTimeoutMs, rebalanceConfig.rebalanceTimeoutMs + 5000);
return client.send(coordinator, requestBuilder, joinGroupTimeoutMs).compose(new JoinGroupResponseHandler());
}
/**
* Обработчик для обработки ответа JoinGroup (синхронизация групповой информации)
*/
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
Errors error = joinResponse.error();
if (error == Errors.NONE) {
log.debug("Received successful JoinGroup response: {}", joinResponse);
sensors.joinLatency.record(response.requestLatencyMs());
synchronized (AbstractCoordinator.this) {
/ ** Если состояние Потребителя не перебалансируется в это время, это вызовет исключение * /
if (state != MemberState.REBALANCING) {
// if the consumer was woken up before a rebalance completes, we may have already left
// the group. In this case, we do not want to continue with the sync group.
future.raise(new UnjoinedGroupException());
} else {
AbstractCoordinator.this.generation = new Generation(joinResponse.data().generationId(), joinResponse.data().memberId(), joinResponse.data().protocolName());
/ ** JoinGroup выполнена успешно, ниже требуется SyncGroup для получения назначенного раздела темы * /
if (joinResponse.isLeader()) {
// Лидер выполнит план распространения и отправит запрос SyncGroup
onJoinLeader(joinResponse).chain(future);
} else {
onJoinFollower().chain(future);
}
}
}
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
log.debug("Attempt to join group rejected since coordinator {} is loading the group.", coordinator());
// backoff and retry
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
// reset the member id and retry immediately
resetGeneration();
log.debug("Attempt to join group failed due to unknown member id.");
future.raise(error);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR) {
// re-discover the coordinator and retry with backoff
markCoordinatorUnknown();
log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message());
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
log.error("Received fatal exception: group.instance.id gets fenced");
future.raise(error);
} else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
|| error == Errors.INVALID_SESSION_TIMEOUT
|| error == Errors.INVALID_GROUP_ID
|| error == Errors.GROUP_AUTHORIZATION_FAILED
|| error == Errors.GROUP_MAX_SIZE_REACHED) {
// log the error and re-throw the exception
log.error("Attempt to join group failed due to fatal error: {}", error.message());
if (error == Errors.GROUP_MAX_SIZE_REACHED) {
future.raise(new GroupMaxSizeReachedException(rebalanceConfig.groupId));
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
} else {
future.raise(error);
}
} else if (error == Errors.UNSUPPORTED_VERSION) {
log.error("Attempt to join group failed due to unsupported version error. Please unset field group.instance.id and retry" + "to see if the problem resolves");
future.raise(error);
} else if (error == Errors.MEMBER_ID_REQUIRED) {
// Broker requires a concrete member id to be allowed to join the group. Update member id
// and send another join group request in next cycle.
synchronized (AbstractCoordinator.this) {
AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, joinResponse.data().memberId(), null);
AbstractCoordinator.this.rejoinNeeded = true;
AbstractCoordinator.this.state = MemberState.UNJOINED;
}
future.raise(error);
} else {
// unexpected error, throw the exception
log.error("Attempt to join group failed due to unexpected error: {}", error.message());
future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
}
}
}
Давайте посмотрим на процесс отправки запроса SyncGroup:
/**
* Когда потребитель является подписчиком, отправьте SyncGroup, чтобы получить результат распределения
*/
private RequestFuture<ByteBuffer> onJoinFollower() {
// SyncGroup отправляет пустое сообщение
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(
new SyncGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(generation.memberId)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(generation.generationId)
.setAssignments(Collections.emptyList())
);
log.debug("Sending follower SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
return sendSyncGroupRequest(requestBuilder);
}
/**
* Когда потребитель является лидером, все экземпляры в группе выделяются, а результат назначения отправляется в GroupCoordinator через запрос SyncGroup
*/
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
try {
// perform the leader synchronization and send back the assignment for the group
/ ** Выполнить операцию присвоения * /
Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(), joinResponse.data().members());
List<SyncGroupRequestData.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>();
for (Map.Entry<String, ByteBuffer> assignment : groupAssignment.entrySet()) {
groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment().setMemberId(assignment.getKey()).setAssignment(Utils.toArray(assignment.getValue())));
}
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(
new SyncGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(generation.memberId)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(generation.generationId)
.setAssignments(groupAssignmentList)
);
log.debug("Sending leader SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
/ ** Отправить запрос группы синхронизации * /
return sendSyncGroupRequest(requestBuilder);
} catch (RuntimeException e) {
return RequestFuture.failure(e);
}
}
/**
* Отправьте запрос SyncGroup, чтобы получить информацию о распределении разделов
*/
private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest.Builder requestBuilder) {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
return client.send(coordinator, requestBuilder).compose(new SyncGroupResponseHandler());
}
private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
@Override
public void handle(SyncGroupResponse syncResponse, RequestFuture<ByteBuffer> future) {
Errors error = syncResponse.error();
if (error == Errors.NONE) {
sensors.syncLatency.record(response.requestLatencyMs());
future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
} else {
// Флаг соединения установлен в true
requestRejoin();
if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
// группа перебалансирована, задача не удалась
log.debug("SyncGroup failed because the group began another rebalance");
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
log.error("Received fatal exception: group.instance.id gets fenced");
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION) {
log.debug("SyncGroup failed: {}", error.message());
resetGeneration();
future.raise(error);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
log.debug("SyncGroup failed: {}", error.message());
markCoordinatorUnknown();
future.raise(error);
} else {
future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message()));
}
}
}
}
Отправить запрос на присоединение к группе (процесс ребалансировки)
Потребителей, впервые присоединяющихся к группе, также можно рассматривать как разновидность Ребалансировки, которая содержит два типа запросов: запросы JoinGroup и SyncGroup. Давайте сначала посмотрим на процесс двух запросов:
Когда член группы присоединяется к группе, он отправляет координатору запрос JoinGroup. Тема, на которую вы хотите подписаться, будет указана в запросе, чтобы координатор мог собрать информацию о подписке всех участников. После сбора информации о подписке, как правило, первый участник, который отправляет запрос JoinGroup, автоматически называется Лидером. Лидер и секционированная лидерная копия — это не одно и то же понятие. Лидер здесь является лидером группы потребителей, и он будет отвечать за формулировку конкретного плана распределения разделов. Давайте посмотрим на реализацию исходного кода:
/**
* Убедитесь, что группа активна, и присоединитесь к группе
* Отправьте запросы JoinGroup и SyncGroup в GroupCoordinator и получите назначенный раздел темы
*/
boolean ensureActiveGroup(final Timer timer) {
// Убедитесь, что GroupCoordinator подключен, чтобы предотвратить отключение ранее установленного подключения
if (!ensureCoordinatorReady(timer)) {
return false;
}
// Запускаем поток отправки пульса (не обязательно отправка пульса, пульс будет отправлен только после выполнения условий)
startHeartbeatThreadIfNeeded();
// Отправляем запрос JoinGroup и обрабатываем возвращенную информацию
return joinGroupIfNeeded(timer);
}
Отправка запроса JoinGroup реализована в joinGroupIfNeeded ():
/**
* Отправить запрос JoinGroup + SyncGroup
*/
boolean joinGroupIfNeeded(final Timer timer) {
while (rejoinNeededOrPending()) {
if (!ensureCoordinatorReady(timer)) {
return false;
}
/ ** Триггер onJoinPrepare, включая приемник смещения и перебалансировку * /
if (needsJoinPrepare) {
onJoinPrepare(generation.generationId, generation.memberId);
needsJoinPrepare = false;
}
/ ** Инициализируем запрос JoinGroup и отправляем запрос * /
final RequestFuture<ByteBuffer> future = initiateJoinGroup();
client.poll(future, timer);
if (!future.isDone()) {
// we ran out of time
return false;
}
/ ** На этом этапе SyncGroup преуспела во времени * /
if (future.succeeded()) {
ByteBuffer memberAssignment = future.value().duplicate();
onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment);
// сбросить joinFuture в пустое
resetJoinGroupFuture();
needsJoinPrepare = true;
} else {
resetJoinGroupFuture();
final RuntimeException exception = future.exception();
if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException ||
exception instanceof IllegalGenerationException || exception instanceof MemberIdRequiredException)
continue;
else if (!future.isRetriable())
throw exception;
timer.sleep(rebalanceConfig.retryBackoffMs);
}
}
return true;
}
Давайте посмотрим на реализацию initiateJoinGroup () ниже:
/**
* Отправить запрос JoinGroup и добавить слушателя
*/
private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
if (joinFuture == null) {
/ ** Во время перебалансировки поток пульса останавливается * /
disableHeartbeatThread();
/ ** Отметить как ребалансировку * /
state = MemberState.REBALANCING;
joinFuture = sendJoinGroupRequest(); / ** Отправить запрос на присоединение к группе * /
joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
@Override
public void onSuccess(ByteBuffer value) {
synchronized (AbstractCoordinator.this) {
log.info("Successfully joined group with generation {}", generation.generationId);
/ ** Отметить Consumer как стабильную * /
state = MemberState.STABLE;
rejoinNeeded = false;
if (heartbeatThread != null)
heartbeatThread.enable();
}
}
@Override
public void onFailure(RuntimeException e) {
synchronized (AbstractCoordinator.this) {
/ ** Отметить потребителя как неподключенного * /
state = MemberState.UNJOINED;
}
}
});
}
return joinFuture;
}
Давайте посмотрим на метод отправки запроса sendJoinGroupRequest ():
/**
* Отправить запрос JoinGroup и вернуть результат распределения (реализовано в JoinGroupResponseHandler)
*/
RequestFuture<ByteBuffer> sendJoinGroupRequest() {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
// Отправляем запрос JoinGroup
log.info("(Re-)joining group");
JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
new JoinGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setSessionTimeoutMs(this.rebalanceConfig.sessionTimeoutMs)
.setMemberId(this.generation.memberId)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setProtocolType(protocolType())
.setProtocols(metadata())
.setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs)
);
log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
int joinGroupTimeoutMs = Math.max(rebalanceConfig.rebalanceTimeoutMs, rebalanceConfig.rebalanceTimeoutMs + 5000);
return client.send(coordinator, requestBuilder, joinGroupTimeoutMs).compose(new JoinGroupResponseHandler());
}
/**
* Обработчик для обработки ответа JoinGroup (синхронизация групповой информации)
*/
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
Errors error = joinResponse.error();
if (error == Errors.NONE) {
log.debug("Received successful JoinGroup response: {}", joinResponse);
sensors.joinLatency.record(response.requestLatencyMs());
synchronized (AbstractCoordinator.this) {
/ ** Если состояние Потребителя не перебалансируется в это время, это вызовет исключение * /
if (state != MemberState.REBALANCING) {
// if the consumer was woken up before a rebalance completes, we may have already left
// the group. In this case, we do not want to continue with the sync group.
future.raise(new UnjoinedGroupException());
} else {
AbstractCoordinator.this.generation = new Generation(joinResponse.data().generationId(), joinResponse.data().memberId(), joinResponse.data().protocolName());
/ ** JoinGroup выполнена успешно, ниже требуется SyncGroup для получения назначенного раздела темы * /
if (joinResponse.isLeader()) {
// Лидер выполнит план распространения и отправит запрос SyncGroup
onJoinLeader(joinResponse).chain(future);
} else {
onJoinFollower().chain(future);
}
}
}
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
log.debug("Attempt to join group rejected since coordinator {} is loading the group.", coordinator());
// backoff and retry
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
// reset the member id and retry immediately
resetGeneration();
log.debug("Attempt to join group failed due to unknown member id.");
future.raise(error);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR) {
// re-discover the coordinator and retry with backoff
markCoordinatorUnknown();
log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message());
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
log.error("Received fatal exception: group.instance.id gets fenced");
future.raise(error);
} else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
|| error == Errors.INVALID_SESSION_TIMEOUT
|| error == Errors.INVALID_GROUP_ID
|| error == Errors.GROUP_AUTHORIZATION_FAILED
|| error == Errors.GROUP_MAX_SIZE_REACHED) {
// log the error and re-throw the exception
log.error("Attempt to join group failed due to fatal error: {}", error.message());
if (error == Errors.GROUP_MAX_SIZE_REACHED) {
future.raise(new GroupMaxSizeReachedException(rebalanceConfig.groupId));
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
} else {
future.raise(error);
}
} else if (error == Errors.UNSUPPORTED_VERSION) {
log.error("Attempt to join group failed due to unsupported version error. Please unset field group.instance.id and retry" + "to see if the problem resolves");
future.raise(error);
} else if (error == Errors.MEMBER_ID_REQUIRED) {
// Broker requires a concrete member id to be allowed to join the group. Update member id
// and send another join group request in next cycle.
synchronized (AbstractCoordinator.this) {
AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, joinResponse.data().memberId(), null);
AbstractCoordinator.this.rejoinNeeded = true;
AbstractCoordinator.this.state = MemberState.UNJOINED;
}
future.raise(error);
} else {
// unexpected error, throw the exception
log.error("Attempt to join group failed due to unexpected error: {}", error.message());
future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
}
}
}
Давайте посмотрим на процесс отправки запроса SyncGroup:
/**
* Когда потребитель является подписчиком, отправьте SyncGroup, чтобы получить результат распределения
*/
private RequestFuture<ByteBuffer> onJoinFollower() {
// SyncGroup отправляет пустое сообщение
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(
new SyncGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(generation.memberId)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(generation.generationId)
.setAssignments(Collections.emptyList())
);
log.debug("Sending follower SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
return sendSyncGroupRequest(requestBuilder);
}
/**
* Когда потребитель является лидером, все экземпляры в группе выделяются, а результат назначения отправляется в GroupCoordinator через запрос SyncGroup
*/
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
try {
// perform the leader synchronization and send back the assignment for the group
/ ** Выполнить операцию присвоения * /
Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(), joinResponse.data().members());
List<SyncGroupRequestData.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>();
for (Map.Entry<String, ByteBuffer> assignment : groupAssignment.entrySet()) {
groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment().setMemberId(assignment.getKey()).setAssignment(Utils.toArray(assignment.getValue())));
}
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(
new SyncGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(generation.memberId)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(generation.generationId)
.setAssignments(groupAssignmentList)
);
log.debug("Sending leader SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
/ ** Отправить запрос группы синхронизации * /
return sendSyncGroupRequest(requestBuilder);
} catch (RuntimeException e) {
return RequestFuture.failure(e);
}
}
/**
* Отправьте запрос SyncGroup, чтобы получить информацию о распределении разделов
*/
private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest.Builder requestBuilder) {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
return client.send(coordinator, requestBuilder).compose(new SyncGroupResponseHandler());
}
private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
@Override
public void handle(SyncGroupResponse syncResponse, RequestFuture<ByteBuffer> future) {
Errors error = syncResponse.error();
if (error == Errors.NONE) {
sensors.syncLatency.record(response.requestLatencyMs());
future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
} else {
// Флаг соединения установлен в true
requestRejoin();
if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
// группа перебалансирована, задача не удалась
log.debug("SyncGroup failed because the group began another rebalance");
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
log.error("Received fatal exception: group.instance.id gets fenced");
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION) {
log.debug("SyncGroup failed: {}", error.message());
resetGeneration();
future.raise(error);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
log.debug("SyncGroup failed: {}", error.message());
markCoordinatorUnknown();
future.raise(error);
} else {
future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message()));
}
}
}
}
Отправить запрос на присоединение к группе (процесс ребалансировки)
Потребителей, впервые присоединяющихся к группе, также можно рассматривать как разновидность Ребалансировки, которая содержит два типа запросов: запросы JoinGroup и SyncGroup. Давайте сначала посмотрим на процесс двух запросов:
Когда член группы присоединяется к группе, он отправляет запрос JoinGroup координатору. Тема, на которую вы хотите подписаться, будет указана в запросе, чтобы координатор мог собрать информацию о подписке всех участников. После сбора информации о подписке, как правило, первый участник, который отправляет запрос JoinGroup, автоматически называется Лидером. Лидер и секционированная лидерная копия — это не одно и то же понятие. Лидер здесь является лидером группы потребителей, и он будет отвечать за формулировку конкретного плана распределения разделов. Давайте посмотрим на реализацию исходного кода:
/**
* Убедитесь, что группа активна, и присоединитесь к группе
* Отправьте запросы JoinGroup и SyncGroup в GroupCoordinator и получите назначенный раздел темы
*/
boolean ensureActiveGroup(final Timer timer) {
// Убедитесь, что GroupCoordinator подключен, чтобы предотвратить отключение ранее установленного подключения
if (!ensureCoordinatorReady(timer)) {
return false;
}
// Запускаем поток отправки пульса (не обязательно отправка пульса, пульс будет отправлен только после выполнения условий)
startHeartbeatThreadIfNeeded();
// Отправляем запрос JoinGroup и обрабатываем возвращенную информацию
return joinGroupIfNeeded(timer);
}
Отправка запроса JoinGroup реализована в joinGroupIfNeeded ():
/**
* Отправить запрос JoinGroup + SyncGroup
*/
boolean joinGroupIfNeeded(final Timer timer) {
while (rejoinNeededOrPending()) {
if (!ensureCoordinatorReady(timer)) {
return false;
}
/ ** Триггер onJoinPrepare, включая приемник смещения и перебалансировку * /
if (needsJoinPrepare) {
onJoinPrepare(generation.generationId, generation.memberId);
needsJoinPrepare = false;
}
/ ** Инициализируем запрос JoinGroup и отправляем запрос * /
final RequestFuture<ByteBuffer> future = initiateJoinGroup();
client.poll(future, timer);
if (!future.isDone()) {
// we ran out of time
return false;
}
/ ** На этом этапе SyncGroup преуспела во времени * /
if (future.succeeded()) {
ByteBuffer memberAssignment = future.value().duplicate();
onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment);
// сбросить joinFuture в пустое
resetJoinGroupFuture();
needsJoinPrepare = true;
} else {
resetJoinGroupFuture();
final RuntimeException exception = future.exception();
if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException ||
exception instanceof IllegalGenerationException || exception instanceof MemberIdRequiredException)
continue;
else if (!future.isRetriable())
throw exception;
timer.sleep(rebalanceConfig.retryBackoffMs);
}
}
return true;
}
Давайте посмотрим на реализацию initiateJoinGroup () ниже:
/**
* Отправить запрос JoinGroup и добавить слушателя
*/
private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
if (joinFuture == null) {
/ ** Во время перебалансировки поток пульса останавливается * /
disableHeartbeatThread();
/ ** Отметить как ребалансировку * /
state = MemberState.REBALANCING;
joinFuture = sendJoinGroupRequest(); / ** Отправить запрос на присоединение к группе * /
joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
@Override
public void onSuccess(ByteBuffer value) {
synchronized (AbstractCoordinator.this) {
log.info("Successfully joined group with generation {}", generation.generationId);
/ ** Отметить Consumer как стабильную * /
state = MemberState.STABLE;
rejoinNeeded = false;
if (heartbeatThread != null)
heartbeatThread.enable();
}
}
@Override
public void onFailure(RuntimeException e) {
synchronized (AbstractCoordinator.this) {
/ ** Отметить потребителя как неподключенного * /
state = MemberState.UNJOINED;
}
}
});
}
return joinFuture;
}
Давайте посмотрим на метод отправки запроса sendJoinGroupRequest ():
/**
* Отправить запрос JoinGroup и вернуть результат распределения (реализовано в JoinGroupResponseHandler)
*/
RequestFuture<ByteBuffer> sendJoinGroupRequest() {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
// Отправляем запрос JoinGroup
log.info("(Re-)joining group");
JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
new JoinGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setSessionTimeoutMs(this.rebalanceConfig.sessionTimeoutMs)
.setMemberId(this.generation.memberId)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setProtocolType(protocolType())
.setProtocols(metadata())
.setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs)
);
log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
int joinGroupTimeoutMs = Math.max(rebalanceConfig.rebalanceTimeoutMs, rebalanceConfig.rebalanceTimeoutMs + 5000);
return client.send(coordinator, requestBuilder, joinGroupTimeoutMs).compose(new JoinGroupResponseHandler());
}
/**
* Обработчик для обработки ответа JoinGroup (синхронизация групповой информации)
*/
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
Errors error = joinResponse.error();
if (error == Errors.NONE) {
log.debug("Received successful JoinGroup response: {}", joinResponse);
sensors.joinLatency.record(response.requestLatencyMs());
synchronized (AbstractCoordinator.this) {
/ ** Если состояние Потребителя не перебалансируется в это время, это вызовет исключение * /
if (state != MemberState.REBALANCING) {
// if the consumer was woken up before a rebalance completes, we may have already left
// the group. In this case, we do not want to continue with the sync group.
future.raise(new UnjoinedGroupException());
} else {
AbstractCoordinator.this.generation = new Generation(joinResponse.data().generationId(), joinResponse.data().memberId(), joinResponse.data().protocolName());
/ ** JoinGroup выполнена успешно, ниже требуется SyncGroup для получения назначенного раздела темы * /
if (joinResponse.isLeader()) {
// Лидер выполнит план распространения и отправит запрос SyncGroup
onJoinLeader(joinResponse).chain(future);
} else {
onJoinFollower().chain(future);
}
}
}
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
log.debug("Attempt to join group rejected since coordinator {} is loading the group.", coordinator());
// backoff and retry
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
// reset the member id and retry immediately
resetGeneration();
log.debug("Attempt to join group failed due to unknown member id.");
future.raise(error);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR) {
// re-discover the coordinator and retry with backoff
markCoordinatorUnknown();
log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message());
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
log.error("Received fatal exception: group.instance.id gets fenced");
future.raise(error);
} else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
|| error == Errors.INVALID_SESSION_TIMEOUT
|| error == Errors.INVALID_GROUP_ID
|| error == Errors.GROUP_AUTHORIZATION_FAILED
|| error == Errors.GROUP_MAX_SIZE_REACHED) {
// log the error and re-throw the exception
log.error("Attempt to join group failed due to fatal error: {}", error.message());
if (error == Errors.GROUP_MAX_SIZE_REACHED) {
future.raise(new GroupMaxSizeReachedException(rebalanceConfig.groupId));
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
} else {
future.raise(error);
}
} else if (error == Errors.UNSUPPORTED_VERSION) {
log.error("Attempt to join group failed due to unsupported version error. Please unset field group.instance.id and retry" + "to see if the problem resolves");
future.raise(error);
} else if (error == Errors.MEMBER_ID_REQUIRED) {
// Broker requires a concrete member id to be allowed to join the group. Update member id
// and send another join group request in next cycle.
synchronized (AbstractCoordinator.this) {
AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, joinResponse.data().memberId(), null);
AbstractCoordinator.this.rejoinNeeded = true;
AbstractCoordinator.this.state = MemberState.UNJOINED;
}
future.raise(error);
} else {
// unexpected error, throw the exception
log.error("Attempt to join group failed due to unexpected error: {}", error.message());
future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
}
}
}
Давайте посмотрим на процесс отправки запроса SyncGroup:
/**
* Когда потребитель является подписчиком, отправьте SyncGroup, чтобы получить результат распределения
*/
private RequestFuture<ByteBuffer> onJoinFollower() {
// SyncGroup отправляет пустое сообщение
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(
new SyncGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(generation.memberId)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(generation.generationId)
.setAssignments(Collections.emptyList())
);
log.debug("Sending follower SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
return sendSyncGroupRequest(requestBuilder);
}
/**
* Когда потребитель является лидером, все экземпляры в группе выделяются, а результат назначения отправляется в GroupCoordinator через запрос SyncGroup
*/
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
try {
// perform the leader synchronization and send back the assignment for the group
/ ** Выполнить операцию присвоения * /
Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(), joinResponse.data().members());
List<SyncGroupRequestData.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>();
for (Map.Entry<String, ByteBuffer> assignment : groupAssignment.entrySet()) {
groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment().setMemberId(assignment.getKey()).setAssignment(Utils.toArray(assignment.getValue())));
}
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(
new SyncGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(generation.memberId)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(generation.generationId)
.setAssignments(groupAssignmentList)
);
log.debug("Sending leader SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
/ ** Отправить запрос группы синхронизации * /
return sendSyncGroupRequest(requestBuilder);
} catch (RuntimeException e) {
return RequestFuture.failure(e);
}
}
/**
* Отправьте запрос SyncGroup, чтобы получить информацию о распределении разделов
*/
private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest.Builder requestBuilder) {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
return client.send(coordinator, requestBuilder).compose(new SyncGroupResponseHandler());
}
private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
@Override
public void handle(SyncGroupResponse syncResponse, RequestFuture<ByteBuffer> future) {
Errors error = syncResponse.error();
if (error == Errors.NONE) {
sensors.syncLatency.record(response.requestLatencyMs());
future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
} else {
// Флаг соединения установлен в true
requestRejoin();
if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
// группа перебалансирована, задача не удалась
log.debug("SyncGroup failed because the group began another rebalance");
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
log.error("Received fatal exception: group.instance.id gets fenced");
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION) {
log.debug("SyncGroup failed: {}", error.message());
resetGeneration();
future.raise(error);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
log.debug("SyncGroup failed: {}", error.message());
markCoordinatorUnknown();
future.raise(error);
} else {
future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message()));
}
}
}
Несколько сценариев ребалансировки потребителей
Давайте сначала посмотрим на несколько представлений, вызванных Ребалансировкой: 1. Количество участников группы изменилось, 2. Количество подписанных тем изменилось, 3. Количество подписанных тем изменилось. Для работающего приложения в трех приведенных выше сценариях первый сценарий, скорее всего, вызовет перебалансировку. Давайте посмотрим на различные сценарии ребалансировки:
Новые участники присоединяются к группе
Члены группы покидают группу
Участник группы свернулся и покинул группу
Члены группы должны отправить смещения во время перебалансировки.
The other day my team ran into below issue. I did debugging to help them resolve this. The error was straight forward, issue with Authorization ie ACL (Access Control List) are not correctly setup.
[shri@xxxxx config]# /opt/kafka_2.11-0.10.1.1/bin/kafka-console-consumer.sh –bootstrap-server xxxxx:9093 —topic topic1 –consumer.config /opt/kafka_2.11-0.10.1.1/config/consumer-ssl.properties –from-beginning
[2017-06-08 23:06:15,290] WARN Error while fetching metadata with correlation id 1 : {topic1=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2017-06-08 23:06:15,292] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: group1
The way to debug issue,
1. List the ACL for the topic ie topic1 and consumer group ie group1
Current ACLs for resource `Topic:topic1`:
User:CN=consumer.shri.com,OU=IT,O=XXX,L=Austin,ST=TX,C=US has Allow permission for operations: Describe from hosts: *
User:CN=producer.shri.com,OU=IT,O=XXX,L=Austin,ST=TX,C=US has Allow permission for operations: Describe from hosts: *
User:CN=consumer.shri.com,OU=IT,O=XXX,L=Austin,ST=TX,C=US has Allow permission for operations: Read from hosts: *
User:CN=producer.shri.com,OU=IT,O=XXX,L=Austin,ST=TX,C=US has Allow permission for operations: Write from hosts: *
Current ACLs for resource `Group:group1`:
User:CN=consumer.shri.com,OU=IT,O=XXX,L=Dallas,ST=TX,C=US has Allow permission for operations: Read from hosts: *
2. List the certificate (keystore) being used by the consumer. As you can see there is minor difference in ACL setup for topic and group and cert being used by the consumer. ACL had location as Austin, while cert had location as Atlanta. We change cert to match what was in the ACL, that resolve the error.
The JKS subject is
[spatel@xxxxx config]# keytool -v -list -keystore test.jks
Certificate[1]:
Owner: CN=consumer.shri.com, OU=IT, O=XXX Inc., L=Atlanta, ST=TX, C=US
So its simple setup issue, make sure all the configuration and setup line up correctly.