일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- zookeeper
- tcp
- AWS
- jvm
- ip
- OS
- helm
- java
- kubernetes
- Packet
- airflow
- CSV
- Kafka
- kubectl
- log
- grafana
- EC2
- Spring
- Vision
- kubeadm
- Operating System
- docker
- CVAT
- PostgreSQL
- MAC address
- Python
- aws s3
- Network
- Trino
- JavaScript
- Today
- Total
JUST WRITE
어디까지 줄일 수 있니?! - Kafka Compression 본문
어디까지 줄일 수 있니?!
Kafka Cluster를 운영 업무를 맡아서 하고 있습니다.
여러 Cluster를 운영하고 있는데, 그중에서 해외 사이트에 설치된 Kafka Cluster도 있습니다.
해외사이트의 Kafka에서 국내로 Consuming해서 데이터를 처리해야 될 상황이었습니다.
Network Bandwidth 때문에 아무리 Consuming 튜닝해도 데이터 전송에 제한이 있었습니다.
여러 가지 방안을 찾던 중 Message Compression을 좀 더 효과적으로 하는 방향을 고민해 보았습니다.
이번 포스팅에서는 위 이슈를 해결하기 위해 공부한 Message Compression을 정리해보려고 합니다.
Kafka Compression
Kafka는 실시간 데이터 처리하는 데 많이 사용되고 있습니다.
Producer -> Broker, Broker -> Consumer와 Message를 주고받습니다.
Message를 주고받을 때, 한 번 Network 통신을 할 때 효율적으로 보내는 게 좋습니다.
Kafka는 한 번 통신할 때 여러 개의 Record를 압축해서 보내는 방안을 생각하게 됩니다.
현재 버전(3.6.1)에서는 아래 4가지 압축 타입을 지원하고 있습니다.
- gzip
- snappy
- lz4
- zstd
압축은 Producer나 Broker에서 진행하게 됩니다.
Producer Compression
Producer에서 Message를 압축할 수 있습니다.
그러면 Producer와 Broker 간의 통신에서 Message를 효율적으로 보낼 수 있습니다.
대신 Producer에서 압축하는데 부하가 생기게 됩니다.
Producer는 Partition단위로 Message를 압축합니다.
Producer는 Topic의 Leader와 통신해서 Message를 보냅니다.
Partition이 고르지 않고 편향되게 Message가 분포된다면 비효율적일 수도 있습니다.
Producer에서 압축한 Message를 Broker에서 압축을 해제하고 Validation 체크를 합니다.(LogValidator)
Broker에서 설정한 압축타입이 Producer와 다르면 하게 됩니다.
Producer에서 아래 설정값들로 압축률을 결정하게 됩니다.
- compression.type
- batch.size
- linger.ms
compression.type은 어떤 압축 타입으로 압축할지 결정하는 설정입니다.
batch.size는 압축할 때 최대 어느 size까지 모아서 압축할지 결정하는 설정입니다.
linger.ms는 Message를 한 번 보내기 전에 어느 시간 동안 Message를 모아서 압축할지 결정하는 설정입니다.
batch.size와 linger.ms는 연관이 엄청 깊습니다.
하나라도 설정값에 충족하면 Producer는 해당 설정값만큼 Message를 모아 압축해서 Broker로 보냅니다.
batch.size가 16K이고 linger.ms가 0이면 모을 것도 없이 Message 하나만 압축해서 보냅니다.
적절한 설정으로 압축을 진행해야 합니다.
압축을 많이 해서 한 번에 보내면 Network 부하는 줄지만,
Broker나 Consumer에서 압축을 해제하는 부하도 생각해야 합니다.
compression.type 설정
Producer와 Topic에서 모두 compression.type을 설정할 수 있습니다.
Topic에서는 compression.type으로 producer로 설정값을 지정할 수 있습니다.
producer로 설정한 압축 타입을 따르는 설정값입니다.
통신 구간 별 압축 타입을 Producer, Broker, Consumer에서 아래 표로 정리하였습니다.
Producer - Topic(Broker) | Topic(Broker) - Consumer |
Producer에서 설정한 압축 타입으로 통신 | Topic에서 설정한 압축 타입으로 통신 |
Producer와 Topic의 압축 타입이 다르면 Broker에서 다시 압축을 하게 됩니다.
Producer에서 gzip으로 압축해서 보낸 다음 Broker에서 zstd로 압축을 할 수도 있습니다.
테스트를 직접 진행해 보았습니다.
zstd로 압축 타입을 설정한 Topic을 만들고 Producer로 gzip타입으로 해당 Topic으로 Message를 보냈습니다.
Partition에 저장된 log를 확인해 보면 zstd로 저장되어 있는 것을 확인할 수 있습니다.
$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic com-test1 --config compression.type=zstd --create
Created topic com-test1.
$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic com-test1 --describe
Topic: com-test1 TopicId: l8g9cV9CQLinbMHVn6DxSQ PartitionCount: 1 ReplicationFactor: 1 Configs: compression.type=zstd
Topic: com-test1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
$ ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic com-test1 --compression-codec gz
>123
>456
>789
$ ./kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/com-test1-0/00000000000000000000.log --print-data-log
Log starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: 0 lastSequence: 0 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1703327082282 size: 83 magic: 2 compresscodec: zstd crc: 2847074886 isvalid: true
| offset: 0 CreateTime: 1703327082282 keySize: -1 valueSize: 3 sequence: 0 headerKeys: [] payload: 123
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: 1 lastSequence: 1 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 83 CreateTime: 1703327083767 size: 83 magic: 2 compresscodec: zstd crc: 1386318322 isvalid: true
| offset: 1 CreateTime: 1703327083767 keySize: -1 valueSize: 3 sequence: 1 headerKeys: [] payload: 456
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: 2 lastSequence: 2 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 166 CreateTime: 1703327085657 size: 83 magic: 2 compresscodec: zstd crc: 184232809 isvalid: true
| offset: 2 CreateTime: 1703327085657 keySize: -1 valueSize: 3 sequence: 2 headerKeys: [] payload: 789
가장 추천할 방식은 Producer에서 압축 타입을 정하고 Topic에서 producer로 설정을 하는 것입니다.
- Producer - Topic - Consumer 전 구간을 압축한 Message로 통신
- Broker 압축 부하 X
타입별 압축률
Kafka에서는 총 4가지 압축 타입을 제공합니다.
타입별로 장단점이 있습니다.
아래는 IBM Developer에서 정리해 준 타입별 장단점입니다.
Type | Compression Ratio | CPU Usage | Compression Speed | Network Bandwidth Usage |
gzip | Highest | Highest | Slowest | Lowest |
snappy | Medium | Moderate | Moderate | Medium |
lz4 | Low | Lowest | Fastest | Highest |
zstd | Medium | Moderate | Moderate | Medium |
gzip이 압축률은 좋지만 압축하는 시간과 Resource가 많이 듭니다.
lz4는 압축률은 가장 안 좋지만 시간과 Resource는 적게 듭니다.
저의 경우 Network Bandwidth 문제이기 때문에 gzip이 적합해 보입니다.
Topic을 만들어서 직접 테스트를 진행해보려 합니다.
압축 비교
25MB json 파일을 준비하였습니다.
-rw-rw-r-- 1 ubuntu ubuntu 25M Dec 23 10:33 large-file.json
gzip, lz4, zstd 3가지 압축 타입을 비교해 보겠습니다.
compression.type을 각기 다르게 설정해서 3개의 Topic을 만들었습니다.
$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic com-zstd --config compression.type=zstd --create
Created topic com-zstd.
$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic com-gzip --config compression.type=gzip --create
Created topic com-gzip.
$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic com-lz4 --config compression.type=lz4 --create
Created topic com-lz4.
3가지 Topic에 준비한 25MB 파일을 Producing 하겠습니다.
그전에 Topic에서 25MB Message를 받을 수 있도록 설정합니다.
$ ./kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name com-zstd --add-config max.message.bytes=26130080
Completed updating config for topic com-zstd.
$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic com-zstd --describe
Topic: com-zstd TopicId: f53lqI8ORjSy1DdWWrJXfw PartitionCount: 1 ReplicationFactor: 1 Configs: compression.type=zstd,max.message.bytes=26130080
Topic: com-zstd Partition: 0 Leader: 0 Replicas: 0 Isr: 0
각 Topic에 json Message를 보냅니다.
$ jq -rc . ~/large-file.json | ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic com-zstd --producer-property max.request.size=26130080
$ jq -rc . ~/large-file.json | ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic com-gzip --producer-property max.request.size=26130080
$ jq -rc . ~/large-file.json | ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic com-lz4 --producer-property max.request.size=26130080
Topic별 Log 디렉터리 Size를 확인해 보았습니다.
$ ls -alh /tmp/kafka-logs/com-zstd-0/00000000000000000000.log
2.9M
$ ls -alh /tmp/kafka-logs/com-gzip-0/00000000000000000000.log
3.7M
$ ls -alh /tmp/kafka-logs/com-lz4-0/00000000000000000000.log
6.2M
압축 Type | 압축 후 Size | 비율 |
zstd | 2.9MB | 11.6% |
gzip | 3.7MB | 14.8% |
lz4 | 6.2MB | 24.8% |
테스트를 진행해 보니 gzip보다 zstd가 압축률이 좋다는 것을 알 수 있었습니다.
단순 sample용 json 데이터를 가지고 비교한 거라 참고만 하면 좋을 거 같습니다.
정리
이번 포스팅에서는 Kafka Compression을 정리해 보았습니다.
Kafka에서 Message을 효율적으로 보내기 위해 압축을 해서 보냅니다.
Producer와 Broker에서 모두 압축 가능하며 Producer에서부터 압축하는 것이 효율적으로 판단됩니다.
이번 포스팅에서는 json 데이터로 압축 타입별 압축비율을 비교해 보았습니다.
lz4 < gzip < zstd 순으로 압축률이 좋다는 결론을 내렸습니다.(단순 참고)
[참고사이트]
'MLOps > Kafka' 카테고리의 다른 글
데이터 언제까지 저장할꺼니?!?! - Kafka Log Retention (0) | 2024.01.06 |
---|---|
Consumer야 살았니?! 죽었니?! - Kafka Consumer Heartbeats (0) | 2023.11.27 |
1개로 부족해, 2개의 IP로 접근하기 (0) | 2023.09.05 |
Doc에 있는 JVM 설정 파헤치기 (0) | 2023.08.21 |
큰일 났다! Out of Sync다?!?! (0) | 2023.07.27 |