Kafka Streams의 stateful 프로세싱은 다른 시간에 도착하는 관련 이벤트들을 그룹화하여 관리하고 저장할 수 있는 기능을 제공한다. State Store는 중간 상태를 로컬 또는 원격으로 저장하는 구조로, Kafka 체인지 로그 토픽을 기반으로 한 내결함성을 갖추고 있다. 이러한 구조 덕분에 Kafka Streams 애플리케이션은 인스턴스 간 데이터 일관성을 유지하면서도 효율적인 스케일 아웃을 달성할 수 있다. 이번 글에서는 Kafka Streams의 state store 분산 방식, 데이터 일관성 유지 방법, 그리고 로컬 및 원격 상태 조회 방식을 중점으로 다룬다.
1. State Store의 역할과 필요성
Kafka Streams에서 State Store는 스트림 처리 중 발생하는 중간 상태를 로컬에 저장한다. 예를 들어 사용자 행동 데이터를 처리할 때, 각 사용자의 총 구매 횟수를 집계한다고 가정해보자. 각 이벤트가 들어올 때마다 사용자의 기존 구매 횟수를 가져와 업데이트해야 하는데, 이 중간 데이터는 빠르게 접근할 수 있는 로컬 저장소에 저장하는 것이 효율적이다.
- 사용자의 구매 이벤트가 Kafka 토픽으로 수신될 때마다, Kafka Streams 애플리케이션은 구매 이벤트를 기반으로 사용자의 총 구매 횟수를 누적하여 업데이트한다.
- 각 Kafka Streams 인스턴스는 사용자 ID를 기준으로 파티셔닝된 데이터를 관리하며, 해당 파티션에 속한 사용자의 구매 횟수를 state store에 저장하여 빠르게 접근할 수 있다.
2. 스케일아웃 상황에서 State Store와 Changelog Topic의 동작 방식
Kafka Streams의 State Store와 Changelog Topic은 인스턴스 간 분산 처리와 데이터 일관성을 지원하기 위해 다음과 같은 방식으로 동작한다.
2-1. 파티셔닝과 할당
Kafka Streams는 데이터를 파티셔닝하여 관리하며, 각 인스턴스는 특정 파티션을 처리하는 방식으로 상태를 유지한다. 스케일아웃 시 인스턴스가 추가되면, 기존 인스턴스에 할당된 파티션을 새 인스턴스에 자동으로 재할당하여 데이터 처리가 더욱 분산된다. 각 인스턴스는 할당된 파티션에 대한 독립적인 State Store를 유지하게 된다.
2-2. State Store 데이터 복제와 일관성 유지
각 인스턴스의 State Store는 Changelog Topic에 변경 사항을 기록하여 다른 인스턴스에서도 해당 상태를 복구할 수 있도록 한다. 인스턴스가 확장되어 파티션이 새로운 인스턴스에 재배치될 때, 새로운 인스턴스는 Changelog Topic을 구독하여 필요한 데이터를 로드하고, 이후 업데이트 사항을 받아 데이터 일관성을 유지한다.
2-3. 스케일아웃 과정에서의 성능 최적화와 복구 지연 완화
스케일아웃 중 State Store의 데이터 크기가 클 경우 복구에 시간이 지연될 수 있다. Kafka Streams는 이를 완화하기 위해 상태 스냅샷(State Snapshot)과 상태 저장소 압축(State Store Compaction) 기법을 제공한다. 상태 스냅샷을 통해 특정 시점의 데이터를 저장하여 복구 시 전체 데이터를 다시 적용할 필요 없이 빠르게 복구할 수 있다. 압축 기법을 통해 불필요한 데이터를 정리하여 복구 시간을 단축할 수 있다.
2-4. 분산 환경에서의 데이터 일관성 보장
Kafka Streams는 분산 환경에서도 exactly-once 또는 at-least-once 처리 방식을 통해 데이터 일관성을 보장한다. State Store는 트랜잭션 방식으로 이벤트를 처리하며, 이벤트 처리 후 상태를 Changelog Topic에 커밋하여 장애 시 중복 처리를 방지한다. 이를 통해 스케일아웃 상황에서도 데이터 일관성을 유지할 수 있다.
예시 상황
쇼핑 애플리케이션에서 사용자 구매 이벤트를 처리하는 Kafka Streams 애플리케이션을 가정한다.
- 초기 인스턴스 구성
사용자의 구매 횟수를 집계하는 애플리케이션이 초기에는 두 개의 인스턴스를 사용한다. 각 인스턴스는 사용자 ID를 기준으로 파티셔닝된 데이터를 할당받아 각 사용자의 총 구매 횟수를 State Store에 기록한다. - 스케일아웃으로 인스턴스 추가
트래픽이 증가하면서 새로운 인스턴스를 추가하여 스케일아웃한다. Kafka Streams는 기존 두 인스턴스에 할당된 파티션 중 일부를 새 인스턴스에 자동으로 재할당한다. 새 인스턴스는 할당된 파티션의 Changelog Topic을 구독하여 해당 파티션의 모든 상태 데이터를 로드하고, 이후 들어오는 구매 이벤트를 처리한다.
3. State Store 데이터 일관성 유지 및 장애 복구
Kafka Streams 애플리케이션은 상태가 필요한 스트림 처리 작업을 수행할 때 State Store를 사용한다. 이때 State Store에 저장된 데이터는 로컬에 유지되기 때문에, 특정 인스턴스에 장애가 발생하면 해당 인스턴스의 상태 정보를 잃어버릴 위험이 있다. Changelog Topic은 이러한 문제를 해결하기 위해 상태 변경 사항을 Kafka의 특별한 토픽에 기록해 두며, 다른 인스턴스가 해당 파티션을 맡게 되었을 때 이 기록을 바탕으로 State Store를 복구할 수 있게 해준다.
- 웹사이트에서 발생하는 사용자 클릭 이벤트를 집계하는 Kafka Streams 애플리케이션을 가정한다. 각 인스턴스는 특정 파티션의 데이터를 처리하며, 클릭 수 집계를 state store에 기록한다.
- 인스턴스 장애 시, 다른 인스턴스가 해당 파티션을 할당받아 state store를 복구하는데, 이때 필요한 데이터는 Changelog Topic에서 불러와 복구한다. 이를 통해 중단 없이 데이터 일관성을 유지하면서 집계를 이어갈 수 있다.
Changelog Topic은 Kafka의 파티션 구조를 활용해 각 State Store의 변경 사항을 토픽 파티션에 저장한다. Kafka Streams 애플리케이션이 여러 파티션을 사용하는 경우, 각 파티션별로 Changelog Topic도 같은 수의 파티션을 가지게 된다. 이를 통해 Changelog Topic의 데이터는 애플리케이션 파티션과 동일한 구조로 분산 저장되어 있어 복구할 때 효율적이다.
- 데이터 일관성 유지: State Store는 기본적으로 RocksDB 같은 로컬 데이터베이스에 저장된다. State Store가 변경될 때마다 로그 커밋이 Changelog Topic으로 전송되어 데이터 일관성이 보장된다. Kafka Streams의 at-least-once 또는 exactly-once 처리 보장에 따라, 중복 데이터가 발생할 수 있는 상황에서도 Changelog Topic을 통해 마지막으로 커밋된 상태를 기준으로 일관성을 유지할 수 있다.
Kafka Streams의 인스턴스가 장애로 인해 중단될 때, Kafka는 해당 인스턴스가 맡고 있던 파티션을 다른 인스턴스에 할당한다. 이 새로운 인스턴스는 Changelog Topic에 저장된 데이터를 기반으로 State Store를 복구한다.
- 복구 과정
- 새 인스턴스가 할당된 파티션의 Changelog Topic을 구독하여 모든 변경 기록을 가져온다.
- 각 이벤트를 순차적으로 적용해 State Store를 다시 생성한다.
- 예를 들어, A 인스턴스가 클릭 수를 관리하던 도중 장애가 발생했다면, B 인스턴스가 해당 파티션을 할당받고, Changelog Topic에서 A 인스턴스의 상태 변경 사항을 받아 State Store를 복구해 클릭 수 집계를 이어간다.
4. Changelog Topic의 장점과 고려 사항
장점:
- 내결함성(Fault Tolerance): Changelog Topic은 State Store의 상태를 Kafka 클러스터에 백업하는 역할을 하기 때문에, 로컬 데이터 손실이 발생해도 안전하게 복구할 수 있다.
- 데이터 일관성: Kafka Streams는 장애 시 재처리를 통해 일관성을 유지하기 위해 Changelog Topic을 사용하며, 정확히 한 번(exactly-once) 또는 적어도 한 번(at-least-once) 처리 옵션을 설정할 수 있다.
고려 사항:
- 디스크 사용량: Changelog Topic은 모든 State Store 변경 사항을 저장하므로, 대규모 데이터를 다루는 애플리케이션에서는 저장 용량이 커질 수 있다. 필요 시 TTL(Time-to-Live) 설정을 통해 오래된 데이터를 삭제하거나, 압축을 통해 용량을 줄일 수 있다.
- 복구 지연 시간: 대량의 데이터를 가진 State Store를 복구할 때 지연이 발생할 수 있다. 복구 시간이 중요한 경우, State Store를 주기적으로 백업하거나 상태 스냅샷 기능을 사용해 복구 시간을 단축하는 것이 도움이 된다.
'Infra' 카테고리의 다른 글
[Terraform] AWS Public ECR 리소스 생성 Error : no such host (0) | 2023.09.07 |
---|---|
[MySQL] 바이너리 로그의 포맷은 왜 ROW를 권장하는가? (0) | 2023.09.04 |
[airflow] airflow-client-python 2.6.0 이하버전 airflow_client.client.exceptions.ApiTypeError: Invalid type for variable 'dag_run_timeout' (0) | 2023.05.16 |
[Nginx] Nginx 도입기(with SSL) (2) | 2022.10.03 |
[Husky] 커밋메시지 JIRA 티켓번호 자동화 (0) | 2022.08.22 |