일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- airflow
- Trino
- kubeadm
- Packet
- java
- Operating System
- EC2
- kubectl
- zookeeper
- kubernetes
- Kafka
- ip
- grafana
- Network
- aws s3
- Spring
- JavaScript
- MAC address
- tcp
- AWS
- docker
- log
- Vision
- CVAT
- helm
- PostgreSQL
- jvm
- OS
- CSV
- Python
- Today
- Total
JUST WRITE
데이터 언제까지 저장할꺼니?!?! - Kafka Log Retention 본문
데이터 언제 저장할꺼니?!?!
Kafka는 분산 메시지 시스템이자 Message Queue입니다.
Kafka는 메시지를 받고 바로 지우지 않고 어느 정도 Broker에 저장합니다.
다양한 Consumer에서 메시지를 활용할 수도 있고
Network 문제 등 다양한 이유로 consuming에 시간이 걸리수도 있습니다.
Kafka에서는 데이터 저장을 어떤식으로 관리할까요?
이번 포스팅에서는 Kafka 데이터 저장 관련 설정값을 정리해보도록 하겠습니다.
Log Retention
Kafka에서는 Broker에서 메시지를 받고 저장하는데, 저장한 Message를 Log라고 합니다.
Broker나 Topic의 설정값에 따라서 Broker에서 Log를 어느 정도 저장할지 설정할 수 있습니다.
Retention 관련 설정값은 아래와 같습니다.
server.properties 파일에서 설정할 수 있습니다.
############################ Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
log.retention.{시간단위}
Kafka Broker에 Log를 얼마나 저장할지 가장 먼저 고려할 설정이 log.retention입니다.
log.retention은 시, 분, ms 단위로 설정이 가능합니다.
아래 설정값 순서대로 우선적으로 적용됩니다.
- log.retention.ms
- log.retention.minutes
- log.retention.hours
hours를 24로 설정해도 minutes를 5로 설정했으면 Log는 5분 동안만 저장됩니다.
-1로 설정하면 Log는 지워지지 않고 Broker log directory에 영원히 저장됩니다.
log.retention.bytes
첫 번째 옵션은 기간에 따라 Log를 얼마나 저장할지 설정하는 옵션입니다.
log.retention.bytes는 log를 어느 size만큼 저장할지 설정하는 옵션입니다.
기본값은 -1이며 size 상관없이 저장하겠다는 의미입니다.
보통 log 저장 정책을 기간 단위로 세우기 때문에 해당 옵션은 우선순위로 고려되지 않습니다.
log.segment.bytes
Broker의 Disk에 저장하게 됩니다.
Disk 저장할 때 파일 하나로 저장하지 않고 나눠서 저장을 합니다.
해당 옵션단위로 log를 나눠서 저장하게 됩니다.
테스트를 진행해 보았습니다.
log.segment.bytes를 1024 bytes(1KB)로 설정하고 테스트를 진행해 보았습니다.
$ segment 설정 변경
$ vi config/server.properties
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.segment.bytes=1073741824
log.segment.bytes=1024
# Test Topic 생성
$ bin/kafka-topics.sh --create --topic log-segment-test --bootstrap-server localhost:9092
Created topic log-segment-test.
# Test Topic에 Producing
$ bin/kafka-producer-perf-test.sh --num-records 10 --record-size 1024 --topic log-segment-test --producer-props bootstrap.servers=localhost:9092 max.request.size=2048 --throughput 10
org.apache.kafka.common.errors.RecordBatchTooLargeException: The request included message batch larger than the configured segment size on the server.
테스트 Topic에 메시지를 Producing 할 때 설정한 segment size보다 크면 에러가 발생하였습니다.
Kafka에서 설정한 Segement단위로 처리한다는 것을 확인할 수 있었습니다.
log.segment.bytes를 1MB로 늘려서 테스트를 진행하였습니다.
$ segment 설정 변경
$ vi config/server.properties
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.segment.bytes=1073741824
log.segment.bytes=1048576
# Test Topic 생성
$ bin/kafka-topics.sh --create --topic log-segment-test2 --bootstrap-server localhost:9092
Created topic log-segment-test2.
# Test Topic에 Producing
$ bin/kafka-producer-perf-test.sh --num-records 1000 --record-size 102400 --topic log-segment-test2 --producer-props bootstrap.servers=localhost:9092 max.request.size=204800 --throughput 10
$ ls -all /tmp/kafka-logs/log-segment-test2-0/
total 101832
drwxr-xr-x. 2 ec2-user ec2-user 8140 Jan 7 03:00 .
drwxr-xr-x. 4 ec2-user ec2-user 200 Jan 7 03:01 ..
-rw-r--r--. 1 ec2-user ec2-user 440 Jan 7 02:59 00000000000000000000.index
-rw-r--r--. 1 ec2-user ec2-user 1037722 Jan 7 02:59 00000000000000000000.log
-rw-r--r--. 1 ec2-user ec2-user 660 Jan 7 02:59 00000000000000000000.timeindex
-rw-r--r--. 1 ec2-user ec2-user 72 Jan 7 02:59 00000000000000000208.index
-rw-r--r--. 1 ec2-user ec2-user 1024720 Jan 7 02:59 00000000000000000208.log
-rw-r--r--. 1 ec2-user ec2-user 148 Jan 7 02:59 00000000000000000208.snapshot
-rw-r--r--. 1 ec2-user ec2-user 108 Jan 7 02:59 00000000000000000208.timeindex
-rw-r--r--. 1 ec2-user ec2-user 72 Jan 7 02:59 00000000000000000218.index
-rw-r--r--. 1 ec2-user ec2-user 1024720 Jan 7 02:59 00000000000000000218.log
segment가 정확하게 설정한 값대로 나눠지지 않지만 해당 segment 단위로 나눠지는 것을 확인할 수 있습니다.
log.retention.check.interval.ms
Broker는 일정한 주기로 Log를 체크해서 설정한 retention 기간이 지난 Log를 지웁니다.
해당 설정값을 통해서 주기적으로 Log를 체크하고 정리합니다.
Log Timestamp
Broker가 메시지를 받으면 Disk에 파일단위로 저장합니다.
Message는 다수의 record들이 모여서 만들어집니다.
record는 아래와 같은 구조로 되어 있습니다.
- Headers -> 필수가 아닌 선택적으로 필요한 메타데이터
- Key -> Parition 분배에 활용
- Value -> 실질적인 데이터
- Timestamp -> 특정 설정값에 따라 생성되며 사용자가 직접 임의로 넣을 수 있음.
Timestamp는 아래와 같이 확인할 수 있습니다.
UNIX Timestamp로 확인할 수 있습니다.
$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/log-segment-test2-0/00000000000000001198.timeindex
Dumping /tmp/kafka-logs/log-segment-test2-0/00000000000000001198.timeindex
timestamp: 1704596442524 offset: 1199
Broker에서 주기적으로 정리할 때 해당 Timestamp를 기준으로 정리합니다.
Record의 Timestamp는 어떻게 생성되는지 확인해 보겠습니다.
log.message.timestamp.type
Record의 Timestamp를 어느 기준으로 생성할지를 설정하는 옵션입니다.
아래와 같이 2가지 옵션이 있습니다.
- CreateTime -> Record 생성 시간에 맞추어서 Timestamp 생성
- LogAppendTime -> Broker에 Log 형태로 저장되는 시간에 맞추어서 Timestamp override
Kafka는 메시지 플랫폼이기 때문에 다양한 시스템에서 메시지를 받습니다.
시스템과 Kafka가 다양한 이유로 메시지를 생성하고 프로듀싱하는데 시간이 걸릴 수 있습니다.
이러한 부분을 고려한다면 CreateTime이 아닌 LogAppendTime도 고려보아야 합니다.
Timestamp 테스트
그럼 한번 테스트를 진행해 보겠습니다.
log.message.timestamp.type에 따라 Broker Log의 Timestamp가 어떻게 되는지 확인해 보겠습니다.
Topic에 Producing을 할 때 임의로 Timestamp를 지정하도록 하겠습니다.
먼저 Type이 CreateTime일 때를 확인해 보겠습니다.
message.timestamp.type을 CreateTime인 테스트 Topic을 생성합니다.
$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic message-type-create --config message.timestamp.type=CreateTime --create
Created topic message-type-create
테스트 Topic에 임의 날짜를 UNIX Timestamp로 넣어서 Producing 하였습니다.
from kafka import KafkaProducer
import time
import datetime
d = datetime.date(2023,1,25)
unixtime = int(time.mktime(d.timetuple()))
producer = KafkaProducer(bootstrap_servers='localhost:9092')
topic='message-type-create'
producer.send(topic, value=b'some_message_bytes', timestamp_ms=unixtime)
Producing 후 Broker의 Log timeindex를 확인해 보았습니다.
Producing 할 때 설정한 2023년 1월 25일로 확인이 되었습니다.
record가 생성한 시점에 설정한 timestamp로 되어 있는 것을 확인할 수 있습니다.
$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/message-type-create-0/00000000000000000000.timeindex
Dumping /tmp/kafka-logs/message-type-create-0/00000000000000000000.timeindex
timestamp: 0 offset: 0
Found timestamp mismatch in :/tmp/kafka-logs/message-type-create-0/00000000000000000000.timeindex
Index timestamp: 0, log timestamp: 1674604800
이제 message.timestamp.type을 LogAppendTime인 테스트 Topic을 생성합니다.
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic message-type-append --config message.timestamp.type=LogAppendTime --create
Created topic message-type-append.
아까와 동일하게 2023년 1월 25일 Timestamp로 LogAppendTime으로 설정한 Topic에 Producing 하였습니다.
그리고 해당 Topic Log의 timeindex를 확인해 보았습니다.
$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/message-type-append-0/00000000000000000000.timeindex
Dumping /tmp/kafka-logs/message-type-append-0/00000000000000000000.timeindex
timestamp: 0 offset: 0
Found timestamp mismatch in :/tmp/kafka-logs/message-type-append-0/00000000000000000000.timeindex
Index timestamp: 0, log timestamp: 1704601234648
Record 생성 시 설정한 2023년 1월 25일이 아닌 현재 날짜 기준으로 되어 있습니다.
LogAppendTime을 하게 되면 Broker에 도착한 기준으로 override 되는 것을 확인할 수 있습니다.
정리
이번 포스팅을 통해서 Kafka에서 데이터 저장기한을 어떻게 두는지 설정값 위주로 살펴보았습니다.
기간과 size 기준으로 데이터를 얼마나 저장할지 설정할 수 있었습니다.
record에 timestamp를 기록하여 해당 timestamp을 기준으로 데이터를 저장합니다.
timestamp는 record를 생성할 때와 Broker에 Log로 저장될 때 2가지 방식으로 설정합니다.
시스템 상황에 맞게 해당 값을 고려해야 될 거 같습니다.
[참고사이트]
'MLOps > Kafka' 카테고리의 다른 글
어디까지 줄일 수 있니?! - Kafka Compression (0) | 2023.12.23 |
---|---|
Consumer야 살았니?! 죽었니?! - Kafka Consumer Heartbeats (0) | 2023.11.27 |
1개로 부족해, 2개의 IP로 접근하기 (0) | 2023.09.05 |
Doc에 있는 JVM 설정 파헤치기 (0) | 2023.08.21 |
큰일 났다! Out of Sync다?!?! (0) | 2023.07.27 |