JUST WRITE

Kafka야?!? 데이터 잘 처리하고 있니?! 본문

MLOps/Kafka

Kafka야?!? 데이터 잘 처리하고 있니?!

천재보단범재 2023. 6. 21. 23:34

Kafka Performance 테스트

Kafka야?!! 데이터 잘 처리하고 있니?!

실시간 데이터 처리를 위해 Kafka를 운영하고 있습니다.

Kafka 모니터링을 위해 주로 Prometheus, Grafana를 통해서 합니다.

Kafka Exporter로 Metric을 수집, Prometheus로 보내고 Grafana에서 Dashboard를 구성해서 모니터링합니다.

출처 : https://grafana.com/solutions/kafka/monitor/

 

보통 Topic별로 들어오는 데이터의 Bytes양, Topic의 Consumer의 Lag 등을 모니터링합니다.

이 방법 말고 Kafka가 데이터를 잘 처리하고 있는지 알 수 있는 방법이 이거밖에 없는지 궁금하였습니다.

그러다가 찾은 방법이 있었습니다.

Kafka 자체적으로 Performance를 테스트할 수 있는 방법이 있었습니다.

이번 포스팅에서 자체적으로 Kafka를 테스트를 정리해 보았습니다.

Kafka Produce Test

먼저 Kafka 프로듀싱으로 Traffic을 줘서 Kafka Broker의 성능을 테스트해 볼 수 있습니다.

Kafka script 중에서 kafka-producer-perf.test.sh로 가능합니다.

아래는 Kafka script 리스트와 kaka-producer-perf 안내문입니다.

