소개

https://github.com/waterfogSW/kotlin-jdsl-graalvm-support

본 문서는 과거 Spring Boot 프로젝트에서 org.springframework.data.jpa.repository.JpaRepository 인터페이스와 com.linecorp.kotlinjdsl.support.spring.data.jpa.repository.KotlinJdslJpqlExecutor 인터페이스를 함께 상속받아 사용하는 리포지토리가 있을 경우, GraalVM 네이티브 이미지 빌드 시 발생했던 문제를 분석합니다. 또한 현재 프로젝트 구성(Spring Boot 3.4.4, Kotlin JDSL 3.5.5, GraalVM Native Build Tools 0.10.6, Kotlin 1.9.25 등)에서 해당 문제가 자연스럽게 해결된 배경을 기술 스택 전반의 개선 관점에서 심층적으로 살펴봅니다.

과거 문제의 핵심은 네이티브 이미지 실행 시 KotlinJdslJpqlExecutor가 제공하는 DSL 메소드를 찾을 수 없다는 NoSuchMethodError 또는 유사한 런타임 오류였습니다. 이는 주로 Spring Data JPA의 프록시 생성 방식과 GraalVM의 AOT(Ahead-of-Time) 컴파일 간의 상호작용 문제로 인해 발생했습니다.

과거 문제점 상세 분석

과거 Spring Boot 환경(주로 3.x 초기 버전)에서 발생했던 문제의 근본 원인은 다음과 같이 복합적으로 추정됩니다:

  1. Spring Data JPA의 CGLIB 기반 프록시 생성:
    • Spring Data JPA는 리포지토리 인터페이스에 대한 구현체를 런타임 시 동적으로 생성하기 위해 Spring AOP를 사용합니다.
    • JpaRepositoryKotlinJdslJpqlExecutor를 모두 상속하는 경우처럼, 여러 인터페이스를 구현하거나 클래스 기반 프록시가 필요할 때 CGLIB 라이브러리를 사용하여 프록시 객체를 생성할 수 있습니다. 이 프록시 객체는 런타임에 동적으로 바이트코드가 생성됩니다.
  2. GraalVM AOT 컴파일의 정적 분석 한계:
    • GraalVM 네이티브 이미지는 빌드 시점에 정적 분석(Static Analysis)을 통해 애플리케이션 코드를 분석하고, 런타임에 필요한 모든 클래스, 메소드, 리소스, 리플렉션 정보, 프록시 정보 등을 미리 결정합니다.
    • CGLIB와 같이 런타임에 동적으로 바이트코드를 생성하는 방식은 정적 분석만으로는 완벽하게 예측하기 어렵습니다. 특히 복잡한 상속 구조(여러 인터페이스를 구현하는 CGLIB 프록시) 생성 시 필요한 리플렉션, 리소스, 프록시 관련 정보를 빌드 시점에 정확히 파악하지 못했을 가능성이 높습니다.
  3. 메타데이터 누락:
    • 결과적으로, GraalVM AOT 컴파일러가 CGLIB로 생성된 프록시 객체에 KotlinJdslJpqlExecutor 인터페이스의 메소드가 포함된다는 사실을 인지하지 못하거나, 관련 리플렉션/프록시 메타데이터를 네이티브 이미지 구성(native-image.properties 또는 자동 생성되는 힌트)에 자동으로 포함시키지 못했을 가능성이 큽니다.
    • 이로 인해 네이티브 이미지 실행 시 해당 메소드를 찾지 못하는 NoSuchMethodError와 같은 런타임 오류가 발생했습니다.
    • reflect-config.json 등에 관련 클래스(KotlinJdslJpqlExecutor, 관련 구현체 등)를 수동으로 등록해도, 프록시 객체 자체의 동적 생성 특성 때문에 근본적인 해결이 어려웠을 수 있습니다.
  4. BeanPostProcessor와 AOT 호환성 문제:
    • 일부 커뮤니티 보고(예: Stack Overflow)에 따르면, Kotlin JDSL의 spring-data-jpa-support 모듈이 사용하는 KotlinJdslJpaRepositoryFactoryBeanPostProcessor와 같은 BeanPostProcessor가 Spring AOT 환경과 완전히 호환되지 않았을 가능성이 제기되었습니다. BeanPostProcessor는 빈 초기화 과정에 개입하여 빈을 동적으로 수정하거나 프록시를 적용하는데, AOT 환경에서는 이러한 동적 처리에 제약이 있을 수 있습니다. Spring AOT는 빌드 시점에 빈 정의를 확정하려고 시도하기 때문입니다.

현재 프로젝트에서의 해결 분석: 누적된 개선 효과

현재 프로젝트 구성에서는 과거 문제가 발생하지 않습니다. 이는 특정 하나의 수정 때문이라기보다는, 관련 기술 스택 전반에 걸쳐 이루어진 지속적인 개선 사항들의 누적된 효과 덕분으로 판단됩니다. 주요 개선 요인은 다음과 같습니다:

  1. Spring Framework (6.0.x -> 6.1.x+)의 AOT 엔진 성숙:
    • Spring Framework 6.0은 AOT 변환을 위한 포괄적인 기반을 마련하고 네이티브 이미지 지원을 일반 기능으로 승격시켰습니다. AOT 처리 지원을 위한 refreshForAotProcessing 메소드, AOP 프록시 및 구성 클래스에 대한 초기 프록시 클래스 결정 지원, JPA 관리 유형 사전 결정 지원 등이 도입되었습니다.
    • Spring Framework 6.1 이후 버전에서는 AOT 엔진의 메타데이터 자동 감지 및 생성 능력이 더욱 크게 향상되었습니다. 복잡한 프록시 객체(예: 여러 인터페이스를 구현하는 CGLIB 프록시) 생성 시 필요한 리플렉션, 리소스, 프록시 관련 정보를 더 정확하게 예측하고 빌드 힌트로 제공하는 능력이 정교해졌습니다.
    • RuntimeHintsRegistrar 인터페이스 도입으로 라이브러리(예: Spring Data JPA)가 AOT 엔진에게 필요한 런타임 정보(리플렉션, 프록시, 리소스 힌트 등)를 프로그래밍 방식으로 더 명확하게 전달할 수 있게 되었습니다.
    • 조건부 구성 처리, AOP 프록시 처리 등 네이티브 이미지 빌드를 위한 전반적인 지원이 강화되었으며, Java 17 베이스라인 및 가상 스레드 지원 등도 안정성 향상에 기여했을 수 있습니다.
  2. GraalVM Native Build Tools (NBT) 개선 (0.10.2 -> 0.10.6):
    • NBT 플러그인의 지속적인 개선을 통해 Spring Boot AOT 엔진과의 통합이 강화되고, 생성된 메타데이터를 더 잘 이해하고 활용하게 되었습니다.
    • 버전별 주요 개선 사항 요약:
      • 0.10.2: 메타데이터 복사 작업 기본 대상 디렉토리 업데이트 등.
      • 0.10.3: 메타데이터 리포지토리 버전 업데이트, 문서 개선.
      • 0.10.4: JUnit Platform @EnabledOnOs 문제 해결, 주요 JDK 버전 감지 개선.
      • 0.10.5: GraalVM 버전 검사 개선, Maven 종속성 업그레이드.
      • 0.10.6: SBOM(Software Bill of Materials) 관련 문제 수정(해결되지 않은 아티팩트 및 빈 "components"), reachability 메타데이터 리포지토리 버전 업데이트.
    • 이러한 개선은 메타데이터 처리 로직 개선, 의존성 분석 정확도 향상, 빌드 프로세스 안정성 증가 등으로 이어져 Kotlin/Spring Boot 프로젝트의 네이티브 빌드 성공률을 높였습니다.
  3. Kotlin 및 Kotlin JDSL 생태계 발전:
    • Kotlin (1.9.21 -> 1.9.25): Kotlin 컴파일러(K2 포함) 및 관련 Gradle 플러그인의 지속적인 개선이 이루어졌습니다. Kotlin/Native 메모리 관리자 성능 개선, Kotlin Multiplatform 지원 강화 등 생태계 전반의 안정성 향상 노력이 있었습니다. Kotlin 컴파일러 자체를 GraalVM 네이티브 이미지와 호환되도록 만드는 작업(KT-66666) 등 GraalVM 호환성 향상 노력도 간접적으로 긍정적인 영향을 미쳤을 수 있습니다.
  4. Spring Boot 자체 개선 (3.2.4 -> 3.4.4):
    • 앞서 언급된 Spring Framework 개선 사항 통합 외에도, Spring Boot 레벨에서 네이티브 이미지 관련 구성 및 지원이 꾸준히 업데이트되었습니다.
    • Spring Boot 3.2: Kotlin Gradle 플러그인 1.9.0 버그(AOT 처리 시 리소스 누락) 관련 주의 사항 언급 및 후속 버전에서의 해결. 기본 CNB(Cloud Native Buildpacks) 빌더 업그레이드.
    • Spring Boot 3.4: Netty 네이티브 이미지 관련 문제 해결을 위한 메타데이터 업그레이드 필요성 언급. Spring Security의 @AuthorizeReturnObject, @PreAuthorize/@PostAuthorize 콜백 등 네이티브 이미지 호환성 개선.
    • org.springframework.data.jpa.repository.aot 패키지의 존재는 Spring Data JPA 팀이 네이티브 이미지 환경에서의 리포지토리 동작 최적화(프록시 처리 포함)를 위해 지속적으로 노력하고 있음을 시사합니다.

