일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- kubectl
- Spring
- Packet
- EC2
- PostgreSQL
- Network
- tcp
- log
- kubernetes
- java
- Kafka
- OS
- jvm
- Python
- MAC address
- JavaScript
- helm
- Trino
- airflow
- CSV
- Operating System
- kubeadm
- zookeeper
- CVAT
- AWS
- docker
- grafana
- ip
- Vision
- aws s3
- Today
- Total
JUST WRITE
큰일 났다! Out of Sync다?!?! 본문
큰일 났다! Out of Sync다?!?!
Kafka Cluster를 운영하면 CMAK을 통해 Web에서 관리하는 경우가 많습니다.
저 역시 Kafka Cluster를 운영하면서 CMAK을 활용하고 있습니다.
CMAK에서 Under Replicated가 0이 아닌 50으로 나온 경우가 발생하였습니다.
Under Replicated가 0이 아니라는 것은 Topic의 Parition replicate가 설정한 대로 안된다는 것을 의미합니다.
Out of Sync가 발생한 상황입니다.
실제로 Kafka 로그가 아래와 같이 발생하고 있었습니다.
INFO [Partition __consumer_offsets-22 broker=3] Shrinking ISR from 1,3 to 3. Leader: (highWatermark: 358341750, endOffset: 358361171). Out of sync replicas: (brokerId: 1, endOffset: 358341750). (kafka.cluster.Partition)
INFO [Partition __consumer_offsets-22 broker=3] Cached zkVersion 4777 not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
INFO [Partition __consumer_offsets-6 broker=3] Shrinking ISR from 1,3 to 3. Leader: (highWatermark: 21644, endOffset: 21645). Out of sync replicas: (brokerId: 1, endOffset: 21644). (kafka.cluster.Partition)
INFO [Partition __consumer_offsets-6 broker=3] Cached zkVersion 4672 not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
INFO [Partition __consumer_offsets-10 broker=3] Shrinking ISR from 1,3 to 3. Leader: (highWatermark: 18012, endOffset: 18014). Out of sync replicas: (brokerId: 1, endOffset: 18012). (kafka.cluster.Partition)
이번 포스팅에서는 Out of Sync에 대해서 살펴보려고 합니다.
Out of Sync란
Kafka에서는 데이터를 Topic의 partition별로 저장합니다.
그리고 데이터를 안전하게 관리하기 위해 데이터를 복제를 합니다.
partition 단위로 저장된 데이터를 복제하는데, Broker별로 분산해서 복제를 해둡니다.
분산해서 복제해야 Broker가 shutdown되도 다른 Broker에 있는 데이터로 대체가능하기 때문입니다.
위 그림을 예를 두면 Broker 4개로 구성된 Kafka Cluster가 있습니다.
Topic replicate을 3으로 하면 위 그림처럼 partition-1의 데이터가 Broker 1,2,3에 3군데에 복제 저장됩니다.
Broker 1이 shutdown되도 2, 3에 데이터가 있기 때문에 안전합니다.
ISR
Kafka는 replication을 둬서 고가용성을 유지할 수 있습니다.
replication을 group으로 둬서 관리를 하는데 ISR(In-Sync-Replica)입니다.
위 그림에서 Topic의 partition-1이 Broker 1,2,3에 replicate 되어 있습니다.
partition-1의 ISR은 Broker 1,2,3입니다.
Kafka는 ISR를 확인하며 복제가 잘되고 있는 확인 합니다.
그럼 같은 데이터가 여러 곳에 있는데 어디 있는 것을 사용하면 될까요?
Kafka는 ISR 내에서 Leader, follower를 정해서 Leader를 통해서 데이터를 처리합니다.
Producer가 보낸 데이터를 Leader에서 가장 먼저 저장하고 이후 follwer에게 복제합니다.
만약 Leader인 Broker가 shutdown 되면 follwer 중에 Leader 선출하여 데이터를 처리합니다.
Kafka 데이터 고가용성을 위해서 Topic의 Replication 설정은 필수적입니다.
Out of Sync
그럼 Out of Sync는 뭘까요?
바로 follwer에게 데이터 복제가 안 되는 상황입니다.
Producer가 보낸 데이터는 먼저 leader에게 저장됩니다.
이후 follwer에게 복제되어야 하는데 복제가 안되는 것이 Out of Sync입니다.
복제가 안된 follwer는 Topic replicas에 속해있지만 ISR에는 빠지게 됩니다.
Out of Sync가 위에서 발생한 Under Replicated가 0이 아닌 상황입니다.
Out of Sync는 왜 생기나?!
Kafka는 어떤 기준으로 Out of Sync라고 판단할까요?
데이터가 크면 복제하는데 시간이 오래 걸립니다.
Leader와 follwer는 엄연히 다른 Broker이기 때문에 네트워크 이슈도 있습니다.
복제에는 다양한 이슈가 있기 때문에 Kafka는 복제되는 시간으로 판단합니다.
근데 처음에는 offset으로 판단하였습니다.
replica.lag.max.messages
leader와 follwer의 저장된 log의 offset 차이가 위 설정값보다 크면 Out of Sync로 판단하였습니다.
하지만 해당 설정값은 deprecated 되었습니다.
Kafka Document에서 확인할 수 있습니다.
replica.lag.time.max.ms
follwer가 leader에게 fetch request를 위 설정값 내에 보내지 않으면 ISR에서 빼버립니다.
기본값은 30000으로 30초입니다.
아래는 Kafka Replica.scala 코드 중에 있는 부분입니다.
def updateFetchState(
followerFetchOffsetMetadata: LogOffsetMetadata,
followerStartOffset: Long,
followerFetchTimeMs: Long,
leaderEndOffset: Long,
brokerEpoch: Long
): Unit = {
replicaState.updateAndGet { currentReplicaState =>
val lastCaughtUpTime = if (followerFetchOffsetMetadata.messageOffset >= leaderEndOffset) {
math.max(currentReplicaState.lastCaughtUpTimeMs, followerFetchTimeMs)
} else if (followerFetchOffsetMetadata.messageOffset >= currentReplicaState.lastFetchLeaderLogEndOffset) {
math.max(currentReplicaState.lastCaughtUpTimeMs, currentReplicaState.lastFetchTimeMs)
} else {
currentReplicaState.lastCaughtUpTimeMs
}
ReplicaState(
logStartOffset = followerStartOffset,
logEndOffsetMetadata = followerFetchOffsetMetadata,
lastFetchLeaderLogEndOffset = math.max(leaderEndOffset, currentReplicaState.lastFetchLeaderLogEndOffset),
lastFetchTimeMs = followerFetchTimeMs,
lastCaughtUpTimeMs = lastCaughtUpTime,
brokerEpoch = Option(brokerEpoch)
)
}
}
어떤 상황에서 발생할까?!
그럼 follwer가 leader에게 fetch request를 보내지 못하고 있을까요?
- 복제하는 데이터가 커서 시간이 걸려서 해당 시간 내 request를 못 보낸 경우
- Network 이슈로 데이터 저장이나 Requst가 오래 걸린 경우
- follower의 Disk 성능 이슈로 인한 데이터 복제가 오랜 걸린 경우
위 3가지 상황 말고도 여러 가지가 있을 수 있습니다.
정리
CMAK에서 Under Replicated가 0이 아닌 상황이 발생해서 Out of Sync에 대해서 정리해보았습니다.
Kafka에서 데이터 고가용성 유지는 중요하기 때문에 ISR은 이야기할 게 많은 주제입니다.
다음에 기회가 된다면 acks에 대해서도 정리해보겠습니다.
[참고사이트]
'MLOps > Kafka' 카테고리의 다른 글
1개로 부족해, 2개의 IP로 접근하기 (0) | 2023.09.05 |
---|---|
Doc에 있는 JVM 설정 파헤치기 (0) | 2023.08.21 |
Kafka야?!? 데이터 잘 처리하고 있니?! (0) | 2023.06.21 |
Kafka 재시작 후 UNKNOWN_TOPIC_ID 에러?! (0) | 2023.06.14 |
UI로 Kafka 관리하기 (0) | 2023.03.16 |