$ ll kafka/bin
drwxr-xr-x  43 jeongsang-wan  staff   1376  6  5 18:08 .
drwxr-xr-x   9 jeongsang-wan  staff    288  6  5 18:08 ..
-rwxr-xr-x   1 jeongsang-wan  staff   1423  6  5 18:03 connect-distributed.sh
-rwxr-xr-x   1 jeongsang-wan  staff   1396  6  5 18:03 connect-mirror-maker.sh
-rwxr-xr-x   1 jeongsang-wan  staff   1420  6  5 18:03 connect-standalone.sh
-rwxr-xr-x   1 jeongsang-wan  staff    861  6  5 18:03 kafka-acls.sh
-rwxr-xr-x   1 jeongsang-wan  staff    873  6  5 18:03 kafka-broker-api-versions.sh
-rwxr-xr-x   1 jeongsang-wan  staff    871  6  5 18:03 kafka-cluster.sh
-rwxr-xr-x   1 jeongsang-wan  staff    864  6  5 18:03 kafka-configs.sh
-rwxr-xr-x   1 jeongsang-wan  staff    945  6  5 18:03 kafka-console-consumer.sh
-rwxr-xr-x   1 jeongsang-wan  staff    944  6  5 18:03 kafka-console-producer.sh
-rwxr-xr-x   1 jeongsang-wan  staff    871  6  5 18:03 kafka-consumer-groups.sh
-rwxr-xr-x   1 jeongsang-wan  staff    959  6  5 18:03 kafka-consumer-perf-test.sh
-rwxr-xr-x   1 jeongsang-wan  staff    882  6  5 18:03 kafka-delegation-tokens.sh
-rwxr-xr-x   1 jeongsang-wan  staff    869  6  5 18:03 kafka-delete-records.sh
-rwxr-xr-x   1 jeongsang-wan  staff    866  6  5 18:03 kafka-dump-log.sh
-rwxr-xr-x   1 jeongsang-wan  staff    877  6  5 18:03 kafka-e2e-latency.sh
-rwxr-xr-x   1 jeongsang-wan  staff    863  6  5 18:03 kafka-features.sh
-rwxr-xr-x   1 jeongsang-wan  staff    865  6  5 18:03 kafka-get-offsets.sh
-rwxr-xr-x   1 jeongsang-wan  staff    867  6  5 18:03 kafka-jmx.sh
-rwxr-xr-x   1 jeongsang-wan  staff    870  6  5 18:03 kafka-leader-election.sh
-rwxr-xr-x   1 jeongsang-wan  staff    863  6  5 18:03 kafka-log-dirs.sh
-rwxr-xr-x   1 jeongsang-wan  staff    881  6  5 18:03 kafka-metadata-quorum.sh
-rwxr-xr-x   1 jeongsang-wan  staff    873  6  5 18:03 kafka-metadata-shell.sh
-rwxr-xr-x   1 jeongsang-wan  staff    862  6  5 18:03 kafka-mirror-maker.sh
-rwxr-xr-x   1 jeongsang-wan  staff    959  6  5 18:03 kafka-producer-perf-test.sh
-rwxr-xr-x   1 jeongsang-wan  staff    874  6  5 18:03 kafka-reassign-partitions.sh
-rwxr-xr-x   1 jeongsang-wan  staff    874  6  5 18:03 kafka-replica-verification.sh
-rwxr-xr-x   1 jeongsang-wan  staff  10884  6  5 18:03 kafka-run-class.sh
-rwxr-xr-x   1 jeongsang-wan  staff   1376  6  5 18:03 kafka-server-start.sh
-rwxr-xr-x   1 jeongsang-wan  staff   1361  6  5 18:03 kafka-server-stop.sh
-rwxr-xr-x   1 jeongsang-wan  staff    860  6  5 18:03 kafka-storage.sh
-rwxr-xr-x   1 jeongsang-wan  staff    956  6  5 18:03 kafka-streams-application-reset.sh
-rwxr-xr-x   1 jeongsang-wan  staff    863  6  5 18:03 kafka-topics.sh
-rwxr-xr-x   1 jeongsang-wan  staff    879  6  5 18:03 kafka-transactions.sh
-rwxr-xr-x   1 jeongsang-wan  staff    958  6  5 18:03 kafka-verifiable-consumer.sh
-rwxr-xr-x   1 jeongsang-wan  staff    958  6  5 18:03 kafka-verifiable-producer.sh
-rwxr-xr-x   1 jeongsang-wan  staff   1714  6  5 18:03 trogdor.sh
drwxr-xr-x  35 jeongsang-wan  staff   1120  6  5 18:03 windows
-rwxr-xr-x   1 jeongsang-wan  staff    867  6  5 18:03 zookeeper-security-migration.sh
-rwxr-xr-x   1 jeongsang-wan  staff   1393  6  5 18:03 zookeeper-server-start.sh
-rwxr-xr-x   1 jeongsang-wan  staff   1366  6  5 18:03 zookeeper-server-stop.sh
-rwxr-xr-x   1 jeongsang-wan  staff   1019  6  5 18:03 zookeeper-shell.sh
$ ./kafka-producer-perf-test.sh
usage: producer-performance [-h] --topic TOPIC --num-records NUM-RECORDS [--payload-delimiter PAYLOAD-DELIMITER]
                            --throughput THROUGHPUT [--producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]]
                            [--producer.config CONFIG-FILE] [--print-metrics] [--transactional-id TRANSACTIONAL-ID]
                            [--transaction-duration-ms TRANSACTION-DURATION] (--record-size RECORD-SIZE |
                            --payload-file PAYLOAD-FILE)

This tool is used to verify the producer performance.

optional arguments:
  -h, --help             show this help message and exit
  --topic TOPIC          produce messages to this topic
  --num-records NUM-RECORDS
                         number of messages to produce
  --payload-delimiter PAYLOAD-DELIMITER
                         provides delimiter to be used  when  --payload-file  is  provided.  Defaults  to  new  line. Note that this
                         parameter will be ignored if --payload-file is not provided. (default: \n)
  --throughput THROUGHPUT
                         throttle maximum message throughput to *approximately* THROUGHPUT  messages/sec.  Set this to -1 to disable
                         throttling.
  --producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...]
                         kafka producer related configuration properties  like  bootstrap.servers,client.id  etc. These configs take
                         precedence over those passed via --producer.config.
  --producer.config CONFIG-FILE
                         producer config properties file.
  --print-metrics        print out metrics at the end of the test. (default: false)
  --transactional-id TRANSACTIONAL-ID
                         The transactionalId to use if  transaction-duration-ms  is  >  0.  Useful  when  testing the performance of
                         concurrent transactions. (default: performance-producer-default-transactional-id)
  --transaction-duration-ms TRANSACTION-DURATION
                         The max age of  each  transaction.  The  commitTransaction  will  be  called  after  this time has elapsed.
                         Transactions are only enabled if this value is positive. (default: 0)

  either --record-size or --payload-file must be specified but not both.

  --record-size RECORD-SIZE
                         message size in bytes. Note that you must provide exactly one of --record-size or --payload-file.
  --payload-file PAYLOAD-FILE
                         file to read the message payloads from.  This  works  only  for  UTF-8 encoded text files. Payloads will be
                         read from this file and a payload  will  be  randomly  selected  when  sending messages. Note that you must
                         provide exactly one of --record-size or --payload-file.