커뮤니티 사례 및 관련 이슈

과거 유사한 문제가 커뮤니티에서도 보고 및 논의되었습니다:

  • Stack Overflow 질문: kotlin-jdsl-causes-native-image-build-failure 에서 KotlinJdslJpaRepositoryFactoryBeanPostProcessor가 문제 원인으로 지목되며 BeanPostProcessor와 Spring AOT 간의 비호환성 가능성을 시사했습니다.
  • GitHub 이슈:
    • Spring Framework #31618: "Kotlin DSL을 사용한 GraalVM 네이티브 오류"로 보고되었으며, 유사 문제 인지를 보여줍니다.
    • GraalVM #722: Kotlin 애플리케이션의 객체 직렬화 관련 네이티브 이미지 빌드 문제가 보고되어, Kotlin과 GraalVM 통합의 복잡성을 보여줍니다.

이러한 사례들은 문제 해결이 특정 라이브러리 하나의 수정보다는 기술 스택 전반의 개선을 통해 이루어졌을 가능성을 뒷받침합니다.

결론

과거 Spring Boot 3.x 초기 환경에서 발생했던 Kotlin JDSL과 Spring Data JPA 리포지토리를 함께 사용할 때의 GraalVM 네이티브 이미지 빌드 문제는, 특정 한두 가지 수정 때문이라기보다는 Spring Framework AOT 엔진의 성숙, GraalVM Native Build Tools의 발전, Kotlin 생태계의 개선, 그리고 Spring Boot 자체의 지속적인 네이티브 지원 강화라는 여러 요인이 복합적으로 작용한 결과로 해결된 것으로 보입니다.

특히 Spring Framework의 AOT 엔진이 CGLIB 기반의 복잡한 프록시 시나리오를 더 잘 이해하고 필요한 메타데이터를 자동으로 생성하는 능력이 크게 향상되었으며, 관련 빌드 도구 및 라이브러리들도 이에 발맞춰 개선되었습니다. 현재 기술 스택(Spring Boot 3.4.4, NBT 0.10.6, Kotlin 1.9.25, Kotlin JDSL 3.5.5 등)에서는 프레임워크와 빌드 도구가 이러한 복잡한 시나리오를 더 잘 이해하고 필요한 메타데이터를 자동으로 처리해주기 때문에, 개발자가 별도의 복잡한 수동 구성 없이도 성공적으로 네이티브 이미지를 빌드할 수 있게 되었습니다.

권장 사항 및 모범 사례

  • 최신 버전 유지: 네이티브 이미지 호환성 관련 최신 개선 사항과 버그 수정을 활용하기 위해 Spring Boot, Spring Framework, Kotlin, Kotlin JDSL, NBT 등 기술 스택의 버전을 가능한 최신 안정 버전으로 유지하는 것이 좋습니다.
  • 표준 도구 활용: Spring Boot Native Profile 및 GraalVM Native Build Tools Gradle/Maven 플러그인을 적극 활용하여 빌드 프로세스를 간소화하고 모범 사례를 따르는 것이 좋습니다.
  • Runtime Hints 활용 (필요시): 자동 메타데이터 감지가 어려운 특수한 경우나 타사 라이브러리 관련 문제가 발생하면, Spring Framework의 RuntimeHintsRegistrar 메커니즘을 사용하여 필요한 힌트(리플렉션, 프록시 등)를 직접 제공하는 것을 고려할 수 있습니다.
  • 공식 문서 참조: 각 기술의 공식 문서를 주기적으로 확인하여 네이티브 이미지 지원 현황, 알려진 제약 사항, 권장 구성 방법 등에 대한 최신 정보를 얻는 것이 중요합니다.

배경

Java 21에서 소개된 가상 스레드(Virtual Thread)는 수많은 동시 작업을 처리할 수 있는 경량화된 스레드로, 고성능 동시성 애플리케이션을 개발하기 위한 중요한 전환점이 되었습니다.
가상 스레드는 JDK 자체 스케줄러를 통해 플랫폼 스레드에 마운트되었다가 필요에 따라 해제되면서 효율적인 리소스 관리를 지원합니다.

그러나 Java 21에서는 Virtual Thread Pinning(핀) 문제가 성능 개선에 영향을 미치고 있습니다. 특정 상황에서 가상 스레드가 플랫폼 스레드를 고정되어 가상 스레드의 주된 이점인 플랫폼 스레드 마운트 언마운트를 제한하는 케이스가 종종 있습니다.

 

왜 Pinning이 발생하나?

가상 스레드는 기본적으로 논블로킹 작업을 수행할 때 플랫폼 스레드에서 해제되어야 하지만, 특정 동기화 및 블로킹 작업 시 플랫폼 스레드에 고정되는 문제가 있습니다.

 

주요 발생 원인은 다음과 같습니다

1. synchronized 메서드와 Pinning

synchronized는 JVM 모니터(Monitor)를 사용하여 스레드 간 상호 배제를 보장합니다.

모니터란?

Java에서 모든 객체는 고유한 모니터(Monitor)를 가지고 있습니다.
모니터는 Java에서 스레드 동기화를 구현하는 핵심 메커니즘으로, 동기화 블록이나 동기화 메서드를 사용할 때 자동으로 생성됩니다. 특정 객체를 기반으로 스레드 간 상호 배제(Mutual Exclusion)상태 동기화(Condition Synchronization)를 제공하여 공유 자원의 안전한 접근을 보장합니다.
  • 동기화 블록: synchronized 키워드로 정의되는 동기화 블록은 객체의 모니터를 획득(acquire)하고 해제(release)하여 특정 코드 블록이나 메서드에 단일 스레드만 접근할 수 있도록 보장합니다.
  • wait/notify 메서드: 스레드가 모니터를 사용해 다른 스레드와 상태를 동기화할 수 있도록 합니다. (wait()는 잠금 해제 후 대기, notify()는 대기 중인 스레드 깨우기)

