JUST WRITE

Consumer야 살았니?! 죽었니?! - Kafka Consumer Heartbeats 본문

MLOps/Kafka

Consumer야 살았니?! 죽었니?! - Kafka Consumer Heartbeats

천재보단범재 2023. 11. 27. 21:46

Kafka Consumer HeartBeats

Consumer야 살았니?! 죽었니?!

Kafka Cluster를 운영하면서 다양한 상황에 직면하게 됩니다.

그중에서 가장 많은 경우가 Producer나 Consumer가 제대로 동작하지 않고 shutdown 되는 경우입니다.

이번에 다른 팀에서 Consumer를 처음 세팅하면서 조언을 구하는 경우가 있었습니다.

Consumer가 자꾸 shutdown된다는 거였습니다.

전달받은 log를 살펴보면


Heartbeat Thread closed.

위 문구를 발견할 수 있었습니다.

몇가지 Consumer 세팅값 변경을 권유해 주었습니다.

이후 저는 위 문구에 있는 Heartbeat Thread에 대해 궁금증이 생겼습니다.

 

Kafka Cluster에서 데이터(Lag)를 가져오려면 Consumer Client를 통해서 데이터를 가져옵니다.

Topic의 Partition당 하나의 Consumer가 Consuming으로 데이터를 가져옵니다.

Producer는 Kafka에게 Data를 보내고(publish)

Consumer는 Kafka에게서 Data를 가져옵니다.(subscribe)

이때 Consumer가 살았는지 죽었는지 Kafka에게 주기적으로 Heatbeat를 보내 체크합니다.

그 이유가 무엇인지 이번 포스팅에서 정리해보려고 합니다.

Consumer Group

Kafka는 Consumer Group를 통해 Consumer들을 관리합니다.

Consumer Group내 Consumer들이 Topic의 Partition을 맡아서 컨슈밍 합니다.

가장 이상적인 모습은 Topic의 Parition 개수와 Consumer Group내 Consumer의 수가 동일한 것입니다.

출처: https://www.instaclustr.com/blog/a-beginners-guide-to-kafka-consumers/

만약 Consumer Group의 Consumer의 수가 Partition 수가 많으면 노는 Consumer가 생기게 됩니다.

출처: https://www.instaclustr.com/blog/a-beginners-guide-to-kafka-consumers/

새로운 Consumer Group이 Topic을 컨슈밍 하면 Partition을 적절하게 나눠서 컨슈밍 합니다.

출처: https://www.instaclustr.com/blog/a-beginners-guide-to-kafka-consumers/

Consumer Group에 관련된 자세한 Logic은 다른 포스팅에서 정리하도록 하겠습니다.

여기서 Heartbeats를 보내는 이유를 짐작할 수 있습니다.

Consumer Group내 Consumer 1개가 dead 상태가 되면 Partition이 남게 됩니다.

그러면 다른 Consumer가 해당 Partition으로 컨슈밍 하도록 해야 합니다.

Kafka는 Heartbeat를 통해서 Consumer의 상태를 체크합니다.

Consumer Heatbeat

Kafka 초창기에는 Consumer가 실행되면 별도의 Process 없이 Kafka에게 Heartbeat를 보냈습니다.

출처 : https://chrzaszcz.dev/2019/06/kafka-heartbeat-thread/

Consumer가 poll이란 method를 호출하면 Heatbeat를 보냈습니다.

하지만 poll 호출 간격이 커지면 Heatbeat 보내는 간격도 커지게 되었습니다.

그래서 KIP-62에서 Heartbeat Thread를 소개합니다.

별도의 Heartbeat Thread를 통해서 일정 간격으로 Heartbeat를 보내게 됩니다.

출처 :https://chrzaszcz.dev/2019/06/kafka-heartbeat-thread/

해당 방식으로 poll 호출과 상관없이 Consumer는 Kafka에게 살아 있음을 전달할 수 있습니다.

Code 살펴보기

Kafka Consumer Client 코드를 살펴봄으로써 Heartbeat 과정을 살펴보려고 합니다.

ConsumerCoordinator에서 poll을 호출합니다.