주요 argument만 살펴보도록 하겠습니다.

Argument 설명
topic 프로듀싱할 topic명
num-record 프로듀싱할 message 개수
record-size message 사이즈
throughput 메시지 처리량, -1이면 제한 두지 않고 처리
producer-props producer 설정(ex. bootstrap-servers...)
print-metrics Test 종료 후 metric 출력

그럼 한번 shell script을 통해 프로듀싱으로 테스트를 진행해 보겠습니다.

$ ./kafka-producer-perf-test.sh ₩
--topic test_topic ₩ 
--num-records 1000 ₩
--throughput -1 ₩
--producer-props bootstrap.servers=kafka1:9092 max.request.size=10485760 ₩
--record-size 1048576 ₩
--print-metric

263 records sent, 52.5 records/sec (52.54 MB/sec), 405.0 ms avg latency, 577.0 ms max latency.
285 records sent, 56.9 records/sec (56.90 MB/sec), 551.1 ms avg latency, 582.0 ms max latency.
339 records sent, 67.7 records/sec (67.68 MB/sec), 457.4 ms avg latency, 541.0 ms max latency.
1000 records sent, 59.837243 records/sec (59.84 MB/sec), 470.39 ms avg latency, 582.00 ms max latency, 472 ms 50th, 572 ms 95th, 578 ms 99th, 582 ms 99.9th.

Metric Name                                                                                          Value
app-info:commit-id:{client-id=perf-producer-client}                                                : 50029d3ed8ba576f
app-info:start-time-ms:{client-id=perf-producer-client}                                            : 1687739710990
app-info:version:{client-id=perf-producer-client}                                                  : 3.2.3
...
...
...

print-metric 옵션을 넣으면 Metric 정보가 많이 나오는데 생략하였습니다.

1MB 사이즈를 가진 record를 보낼 때 대략 1초에 60개의 record를 보내는 것으로 결과가 나옵니다.

1000개의 99.9%면 999개까지 보낼 때 582ms정도 시간의 걸리는 것으로 나옵니다.

Kafka Consume Test

프로듀싱뿐만 아니라 컨슈밍도 테스트할 수 있는 shell script를 제공해 줍니다.

Kafka script 중에서 kafka-consumer-perf.test.sh로 가능합니다.

$ ./kafka-consumer-perf-test.sh --help
This tool helps in performance test for the full zookeeper consumer
Option                                   Description
------                                   -----------
--bootstrap-server <String: server to    REQUIRED unless --broker-list
  connect to>                              (deprecated) is specified. The server
                                           (s) to connect to.
--broker-list <String: broker-list>      DEPRECATED, use --bootstrap-server
                                           instead; ignored if --bootstrap-
                                           server is specified.  The broker
                                           list string in the form HOST1:PORT1,
                                           HOST2:PORT2.
--consumer.config <String: config file>  Consumer config properties file.
--date-format <String: date format>      The date format to use for formatting
                                           the time field. See java.text.
                                           SimpleDateFormat for options.
                                           (default: yyyy-MM-dd HH:mm:ss:SSS)
--fetch-size <Integer: size>             The amount of data to fetch in a
                                           single request. (default: 1048576)
--from-latest                            If the consumer does not already have
                                           an established offset to consume
                                           from, start with the latest message
                                           present in the log rather than the
                                           earliest message.
--group <String: gid>                    The group id to consume on. (default:
                                           perf-consumer-84403)
--help                                   Print usage information.
--hide-header                            If set, skips printing the header for
                                           the stats
--messages <Long: count>                 REQUIRED: The number of messages to
                                           send or consume
--num-fetch-threads <Integer: count>     DEPRECATED AND IGNORED: Number of
                                           fetcher threads. (default: 1)