JVM은 모니터를 플랫폼 스레드 기준으로 관리합니다. 가상 스레드가 synchronized 메서드에 진입하면, 모니터의 소유권은 가상 스레드가 아니라 가상 스레드의 캐리어 플랫폼 스레드에 할당됩니다. 이 상태에서 가상 스레드가 I/O 등의 블로킹 작업을 수행하면 플랫폼 스레드는 해제되지 않고 고정(Pinned)됩니다.

synchronized void fetchData() {
    byte[] data = new byte[1024];
    socket.getInputStream().read(data); // 블로킹 작업
}

위 코드에서 read 메서드가 데이터를 대기하며 블로킹되면, 가상 스레드는 플랫폼 스레드에 고정되어 다른 가상 스레드를 처리하지 못하는 상태가 됩니다.

 

라이브러리 Pinning 사례

 

Hibernate/JPA

Spring Data JPA 3.3.0 버전에서 PartTreeJpaQuery.QueryPreparer#createQuery() 메서드의 synchronized 블록으로 인해 virtual thread pinning이 발생한다는 이슈가 보고되었습니다. 이 문제를 해결하기 위해 synchronized 블록을 ReentrantLock으로 교체하는 것이 제안되었습니다.

https://github.com/spring-projects/spring-data-jpa/issues/3505

 

[Virtual Threads] Possible Thread Pinning in `PartTreeJpaQuery.QueryPreparer#createQuery()` · Issue #3505 · spring-projects/sp

Version: Spring Data JPA 3.3.0. The method uses a synchronized block, causing thread pinning: spring-data-jpa/spring-data-jpa/src/main/java/org/springframework/data/jpa/repository/query/PartTreeJpa...

github.com

 

HikariCP 

반면 HikariCP는 syncronized 블럭 사용으로 인한 pinning 문제를 해결하지 않기로 결정했습니다. 

Virtual Thread "pinning" 문제는 특정 조건에서 발생하는데, 이는 Virtual Thread가 synchronized 블록 내부에서 IO 작업 또는 블로킹 작업을 수행할 때 발생합니다. 하지만 HikariCP는 이러한 블로킹 작업을 synchronized 내부에서 수행하지 않습니다.

ReentrantLock을 사용하도록 변경하는 것은 Virtual Threads의 호환성을 위한 시도로 제안되었으나, HikariCP의 기존 synchronized 사용 방식에서 실질적인 성능 개선이나 문제 해결 효과가 거의 없을 가능성이 크며, ReentrantLock으로의 변경은 불필요한 오버헤드(추가 객체 생성 및 GC)를 초래할 수 있다고 Brett Wooldridge(HikariCP의 소유자)가 언급했습니다.

https://github.com/brettwooldridge/HikariCP/pull/2055

 

Add support for Virtual Threads by bdeneuter · Pull Request #2055 · brettwooldridge/HikariCP

It seems that synchronized will still pinn carrier threads in JDK 21 for the moment. This is the draft JEP for JDK21: https://openjdk.org/jeps/8303683 So I'm reoping the PR for using ReentrantL...

github.com

 

 

2. Object.wait()와 Pinning

Object.wait()는 동기화된 객체에서 대기 상태로 전환될 때 사용하는 메서드입니다.

작동 방식

  1. Object.wait()는 모니터를 소유한 상태에서 호출해야 합니다.
  2. 호출된 스레드는 대기 상태로 전환되며, 모니터를 해제.
  3. 다른 스레드가 Object.notify() 또는 Object.notifyAll()을 호출하면 대기 상태에서 깨어남.

Pinning이 발생하는 이유

  • wait 호출 중에도 가상 스레드는 플랫폼 스레드와 연결되어 있습니다.
  • 깨어난 후 다시 모니터를 재획득해야 하는데, 이 과정에서도 플랫폼 스레드가 고정됩니다.

 

Java 24 의 해결 방안 - JEP491

https://openjdk.org/jeps/491

Java 24의 JEP 491은 Virtual Thread Pinning 문제를 해결하기 위해 JVM 수준의 동기화 메커니즘을 대폭 개선했습니다. 이 개선은 가상 스레드가 synchronized 메서드, 블록, 또는 Object.wait() 호출 중에도 플랫폼 스레드에서 분리(Mount 해제)될 수 있도록 지원합니다. 이를 통해 가상 스레드가 블로킹 작업을 수행하는 동안 플랫폼 스레드가 유휴 상태로 고정되지 않게 되어 확장성이 크게 향상됩니다.

 

1. Pinning 문제의 핵심 원인

기존 JVM의 동작 방식

  • synchronized 키워드와 모니터(Monitor)
    • synchronized는 JVM 내부적으로 객체의 모니터를 활용하여 상호 배제를 보장합니다.
    • JVM은 특정 스레드(현재는 플랫폼 스레드)가 모니터를 소유하고 있음을 추적합니다.
    • 가상 스레드가 synchronized 메서드에 진입하면, JVM은 해당 가상 스레드의 캐리어 플랫폼 스레드를 모니터 소유자로 설정합니다.
    • 이 상태에서 가상 스레드가 블로킹 작업에 들어가더라도 플랫폼 스레드는 모니터와 연결된 상태로 고정됩니다.
  • Object.wait()
    • Object.wait()는 모니터를 소유한 상태에서 호출해야 합니다.
    • 호출된 스레드는 대기 상태로 전환되며 모니터를 일시적으로 해제합니다.
    • 그러나 JVM은 여전히 플랫폼 스레드를 대기 상태로 유지하므로, 플랫폼 스레드가 다른 작업에 재사용되지 못합니다.

 

2. JEP 491의 해결 방안

Java 24는 Pinning 문제를 해결하기 위해 다음과 같은 변경을 도입했습니다.

1) 모니터 소유권을 가상 스레드 기준으로 변경

  • 기존 JVM은 모니터 소유권을 플랫폼 스레드 기준으로 관리했지만, Java 24에서는 이를 가상 스레드 기준으로 관리합니다.
  • 가상 스레드가 synchronized 메서드에 진입하면
    • JVM은 해당 가상 스레드를 모니터 소유자로 설정합니다.
    • 플랫폼 스레드와는 독립적으로 모니터 소유권을 유지할 수 있습니다.
  • 이를 통해 가상 스레드가 블로킹 상태에 들어가더라도 플랫폼 스레드가 고정되지 않고 다른 가상 스레드에 재사용될 수 있습니다.

2) Object.wait()의 동작 개선

  • Object.wait() 호출 시
    • 가상 스레드는 플랫폼 스레드에서 Unmount됩니다.
    • 대기 상태가 끝나면 JVM 스케줄러는 가상 스레드를 새로운 플랫폼 스레드에 Mount하여 작업을 재개합니다.
  • 이를 통해 wait 대기 중에도 플랫폼 스레드가 유휴 상태로 고정되지 않습니다.

들어가며

스타크래프트2의 프로토스 종족에는 '시간 증폭'이라는 독특한 메커니즘이 있다. 연결체(Nexus)에서 프로브를 생산할 때 시간 증폭을 사용하면 일정 시간 동안 생산 속도가 빨라지는 기능이다. 이런 상태 관리와 비동기 처리가 필요한 시스템을 Kotlin의 Coroutine Flow를 활용해 구현해보자.