중간에 pollHeartbeat를 호출하는 것을 알 수 있습니다.

    public boolean poll(Timer timer) {
        invokeCompletedOffsetCommitCallbacks();

        if (subscriptions.partitionsAutoAssigned()) {
            // Always update the heartbeat last poll time so that the heartbeat thread does not leave the
            // group proactively due to application inactivity even if (say) the coordinator cannot be found.
            pollHeartbeat(timer.currentTimeMs());
            if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
                return false;
            }

            if (rejoinNeededOrPending()) {
                // due to a race condition between the initial metadata fetch and the initial rebalance,
                // we need to ensure that the metadata is fresh before joining initially. This ensures
                // that we have matched the pattern against the cluster's topics at least once before joining.
                if (subscriptions.hasPatternSubscription()) {
                    // For consumer group that uses pattern-based subscription, after a topic is created,
                    // any consumer that discovers the topic after metadata refresh can trigger rebalance
                    // across the entire consumer group. Multiple rebalances can be triggered after one topic
                    // creation if consumers refresh metadata at vastly different times. We can significantly
                    // reduce the number of rebalances caused by single topic creation by asking consumer to
                    // refresh metadata before re-joining the group as long as the refresh backoff time has
                    // passed.
                    if (this.metadata.timeToAllowUpdate(time.milliseconds()) == 0) {
                        this.metadata.requestUpdate();
                    }

                    if (!client.ensureFreshMetadata(timer)) {
                        return false;
                    }
                }

                if (!ensureActiveGroup(timer)) {
                    return false;
                }
            }
        } else {
            // For manually assigned partitions, if there are no ready nodes, await metadata.
            // If connections to all nodes fail, wakeups triggered while attempting to send fetch
            // requests result in polls returning immediately, causing a tight loop of polls. Without
            // the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
            // awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop.
            // When group management is used, metadata wait is already performed for this scenario as
            // coordinator is unknown, hence this check is not required.
            if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {
                client.awaitMetadataUpdate(timer);
            }
        }

        maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
        return true;
    }

 

ConsumerCorrdinator는 AbstractCoordinator를 상속받아 정의되어 있습니다.

public final class ConsumerCoordinator extends AbstractCoordinator {
...

AbstractCoordinator를 살펴보면 heartbeatThread를 생성한 뒤 heartbeat를 poll 합니다.

그리고 AbstractCoordinator 생성자에서 HeartBeat 객체를 생성합니다.

public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig,
                        LogContext logContext,
                        ConsumerNetworkClient client,
                        Metrics metrics,
                        String metricGrpPrefix,
                        Time time,
                        Optional<ClientTelemetryReporter> clientTelemetryReporter) {
  Objects.requireNonNull(rebalanceConfig.groupId,
                      "Expected a non-null group id for coordinator construction");
  this.rebalanceConfig = rebalanceConfig;
  this.log = logContext.logger(this.getClass());
  this.client = client;
  this.time = time;
  this.retryBackoff = new ExponentialBackoff(
              rebalanceConfig.retryBackoffMs,
              CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
              rebalanceConfig.retryBackoffMaxMs,
              CommonClientConfigs.RETRY_BACKOFF_JITTER);
  this.heartbeat = new Heartbeat(rebalanceConfig, time);
  this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
  this.clientTelemetryReporter = clientTelemetryReporter;
}
...
...
...
protected synchronized void pollHeartbeat(long now) {
  if (heartbeatThread != null) {
    if (heartbeatThread.hasFailed()) {
    // set the heartbeat thread to null and raise an exception. If the user catches it,
    // the next call to ensureActiveGroup() will spawn a new heartbeat thread.
      RuntimeException cause = heartbeatThread.failureCause();
      heartbeatThread = null;
      throw cause;
    }
    // Awake the heartbeat thread if needed
    if (heartbeat.shouldHeartbeat(now)) {
      notify();
    }
    heartbeat.poll(now);
  }
}

HeartBeat Thread는 아래와 같이 정의되어 있습니다.