--print-metrics                          Print out the metrics.
--reporting-interval <Integer:           Interval in milliseconds at which to
  interval_ms>                             print progress info. (default: 5000)
--show-detailed-stats                    If set, stats are reported for each
                                           reporting interval as configured by
                                           reporting-interval
--socket-buffer-size <Integer: size>     The size of the tcp RECV size.
                                           (default: 2097152)
--threads <Integer: count>               DEPRECATED AND IGNORED: Number of
                                           processing threads. (default: 10)
--timeout [Long: milliseconds]           The maximum allowed time in
                                           milliseconds between returned
                                           records. (default: 10000)
--topic <String: topic>                  REQUIRED: The topic to consume from.
--version                                Display Kafka version.

혹시나 테스트할 때 주의할 점이 테스트할 Topic에 메시지가 남아있어야 합니다.

아니면 아래처럼 timeout으로 테스트가 불가능합니다.

$ ./kafka-consumer-perf-test.sh --bootstrap-server kafka1:9092 --topic test-topic --messages 1000
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
WARNING: Exiting before consuming the expected number of messages: timeout (10000 ms) exceeded. You can use the --timeout option to increase the timeout.
2023-06-25 17:48:54:340, 2023-06-25 17:49:04:370, 0.0000, 0.0000, 0, 0.0000, 405, 9625, 0.0000, 0.0000

참고로 테스트하는 Topic은 아래 command에서 알 수 있듯이 partition 2, replication 2인 Topic입니다.

$ ./kafka-topics.sh  --bootstrap-server kafka1:9092 --topic test-topic --describe
Topic: test-topic TopicId: rAHWx3dQT7au3kIW5WUSwA PartitionCount: 2       ReplicationFactor: 2    Configs: segment.bytes=1073741824,retention.ms=86400000,max.message.bytes=157286400
        Topic: test-topic Partition: 0    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: test-topic Partition: 1    Leader: 3       Replicas: 3,2   Isr: 3,2

테스트를 진행해 보겠습니다.

1000개의 Message를 fetch 하는데 110.2293ms가 소요되는 것을 알 수 있습니다.

$ ./kafka-consumer-perf-test.sh --bootstrap-server kafka1:9092 --topic test-topic --messages 1000
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2023-06-25 18:03:11:631, 2023-06-25 18:03:21:057, 1000.0000, 106.0895, 1000, 106.0895, 354, 9072, 110.2293, 110.2293

End-To-End Latency 테스트

End-To-End LatencyProducer가 Kafka Broker에 Message를 보낼 때부터

Consumer가 Topic에서 Message를 가져오기까지 걸리는 시간입니다.

정확하게는 KafkaProducer.send() -> KafkaConsumer.poll()까지의 시간입니다.

Kafka를 실시간으로 처리용으로 사용 중이라면 필요한 테스트입니다.

End-To-End 단계는 아래 5가지 단계를 따릅니다.

  1. Produce : batch 사이즈만큼의 record 처리(process)
  2. Publish : Kafka Broker로 Producer가 Message(batch 사이즈의 records)를 send
  3. Commit : Message가 leader에 전달되고 follower까지 replicate
  4. Catch-up : offeset check
  5. Fetch : Broker에서 Message fetch

출처 :&nbsp;https://www.confluent.io/blog/configure-kafka-to-minimize-latency/

해당 테스트를 하려면 Producer와 Consumer를 만들어야 할까요?

Kafka에서는 End-To-End에 대해서도 테스트 Script를 제공해 줍니다.

EndToEndLatency Class를 제공하여 해당 Class를 이용하면 됩니다.

    for (i <- 0 until numMessages) {
      val message = randomBytesOfLen(random, messageLen)
      val begin = System.nanoTime

      //Send message (of random bytes) synchronously then immediately poll for it
      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, message)).get()
      val recordIter = consumer.poll(Duration.ofMillis(timeout)).iterator

      val elapsed = System.nanoTime - begin

      //Check we got results
      if (!recordIter.hasNext) {
        finalise()
        throw new RuntimeException(s"poll() timed out before finding a result (timeout:[$timeout])")
      }

      //Check result matches the original record
      val sent = new String(message, StandardCharsets.UTF_8)
      val read = new String(recordIter.next().value(), StandardCharsets.UTF_8)
      if (!read.equals(sent)) {
        finalise()
        throw new RuntimeException(s"The message read [$read] did not match the message sent [$sent]")
      }

      //Check we only got the one message
      if (recordIter.hasNext) {
        val count = 1 + recordIter.asScala.size
        throw new RuntimeException(s"Only one result was expected during this test. We found [$count]")
      }

      //Report progress
      if (i % 1000 == 0)
        println(i.toString + "\t" + elapsed / 1000.0 / 1000.0)
      totalTime += elapsed
      latencies(i) = elapsed / 1000 / 1000
    }