https://github.com/waterfogSW/starcraft-time-amplification

 

시스템 요구사항

  1. 연결체는 프로브를 생산할 수 있다
  2. 생산 큐는 최대 5개까지 가능
  3. 시간 증폭은 10초 동안 지속되며, 적용 시 생산 속도가 3배로 증가
  4. 생산 진행 상태를 실시간으로 관찰 가능
  5. 생산 중인 항목을 취소할 수 있음

 

연결체 프로브 생산 프로세스

 

1. 전통적인 방식 (Observer 패턴)

직관에 따라 Nexus(연결체) 를 구현한다면 Variable과 Observer 패턴을 사용하는 방식을 떠올릴 수 있다.

class Nexus {
    private var productionState: ProductionState = ProductionState.Idle
    private var probeCount: Int = 0
    private val observers = mutableListOf<ProductionObserver>()
    
    // 메모리 누수 위험이 있는 observer 등록/해제
    fun addObserver(observer: ProductionObserver) {
        observers.add(observer)
    }
    
    fun removeObserver(observer: ProductionObserver) {
        observers.remove(observer)
    }
    
    // 스레드 안전성을 위해 모든 메서드에 동기화 필요
    @Synchronized
    fun updateState(newState: ProductionState) {
        productionState = newState
        observers.forEach { it.onStateChanged(newState) }
    }
    
    // 여러 상태를 변경할 때 데드락 위험
    @Synchronized
    fun completeProduction() {
        productionState = ProductionState.Complete
        probeCount++
        observers.forEach { 
            it.onStateChanged(productionState)
            it.onProbeCountChanged(probeCount)
        }
    }
}

// Activity/Fragment에서 메모리 누수 발생 가능
class NexusActivity : Activity(), ProductionObserver {
    private val nexus = Nexus()
    
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        nexus.addObserver(this) // 등록은 했지만...
    }
    
    // onDestroy에서 removeObserver를 호출하지 않으면 메모리 누수
}

전통적인 Observer 패턴 기반의 상태 관리 방식은 근본적인 문제점을 가지고 있다. 개발자가 직접 Observer를 등록하고 해제하는 과정에서 메모리 누수가 발생하기 쉽다. Activity나 Fragment의 생명주기와 Observer의 생명주기를 수동으로 동기화해야 하는데, 이는 실수하기 쉬운 작업이다.

멀티스레드 환경에서의 안전성도 보장하기 어렵다. 상태 변경 메소드마다 @Synchronized 어노테이션을 붙여야 하며, 이는 성능 저하를 일으킨다. 여러 상태를 동시에 변경할 때는 데드락이 발생할 위험도 있다. 결과적으로 복잡한 보일러플레이트 코드가 생기고 유지보수가 어려워진다.

 

2. LiveData

그럼 LiveData는 어떤가?, LiveData는 이러한 문제를 상당 부분 해결한다. 생명주기를 자동으로 관리하여 메모리 누수를 방지하고, 메인 스레드 안전성을 보장한다. Observer 등록과 해제를 수동으로 관리할 필요가 없어졌다.

class Nexus {
    // 안드로이드 플랫폼 종속적
    private val _productionState = MutableLiveData<ProductionState>()
    val productionState: LiveData<ProductionState> = _productionState
    
    private val _probeCount = MutableLiveData<Int>()
    val probeCount: LiveData<Int> = _probeCount
    
    // 메인 스레드에서만 값 변경 가능
    fun updateState(newState: ProductionState) {
        _productionState.value = newState
    }
    
    // 빠른 연속 업데이트 처리 불가능
    fun startFastUpdates() {
        viewModelScope.launch {
            repeat(1000) { // 빠른 업데이트 발생
                _productionState.value = ProductionState.Producing(it / 1000f)
                delay(16) // 16ms마다 업데이트
                // 백프레셔 처리 메커니즘 부재로 프레임 드롭 발생
            }
        }
    }
}

그러나 LiveData는 안드로이드 플랫폼에 종속되어 있어 서버사이드에서는 사용할 수 없다. 역시 순수 Kotlin 모듈에서는 사용이 불가능하며, 이로 인해 멀티플랫폼 개발과 테스트가 제한된다.

더 큰 문제는 연속적인 데이터 업데이트 처리에 취약하다는 점이다. 센서 데이터와 같이 빠르게 들어오는 데이터 스트림을 처리할 때 데이터 손실이 발생하거나 UI 성능이 저하된다. 이는 LiveData가 백프레셔 처리 메커니즘을 가지고 있지 않기 때문이다.

 

3. Coroutine Flow

Flow는 이전 방식들의 문제를 해결하고 더 강력한 기능을 제공한다.

class Nexus {
    private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())
    
    // 플랫폼 독립적이며 상태 관리가 용이한 StateFlow
    private val _productionState = MutableStateFlow<ProductionState>(ProductionState.Idle)
    val productionState = _productionState.asStateFlow()
    
    private val _probeCount = MutableStateFlow(0)
    val probeCount = _probeCount.asStateFlow()
    
    // 백프레셔가 적용된 생산 프로세스
    fun startProduction() = flow {
        var progress = 0f
        while (progress < 1f) {
            progress += 0.1f
            emit(progress) // 백프레셔 자동 적용
            delay(16)
        }
    }.buffer(Channel.BUFFERED) // 생산자-소비자 분리
     .catch { e -> 
        _productionState.value = ProductionState.Idle
        emit(0f)
    }
    
    // 복잡한 상태 조합도 쉽게 처리
    val combinedState = combine(
        productionState,
        probeCount
    ) { state, count ->
        CombinedState(state, count)
    }.stateIn(scope, SharingStarted.Lazily, CombinedState())
    
    // 스레드 안전한 상태 업데이트
    fun updateState(newState: ProductionState) {
        _productionState.value = newState // 자동으로 스레드 안전
    }
    
    fun shutdown() {
        scope.cancel() // 리소스 정리도 간단
    }
}

// UI에서의 사용 (Compose)
@Composable
fun NexusScreen(nexus: Nexus) {
    val state by nexus.productionState.collectAsState()
    
    LaunchedEffect(Unit) {
        nexus.startProduction()
            .collect { progress ->
                // 백프레셔로 인해 UI는 버벅임 없이 부드럽게 업데이트
            }
    }
}

// 순수 Kotlin 모듈에서도 사용 가능
class PureKotlinModule {
    private val nexus = Nexus()
    
    fun processData() {
        // Flow는 플랫폼 독립적이므로 사용 가능
        nexus.productionState
            .map { ... }
            .filter { ... }
            .collect { ... }
    }
}

이처럼 Flow와 StateFlow는 이전 방식들의 한계를 모두 해결하고, 코루틴과의 자연스러운 통합, 백프레셔 지원, 플랫폼 독립성 등 다양한 이점을 제공한다. 특히 상태 관리에 있어서 스레드 안전성과 메모리 관리가 자동으로 이루어지며, 복잡한 비동기 처리도 간단하게 구현할 수 있다.

 

Flow와 StateFlow

Flow의 기본 개념

Flow는 코틀린에서 비동기적으로 계산될 수 있는 데이터 스트림을 나타내는 타입이다. Flow는 다음과 같은 특징을 가진다:

// Flow의 기본 구조
val flow = flow {
    for (i in 1..3) {
        delay(100) // 비동기 작업 시뮬레이션
        emit(i)    // 값 방출
    }
}

// Flow 수집
scope.launch {
    flow.collect { value ->
        println(value)
    }
}