먼저 아래 3가지 상태로 나눠지는 것을 알 수 있습니다.

  • enable
  • disable
  • close
 private class HeartbeatThread extends KafkaThread implements AutoCloseable {
        private boolean enabled = false;
        private boolean closed = false;
        private final AtomicReference<RuntimeException> failed = new AtomicReference<>(null);

        private HeartbeatThread() {
            super(HEARTBEAT_THREAD_PREFIX + (rebalanceConfig.groupId.isEmpty() ? "" : " | " + rebalanceConfig.groupId), true);
        }

        public void enable() {
            synchronized (AbstractCoordinator.this) {
                log.debug("Enabling heartbeat thread");
                this.enabled = true;
                heartbeat.resetTimeouts();
                AbstractCoordinator.this.notify();
            }
        }

        public void disable() {
            synchronized (AbstractCoordinator.this) {
                log.debug("Disabling heartbeat thread");
                this.enabled = false;
            }
        }

        public void close() {
            synchronized (AbstractCoordinator.this) {
                this.closed = true;
                AbstractCoordinator.this.notify();
            }
        }

        private boolean hasFailed() {
            return failed.get() != null;
        }

        private RuntimeException failureCause() {
            return failed.get();
        }

        @Override
        public void run() {
 ...
 ...
 ...

HeartBeat Thread가 HeartBeat를 보내기 전에 몇 가지 상태를 체크합니다.

  • coordinator 상태
  • session timeout
  • poll timeout
if (coordinatorUnknown()) {
	if (findCoordinatorFuture != null) {
		clearFindCoordinatorFuture();
    } else {
		lookupCoordinator();
    }
	AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
} else if (heartbeat.sessionTimeoutExpired(now)) {
	markCoordinatorUnknown("session timed out without receiving a " + "heartbeat response");
} else if (heartbeat.pollTimeoutExpired(now)) {
	log.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " +
			"was longer than the configured max.poll.interval.ms, which typically implies that " +
			"the poll loop is spending too much time processing messages. You can address this " +
			"either by increasing max.poll.interval.ms or by reducing the maximum size of batches " +
			"returned in poll() with max.poll.records.");
	 maybeLeaveGroup("consumer poll timeout has expired.");
} else if (!heartbeat.shouldHeartbeat(now)) {
	AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);

이상이 없으면 그때 HeartBeat를 보냅니다(sendHeartbeatRequest())

} else {
	heartbeat.sentHeartbeat(now);
	final RequestFuture<Void> heartbeatFuture = sendHeartbeatRequest();
	heartbeatFuture.addListener(new RequestFutureListener<Void>() {
		@Override
		public void onSuccess(Void value) {
			synchronized (AbstractCoordinator.this) {
				heartbeat.receiveHeartbeat();
			}
		}

        @Override
		public void onFailure(RuntimeException e) {
			synchronized (AbstractCoordinator.this) {
				if (e instanceof RebalanceInProgressException) {
					// it is valid to continue heartbeating while the group is rebalancing. This
					// ensures that the coordinator keeps the member in the group for as long
					// as the duration of the rebalance timeout. If we stop sending heartbeats,
					// however, then the session timeout may expire before we can rejoin.
					heartbeat.receiveHeartbeat();
				} else if (e instanceof FencedInstanceIdException) {
					log.error("Caught fenced group.instance.id {} error in heartbeat thread", rebalanceConfig.groupInstanceId);
					heartbeatThread.failed.set(e);
				} else {
					heartbeat.failHeartbeat();
					// wake up the thread if it's sleeping to reschedule the heartbeat
					AbstractCoordinator.this.notify();
                }
            }
        }
    });
}

HeartBeat를 보내기 전에 먼저 세팅값들을 현재 시간으로 update 합니다.

HeartBeat 객체에서 sentHeartbeat 메서드를 호출해서 해당 update를 처리합니다.

void sentHeartbeat(long now) {
  lastHeartbeatSend = now;
  heartbeatInFlight = true;
  update(now);
  heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);

  if (log.isTraceEnabled()) {
    log.trace("Sending heartbeat request with {}ms remaining on timer", heartbeatTimer.remainingMs());
  }
}
...
...
...
private void update(long now) {
  heartbeatTimer.update(now);
  sessionTimer.update(now);
  pollTimer.update(now);
}

세팅값을 변경한 후 Heartbeat를 보냅니다.

sendHeartbeatRequest 메소드를 호출해서 보냅니다.

 synchronized RequestFuture<Void> sendHeartbeatRequest() {
   log.debug("Sending Heartbeat request with generation {} and member id {} to coordinator {}",
           generation.generationId, generation.memberId, coordinator);
   HeartbeatRequest.Builder requestBuilder =
        new HeartbeatRequest.Builder(new HeartbeatRequestData()
                        .setGroupId(rebalanceConfig.groupId)
                        .setMemberId(this.generation.memberId)
                        .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                        .setGenerationId(this.generation.generationId));
    return client.send(coordinator, requestBuilder)
               .compose(new HeartbeatResponseHandler(generation));
 }

정리

Kafka에서는 Consumer가 컨슈밍을 할 때 Kafka Cluster에게 주기적으로 Heartbeat를 전송합니다.

효율적인 Heartbeat 프로세스를 통해 별도의 Thread를 두어서 전송을 합니다.

이번 포스팅에서는 Consumer Heartbeat가 어떻게 진행되는지 정리해 보았습니다.

다음에는 Consumer Group에 관련해서 좀 더 정리해 보겠습니다.

[참고사이트]

728x90
반응형
Comments