JUST WRITE

Kafka 재시작 후 UNKNOWN_TOPIC_ID 에러?! 본문

MLOps/Kafka

Kafka 재시작 후 UNKNOWN_TOPIC_ID 에러?!

천재보단범재 2023. 6. 14. 00:02

Kafka 재시작 후 UNKNOWN_TOPIC_ID 에러?!

Kafka 재시작 후 UNKNOWN_TOPIC_ID 에러?!

최근에 Kafka Cluster를 여러 개를 구축하는 업무를 맡아서 하고 있었습니다.

각 Cluster를 3개의 Server에 Kafka를 설치하여 구성하고 있었습니다.

1개의 Cluster 중 1개의 Server가 재시작하여서 Kafka도 재시작하는 경우가 있었습니다.

(포스팅 주제가 다르니 Server 재시작 원인은 다루지 않겠습니다.)

Kafka를 재시작하고 Log를 살펴보니 아래와 같은 경고메시지가 눈에 띄었습니다.

$ tail -5000f ./kafka/logs/server.log
...
...
[2023-06-09 00:00:47,698] WARN [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition ***_***_***-0. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist. (kafka.server.ReplicaFetcherThread)
[2023-06-09 00:00:47,698] WARN [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition ***_***_***-0. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist. (kafka.server.ReplicaFetcherThread)
[2023-06-09 00:00:47,698] WARN [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition ***_***_***-0. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist. (kafka.server.ReplicaFetcherThread)
[2023-06-09 00:00:30,337] WARN [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition ***_***_***-0. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist. (kafka.server.ReplicaFetcherThread)
[2023-06-09 00:00:30,338] WARN [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition ***_***_***-0. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist. (kafka.server.ReplicaFetcherThread)
[2023-06-09 00:00:30,338] WARN [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Received UNKNOWN_TOPIC_ID from the leader for partition ***_***_***-0. This error may be returned transiently when the partition is being created or deleted, but it is not expected to persist. (kafka.server.ReplicaFetcherThread)
...
...

(Topic명은 보안상 ***로 처리하였습니다.)

leader parition에서 모르는 topic id를 받았다는 경고 메시지였습니다.

경고메시지긴 하였지만 해당 메시지를 살펴보기로 하였습니다.

제가 구성하고 있는 Kafka Cluster는 Zookeeper를 구성하였습니다.

Zookeeper의 Znode에 Kafka 관리를 위한 정보들을 저장합니다.

Zookeeper와 Kafka를 살펴보며 해당 경고메시지를 파악해 보았습니다.

Kafka와 Zookeeper?!

먼저 Apache Zookeeper에 대해서 간단히 소개하고자 합니다.

Yahoo에서 개발하여 Hadoop의 서브 프로젝트로 시작한 오프 소스입니다.

분산 Cluster 환경에서 Big Data를 관리하기 위한 Orchestration System입니다.

Kafka뿐만 아니라 HBase, SOLR, Spark, Nifi 등의 분산 시스템에서 활용되고 있습니다.

주요 역할은 아래와 같습니다.

  • Naming Registry(ZNode)
  • leadership election
  • Service 설정값 분산
  • Service 동기화

특히 Kafka에서는 Zookeeper의 Znode를 잘 활용합니다.

ACL, version number 등 Kafka의 cache 정보들을 저장하고 관리해 줍니다.

좀 더 Kafka와 Zookeeper의 동작을 살펴보면,,,

Controller 선출

Kafka는 standalone도 가능하지만 보통 Cluster를 구성합니다.

Kafka Cluster의 모든 Broker에는 Controller가 Service로 동작합니다.

하지만 실제로 Active 하고 있는 것은 선출된 1개입니다.

Kafka Controller는 아래와 같이 중요한 역할을 수행합니다.

  • partition 관리
  • partition leader 관리
  • replica 관리
  • partition 할당
  • ISR(In Sync Replica) 관리

이렇게 중요한 역할을 하는 Controller 선출을 Zookeeper가 맡아서 합니다.

Zookeeper Cli로 현재 어떤 Broker에서 Controller가 Active인지 알 수 있습니다.

$ [zk: localhost:2181(CONNECTED) 1] get /controller
{"version":1,"brokerid":1,"timestamp":"1686641386427"}

Cluster 관리

Kafka Cluster를 다수의 Broker로 구성되어 있습니다.

각 Broker마다 Unique ID를 지정해 줍니다.

$ cat kafka/config/server.properties
...
...
# The id of the broker. This must be set to a unique integer for broker.
broker.id=1
...
...

Zookeeper는 Znode를 활용하여 Cluster의 member를 관리합니다.

Broker 1개가 shut down 되면 해당 Znode도 삭제됩니다.

Topic 설정 관리

Kafka Topic 설정은 전역으로 설정도 가능하며 Topic별로 설정도 가능합니다.

server.properties로 설정한 값이 전역으로 설정됩니다.

Topic별로 따로 설정을 하지 않으면 전역 설정값을 따릅니다.

Topic에 대한 설정값을 Zookeeper에서 관리합니다.

Zookeeper Cli로 각 Topic 설정값을 확인할 수도 있습니다.

# 전역 설정값 따르는 topic
[zk: localhost:2181(CONNECTED) 11] get /config/topics/test-topic-setting-not-change
{"version":1,"config":{}}

# retetion.ms를 따로 설정한 topic
[zk: localhost:2181(CONNECTED) 12] get /config/topics/test-topic-setting-change
{"version":1,"config":{"retention.ms":"1"}}

ACL 정보 저장

Kafka에서 ACLs(Access Control Lists)를 제공합니다.

ACLs를 통해 Authentication, Authorization을 진행합니다.

Zookeeper에서 해당 ACLs 정보를 저장합니다.

그럼 UNKNOW_TOPIC_ID 에러는?!

결론부터 말씀드리지만 경고메시지였고 Kafka와 Zookeeper에서 다시 sync를 맞춰줍니다.

다만 Topic ID가 어떻게 관리되는지만 짚고 넘어가겠습니다.

Zookeeper에서 확인

먼저 zookeeper cli로 topic의 id를 확인해 보겠습니다.

# IP(localhost), PORT(2181)인 zookeeper에 접근
$ zookeeper/bin/zkCli.sh

# 그 외의 zookeeper로 접근
$ zookeeper/bin/zkCli.sh -server localhost:12181

java/jdk-11.0.2/bin/java
Connecting to localhost:2181
2023-06-09 00:13:28,301 [myid:] - INFO  [main:Environment@98] - Client environment:zookeeper.version=3.6.3--6401e4ad2087061bc6b9f80dec2d69f2e3c8660a, built on 04/08/2021 16:35 GMT
2023-06-09 00:13:28,304 [myid:] - INFO  [main:Environment@98] - Client environment:host.name=seh-tr1
2023-06-09 00:13:28,304 [myid:] - INFO  [main:Environment@98] - Client environment:java.version=11.0.2
2023-06-09 00:13:28,306 [myid:] - INFO  [main:Environment@98] - Client environment:java.vendor=Oracle Corporation
2023-06-09 00:13:28,306 [myid:] - INFO  [main:Environment@98] - Client environment:java.home=/home/manager/java/jdk-11.0.2
2023-06-09 00:13:28,306 [myid:] - INFO  [main:Environment@98] - Client environment:java.class.path=/home/manager/kafka-zk/bin/../zookeeper-server/target/classes:/home/manager/kafka-zk/bin/../build/classes:/home/manager/kafka-zk/bin/../zookeeper-server/target/lib/*.jar:/home/manager/kafka-zk/bin/../build/lib/*.jar:/home/manager/kafka-zk/bin/../lib/zookeeper-prometheus-metrics-3.6.3.jar:/home/manager/kafka-zk/bin/../lib/zookeeper-jute-3.6.3.jar:/home/manager/kafka-zk/bin/../lib/zookeeper-3.6.3.jar:/home/manager/kafka-zk/bin/../lib/snappy-java-1.1.7.jar:/home/manager/kafka-zk/bin/../lib/slf4j-log4j12-1.7.25.jar:/home/manager/kafka-zk/bin/../lib/slf4j-api-1.7.25.jar:/home/manager/kafka-zk/bin/../lib/simpleclient_servlet-0.6.0.jar:/home/manager/kafka-zk/bin/../lib/simpleclient_hotspot-0.6.0.jar:/home/manager/kafka-zk/bin/../lib/simpleclient_common-0.6.0.jar:/home/manager/kafka-zk/bin/../lib/simpleclient-0.6.0.jar:/home/manager/kafka-zk/bin/../lib/netty-transport-native-unix-common-4.1.63.Final.jar:/home/manager/kafka-zk/bin/../lib/netty-transport-native-epoll-4.1.63.Final.jar:/home/manager/kafka-zk/bin/../lib/netty-transport-4.1.63.Final.jar:/home/manager/kafka-zk/bin/../lib/netty-resolver-4.1.63.Final.jar:/home/manager/kafka-zk/bin/../lib/netty-handler-4.1.63.Final.jar:/home/manager/kafka-zk/bin/../lib/netty-common-4.1.63.Final.jar:/home/manager/kafka-zk/bin/../lib/netty-codec-4.1.63.Final.jar:/home/manager/kafka-zk/bin/../lib/netty-buffer-4.1.63.Final.jar:/home/manager/kafka-zk/bin/../lib/metrics-core-3.2.5.jar:/home/manager/kafka-zk/bin/../lib/log4j-1.2.17.jar:/home/manager/kafka-zk/bin/../lib/json-simple-1.1.1.jar:/home/manager/kafka-zk/bin/../lib/jline-2.14.6.jar:/home/manager/kafka-zk/bin/../lib/jetty-util-ajax-9.4.39.v20210325.jar:/home/manager/kafka-zk/bin/../lib/jetty-util-9.4.39.v20210325.jar:/home/manager/kafka-zk/bin/../lib/jetty-servlet-9.4.39.v20210325.jar:/home/manager/kafka-zk/bin/../lib/jetty-server-9.4.39.v20210325.jar:/home/manager/kafka-zk/bin/../lib/jetty-security-9.4.39.v20210325.jar:/home/manager/kafka-zk/bin/../lib/jetty-io-9.4.39.v20210325.jar:/home/manager/kafka-zk/bin/../lib/jetty-http-9.4.39.v20210325.jar:/home/manager/kafka-zk/bin/../lib/javax.servlet-api-3.1.0.jar:/home/manager/kafka-zk/bin/../lib/jackson-databind-2.10.5.1.jar:/home/manager/kafka-zk/bin/../lib/jackson-core-2.10.5.jar:/home/manager/kafka-zk/bin/../lib/jackson-annotations-2.10.5.jar:/home/manager/kafka-zk/bin/../lib/commons-cli-1.2.jar:/home/manager/kafka-zk/bin/../lib/audience-annotations-0.5.0.jar:/home/manager/kafka-zk/bin/../zookeeper-*.jar:/home/manager/kafka-zk/bin/../zookeeper-server/src/main/resources/lib/*.jar:/home/manager/kafka-zk/bin/../conf:
2023-06-09 00:13:28,306 [myid:] - INFO  [main:Environment@98] - Client environment:java.library.path=/usr/java/packages/lib:/usr/lib64:/lib64:/lib:/usr/lib
2023-06-09 00:13:28,306 [myid:] - INFO  [main:Environment@98] - Client environment:java.io.tmpdir=/tmp
2023-06-09 00:13:28,306 [myid:] - INFO  [main:Environment@98] - Client environment:java.compiler=<NA>
2023-06-09 00:13:28,306 [myid:] - INFO  [main:Environment@98] - Client environment:os.name=Linux
2023-06-09 00:13:28,307 [myid:] - INFO  [main:Environment@98] - Client environment:os.arch=amd64
2023-06-09 00:13:28,307 [myid:] - INFO  [main:Environment@98] - Client environment:os.version=5.15.0-43-generic
2023-06-09 00:13:28,307 [myid:] - INFO  [main:Environment@98] - Client environment:user.name=manager
2023-06-09 00:13:28,307 [myid:] - INFO  [main:Environment@98] - Client environment:user.home=/home/manager
2023-06-09 00:13:28,307 [myid:] - INFO  [main:Environment@98] - Client environment:user.dir=/data/kafka-logs/SEH_IPMS_ROBOT-0
2023-06-09 00:13:28,307 [myid:] - INFO  [main:Environment@98] - Client environment:os.memory.free=247MB
2023-06-09 00:13:28,309 [myid:] - INFO  [main:Environment@98] - Client environment:os.memory.max=256MB
2023-06-09 00:13:28,309 [myid:] - INFO  [main:Environment@98] - Client environment:os.memory.total=256MB
2023-06-09 00:13:28,312 [myid:] - INFO  [main:ZooKeeper@1006] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@221af3c0
2023-06-09 00:13:28,315 [myid:] - INFO  [main:X509Util@77] - Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation
2023-06-09 00:13:28,323 [myid:] - INFO  [main:ClientCnxnSocket@239] - jute.maxbuffer value is 1048575 Bytes
2023-06-09 00:13:28,329 [myid:] - INFO  [main:ClientCnxn@1736] - zookeeper.request.timeout value is 0. feature enabled=false
Welcome to ZooKeeper!
2023-06-09 00:13:28,346 [myid:localhost:12181] - INFO  [main-SendThread(localhost:12181):ClientCnxn$SendThread@1181] - Opening socket connection to server localhost/127.0.0.1:2181.
2023-06-09 00:13:28,347 [myid:localhost:12181] - INFO  [main-SendThread(localhost:12181):ClientCnxn$SendThread@1183] - SASL config status: Will not attempt to authenticate using SASL (unknown error)
JLine support is enabled
2023-06-09 00:13:28,356 [myid:localhost:12181] - INFO  [main-SendThread(localhost:12181):ClientCnxn$SendThread@1013] - Socket connection established, initiating session, client: /127.0.0.1:41834, server: localhost/127.0.0.1:2181
2023-06-09 00:13:28,364 [myid:localhost:12181] - INFO  [main-SendThread(localhost:12181):ClientCnxn$SendThread@1448] - Session establishment complete on server localhost/127.0.0.1:12181, session id = 0x10022723a240004, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0]

Zookeeper에는 아래와 같은 정보들이 있습니다.

이 중 kafka-manager는 예전에 다룬 적 있는 CMAK의 정보들입니다.

 

UI로 Kafka 관리하기

CMAK 설치 Kafka Cluster를 구축하고 나서 Kafka Broker를 컨트롤하려면 기본적으로 command로 컨트롤합니다. Kafka Bin 폴더 아래에 다양한 script를 제공해 줍니다. $ ll ~/kafka/bin -rwxr-xr-x 1 kafka kafka 1.4K Dec 21 21:

developnote-blog.tistory.com

[zk: localhost:12181(CONNECTED) 1] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, kafka-manager, latest_producer_id_block, log_dir_event_notification, zookeeper]

그중에서 Kafka Topic에 관련된 정보는 brokers에서 확인할 수 있습니다.

기본 Topic인 '__consumer_offesets'이 존재하고 나머지는 제가 생성한 Topic입니다.

보시면 test-topic1의 ID가 qlJqQvlAT7O5OgEylwJtuw인 것을 확인할 수 있습니다.

[zk: localhost:2181(CONNECTED) 30] ls /brokers/topics
[test-topic1, test-topic2, __consumer_offsets]

[zk: localhost:2181(CONNECTED) 1] ls /brokers/topics/test-topic1
[partitions]

[zk: localhost:2181(CONNECTED) 34] ls /brokers/topics/test-topic1/partitions
[0]

[zk: localhost:2181(CONNECTED) 35] ls /brokers/topics/test-topic1/partitions/0
[state]

[zk: localhost:2181(CONNECTED) 2] get /brokers/topics/test-topic1
{"partitions":{"0":[3,2]},"topic_id":"qlJqQvlAT7O5OgEylwJtuw","adding_replicas":{},"removing_replicas":{},"version":3}

[zk: localhost:12181(CONNECTED) 39] get /brokers/topics/test-topic1/partitions/0/state
{"controller_epoch":1,"leader":3,"version":1,"leader_epoch":0,"isr":[3,2]}

Kafka에서 확인

Kafka에서 Topic ID는 2군데에서 확인할 수 있습니다.

  • Kafka Topic Cli
  • Topic Log에서 metadata 파일

위 2군데에서 확인이 가능합니다.

manager@kafka-server1:~/kafka/bin$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic test-topic1 --describe
Topic: test-topic1  TopicId: qlJqQvlAT7O5OgEylwJtuw PartitionCount: 1       ReplicationFactor: 2    Configs: segment.bytes=1073741824,max.message.bytes=157286400
        Topic: test-topic1  Partition: 0    Leader: 3       Replicas: 3,2   Isr: 3,2

manager@kafka-server1:~/data/kafka-logs/test-topic1-0$ cat ~/data/kafka-logs/test-topic1-0/partition.metadata
version: 0
topic_id: qlJqQvlAT7O5OgEylwJtuw

정리

갑작스러운 Sever Shut down으로 Kafka Broker 1개가 중지되었습니다.

정확한 원인은 파악 못했지만 topic id의 sync가 맞지 않았던 거 같습니다.

UNKNOW_TOPIC_ID가 나오면 Zookeeper와 Kafka에서 해당 값들 sync가 맞는지 확인하는 것을 권장드립니다.

[참고사이트]

더보기

 

728x90
반응형

'MLOps > Kafka' 카테고리의 다른 글

큰일 났다! Out of Sync다?!?!  (0) 2023.07.27
Kafka야?!? 데이터 잘 처리하고 있니?!  (0) 2023.06.21
UI로 Kafka 관리하기  (0) 2023.03.16
Kafka Broker Log 관리  (0) 2023.01.10
Ansible를 통한 Kafka 설치  (0) 2022.12.18
Comments