Flow의 주요 특징

  1. Cold Stream: Flow는 collect를 호출할 때만 데이터를 방출한다
  2. 순차적 실행: 기본적으로 순차적으로 처리된다
  3. 취소 가능: 코루틴의 취소 메커니즘을 지원한다
  4. 백프레셔 지원: 데이터 생산과 소비 속도를 조절할 수 있다

 

StateFlow 이해하기

StateFlow는 Flow의 특별한 형태로, 항상 값을 가지고 있는 상태 홀더다. 우리의 연결체 구현에서 중요한 역할을 한다

// StateFlow 기본 예제
private val _state = MutableStateFlow(초기값)
val state: StateFlow<T> = _state.asStateFlow()

// 값 업데이트
_state.value = 새로운값
// 또는
_state.update { currentValue -> 
    // 새로운 값을 계산하고 반환
}

StateFlow의 특징

  1. Hot Stream: 수집하는 코루틴이 없어도 활성 상태를 유지
  2. 상태 보유: 항상 현재 값을 가짐
  3. 중복 제거: 동일한 값은 방출하지 않음
  4. 다중 구독자 지원: 여러 수집기가 동시에 값을 관찰할 수 있음

 

생산 프로세스, 시간 증폭 프로세스 구현

앞서 본 내용들을 바탕으로 연결체의 생산 프로세스와 시간 증폭 프로세스를 구현해 보자.

생산 프로세스

fun startProduction() {
    if (_productionQueue.value.size >= 5) return

    scope.launch {
        // 큐에 새 항목 추가
        _productionQueue.update { it + ProbeQueueItem() }

        // 생산이 진행중이지 않은 경우에만 시작
        if (_productionState.value is ProductionState.Idle) {
            startProductionProcess()
        }
    }
}

private fun startProductionProcess() {
    productionJob = scope.launch {
        while (_productionQueue.value.isNotEmpty()) {
            var accumulatedProgress = 0f
            var lastUpdateTime = System.currentTimeMillis()

            // 진행률 업데이트 루프
            while (accumulatedProgress < 1f) {
                val currentTime = System.currentTimeMillis()
                val deltaTime = currentTime - lastUpdateTime
                lastUpdateTime = currentTime

                // 시간 증폭 상태 확인
                val isCurrentlyBoosted = _chronoBoostState.value.isActive
                val progressIncrement = calculateProgress(deltaTime, isCurrentlyBoosted)
                
                accumulatedProgress = (accumulatedProgress + progressIncrement)
                    .coerceAtMost(1f)

                // 상태 업데이트
                updateProductionState(ProductionState.Producing(accumulatedProgress))
                updateQueueProgress(accumulatedProgress)
                
                delay(16) // ~60fps
            }

            // 생산 완료 처리
            completeProduction()
        }
    }
}

 

시간 증폭 프로세스

동일한 메커니즘으로 시간 증폭도 구현할 수 있다.

fun applyChronoBoost() {
    chronoBoostJob = scope.launch {
        updateChronoBoostState(
            ChronoBoostState(
                isActive = true,
                remainingTimeMillis = CHRONOBOOST_DURATION
            )
        )
        
        val startTime = System.currentTimeMillis()
        while (true) {
            val remainingTime = CHRONOBOOST_DURATION - (System.currentTimeMillis() - startTime)
            if (remainingTime <= 0) break
            
            updateChronoBoostState(
                _chronoBoostState.value.copy(remainingTimeMillis = remainingTime)
            )
            delay(100)
        }
    }
}

시간 증폭은 별도의 코루틴에서 관리되며, 상태를 통해 생산 속도에 영향을 준다. 생산 프로세스는 지속적으로 시간 증폭 상태를 확인하며 생산 속도에 반영해 결정하게 된다.

 

서버사이드에서의 응용

앞서 살펴본 Flow와 StateFlow를 활용한 생산 및 시간 증폭 프로세스 구현은 서버사이드 애플리케이션에서도 효과적으로 응용할 수 있다. 서버 환경에서는 다수의 클라이언트 요청을 효율적으로 처리하고, 실시간 상태 관리를 통해 시스템의 성능과 안정성을 유지하는 것이 중요하다. Kotlin의 Coroutine과 Flow는 이러한 요구사항을 충족시키는 강력한 도구를 제공한다.

실시간 게임 서버에서는 플레이어의 상태 업데이트, 게임 내 이벤트 처리, 자원 관리 등이 빈번하게 발생한다. Flow와 StateFlow를 활용하면 각 플레이어의 상태 변화를 비동기적으로 처리하고, 서버 자원의 효율적인 분배를 통해 높은 동시성을 유지할 수 있다. 예를 들어, 플레이어의 행동에 따른 자원 생산 속도를 조절하거나, 특정 이벤트 발생 시 일시적으로 생산 속도를 증가시키는 시간 증폭 메커니즘을 구현할 수 있다.

또한 실시간 데이터 스트리밍 이 필요한 금융 거래 시스템, IoT 데이터 수집, 실시간 분석 등 고속의 데이터 스트림을 처리해야 하는 서버 애플리케이션에서 Flow는 자연스러운 선택이 될 수 있다. 백프레셔 지원을 통해 데이터 생산자와 소비자 간의 속도 차이를 조절할 수 있으며, 상태 관리 기능을 통해 현재 데이터 처리 상태를 실시간으로 모니터링할 수 있다. 이는 데이터 손실을 방지하고 시스템의 안정성을 높이는 데 기여한다.

Spring Cloud OpenFeign이 feature complete 상태가 되었다. Spring Cloud OpenFeign 공식문서에서는 RestClient와 Http interface를 활용한 방식으로의 마이그레이션을 권장하고 있다. Spring 6에서 제공하는 HTTP Interface는 OpenFeign의 선언형 프로그래밍 방식을 대체할 수 있다. 이 글에서는 OpenFeign에서 HTTP Interface로 마이그레이션하는 방법을 설명한다.

 

의존성 구성

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.springframework.retry:spring-retry")
    implementation("org.springframework:spring-aspects")
}

먼저 필요한 의존성을 설정해야 한다. Spring Boot 3.x 이상을 사용하면 별도의 HTTP Interface 관련 의존성은 필요하지 않다. OpenFeign에서 사용했던 재시도 기능을 HttpExchange에서 재현하기 위해 spring-retry만 추가하면 된다

 

OpenFeign에서 HTTP Interface 코드 비교

인터페이스 정의 방식의 변화

기존 OpenFeign에서는 다음과 같이 클라이언트를 정의했다:

@FeignClient(name = "user-service", url = "${user.service.url}")
interface UserClient {
    @GetMapping("/users/{id}")
    fun getUser(@PathVariable id: Long): User
    
    @PostMapping("/users")
    fun createUser(@RequestBody user: User): User
}

HTTP Interface에서는 이렇게 변경된다:

@HttpExchange
interface UserClient {
    @GetExchange("/users/{id}")
    fun getUser(@PathVariable id: Long): User

    @PostExchange("/users")
    fun createUser(@RequestBody user: User): User
    
    @PutExchange("/users/{id}")
    fun updateUser(
        @PathVariable id: Long,
        @RequestBody user: User
    ): User

    @DeleteExchange("/users/{id}")
    fun deleteUser(@PathVariable id: Long)
}

주요 변경사항을 살펴보면:

  1. @FeignClient 어노테이션이 @HttpExchange로 변경됐다
  2. HTTP 메서드 어노테이션들이 각각 대응되는 Exchange 어노테이션으로 변경됐다
    • @GetMapping → @GetExchange
    • @PostMapping → @PostExchange
    • @PutMapping → @PutExchange
    • @DeleteMapping → @DeleteExchange
  3. 기본적인 요청/응답 구조는 유사하게 유지된다

