일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- JavaScript
- aws s3
- jvm
- MAC address
- tcp
- grafana
- kubernetes
- AWS
- docker
- ip
- Packet
- Trino
- kubectl
- Python
- java
- airflow
- OS
- CVAT
- Spring
- Network
- EC2
- CSV
- Operating System
- Kafka
- log
- kubeadm
- PostgreSQL
- Vision
- helm
- zookeeper
- Today
- Total
JUST WRITE
Kafka야?!? 데이터 잘 처리하고 있니?! 본문
Kafka야?!! 데이터 잘 처리하고 있니?!
실시간 데이터 처리를 위해 Kafka를 운영하고 있습니다.
Kafka 모니터링을 위해 주로 Prometheus, Grafana를 통해서 합니다.
Kafka Exporter로 Metric을 수집, Prometheus로 보내고 Grafana에서 Dashboard를 구성해서 모니터링합니다.
보통 Topic별로 들어오는 데이터의 Bytes양, Topic의 Consumer의 Lag 등을 모니터링합니다.
이 방법 말고 Kafka가 데이터를 잘 처리하고 있는지 알 수 있는 방법이 이거밖에 없는지 궁금하였습니다.
그러다가 찾은 방법이 있었습니다.
Kafka 자체적으로 Performance를 테스트할 수 있는 방법이 있었습니다.
이번 포스팅에서 자체적으로 Kafka를 테스트를 정리해 보았습니다.
Kafka Produce Test
먼저 Kafka 프로듀싱으로 Traffic을 줘서 Kafka Broker의 성능을 테스트해 볼 수 있습니다.
Kafka script 중에서 kafka-producer-perf.test.sh로 가능합니다.
아래는 Kafka script 리스트와 kaka-producer-perf 안내문입니다.
$ ll kafka/bin
drwxr-xr-x 43 jeongsang-wan staff 1376 6 5 18:08 .
drwxr-xr-x 9 jeongsang-wan staff 288 6 5 18:08 ..
-rwxr-xr-x 1 jeongsang-wan staff 1423 6 5 18:03 connect-distributed.sh
-rwxr-xr-x 1 jeongsang-wan staff 1396 6 5 18:03 connect-mirror-maker.sh
-rwxr-xr-x 1 jeongsang-wan staff 1420 6 5 18:03 connect-standalone.sh
-rwxr-xr-x 1 jeongsang-wan staff 861 6 5 18:03 kafka-acls.sh
-rwxr-xr-x 1 jeongsang-wan staff 873 6 5 18:03 kafka-broker-api-versions.sh
-rwxr-xr-x 1 jeongsang-wan staff 871 6 5 18:03 kafka-cluster.sh
-rwxr-xr-x 1 jeongsang-wan staff 864 6 5 18:03 kafka-configs.sh
-rwxr-xr-x 1 jeongsang-wan staff 945 6 5 18:03 kafka-console-consumer.sh
-rwxr-xr-x 1 jeongsang-wan staff 944 6 5 18:03 kafka-console-producer.sh
-rwxr-xr-x 1 jeongsang-wan staff 871 6 5 18:03 kafka-consumer-groups.sh
-rwxr-xr-x 1 jeongsang-wan staff 959 6 5 18:03 kafka-consumer-perf-test.sh
-rwxr-xr-x 1 jeongsang-wan staff 882 6 5 18:03 kafka-delegation-tokens.sh
-rwxr-xr-x 1 jeongsang-wan staff 869 6 5 18:03 kafka-delete-records.sh
-rwxr-xr-x 1 jeongsang-wan staff 866 6 5 18:03 kafka-dump-log.sh
-rwxr-xr-x 1 jeongsang-wan staff 877 6 5 18:03 kafka-e2e-latency.sh
-rwxr-xr-x 1 jeongsang-wan staff 863 6 5 18:03 kafka-features.sh
-rwxr-xr-x 1 jeongsang-wan staff 865 6 5 18:03 kafka-get-offsets.sh
-rwxr-xr-x 1 jeongsang-wan staff 867 6 5 18:03 kafka-jmx.sh
-rwxr-xr-x 1 jeongsang-wan staff 870 6 5 18:03 kafka-leader-election.sh
-rwxr-xr-x 1 jeongsang-wan staff 863 6 5 18:03 kafka-log-dirs.sh
-rwxr-xr-x 1 jeongsang-wan staff 881 6 5 18:03 kafka-metadata-quorum.sh
-rwxr-xr-x 1 jeongsang-wan staff 873 6 5 18:03 kafka-metadata-shell.sh
-rwxr-xr-x 1 jeongsang-wan staff 862 6 5 18:03 kafka-mirror-maker.sh
-rwxr-xr-x 1 jeongsang-wan staff 959 6 5 18:03 kafka-producer-perf-test.sh
-rwxr-xr-x 1 jeongsang-wan staff 874 6 5 18:03 kafka-reassign-partitions.sh
-rwxr-xr-x 1 jeongsang-wan staff 874 6 5 18:03 kafka-replica-verification.sh
-rwxr-xr-x 1 jeongsang-wan staff 10884 6 5 18:03 kafka-run-class.sh
-rwxr-xr-x 1 jeongsang-wan staff 1376 6 5 18:03 kafka-server-start.sh
-rwxr-xr-x 1 jeongsang-wan staff 1361 6 5 18:03 kafka-server-stop.sh
-rwxr-xr-x 1 jeongsang-wan staff 860 6 5 18:03 kafka-storage.sh
-rwxr-xr-x 1 jeongsang-wan staff 956 6 5 18:03 kafka-streams-application-reset.sh
-rwxr-xr-x 1 jeongsang-wan staff 863 6 5 18:03 kafka-topics.sh
-rwxr-xr-x 1 jeongsang-wan staff 879 6 5 18:03 kafka-transactions.sh
-rwxr-xr-x 1 jeongsang-wan staff 958 6 5 18:03 kafka-verifiable-consumer.sh
-rwxr-xr-x 1 jeongsang-wan staff 958 6 5 18:03 kafka-verifiable-producer.sh
-rwxr-xr-x 1 jeongsang-wan staff 1714 6 5 18:03 trogdor.sh
drwxr-xr-x 35 jeongsang-wan staff 1120 6 5 18:03 windows
-rwxr-xr-x 1 jeongsang-wan staff 867 6 5 18:03 zookeeper-security-migration.sh
-rwxr-xr-x 1 jeongsang-wan staff 1393 6 5 18:03 zookeeper-server-start.sh
-rwxr-xr-x 1 jeongsang-wan staff 1366 6 5 18:03 zookeeper-server-stop.sh
-rwxr-xr-x 1 jeongsang-wan staff 1019 6 5 18:03 zookeeper-shell.sh
$ ./kafka-producer-perf-test.sh
usage: producer-performance [-h] --topic TOPIC --num-records NUM-RECORDS [--payload-delimiter PAYLOAD-DELIMITER]
--throughput THROUGHPUT [--producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]]
[--producer.config CONFIG-FILE] [--print-metrics] [--transactional-id TRANSACTIONAL-ID]
[--transaction-duration-ms TRANSACTION-DURATION] (--record-size RECORD-SIZE |
--payload-file PAYLOAD-FILE)
This tool is used to verify the producer performance.
optional arguments:
-h, --help show this help message and exit
--topic TOPIC produce messages to this topic
--num-records NUM-RECORDS
number of messages to produce
--payload-delimiter PAYLOAD-DELIMITER
provides delimiter to be used when --payload-file is provided. Defaults to new line. Note that this
parameter will be ignored if --payload-file is not provided. (default: \n)
--throughput THROUGHPUT
throttle maximum message throughput to *approximately* THROUGHPUT messages/sec. Set this to -1 to disable
throttling.
--producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]
kafka producer related configuration properties like bootstrap.servers,client.id etc. These configs take
precedence over those passed via --producer.config.
--producer.config CONFIG-FILE
producer config properties file.
--print-metrics print out metrics at the end of the test. (default: false)
--transactional-id TRANSACTIONAL-ID
The transactionalId to use if transaction-duration-ms is > 0. Useful when testing the performance of
concurrent transactions. (default: performance-producer-default-transactional-id)
--transaction-duration-ms TRANSACTION-DURATION
The max age of each transaction. The commitTransaction will be called after this time has elapsed.
Transactions are only enabled if this value is positive. (default: 0)
either --record-size or --payload-file must be specified but not both.
--record-size RECORD-SIZE
message size in bytes. Note that you must provide exactly one of --record-size or --payload-file.
--payload-file PAYLOAD-FILE
file to read the message payloads from. This works only for UTF-8 encoded text files. Payloads will be
read from this file and a payload will be randomly selected when sending messages. Note that you must
provide exactly one of --record-size or --payload-file.
주요 argument만 살펴보도록 하겠습니다.
Argument | 설명 |
topic | 프로듀싱할 topic명 |
num-record | 프로듀싱할 message 개수 |
record-size | message 사이즈 |
throughput | 메시지 처리량, -1이면 제한 두지 않고 처리 |
producer-props | producer 설정(ex. bootstrap-servers...) |
print-metrics | Test 종료 후 metric 출력 |
그럼 한번 shell script을 통해 프로듀싱으로 테스트를 진행해 보겠습니다.
$ ./kafka-producer-perf-test.sh ₩
--topic test_topic ₩
--num-records 1000 ₩
--throughput -1 ₩
--producer-props bootstrap.servers=kafka1:9092 max.request.size=10485760 ₩
--record-size 1048576 ₩
--print-metric
263 records sent, 52.5 records/sec (52.54 MB/sec), 405.0 ms avg latency, 577.0 ms max latency.
285 records sent, 56.9 records/sec (56.90 MB/sec), 551.1 ms avg latency, 582.0 ms max latency.
339 records sent, 67.7 records/sec (67.68 MB/sec), 457.4 ms avg latency, 541.0 ms max latency.
1000 records sent, 59.837243 records/sec (59.84 MB/sec), 470.39 ms avg latency, 582.00 ms max latency, 472 ms 50th, 572 ms 95th, 578 ms 99th, 582 ms 99.9th.
Metric Name Value
app-info:commit-id:{client-id=perf-producer-client} : 50029d3ed8ba576f
app-info:start-time-ms:{client-id=perf-producer-client} : 1687739710990
app-info:version:{client-id=perf-producer-client} : 3.2.3
...
...
...
print-metric 옵션을 넣으면 Metric 정보가 많이 나오는데 생략하였습니다.
1MB 사이즈를 가진 record를 보낼 때 대략 1초에 60개의 record를 보내는 것으로 결과가 나옵니다.
1000개의 99.9%면 999개까지 보낼 때 582ms정도 시간의 걸리는 것으로 나옵니다.
Kafka Consume Test
프로듀싱뿐만 아니라 컨슈밍도 테스트할 수 있는 shell script를 제공해 줍니다.
Kafka script 중에서 kafka-consumer-perf.test.sh로 가능합니다.
$ ./kafka-consumer-perf-test.sh --help
This tool helps in performance test for the full zookeeper consumer
Option Description
------ -----------
--bootstrap-server <String: server to REQUIRED unless --broker-list
connect to> (deprecated) is specified. The server
(s) to connect to.
--broker-list <String: broker-list> DEPRECATED, use --bootstrap-server
instead; ignored if --bootstrap-
server is specified. The broker
list string in the form HOST1:PORT1,
HOST2:PORT2.
--consumer.config <String: config file> Consumer config properties file.
--date-format <String: date format> The date format to use for formatting
the time field. See java.text.
SimpleDateFormat for options.
(default: yyyy-MM-dd HH:mm:ss:SSS)
--fetch-size <Integer: size> The amount of data to fetch in a
single request. (default: 1048576)
--from-latest If the consumer does not already have
an established offset to consume
from, start with the latest message
present in the log rather than the
earliest message.
--group <String: gid> The group id to consume on. (default:
perf-consumer-84403)
--help Print usage information.
--hide-header If set, skips printing the header for
the stats
--messages <Long: count> REQUIRED: The number of messages to
send or consume
--num-fetch-threads <Integer: count> DEPRECATED AND IGNORED: Number of
fetcher threads. (default: 1)
--print-metrics Print out the metrics.
--reporting-interval <Integer: Interval in milliseconds at which to
interval_ms> print progress info. (default: 5000)
--show-detailed-stats If set, stats are reported for each
reporting interval as configured by
reporting-interval
--socket-buffer-size <Integer: size> The size of the tcp RECV size.
(default: 2097152)
--threads <Integer: count> DEPRECATED AND IGNORED: Number of
processing threads. (default: 10)
--timeout [Long: milliseconds] The maximum allowed time in
milliseconds between returned
records. (default: 10000)
--topic <String: topic> REQUIRED: The topic to consume from.
--version Display Kafka version.
혹시나 테스트할 때 주의할 점이 테스트할 Topic에 메시지가 남아있어야 합니다.
아니면 아래처럼 timeout으로 테스트가 불가능합니다.
$ ./kafka-consumer-perf-test.sh --bootstrap-server kafka1:9092 --topic test-topic --messages 1000
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
WARNING: Exiting before consuming the expected number of messages: timeout (10000 ms) exceeded. You can use the --timeout option to increase the timeout.
2023-06-25 17:48:54:340, 2023-06-25 17:49:04:370, 0.0000, 0.0000, 0, 0.0000, 405, 9625, 0.0000, 0.0000
참고로 테스트하는 Topic은 아래 command에서 알 수 있듯이 partition 2, replication 2인 Topic입니다.
$ ./kafka-topics.sh --bootstrap-server kafka1:9092 --topic test-topic --describe
Topic: test-topic TopicId: rAHWx3dQT7au3kIW5WUSwA PartitionCount: 2 ReplicationFactor: 2 Configs: segment.bytes=1073741824,retention.ms=86400000,max.message.bytes=157286400
Topic: test-topic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: test-topic Partition: 1 Leader: 3 Replicas: 3,2 Isr: 3,2
테스트를 진행해 보겠습니다.
1000개의 Message를 fetch 하는데 110.2293ms가 소요되는 것을 알 수 있습니다.
$ ./kafka-consumer-perf-test.sh --bootstrap-server kafka1:9092 --topic test-topic --messages 1000
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2023-06-25 18:03:11:631, 2023-06-25 18:03:21:057, 1000.0000, 106.0895, 1000, 106.0895, 354, 9072, 110.2293, 110.2293
End-To-End Latency 테스트
End-To-End Latency는 Producer가 Kafka Broker에 Message를 보낼 때부터
Consumer가 Topic에서 Message를 가져오기까지 걸리는 시간입니다.
정확하게는 KafkaProducer.send() -> KafkaConsumer.poll()까지의 시간입니다.
Kafka를 실시간으로 처리용으로 사용 중이라면 필요한 테스트입니다.
End-To-End 단계는 아래 5가지 단계를 따릅니다.
- Produce : batch 사이즈만큼의 record 처리(process)
- Publish : Kafka Broker로 Producer가 Message(batch 사이즈의 records)를 send
- Commit : Message가 leader에 전달되고 follower까지 replicate
- Catch-up : offeset check
- Fetch : Broker에서 Message fetch
해당 테스트를 하려면 Producer와 Consumer를 만들어야 할까요?
Kafka에서는 End-To-End에 대해서도 테스트 Script를 제공해 줍니다.
EndToEndLatency Class를 제공하여 해당 Class를 이용하면 됩니다.
for (i <- 0 until numMessages) {
val message = randomBytesOfLen(random, messageLen)
val begin = System.nanoTime
//Send message (of random bytes) synchronously then immediately poll for it
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, message)).get()
val recordIter = consumer.poll(Duration.ofMillis(timeout)).iterator
val elapsed = System.nanoTime - begin
//Check we got results
if (!recordIter.hasNext) {
finalise()
throw new RuntimeException(s"poll() timed out before finding a result (timeout:[$timeout])")
}
//Check result matches the original record
val sent = new String(message, StandardCharsets.UTF_8)
val read = new String(recordIter.next().value(), StandardCharsets.UTF_8)
if (!read.equals(sent)) {
finalise()
throw new RuntimeException(s"The message read [$read] did not match the message sent [$sent]")
}
//Check we only got the one message
if (recordIter.hasNext) {
val count = 1 + recordIter.asScala.size
throw new RuntimeException(s"Only one result was expected during this test. We found [$count]")
}
//Report progress
if (i % 1000 == 0)
println(i.toString + "\t" + elapsed / 1000.0 / 1000.0)
totalTime += elapsed
latencies(i) = elapsed / 1000 / 1000
}
테스트를 위해서는 Topic이 있어야 합니다.
partition 1개, replication 2개로 설정된 Topic을 준비하였습니다.
$ ./kafka-run-class.sh kafka.tools.EndToEndLatency --help
USAGE: java kafka.tools.EndToEndLatency$ broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file
$ ./kafka-topics.sh --bootstrap-server kafka1:9092 --topic test-topic --describe
Topic: test-topic TopicId: Dt6PMEU3TsO-oPVXrdc8Vg PartitionCount: 1 ReplicationFactor: 2 Configs: segment.bytes=1073741824,max.message.bytes=157286400
Topic: test-topic Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3
위 help command 보면 아시겠지만 필요한 값들을 넣어줍니다.
아래 argument들을 차례대로 입력해 줍니다.
- broker_list : broker 접속 정보(ex. kafka1:9092)
- topic : 테스트할 Topic명
- num_messages : 테스트할 Message 개수
- producer_acks : producer 메시지 send 확인 옵션
- 0 : 따로 체크하지 않음
- 1 : leader가 받았는지만 확인, follower는 확인 X
- all(-1) : leader, follower까지 확인
- message_size_bytes : Message 사이즈
이번 테스트에서는 producer_acks만 다르게 설정해서 테스트해 보았습니다.
첫 번째는 1로 주어 leader만 확인하고 두 번째는 all로 주어 follower까지 확인하도록 하였습니다.
replication이 2인 topic이니 leader 1, follower 1이라 시간이 더 걸리지 않을까 예상하였습니다.
$ ./kafka-run-class.sh kafka.tools.EndToEndLatency kafka1:9092 test-topic 10000 1 1024
0 56.484224000000005
1000 1.989923
2000 2.134469
3000 1.909721
4000 1.71727
5000 1.948955
6000 1.9003160000000001
7000 1.955016
8000 1.937003
9000 1.665537
Avg latency: 1.8971 ms
Percentiles: 50th = 1, 99th = 2, 99.9th = 8
$ ./kafka-run-class.sh kafka.tools.EndToEndLatency kafka1:9092 test-topic 10000 all 1024
0 61.636155
1000 2.419481
2000 2.3488270000000004
3000 2.5352449999999997
4000 2.240652
5000 2.3738319999999997
6000 2.284329
7000 2.043909
8000 2.273963
9000 2.296845
Avg latency: 2.3781 ms
Percentiles: 50th = 2, 99th = 4, 99.9th = 8
평균 latency만 비교해 보아도 차이가 나는 것을 알 수 있습니다.
acks | 1 | all |
Avg latency | 1.8971 ms | 2.3781 ms |
follower까지 replicate 되는 것을 기다리고 확인까지 되는 시간이 더 걸린 것으로 추정됩니다.
정리
지금까지 Kafka에서 제공하는 shell script로 자체 성능 테스트 하는 방법을 살펴보았습니다.
Kafka가 실시간 처리용 도로 많이 사용하기 때문에 성능, latency 체크가 중요합니다.
테스트를 통해서 Kafka의 설정값을 디테일하게 조정해 보며 환경에 맞는 설정값을 찾는데 도움이 될 거 같습니다.
[참고사이트]
'MLOps > Kafka' 카테고리의 다른 글
Doc에 있는 JVM 설정 파헤치기 (0) | 2023.08.21 |
---|---|
큰일 났다! Out of Sync다?!?! (0) | 2023.07.27 |
Kafka 재시작 후 UNKNOWN_TOPIC_ID 에러?! (0) | 2023.06.14 |
UI로 Kafka 관리하기 (0) | 2023.03.16 |
Kafka Broker Log 관리 (0) | 2023.01.10 |