테스트를 위해서는 Topic이 있어야 합니다.

partition 1개, replication 2개로 설정된 Topic을 준비하였습니다.

$ ./kafka-run-class.sh kafka.tools.EndToEndLatency --help
USAGE: java kafka.tools.EndToEndLatency$ broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file

$ ./kafka-topics.sh --bootstrap-server kafka1:9092 --topic test-topic --describe
Topic: test-topic TopicId: Dt6PMEU3TsO-oPVXrdc8Vg PartitionCount: 1       ReplicationFactor: 2    Configs: segment.bytes=1073741824,max.message.bytes=157286400
Topic: test-topic Partition: 0    Leader: 1       Replicas: 1,3   Isr: 1,3

위 help command 보면 아시겠지만 필요한 값들을 넣어줍니다.

아래 argument들을 차례대로 입력해 줍니다.

  • broker_list : broker 접속 정보(ex. kafka1:9092)
  • topic : 테스트할 Topic명
  • num_messages : 테스트할 Message 개수
  • producer_acks : producer 메시지 send 확인 옵션
    • 0 : 따로 체크하지 않음
    • 1 : leader가 받았는지만 확인, follower는 확인 X
    • all(-1) : leader, follower까지 확인
  • message_size_bytes : Message 사이즈

이번 테스트에서는 producer_acks만 다르게 설정해서 테스트해 보았습니다.

첫 번째는 1로 주어 leader만 확인하고 두 번째는 all로 주어 follower까지 확인하도록 하였습니다.

replication이 2인 topic이니 leader 1, follower 1이라 시간이 더 걸리지 않을까 예상하였습니다.

$ ./kafka-run-class.sh kafka.tools.EndToEndLatency kafka1:9092 test-topic 10000 1 1024
0       56.484224000000005
1000    1.989923
2000    2.134469
3000    1.909721
4000    1.71727
5000    1.948955
6000    1.9003160000000001
7000    1.955016
8000    1.937003
9000    1.665537
Avg latency: 1.8971 ms

Percentiles: 50th = 1, 99th = 2, 99.9th = 8

$ ./kafka-run-class.sh kafka.tools.EndToEndLatency kafka1:9092 test-topic 10000 all 1024
0       61.636155
1000    2.419481
2000    2.3488270000000004
3000    2.5352449999999997
4000    2.240652
5000    2.3738319999999997
6000    2.284329
7000    2.043909
8000    2.273963
9000    2.296845
Avg latency: 2.3781 ms

Percentiles: 50th = 2, 99th = 4, 99.9th = 8

평균 latency만 비교해 보아도 차이가 나는 것을 알 수 있습니다.

acks 1 all
Avg latency 1.8971 ms 2.3781 ms

follower까지 replicate 되는 것을 기다리고 확인까지 되는 시간이 더 걸린 것으로 추정됩니다.

정리

지금까지 Kafka에서 제공하는 shell script로 자체 성능 테스트 하는 방법을 살펴보았습니다.

Kafka가 실시간 처리용 도로 많이 사용하기 때문에 성능, latency 체크가 중요합니다.

테스트를 통해서 Kafka의 설정값을 디테일하게 조정해 보며 환경에 맞는 설정값을 찾는데 도움이 될 거 같습니다.

[참고사이트]

 

728x90
반응형

'MLOps > Kafka' 카테고리의 다른 글

Doc에 있는 JVM 설정 파헤치기  (0) 2023.08.21
큰일 났다! Out of Sync다?!?!  (0) 2023.07.27
Kafka 재시작 후 UNKNOWN_TOPIC_ID 에러?!  (0) 2023.06.14
UI로 Kafka 관리하기  (0) 2023.03.16
Kafka Broker Log 관리  (0) 2023.01.10
Comments