이러한 변경은 Spring의 기본 기능을 활용하면서도, OpenFeign의 직관적인 인터페이스 스타일을 유지하고 있다.

 

재시도 전략

OpenFeign에서 제공하던 exponential backoff 기능은 Spring Retry를 통해 구현할 수 있다. 기존 OpenFeign이 별도의 의존성 없이 재시도 기능을 지원했던 반면 Http Interface를 사용할때는 Srping Retry가 필요하다.

@Retryable(
    include = [RuntimeException::class],
    maxAttempts = 3,
    backoff = Backoff(delay = 1000)
)
@GetExchange("/users/retry/{id}")
fun getUserWithRetry(
    @PathVariable id: Long,
    @RequestParam("failCount", required = false) failCount: Int?
): User

주요 설정:

  • @EnableRetry: 애플리케이션 레벨에서 재시도 기능 활성화, 
  • @Retryable: 메서드 레벨에서 재시도 정책 설정
  • include: 재시도할 예외 타입 지정
  • maxAttempts: 최대 재시도 횟수
  • backoff: 재시도 간격 설정

 

에러 처리

OpenFeign의 ErrorDecoder와 달리, RestClient는 defaultStatusHandler를 통해 HTTP 상태 코드별 에러 처리를 구현할 수 있다. 상태 코드에 따라 적절한 예외를 던지도록 설정이 가능하다.

@Configuration
class RestClientConfig {

    @Bean
    fun defaultRestClientBuilder(): RestClient.Builder {
        return RestClient
            .builder()
            .defaultStatusHandler(HttpStatusCode::isError) { _, response ->
                when (response.statusCode) {
                    HttpStatus.NOT_FOUND -> throw RestClientException("Resource not found")
                    HttpStatus.UNAUTHORIZED -> throw RestClientException("Unauthorized")
                    HttpStatus.BAD_REQUEST -> throw RestClientException("Invalid request")
                    else -> throw RestClientException("HTTP error: ${response.statusCode}")
                }
            }
    }

}

 

요청/응답 로깅

class LoggingInterceptor : ClientHttpRequestInterceptor {
    override fun intercept(
        request: HttpRequest,
        body: ByteArray,
        execution: ClientHttpRequestExecution
    ): ClientHttpResponse {
        logger.info("=== Request ===")
        logger.info("URI: {}", request.uri)
        logger.info("Method: {}", request.method)
        logger.info("Headers: {}", request.headers)

        val response = execution.execute(request, body)

        logger.info("=== Response ===")
        logger.info("Status: {}", response.statusCode)

        return response
    }
}

// 적용
.requestInterceptor(LoggingInterceptor())

OpenFeign의 Logger.Level 설정과 비교하여, RestClient는 더 유연한 로깅 방식을 제공한다

타임아웃 설정

@Configuration
class RestClientConfig(
    @Value("\${rest.client.base-url}")
    private val baseUrl: String
) {
    @Bean
    fun defaultRestClientBuilder(): RestClient.Builder {
        return RestClient
            .builder()
            .baseUrl(baseUrl)
            .defaultHeaders { headers ->
                headers.setBearerAuth(generateToken())
            }
            // 타임아웃 설정
            .requestFactory {
                HttpComponentsClientHttpRequestFactory().apply {
                    setConnectTimeout(Duration.ofSeconds(5))
                    setConnectionRequestTimeout(Duration.ofSeconds(5))
                }
            }
    }
}

OpenFeign의 타임아웃 설정과 달리, RestClient는 HttpComponentsClientHttpRequestFactory를 통해 더 섬세한 타임아웃 제어가 가능하다:

  1. setConnectTimeout: 서버와의 연결을 맺는데 걸리는 최대 시간
    • TCP 연결 수립 시간 제한
    • 네트워크 문제나 서버 응답 지연 시 빠른 실패 처리 가능
  2. setConnectionRequestTimeout: 커넥션 풀에서 커넥션을 가져오는데 걸리는 최대 시간
    • 커넥션 풀이 고갈되었을 때의 대기 시간 제한
    • 서비스 과부하 상황에서의 타임아웃 처리
  3. 선택적으로 ReadTimeout 설정도 가능하다
.requestFactory {
    HttpComponentsClientHttpRequestFactory().apply {
        setConnectTimeout(Duration.ofSeconds(5))
        setConnectionRequestTimeout(Duration.ofSeconds(5))
        setReadTimeout(Duration.ofSeconds(10))  // 데이터 읽기 타임아웃
    }
}

 

Conclusion

Spring Cloud OpenFeign에 비교해 HTTP Interface의 경우 다음과 같은 이점이 있다.

 

  • Spring 네이티브 통합
    • Spring 6의 기본 기능을 활용하여 더 나은 Spring 생태계와 통합된다
    • 별도의 외부 라이브러리 의존성이 감소한다
    • Spring의 지속적인 개선사항이 자동으로 적용된다
  • openFeign과 유사한 선언적 프로그래밍 방식을 유지한다
    • OpenFeign과 유사한 선언적 프로그래밍 방식을 유지한다
    • 기존 코드 구조를 크게 변경하지 않고도 전환할 수 있다
  • 안정성
    • Spring의 최신 HTTP 클라이언트 스택을 활용할 수 있다
    • Spring의 지속적인 개선사항이 자동으로 적용된다

 

예시에 사용한 코드는 다음 링크에 있다.
https://github.com/waterfogSW/HttpInterfaceExample

Kafka Streams의 stateful 프로세싱은 다른 시간에 도착하는 관련 이벤트들을 그룹화하여 관리하고 저장할 수 있는 기능을 제공한다. State Store는 중간 상태를 로컬 또는 원격으로 저장하는 구조로, Kafka 체인지 로그 토픽을 기반으로 한 내결함성을 갖추고 있다. 이러한 구조 덕분에 Kafka Streams 애플리케이션은 인스턴스 간 데이터 일관성을 유지하면서도 효율적인 스케일 아웃을 달성할 수 있다. 이번 글에서는 Kafka Streams의 state store 분산 방식, 데이터 일관성 유지 방법, 그리고 로컬 및 원격 상태 조회 방식을 중점으로 다룬다.

 

1. State Store의 역할과 필요성

Kafka Streams에서 State Store는 스트림 처리 중 발생하는 중간 상태를 로컬에 저장한다. 예를 들어 사용자 행동 데이터를 처리할 때, 각 사용자의 총 구매 횟수를 집계한다고 가정해보자. 각 이벤트가 들어올 때마다 사용자의 기존 구매 횟수를 가져와 업데이트해야 하는데, 이 중간 데이터는 빠르게 접근할 수 있는 로컬 저장소에 저장하는 것이 효율적이다.

예시 : 쇼핑 애플리케이션의 구매 집계 시스템

 

  • 사용자의 구매 이벤트가 Kafka 토픽으로 수신될 때마다, Kafka Streams 애플리케이션은 구매 이벤트를 기반으로 사용자의 총 구매 횟수를 누적하여 업데이트한다.
  • 각 Kafka Streams 인스턴스는 사용자 ID를 기준으로 파티셔닝된 데이터를 관리하며, 해당 파티션에 속한 사용자의 구매 횟수를 state store에 저장하여 빠르게 접근할 수 있다.

 

2. 스케일아웃 상황에서 State Store와 Changelog Topic의 동작 방식

Kafka Streams의 State Store와 Changelog Topic은 인스턴스 간 분산 처리와 데이터 일관성을 지원하기 위해 다음과 같은 방식으로 동작한다.

