들어가며
스타크래프트2의 프로토스 종족에는 '시간 증폭'이라는 독특한 메커니즘이 있다. 연결체(Nexus)에서 프로브를 생산할 때 시간 증폭을 사용하면 일정 시간 동안 생산 속도가 빨라지는 기능이다. 이런 상태 관리와 비동기 처리가 필요한 시스템을 Kotlin의 Coroutine Flow를 활용해 구현해보자.
https://github.com/waterfogSW/starcraft-time-amplification
시스템 요구사항
- 연결체는 프로브를 생산할 수 있다
- 생산 큐는 최대 5개까지 가능
- 시간 증폭은 10초 동안 지속되며, 적용 시 생산 속도가 3배로 증가
- 생산 진행 상태를 실시간으로 관찰 가능
- 생산 중인 항목을 취소할 수 있음
연결체 프로브 생산 프로세스
1. 전통적인 방식 (Observer 패턴)
직관에 따라 Nexus(연결체) 를 구현한다면 Variable과 Observer 패턴을 사용하는 방식을 떠올릴 수 있다.
class Nexus {
private var productionState: ProductionState = ProductionState.Idle
private var probeCount: Int = 0
private val observers = mutableListOf<ProductionObserver>()
// 메모리 누수 위험이 있는 observer 등록/해제
fun addObserver(observer: ProductionObserver) {
observers.add(observer)
}
fun removeObserver(observer: ProductionObserver) {
observers.remove(observer)
}
// 스레드 안전성을 위해 모든 메서드에 동기화 필요
@Synchronized
fun updateState(newState: ProductionState) {
productionState = newState
observers.forEach { it.onStateChanged(newState) }
}
// 여러 상태를 변경할 때 데드락 위험
@Synchronized
fun completeProduction() {
productionState = ProductionState.Complete
probeCount++
observers.forEach {
it.onStateChanged(productionState)
it.onProbeCountChanged(probeCount)
}
}
}
// Activity/Fragment에서 메모리 누수 발생 가능
class NexusActivity : Activity(), ProductionObserver {
private val nexus = Nexus()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
nexus.addObserver(this) // 등록은 했지만...
}
// onDestroy에서 removeObserver를 호출하지 않으면 메모리 누수
}
전통적인 Observer 패턴 기반의 상태 관리 방식은 근본적인 문제점을 가지고 있다. 개발자가 직접 Observer를 등록하고 해제하는 과정에서 메모리 누수가 발생하기 쉽다. Activity나 Fragment의 생명주기와 Observer의 생명주기를 수동으로 동기화해야 하는데, 이는 실수하기 쉬운 작업이다.
멀티스레드 환경에서의 안전성도 보장하기 어렵다. 상태 변경 메소드마다 @Synchronized 어노테이션을 붙여야 하며, 이는 성능 저하를 일으킨다. 여러 상태를 동시에 변경할 때는 데드락이 발생할 위험도 있다. 결과적으로 복잡한 보일러플레이트 코드가 생기고 유지보수가 어려워진다.
2. LiveData
그럼 LiveData는 어떤가?, LiveData는 이러한 문제를 상당 부분 해결한다. 생명주기를 자동으로 관리하여 메모리 누수를 방지하고, 메인 스레드 안전성을 보장한다. Observer 등록과 해제를 수동으로 관리할 필요가 없어졌다.
class Nexus {
// 안드로이드 플랫폼 종속적
private val _productionState = MutableLiveData<ProductionState>()
val productionState: LiveData<ProductionState> = _productionState
private val _probeCount = MutableLiveData<Int>()
val probeCount: LiveData<Int> = _probeCount
// 메인 스레드에서만 값 변경 가능
fun updateState(newState: ProductionState) {
_productionState.value = newState
}
// 빠른 연속 업데이트 처리 불가능
fun startFastUpdates() {
viewModelScope.launch {
repeat(1000) { // 빠른 업데이트 발생
_productionState.value = ProductionState.Producing(it / 1000f)
delay(16) // 16ms마다 업데이트
// 백프레셔 처리 메커니즘 부재로 프레임 드롭 발생
}
}
}
}
그러나 LiveData는 안드로이드 플랫폼에 종속되어 있어 서버사이드에서는 사용할 수 없다. 역시 순수 Kotlin 모듈에서는 사용이 불가능하며, 이로 인해 멀티플랫폼 개발과 테스트가 제한된다.
더 큰 문제는 연속적인 데이터 업데이트 처리에 취약하다는 점이다. 센서 데이터와 같이 빠르게 들어오는 데이터 스트림을 처리할 때 데이터 손실이 발생하거나 UI 성능이 저하된다. 이는 LiveData가 백프레셔 처리 메커니즘을 가지고 있지 않기 때문이다.
3. Coroutine Flow
Flow는 이전 방식들의 문제를 해결하고 더 강력한 기능을 제공한다.
class Nexus {
private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())
// 플랫폼 독립적이며 상태 관리가 용이한 StateFlow
private val _productionState = MutableStateFlow<ProductionState>(ProductionState.Idle)
val productionState = _productionState.asStateFlow()
private val _probeCount = MutableStateFlow(0)
val probeCount = _probeCount.asStateFlow()
// 백프레셔가 적용된 생산 프로세스
fun startProduction() = flow {
var progress = 0f
while (progress < 1f) {
progress += 0.1f
emit(progress) // 백프레셔 자동 적용
delay(16)
}
}.buffer(Channel.BUFFERED) // 생산자-소비자 분리
.catch { e ->
_productionState.value = ProductionState.Idle
emit(0f)
}
// 복잡한 상태 조합도 쉽게 처리
val combinedState = combine(
productionState,
probeCount
) { state, count ->
CombinedState(state, count)
}.stateIn(scope, SharingStarted.Lazily, CombinedState())
// 스레드 안전한 상태 업데이트
fun updateState(newState: ProductionState) {
_productionState.value = newState // 자동으로 스레드 안전
}
fun shutdown() {
scope.cancel() // 리소스 정리도 간단
}
}
// UI에서의 사용 (Compose)
@Composable
fun NexusScreen(nexus: Nexus) {
val state by nexus.productionState.collectAsState()
LaunchedEffect(Unit) {
nexus.startProduction()
.collect { progress ->
// 백프레셔로 인해 UI는 버벅임 없이 부드럽게 업데이트
}
}
}
// 순수 Kotlin 모듈에서도 사용 가능
class PureKotlinModule {
private val nexus = Nexus()
fun processData() {
// Flow는 플랫폼 독립적이므로 사용 가능
nexus.productionState
.map { ... }
.filter { ... }
.collect { ... }
}
}
이처럼 Flow와 StateFlow는 이전 방식들의 한계를 모두 해결하고, 코루틴과의 자연스러운 통합, 백프레셔 지원, 플랫폼 독립성 등 다양한 이점을 제공한다. 특히 상태 관리에 있어서 스레드 안전성과 메모리 관리가 자동으로 이루어지며, 복잡한 비동기 처리도 간단하게 구현할 수 있다.
Flow와 StateFlow
Flow의 기본 개념
Flow는 코틀린에서 비동기적으로 계산될 수 있는 데이터 스트림을 나타내는 타입이다. Flow는 다음과 같은 특징을 가진다:
// Flow의 기본 구조
val flow = flow {
for (i in 1..3) {
delay(100) // 비동기 작업 시뮬레이션
emit(i) // 값 방출
}
}
// Flow 수집
scope.launch {
flow.collect { value ->
println(value)
}
}
Flow의 주요 특징
- Cold Stream: Flow는 collect를 호출할 때만 데이터를 방출한다
- 순차적 실행: 기본적으로 순차적으로 처리된다
- 취소 가능: 코루틴의 취소 메커니즘을 지원한다
- 백프레셔 지원: 데이터 생산과 소비 속도를 조절할 수 있다
StateFlow 이해하기
StateFlow는 Flow의 특별한 형태로, 항상 값을 가지고 있는 상태 홀더다. 우리의 연결체 구현에서 중요한 역할을 한다
// StateFlow 기본 예제
private val _state = MutableStateFlow(초기값)
val state: StateFlow<T> = _state.asStateFlow()
// 값 업데이트
_state.value = 새로운값
// 또는
_state.update { currentValue ->
// 새로운 값을 계산하고 반환
}
StateFlow의 특징
- Hot Stream: 수집하는 코루틴이 없어도 활성 상태를 유지
- 상태 보유: 항상 현재 값을 가짐
- 중복 제거: 동일한 값은 방출하지 않음
- 다중 구독자 지원: 여러 수집기가 동시에 값을 관찰할 수 있음
생산 프로세스, 시간 증폭 프로세스 구현
앞서 본 내용들을 바탕으로 연결체의 생산 프로세스와 시간 증폭 프로세스를 구현해 보자.
생산 프로세스
fun startProduction() {
if (_productionQueue.value.size >= 5) return
scope.launch {
// 큐에 새 항목 추가
_productionQueue.update { it + ProbeQueueItem() }
// 생산이 진행중이지 않은 경우에만 시작
if (_productionState.value is ProductionState.Idle) {
startProductionProcess()
}
}
}
private fun startProductionProcess() {
productionJob = scope.launch {
while (_productionQueue.value.isNotEmpty()) {
var accumulatedProgress = 0f
var lastUpdateTime = System.currentTimeMillis()
// 진행률 업데이트 루프
while (accumulatedProgress < 1f) {
val currentTime = System.currentTimeMillis()
val deltaTime = currentTime - lastUpdateTime
lastUpdateTime = currentTime
// 시간 증폭 상태 확인
val isCurrentlyBoosted = _chronoBoostState.value.isActive
val progressIncrement = calculateProgress(deltaTime, isCurrentlyBoosted)
accumulatedProgress = (accumulatedProgress + progressIncrement)
.coerceAtMost(1f)
// 상태 업데이트
updateProductionState(ProductionState.Producing(accumulatedProgress))
updateQueueProgress(accumulatedProgress)
delay(16) // ~60fps
}
// 생산 완료 처리
completeProduction()
}
}
}
시간 증폭 프로세스
동일한 메커니즘으로 시간 증폭도 구현할 수 있다.
fun applyChronoBoost() {
chronoBoostJob = scope.launch {
updateChronoBoostState(
ChronoBoostState(
isActive = true,
remainingTimeMillis = CHRONOBOOST_DURATION
)
)
val startTime = System.currentTimeMillis()
while (true) {
val remainingTime = CHRONOBOOST_DURATION - (System.currentTimeMillis() - startTime)
if (remainingTime <= 0) break
updateChronoBoostState(
_chronoBoostState.value.copy(remainingTimeMillis = remainingTime)
)
delay(100)
}
}
}
시간 증폭은 별도의 코루틴에서 관리되며, 상태를 통해 생산 속도에 영향을 준다. 생산 프로세스는 지속적으로 시간 증폭 상태를 확인하며 생산 속도에 반영해 결정하게 된다.
서버사이드에서의 응용
앞서 살펴본 Flow와 StateFlow를 활용한 생산 및 시간 증폭 프로세스 구현은 서버사이드 애플리케이션에서도 효과적으로 응용할 수 있다. 서버 환경에서는 다수의 클라이언트 요청을 효율적으로 처리하고, 실시간 상태 관리를 통해 시스템의 성능과 안정성을 유지하는 것이 중요하다. Kotlin의 Coroutine과 Flow는 이러한 요구사항을 충족시키는 강력한 도구를 제공한다.
실시간 게임 서버에서는 플레이어의 상태 업데이트, 게임 내 이벤트 처리, 자원 관리 등이 빈번하게 발생한다. Flow와 StateFlow를 활용하면 각 플레이어의 상태 변화를 비동기적으로 처리하고, 서버 자원의 효율적인 분배를 통해 높은 동시성을 유지할 수 있다. 예를 들어, 플레이어의 행동에 따른 자원 생산 속도를 조절하거나, 특정 이벤트 발생 시 일시적으로 생산 속도를 증가시키는 시간 증폭 메커니즘을 구현할 수 있다.
또한 실시간 데이터 스트리밍 이 필요한 금융 거래 시스템, IoT 데이터 수집, 실시간 분석 등 고속의 데이터 스트림을 처리해야 하는 서버 애플리케이션에서 Flow는 자연스러운 선택이 될 수 있다. 백프레셔 지원을 통해 데이터 생산자와 소비자 간의 속도 차이를 조절할 수 있으며, 상태 관리 기능을 통해 현재 데이터 처리 상태를 실시간으로 모니터링할 수 있다. 이는 데이터 손실을 방지하고 시스템의 안정성을 높이는 데 기여한다.
'Java & Kotlin' 카테고리의 다른 글
[Java/Kotlin] 정수 비교와 객체 캐싱: 1 === 1, 128 === 128의 결과는? (0) | 2024.09.20 |
---|---|
[Kotlin] Nullable Value Class 이슈 - JPA Entity (1) | 2024.02.06 |
[Kotlin] 제네릭스(Generics) - 불공변, 공변, 반공변 (1) | 2023.09.06 |
[Java] HotSpot VM의 Survivor영역은 왜 2개일까? (0) | 2023.08.28 |
[Kotlin] Kotlin DSL과 Kotest로 Java RestDocs 테스트 리팩토링 하기 (2) | 2023.01.15 |