2-1. 파티셔닝과 할당

Kafka Streams는 데이터를 파티셔닝하여 관리하며, 각 인스턴스는 특정 파티션을 처리하는 방식으로 상태를 유지한다. 스케일아웃 시 인스턴스가 추가되면, 기존 인스턴스에 할당된 파티션을 새 인스턴스에 자동으로 재할당하여 데이터 처리가 더욱 분산된다. 각 인스턴스는 할당된 파티션에 대한 독립적인 State Store를 유지하게 된다.

2-2. State Store 데이터 복제와 일관성 유지

각 인스턴스의 State Store는 Changelog Topic에 변경 사항을 기록하여 다른 인스턴스에서도 해당 상태를 복구할 수 있도록 한다. 인스턴스가 확장되어 파티션이 새로운 인스턴스에 재배치될 때, 새로운 인스턴스는 Changelog Topic을 구독하여 필요한 데이터를 로드하고, 이후 업데이트 사항을 받아 데이터 일관성을 유지한다.

2-3. 스케일아웃 과정에서의 성능 최적화와 복구 지연 완화

스케일아웃 중 State Store의 데이터 크기가 클 경우 복구에 시간이 지연될 수 있다. Kafka Streams는 이를 완화하기 위해 상태 스냅샷(State Snapshot)상태 저장소 압축(State Store Compaction) 기법을 제공한다. 상태 스냅샷을 통해 특정 시점의 데이터를 저장하여 복구 시 전체 데이터를 다시 적용할 필요 없이 빠르게 복구할 수 있다. 압축 기법을 통해 불필요한 데이터를 정리하여 복구 시간을 단축할 수 있다.

2-4. 분산 환경에서의 데이터 일관성 보장

Kafka Streams는 분산 환경에서도 exactly-once 또는 at-least-once 처리 방식을 통해 데이터 일관성을 보장한다. State Store는 트랜잭션 방식으로 이벤트를 처리하며, 이벤트 처리 후 상태를 Changelog Topic에 커밋하여 장애 시 중복 처리를 방지한다. 이를 통해 스케일아웃 상황에서도 데이터 일관성을 유지할 수 있다.


예시 상황

쇼핑 애플리케이션에서 사용자 구매 이벤트를 처리하는 Kafka Streams 애플리케이션을 가정한다.

  1. 초기 인스턴스 구성
    사용자의 구매 횟수를 집계하는 애플리케이션이 초기에는 두 개의 인스턴스를 사용한다. 각 인스턴스는 사용자 ID를 기준으로 파티셔닝된 데이터를 할당받아 각 사용자의 총 구매 횟수를 State Store에 기록한다.
  2. 스케일아웃으로 인스턴스 추가
    트래픽이 증가하면서 새로운 인스턴스를 추가하여 스케일아웃한다. Kafka Streams는 기존 두 인스턴스에 할당된 파티션 중 일부를 새 인스턴스에 자동으로 재할당한다. 새 인스턴스는 할당된 파티션의 Changelog Topic을 구독하여 해당 파티션의 모든 상태 데이터를 로드하고, 이후 들어오는 구매 이벤트를 처리한다.

 

3. State Store 데이터 일관성 유지 및 장애 복구

Kafka Streams 애플리케이션은 상태가 필요한 스트림 처리 작업을 수행할 때 State Store를 사용한다. 이때 State Store에 저장된 데이터는 로컬에 유지되기 때문에, 특정 인스턴스에 장애가 발생하면 해당 인스턴스의 상태 정보를 잃어버릴 위험이 있다. Changelog Topic은 이러한 문제를 해결하기 위해 상태 변경 사항을 Kafka의 특별한 토픽에 기록해 두며, 다른 인스턴스가 해당 파티션을 맡게 되었을 때 이 기록을 바탕으로 State Store를 복구할 수 있게 해준다.

예시 :  사용자 클릭 집계에서 장애 상황 발생 시 데이터 복구

  • 웹사이트에서 발생하는 사용자 클릭 이벤트를 집계하는 Kafka Streams 애플리케이션을 가정한다. 각 인스턴스는 특정 파티션의 데이터를 처리하며, 클릭 수 집계를 state store에 기록한다.
  • 인스턴스 장애 시, 다른 인스턴스가 해당 파티션을 할당받아 state store를 복구하는데, 이때 필요한 데이터는 Changelog Topic에서 불러와 복구한다. 이를 통해 중단 없이 데이터 일관성을 유지하면서 집계를 이어갈 수 있다.

 

Changelog Topic은 Kafka의 파티션 구조를 활용해 각 State Store의 변경 사항을 토픽 파티션에 저장한다. Kafka Streams 애플리케이션이 여러 파티션을 사용하는 경우, 각 파티션별로 Changelog Topic도 같은 수의 파티션을 가지게 된다. 이를 통해 Changelog Topic의 데이터는 애플리케이션 파티션과 동일한 구조로 분산 저장되어 있어 복구할 때 효율적이다.

  • 데이터 일관성 유지: State Store는 기본적으로 RocksDB 같은 로컬 데이터베이스에 저장된다. State Store가 변경될 때마다 로그 커밋이 Changelog Topic으로 전송되어 데이터 일관성이 보장된다. Kafka Streams의 at-least-once 또는 exactly-once 처리 보장에 따라, 중복 데이터가 발생할 수 있는 상황에서도 Changelog Topic을 통해 마지막으로 커밋된 상태를 기준으로 일관성을 유지할 수 있다.

 

Kafka Streams의 인스턴스가 장애로 인해 중단될 때, Kafka는 해당 인스턴스가 맡고 있던 파티션을 다른 인스턴스에 할당한다. 이 새로운 인스턴스는 Changelog Topic에 저장된 데이터를 기반으로 State Store를 복구한다.

  • 복구 과정
    • 새 인스턴스가 할당된 파티션의 Changelog Topic을 구독하여 모든 변경 기록을 가져온다.
    • 각 이벤트를 순차적으로 적용해 State Store를 다시 생성한다.
  • 예를 들어, A 인스턴스가 클릭 수를 관리하던 도중 장애가 발생했다면, B 인스턴스가 해당 파티션을 할당받고, Changelog Topic에서 A 인스턴스의 상태 변경 사항을 받아 State Store를 복구해 클릭 수 집계를 이어간다.

 

4. Changelog Topic의 장점과 고려 사항

장점:

  • 내결함성(Fault Tolerance): Changelog Topic은 State Store의 상태를 Kafka 클러스터에 백업하는 역할을 하기 때문에, 로컬 데이터 손실이 발생해도 안전하게 복구할 수 있다.
  • 데이터 일관성: Kafka Streams는 장애 시 재처리를 통해 일관성을 유지하기 위해 Changelog Topic을 사용하며, 정확히 한 번(exactly-once) 또는 적어도 한 번(at-least-once) 처리 옵션을 설정할 수 있다.

고려 사항:

  • 디스크 사용량: Changelog Topic은 모든 State Store 변경 사항을 저장하므로, 대규모 데이터를 다루는 애플리케이션에서는 저장 용량이 커질 수 있다. 필요 시 TTL(Time-to-Live) 설정을 통해 오래된 데이터를 삭제하거나, 압축을 통해 용량을 줄일 수 있다.
  • 복구 지연 시간: 대량의 데이터를 가진 State Store를 복구할 때 지연이 발생할 수 있다. 복구 시간이 중요한 경우, State Store를 주기적으로 백업하거나 상태 스냅샷 기능을 사용해 복구 시간을 단축하는 것이 도움이 된다.

 

다음 코드의 실행 결과를 한번 예측해보자.

fun main() {
    val a: Int? = 128
    val b: Int? = 128
    println(a === b)

    val c: Int? = 1
    val d: Int? = 1
    println(c === d)
}

이 코드의 실행 결과는 false, true이다. 이러한 결과가 나오는 원인은 Kotlin(그리고 그 기반이 되는 Java)의 정수 객체 캐싱 메커니즘에 있다. 본 글에서는 이 현상의 원인과 그 배경에 있는 객체 캐싱 메커니즘에 대해 상세히 분석한다.

 

1. Java의 Autoboxing과 객체 캐싱

Java에서는 기본 타입(primitive type)과 그에 대응하는 래퍼 클래스(wrapper class)가 존재한다. 예를 들어, int에 대응하는 래퍼 클래스는 Integer이다. Java 5부터 도입된 Autoboxing 기능은 기본 타입과 래퍼 클래스 간의 자동 변환을 지원한다.

Integer a = 100; // Autoboxing: int -> Integer
int b = a; // Unboxing: Integer -> int

Java는 성능 최적화를 위해 특정 범위의 정수값에 대해 객체 캐싱을 수행한다. 기본적으로 -128부터 127까지의 정수값에 대해서는 미리 객체를 생성하여 캐시에 저장한다.

다음 Java 코드를 통해 이 동작을 확인할 수 있다:

public class IntegerCacheTest {
    public static void main(String[] args) {
        Integer a = 127;
        Integer b = 127;
        System.out.println(a == b); // true

        Integer c = 128;
        Integer d = 128;
        System.out.println(c == d); // false

        int e = 128;
        int f = 128;
        System.out.println(e == f); // true
    }
}

이 코드에서 a == b는 true를 반환하지만, c == d는 false를 반환한다. 이는 127이 캐시 범위 내에 있어 같은 객체를 참조하지만, 128은 캐시 범위를 벗어나 새로운 객체가 생성되기 때문이다. 반면 e == f는 기본 타입 int의 비교이므로 true를 반환한다.

 

2. Kotlin에서의 정수 비교

Kotlin은 Java의 이러한 특성을 기반으로 하면서도, 몇 가지 추가적인 특징을 제공한다. Kotlin에서 === 연산자는 참조 동등성을 비교한다. 즉, 두 객체가 메모리상에서 같은 객체인지를 확인한다.

앞서 제시한 Kotlin 코드의 결과를 분석하면 다음과 같다:

  1. 1은 -128에서 127 사이의 값이므로 캐시된 객체를 사용한다. 따라서 c와 d는 같은 객체를 참조하게 되어 true가 반환된다.
  2. 128은 캐시 범위를 벗어나는 값이므로 새로운 객체가 생성된다. 따라서 a와 b는 서로 다른 객체를 참조하게 되어 false가 반환된다.

 

3. Java의 래퍼 클래스와 캐싱

Java에서 이러한 캐싱 메커니즘은 여러 래퍼 클래스에 적용된다:

  • Boolean: true와 false
  • Byte: 모든 값 (-128 to 127)
  • Short: -128 to 127
  • Integer: -128 to 127 (기본값, 변경 가능)
  • Long: -128 to 127
  • Character: 0 to 127

다음 Java 코드를 통해 다양한 래퍼 클래스의 캐싱 동작을 확인할 수 있다:

public class WrapperCacheTest {
    public static void main(String[] args) {
        Boolean bool1 = true;
        Boolean bool2 = true;
        System.out.println("Boolean: " + (bool1 == bool2)); // true

        Byte byte1 = 127;
        Byte byte2 = 127;
        System.out.println("Byte: " + (byte1 == byte2)); // true

        Short short1 = 127;
        Short short2 = 127;
        System.out.println("Short: " + (short1 == short2)); // true

        Integer int1 = 127;
        Integer int2 = 127;
        System.out.println("Integer: " + (int1 == int2)); // true

        Long long1 = 127L;
        Long long2 = 127L;
        System.out.println("Long: " + (long1 == long2)); // true

        Character char1 = 127;
        Character char2 = 127;
        System.out.println("Character: " + (char1 == char2)); // true
    }
}

 

4. -XX:AutoBoxCacheMax 옵션

Java (그리고 Kotlin)에서는 -XX:AutoBoxCacheMax JVM 옵션을 통해 Integer 캐시의 최대값을 조정할 수 있다. 기본값은 127이지만, 이를 변경하여 더 큰 범위의 정수에 대해서도 캐싱을 적용할 수 있다.

java -XX:AutoBoxCacheMax=1000 YourProgram

이 옵션을 사용하면 지정된 값까지의 Integer 객체가 캐시되어, 해당 범위 내의 정수 비교 시 == 연산자 (Java) 또는 === 연산자 (Kotlin)가 true를 반환하게 된다.

다음은 이 옵션을 적용한 Java 프로그램의 예시이다:

public class AutoBoxCacheMaxTest {
    public static void main(String[] args) {
        Integer a = 1000;
        Integer b = 1000;
        System.out.println(a == b); // true (if -XX:AutoBoxCacheMax=1000 is set)
    }
}

 

5. Kotlin의 특징

Kotlin은 Java의 이러한 특성을 기반으로 하면서도, 몇 가지 추가적인 특징을 제공한다:

  1. == 연산자: Kotlin에서 ==는 구조적 동등성을 비교한다. 이는 내부적으로 .equals() 메소드를 호출하는 것과 동일하다.
  2. === 연산자: 참조 동등성을 비교한다. Java의 ==와 유사한 역할을 한다.

다음 Kotlin 코드를 통해 이러한 특징을 확인할 수 있다:

fun main() {
    val a: Int? = 128
    val b: Int? = 128
    println(a == b)  // true (구조적 동등성)
    println(a === b) // false (참조 동등성)

    val c: Int? = 127
    val d: Int? = 127
    println(c == d)  // true
    println(c === d) // true (캐시된 객체)
}

 

6. 주의사항

  1. 이 캐싱 메커니즘은 성능 최적화를 위한 기능이므로, 코드의 정확성을 이 동작에 의존해서는 안 된다.
  2. Java에서 값 비교를 위해서는 .equals() 메소드를 사용하는 것이 안전하다. Kotlin에서는 == 연산자를 사용하면 된다.
  3. 이 캐싱 메커니즘은 Integer/Int 외의 다른 숫자 타입(Byte, Short, Long 등)에도 적용되지만, 범위가 다를 수 있다.
  4. Float와 Double은 캐싱되지 않는다.

 

7. 결론

Java와 Kotlin에서의 정수 비교는 표면적으로는 단순해 보이지만, 내부적으로 복잡한 메커니즘이 작동하고 있다. 객체 캐싱은 성능 최적화를 위한 중요한 기능이지만, 개발자는 이에 의존하기보다는 항상 명확하고 안전한 비교 방법을 사용해야 한다.

이러한 내부 동작을 이해함으로써, 더 효율적이고 버그 없는 코드를 작성할 수 있다. 또한, 이는 Java와 Kotlin의 내부 동작 방식에 대한 깊이 있는 이해를 제공하여, 더 나은 프로그래밍 실력 향상에 기여할 수 있다.

마지막으로, 이러한 세부사항을 알고 있는 것은 중요하지만, 일반적인 애플리케이션 개발에서는 == (Kotlin) 또는 .equals() (Java)를 사용하여 값을 비교하는 것이 가장 안전하고 명확한 방법임을 인지해야 한다.

+ Recent posts