Spring Cloud OpenFeign이 feature complete 상태가 되었다. Spring Cloud OpenFeign 공식문서에서는 RestClient와 Http interface를 활용한 방식으로의 마이그레이션을 권장하고 있다. Spring 6에서 제공하는 HTTP Interface는 OpenFeign의 선언형 프로그래밍 방식을 대체할 수 있다. 이 글에서는 OpenFeign에서 HTTP Interface로 마이그레이션하는 방법을 설명한다.

 

의존성 구성

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.springframework.retry:spring-retry")
    implementation("org.springframework:spring-aspects")
}

먼저 필요한 의존성을 설정해야 한다. Spring Boot 3.x 이상을 사용하면 별도의 HTTP Interface 관련 의존성은 필요하지 않다. OpenFeign에서 사용했던 재시도 기능을 HttpExchange에서 재현하기 위해 spring-retry만 추가하면 된다

 

OpenFeign에서 HTTP Interface 코드 비교

인터페이스 정의 방식의 변화

기존 OpenFeign에서는 다음과 같이 클라이언트를 정의했다:

@FeignClient(name = "user-service", url = "${user.service.url}")
interface UserClient {
    @GetMapping("/users/{id}")
    fun getUser(@PathVariable id: Long): User
    
    @PostMapping("/users")
    fun createUser(@RequestBody user: User): User
}

HTTP Interface에서는 이렇게 변경된다:

@HttpExchange
interface UserClient {
    @GetExchange("/users/{id}")
    fun getUser(@PathVariable id: Long): User

    @PostExchange("/users")
    fun createUser(@RequestBody user: User): User
    
    @PutExchange("/users/{id}")
    fun updateUser(
        @PathVariable id: Long,
        @RequestBody user: User
    ): User

    @DeleteExchange("/users/{id}")
    fun deleteUser(@PathVariable id: Long)
}

주요 변경사항을 살펴보면:

  1. @FeignClient 어노테이션이 @HttpExchange로 변경됐다
  2. HTTP 메서드 어노테이션들이 각각 대응되는 Exchange 어노테이션으로 변경됐다
    • @GetMapping → @GetExchange
    • @PostMapping → @PostExchange
    • @PutMapping → @PutExchange
    • @DeleteMapping → @DeleteExchange
  3. 기본적인 요청/응답 구조는 유사하게 유지된다

이러한 변경은 Spring의 기본 기능을 활용하면서도, OpenFeign의 직관적인 인터페이스 스타일을 유지하고 있다.

 

재시도 전략

OpenFeign에서 제공하던 exponential backoff 기능은 Spring Retry를 통해 구현할 수 있다. 기존 OpenFeign이 별도의 의존성 없이 재시도 기능을 지원했던 반면 Http Interface를 사용할때는 Srping Retry가 필요하다.

@Retryable(
    include = [RuntimeException::class],
    maxAttempts = 3,
    backoff = Backoff(delay = 1000)
)
@GetExchange("/users/retry/{id}")
fun getUserWithRetry(
    @PathVariable id: Long,
    @RequestParam("failCount", required = false) failCount: Int?
): User

주요 설정:

  • @EnableRetry: 애플리케이션 레벨에서 재시도 기능 활성화, 
  • @Retryable: 메서드 레벨에서 재시도 정책 설정
  • include: 재시도할 예외 타입 지정
  • maxAttempts: 최대 재시도 횟수
  • backoff: 재시도 간격 설정

 

에러 처리

OpenFeign의 ErrorDecoder와 달리, RestClient는 defaultStatusHandler를 통해 HTTP 상태 코드별 에러 처리를 구현할 수 있다. 상태 코드에 따라 적절한 예외를 던지도록 설정이 가능하다.

@Configuration
class RestClientConfig {

    @Bean
    fun defaultRestClientBuilder(): RestClient.Builder {
        return RestClient
            .builder()
            .defaultStatusHandler(HttpStatusCode::isError) { _, response ->
                when (response.statusCode) {
                    HttpStatus.NOT_FOUND -> throw RestClientException("Resource not found")
                    HttpStatus.UNAUTHORIZED -> throw RestClientException("Unauthorized")
                    HttpStatus.BAD_REQUEST -> throw RestClientException("Invalid request")
                    else -> throw RestClientException("HTTP error: ${response.statusCode}")
                }
            }
    }

}

 

요청/응답 로깅

class LoggingInterceptor : ClientHttpRequestInterceptor {
    override fun intercept(
        request: HttpRequest,
        body: ByteArray,
        execution: ClientHttpRequestExecution
    ): ClientHttpResponse {
        logger.info("=== Request ===")
        logger.info("URI: {}", request.uri)
        logger.info("Method: {}", request.method)
        logger.info("Headers: {}", request.headers)

        val response = execution.execute(request, body)

        logger.info("=== Response ===")
        logger.info("Status: {}", response.statusCode)

        return response
    }
}

// 적용
.requestInterceptor(LoggingInterceptor())

OpenFeign의 Logger.Level 설정과 비교하여, RestClient는 더 유연한 로깅 방식을 제공한다

타임아웃 설정

@Configuration
class RestClientConfig(
    @Value("\${rest.client.base-url}")
    private val baseUrl: String
) {
    @Bean
    fun defaultRestClientBuilder(): RestClient.Builder {
        return RestClient
            .builder()
            .baseUrl(baseUrl)
            .defaultHeaders { headers ->
                headers.setBearerAuth(generateToken())
            }
            // 타임아웃 설정
            .requestFactory {
                HttpComponentsClientHttpRequestFactory().apply {
                    setConnectTimeout(Duration.ofSeconds(5))
                    setConnectionRequestTimeout(Duration.ofSeconds(5))
                }
            }
    }
}

OpenFeign의 타임아웃 설정과 달리, RestClient는 HttpComponentsClientHttpRequestFactory를 통해 더 섬세한 타임아웃 제어가 가능하다:

  1. setConnectTimeout: 서버와의 연결을 맺는데 걸리는 최대 시간
    • TCP 연결 수립 시간 제한
    • 네트워크 문제나 서버 응답 지연 시 빠른 실패 처리 가능
  2. setConnectionRequestTimeout: 커넥션 풀에서 커넥션을 가져오는데 걸리는 최대 시간
    • 커넥션 풀이 고갈되었을 때의 대기 시간 제한
    • 서비스 과부하 상황에서의 타임아웃 처리
  3. 선택적으로 ReadTimeout 설정도 가능하다
.requestFactory {
    HttpComponentsClientHttpRequestFactory().apply {
        setConnectTimeout(Duration.ofSeconds(5))
        setConnectionRequestTimeout(Duration.ofSeconds(5))
        setReadTimeout(Duration.ofSeconds(10))  // 데이터 읽기 타임아웃
    }
}

 

Conclusion

Spring Cloud OpenFeign에 비교해 HTTP Interface의 경우 다음과 같은 이점이 있다.

 

  • Spring 네이티브 통합
    • Spring 6의 기본 기능을 활용하여 더 나은 Spring 생태계와 통합된다
    • 별도의 외부 라이브러리 의존성이 감소한다
    • Spring의 지속적인 개선사항이 자동으로 적용된다
  • openFeign과 유사한 선언적 프로그래밍 방식을 유지한다
    • OpenFeign과 유사한 선언적 프로그래밍 방식을 유지한다
    • 기존 코드 구조를 크게 변경하지 않고도 전환할 수 있다
  • 안정성
    • Spring의 최신 HTTP 클라이언트 스택을 활용할 수 있다
    • Spring의 지속적인 개선사항이 자동으로 적용된다

 

예시에 사용한 코드는 다음 링크에 있다.
https://github.com/waterfogSW/HttpInterfaceExample

분산 시스템의 안정성과 일관성을 보장하기 위해, 분산락(distributed lock)은 필수적인 메커니즘 중 하나입니다. Spring 프레임워크에서 분산락 구현은 주로 Aspect-Oriented Programming(AOP)를 통해 이루어집니다. 그러나 Spring AOP를 활용할 경우, 몇 가지 제약 사항과 단점이 존재합니다.

Spring AOP의 한계

  1. Pointcut 표현식 사용: Spring AOP는 pointcut 표현식을 통해 어드바이스(Advice) 적용 대상을 지정합니다. 이 표현식을 정확히 작성하는 것은 복잡하고, 오류가 발생하기 쉬운 작업입니다.
  2. 적용 여부 확인: Spring AOP를 적용한 후, 해당 AOP가 정상적으로 적용되었는지 런타임에서만 확인할 수 있습니다. 이는 개발 과정에서 시간을 소모하게 만듭니다.
  3. 내부 메서드 적용 불가: 클래스 내부에서 호출되는 private 메서드에는 Spring AOP가 적용되지 않습니다. 이는 내부 로직에 분산락을 적용하려 할 때 문제가 됩니다.
  4. SpEL 사용의 복잡성: 분산락의 키값을 지정하기 위해 Spring Expression Language(SpEL)를 사용하게 되는데, 이는 컴파일 타임에서는 오류를 확인할 수 없으며, 잘못된 값이 지정되면 런타임 예외를 발생시킵니다.

 

Kotlin Trailing Lambdas

이러한 Spring AOP의 한계를 극복하기 위해, Kotlin의 Trailing Lambdas를 활용한 방식을 도입할 수 있습니다. Kotlin에서 함수는 일급 객체이며, Trailing Lambdas를 사용하면, 함수를 더 직관적이고 유연하게 다룰 수 있습니다. 이를 통해 분산락 구현에 있어 AOP의 한계를 해결할 수 있습니다.

Kotlin에서 Trailing Lambdas는 함수형 프로그래밍의 강력한 특성 중 하나입니다. 이 개념을 이해하기 위해서는 먼저 Kotlin에서의 람다식과 고차 함수에 대한 이해가 필요합니다.

 

람다식(Lambda Expressions)

람다식은 간단히 말해 익명 함수입니다. 이는 함수를 간결하게 표현할 수 있게 해 주며, 다른 함수의 인자로 전달되거나 변수에 저장될 수 있습니다. Kotlin에서 람다식은 { }로 둘러싸여 표현됩니다. 예를 들어, 다음은 두 수의 합을 반환하는 람다식입니다:

val sum: (Int, Int) -> Int = { x, y -> x + y }

 

고차 함수(Higher-Order Functions)

고차 함수는 다른 함수를 인자로 받거나 함수를 결과로 반환하는 함수를 말합니다. Kotlin에서 함수는 일급 객체이므로, 변수에 할당될 수 있고 다른 함수의 인자나 반환 값으로 사용될 수 있습니다. 예를 들어, 다음 함수 calculate는 함수를 인자로 받고, 두 개의 정수와 함께 이 함수를 호출합니다:

fun calculate(x: Int, y: Int, operation: (Int, Int) -> Int): Int {
    return operation(x, y) 
}

여기서 operation 파라미터는 람다식을 받는 고차 함수의 예입니다.

 

Trailing Lambdas

Kotlin에서는 함수의 마지막 인자가 람다식인 경우, 람다식을 괄호 밖으로 빼내어 코드의 가독성을 높일 수 있습니다. 이를 Trailing Lambdas라고 합니다. 예를 들어, 위의 calculate 함수를 호출할 때, 다음과 같이 Trailing Lambdas를 사용할 수 있습니다:

val result = calculate(10, 20) { a, b -> a + b }

여기서 { a, b -> a + b }calculate 함수의 마지막 인자로 전달된 람다식입니다. 이 문법을 사용함으로써, 코드가 훨씬 자연스럽고 읽기 쉬워집니다.

 

Trailing Lambdas와 분산락

Trailing Lambdas의 이러한 특성을 분산락 구현에 적용하면, Spring AOP와 비슷하게, 비스니스 로직과 분산락 이라는 횡단 관심사를 분리할 수 있습니다. 특히, 분산락을 적용해야 하는 비즈니스 로직을 람다식으로 정의하고, 이를 고차 함수에 전달함으로써, 분산락 로직과 비즈니스 로직을 명확히 분리할 수 있습니다. 이는 코드의 가독성과 유지보수성을 크게 향상시킵니다.

Spring AOP를 활용한 분산락

@DistributedLock("UsePointDomainService.incrementByUserId:#{#userId}")  
override fun incrementByUserId(  
    userId: UUID,  
    amount: Long  
): UserSil {  
    return userPointRepository  
        .getByUserId(userId)  
        .increment(amount)  
        .also { userSilRepository.save(it) }  
}

 

Trailing Lambdas를 활용한 분산락

fun incrementByUserId(  
    userId: UUID,  
    amount: Long  
): UserSil = distributedLock("userPointDomainService:$userId") {  
    return@distributedLock userPointRepository  
        .getByUserId(userId)  
        .increment(amount)  
        .also { userSilRepository.save(it) }  
}

 

구현과정

우선 분산락 함수를 지원하기 위한 Aspect를 정의합니다.

@Component  
class DistributedLockAspect(  
    innerRedissonClient: RedissonClient,  
    innerDistributedLockTransactionProcessor: DistributedLockTransactionProcessor,  
) {  

    init {  
        redissonClient = innerRedissonClient  
        distributedLockTransactionProcessor = innerDistributedLockTransactionProcessor  
    }  

    companion object {  

        val logger = KotlinLogging.logger { }  

        lateinit var redissonClient: RedissonClient  
            private set  

        lateinit var distributedLockTransactionProcessor: DistributedLockTransactionProcessor  
            private set  

        const val REDISSON_LOCK_PREFIX = "LOCK:"  
    }  
}

Aspect는 스프링 빈으로 정의하여, RedissonClient, DistributedLockTransactionProcessor를 주입받아 분산락 함수에서 사용할 수 있도록 정적 멤버로 제공합니다.

RedissonClient의 경우 분산락획득을 위한 별도 인터페이스를 제공하기때문에 선택하였고, 분산락 모듈의 Config파일에서 빈으로 등록해 주었습니다.

@Configuration  
@ComponentScan(basePackages = ["com.studentcenter.support.lock"])  
class DistributedLockConfig (  
    private val redisProperties: RedisProperties  
){  

    @Bean  
    fun redissonClient(): RedissonClient {  
        val redisConfig = Config()  
        redisConfig  
            .useSingleServer()  
            .apply {  
                address = "redis://${redisProperties.host}:${redisProperties.port}"  
            }  
        return Redisson.create(redisConfig)  
    }  

}

락의 해제가 트랜잭션 커밋 이전에 이루어질 경우 동시성 문제가 발생할 수 있습니다. DistributedLockTransactionProcessor의 경우 락의 해제가 트랜잭션 커밋이후에 이루어지도록, 별도의 트랜잭션을 만들어 동작하게 만들었습니다.

단 이경우에는 자식 트랜잭션 커밋 이후 부모 트랜잭션에서 예외가 발생하면, 자식 트랜잭션이 롤백되지는 않기 때문에 유의해서 사용해야 합니다.

@Component  
class DistributedLockTransactionProcessor {  

    @Transactional(propagation = Propagation.REQUIRES_NEW)  
    fun <T> proceed(function: () -> T): T {  
        return function()  
    }  

}

이후 프로젝트 내에서 전역적으로 활용할 수 있도록 패키지레벨의 분산락 함수를 구현합니다.

/**
 * Distributed Lock
 * 적용 대상 함수는 별도의 트랜잭션으로 동작하며 커밋 이후 락을 해제한다.
 * @param key       락 식별자
 * @param waitDuration  락 대기 시간
 * @param leaseDuration 락 유지 시간
 * @param function  적용 대상 함수
 * @return          함수 실행 결과
 */
fun <T> distributedLock(
    key: String,
    waitDuration: Duration = 5.seconds,
    leaseDuration: Duration = 3.seconds,
    function: () -> T,
): T {
    val rLock: RLock = (DistributedLockAspect.REDISSON_LOCK_PREFIX + key)
        .let { DistributedLockAspect.redissonClient.getLock(it) }

    try {
        val available: Boolean = rLock.tryLock(
            waitDuration.inWholeSeconds,
            leaseDuration.inWholeSeconds,
            TimeUnit.SECONDS,
        )
        check(available) {
            throw IllegalStateException("Lock is not available")
        }

        return DistributedLockAspect.distributedLockTransactionProcessor.proceed(function)
    } finally {
        try {
            rLock.unlock()
        } catch (e: IllegalMonitorStateException) {
            DistributedLockAspect.logger.info {
                "Redisson Lock Already UnLock Key : $key"
            }
        }
    }

}

 

테스트

분산락 미적용

override fun incrementByUserId(  
    userId: UUID,  
    amount: Long  
): UserSil {  
    return userSilRepository  
        .getByUserId(userId)  
        .increment(amount)  
        .also { userSilRepository.save(it) }  
}
@DisplayName("UserSilDomainService 통합 테스트")  
class UserSilDomainServiceIntegrationTest(  
    private val userSilDomainService: UserSilDomainService,  
) : IntegrationTestDescribeSpec({  

    describe("유저 실 증가 동시성 테스트") {  
        context("분산락 적용 X") {  
            it("동시성 테스트") {  
                // arrange  
                val userId = UuidCreator.create()  
                userSilDomainService.create(userId)  

                val threadCount = 10  
                val incrementAmount = 10L  

                // act  
                runBlocking {  
                    repeat(threadCount) {  
                        launch(Dispatchers.Default) {  
                            userSilDomainService.incrementByUserId(userId, incrementAmount)  
                        }                    }  
                }  


                // assert  
                val userSil: UserSil = userSilDomainService.getByUserId(userId)  
                userSil.amount shouldBe incrementAmount * threadCount  
            }  
        }  
    }  

})

img2

락이 적용되어있지 않을때에는 동시성 문제가 발생해 포인트가 기댓값에 미치지 못해 테스트가 실패했습니다.

 

분산락 적용

override fun incrementByUserId(  
    userId: UUID,  
    amount: Long  
): UserSil = distributedLock("UseSilDomainService.incrementByUserId:$userId") {  
    return@distributedLock userSilRepository  
        .getByUserId(userId)  
        .increment(amount)  
        .also { userSilRepository.save(it) }  
}

img1

분산락을 적용했을때는 기댓값만큼 포인트가 증가해 테스트가 성공하는것을 확인할 수 있었습니다.

 

분산락이 적용된 함수의 단위테스트는 어떻게 처리해야하나?

이렇게 trailing lambdas문법을 통해 분산락을 적용하게 되면, DistributedLockAspect에 의존하고 있어 해당 컴포넌트가 스프링 빈으로 등록되지 않는 유닛테스트 환경에서는 에러가 발생하게 됩니다. 이때는 mockk라이브러리를 활용해 DistributedLock에 대한 static 모킹을 통해 해결할 수 있습니다.

    beforeTest {
        mockkStatic("com.studentcenter.weave.support.lock.DistributedLockKt")
        every {
            distributedLock<Any?>(any(), any(), any(), captureLambda())
        } answers {
            val lambda: () -> Any? = arg<(()-> Any?)>(3)
            lambda()
        }
    }

 

마치며

Kotlin의 Trailing Lambdas를 활용한 분산락 구현은 Spring AOP의 한계를 극복하고, 더 안정적이고 유지보수가 쉬운 코드를 작성할 수 있게 만들어 주었습니다. 또한 Kotlin의 풍부한 언어 기능을 활용하여 보다 Kotlin 스럽게 분산 시스템에서의 동시성 관리를 수행할 수 있었습니다.

Reference

선착순 쿠폰 시스템을 만드는 과정에서, Producer와 Consumer를 분리해 서버를 설계하면서 도메인을 중복으로 작성하게 되어 관리포인트가 늘어난다는 느낌을 받았습니다. 이를 해결하기 위해 헥사고날 아키텍처를 활용해 도메인 모듈을 따로 분리하고 하나의 도메인 의존성을 Producer모듈과 Consumer모듈에서 의존하게 하여 도메인 관리포인트를 하나로 줄이는 방식으로 설계해 보았습니다.

아래 내용을 주로 담고있습니다.

  • 헥사고날 아키텍처와 멀티모듈을 활용해 설계한 선착순 쿠폰 시스템의 구조를 설명합니다.
  • 멀티모듈, 헥사고날 아키텍처의 개념
  • 구현 방법
    • Component Scan의 패키지 범위설정
    • Component Scan의 LazyInit 옵션
    • application.yaml 파일의 include

전체 코드는 https://github.com/waterfogSW/coupon-service 에서 확인하실 수 있습니다.

헥사고날 아키텍처

헥사고날 아키텍처는 계층형 아키텍처의 대안으로 Alistair Cockburn에 의해 고안되었습니다. 기존 계층형 아키텍처의 경우 모든 계층이 영속성 계층을 토대로 만들어 지기 때문에 비즈니스 로직의 변경이 어렵고, 테스트 또한 영속성 컴포넌트에 의존성이 생기기 때문에 테스트의 복잡도의 높이는 등 여러 문제점이 존재합니다.

업무규칙은 사용자 인터페이스나 데이터베이스와 같은 저수준의 관심사로 인해 오염되어서는 안되며, 원래 그대로의 모습으로 남아 있어야 한다. 이상적으로는 업무 규칙을 표현하는 코드는 반드시 시스템의 심장부에 위치해야 하며, 덜 중요한 코드는 이 심장부에 플러그인 되어야 한다. 업무 규칙은 시스템에서 가장 독립적이며 가장 많이 재사용할 수 있는 코드여야 한다.
- 로버트 C. 마틴, 클린 아키텍처

 

헥사고날 아키텍처는 이러한 문제점을 의존역전을 통에 의존성이 도메인을 향하게 하면서 이러한 문제를 해결합니다. 애플리케이션의 핵심 로직을 외부 시스템으로 부터 격리시켜 외부 요소의 변화에 의해 핵심 로직이 영향을 받지 않도록 합니다. 이를 통해 핵심 로직을 테스트하기 쉽고, 변경하기 쉽게 만듭니다.

헥사고날 아키텍처는 포트(Port)와 어댑터(Adapter) 아키텍처라고도 불리며, 포트와 어댑터는 다음과 같은 특징을 가지고 있습니다.

포트 (Port)

포트는 애플리케이션의 핵심 로직과 외부 세계 사이의 인터페이스 역할을 합니다.

Primary/Driving Ports

외부 시스템(예: 사용자 인터페이스, 웹 요청 등)으로부터 애플리케이션의 핵심 로직으로의 데이터 흐름을 다룹니다. 외부 요소들은 외부 포트를 통해 애플리케이션의 핵심 기능을 활용하게 됩니다.

Secondary/Driven Ports

애플리케이션 핵심 로직으로부터 외부 시스템이나 인프라스트럭처(예: Database, Message Queue, 외부 API)로의 데이터 흐름을 다룹니다. 애플리케이션은 Secondary Port를 통해 외부 자원을 필요로 할 때 접근합니다.

어댑터 (Adapter)

어댑터는 포트와 외부 세계 사이에서 데이터 형식을 변환하고, 호출을 중개하는 역할을 합니다. 포트와 외부 시스템 간의 중간자로서, 서로 다른 시스템 간의 통신을 가능하게 합니다.

Primary/Driving Adapters

애플리케이션의 핵심 로직에 접근하는 외부 시스템(예: 웹 서버, GUI 클라이언트 등)을 다룹니다. 외부 요청을 애플리케이션의 포트에 맞는 형식으로 변환합니다.

Secondary/Driven Adapters

애플리케이션에서 필요한 외부 자원(예: 데이터베이스, 파일 시스템, 외부 API 등)을 다룹니다. 핵심 로직의 요청을 외부 자원에 맞는 형식으로 변환합니다.

쿠폰 시스템 멀티모듈 구성

 

앞서 설명한 헥사고날 아키텍처의 각 레이어를 차용하여 모듈을 설계하였습니다.

Domain Hexagon

도메인 모델을 정의하는 모듈이므로, Domain Hexagon으로 명명하였습니다.

애플리케이션의 핵심 로직을 담당하는 모듈로, 도메인을 정의하고 있습니다.

  • POJO로 구현되어 있습니다.
  • common 모듈내 라이브러리 외 의존성을 가지지 않습니다.

Use Case Hexagon

도메인에 대한 유스케이스를 정의하는 모듈이므로, UseCase Hexagon으로 명명하였습니다.

도메인에 대한 Use Case를 정의하는 모듈입니다.

  • 외부 시스템과의 통신을 위한 Port 인터페이스를 정의합니다.
  • Domain 외 Spring Boot, Common 모듈내 라이브러리 의존성을 가집니다.

Infrastructure Hexagon

외부 인프라에대한 의존성을 정의하는 모듈이므로, Infrastructure Hexagon으로 명명하였습니다.

  • 외부 인프라와의 통신을 위한 Secondary Adapter를 정의합니다.
    • Kafka Producer Adapter, Persistence Adapter, Redis Adapter 등
  • Domain, Use Case 외 Spring Boot, Common 모듈내 라이브러리 의존성을 가집니다.
  • 외부 인프라별로 Module을 분리해 관리합니다
    • coupon-infrastructure/kafka
    • coupon-infrastructure/persistence
    • coupon-infrastructure/redis
  • 각 모듈별로 config class를 정의하며, application-{module name}.yaml 파일을 통해 각 모듈별로 설정을 관리합니다.
    • application-kafka.yaml
    • application-persistence.yaml
    • application-redis.yaml

Bootstrap Hexagon

여러 의존성을 조합해 하나의 애플리케이션 서버를 구성하는 모듈이므로 Bootstrap Hexagon으로 명명하였습니다.

  • 외부 요청을 받아 Use Case를 실행하기 위한 Primary Adapter를 정의합니다.
    • RestController, Kafka Consumer 등
  • Domain, Use Case, Infrastructure 외 Spring Boot, Common 모듈내 라이브러리 의존성을 가집니다.
  • Spring Boot Application을 정의합니다.
  • UseCase Hexagon과 Infrastructure Hexagon을 의존합니다.

Infrastructure 모듈과 같이 애플리케이션이 제공할 각 서비스별로 Module을 분리해 제공하는 방법도 고려해보았지만, 애플리케이션 서버마다 제공하는 API가 서로 다른경우가 훨씬 많기 때문에 모듈분리의 효용성이 떨어진다고 판단하여 Bootstrap 모듈에 Primary Adapter를 정의하였습니다.

구현 과정

Component Scan에 Lazy 옵션을 적용하기

UseCase 모듈의 UseCaseConfig

@Configuration
@ComponentScan(basePackages = ["com.waterfogsw.coupon.usecase"], lazyInit = true)
class UseCaseConfig

UseCase 모듈은 API Server, Worker Server모두 공통적으로 의존하는 모듈입니다.

하지만, API서버의 경우 Persistence모듈 의존성이 존재하지 않아, 쿠폰을 생성하고 DB에 저장하는 CreateCouponUseCase를 컴포넌트 스캔으로 등록하면 에러가 발생합니다. 마찬가지로 Worker 서버의 경우 Redis 모듈 의존성이 존재하기 않기 때문에 발행된 쿠폰의 개수를 Redis에서 조회하고 발행 이벤트를 Kafka에 전달하는 IssueCouponUseCase 를 컴포넌트 스캔으로 등록하면 에러가 발생합니다.

이러한 문제를 해결하기 위해 Usecase 모듈의 ComponentScan에 lazyInit 속성을 true로두어, UseCase를 사용하는 시점에 Bean을 생성하도록 하였습니다. 이렇게 하면 API서버는 CreateCouponUseCase에 의존성을 갖는 클래스가 존재하지 않기때문에 CreateCouponUseCase를 빈으로 등록하지 않습니다. 마찬가지로 Worker 서버는 IssueCouponUseCase를 의존성을 갖는 클래스가 존재하지 않아 IssueCouponUseCase를 빈으로 등록하지 않습니다.

Component Scan을 각 모듈내에서 수행되도록 하기위해서 각 모듈의 ComponentScan어노테이션의 패키지 위치를 잘 지정해 주어야 합니다.

@SpringBootApplication어노테이션이 @ComponentScan어노테이션을 내장하고 있기 때문에 위치를 잘 지정해 두어야 합니다. 예를들어 @SpringBootApplication어노테이션이있는 Application 클래스를 com.waterfogsw.coupon 패키지에 위치시키면, 다른 모듈에 있는 클래스라 하더라도 com.waterfogsw.coupon.* 패키지 하위의 모든 클래스들은 컴포넌트 스캔의 대상이 되기 때문에 의도치 않은 빈이 등록될 수 있습니다.

모듈별 Config, application.yaml

@Configuration
@Import(
  value = [
    KafkaProducerConfig::class,
    RedisConfig::class,
    UseCaseConfig::class
  ]
)
class ApiConfig 
@Configuration
@Import(
    PersistenceConfig::class,
    UseCaseConfig::class,
)
class WorkerConfig

위와 같이 각 모듈별로 Config 클래스를 정의하고, BootStrap Hexagon에 위치하는 API, Worker모듈의 Config는 각 모듈이 의존하고있는 모듈의 Config를 Import하도록 구현하였습니다.

또한 application-{module name}.yaml 파일을 통해 각 모듈별로 설정을 관리하기 때문에 API 모듈과 Worker 모듈의 application.yaml 파일은 다음과 같이 각 하위 의존성의 application.yaml을 include하도록 구현하였습니다.

API 모듈의 application.yaml

server:
  port: 8080
  shutdown: graceful
spring:
  profiles:
    active: local
    include:
      - kafka
      - redis
      - usecase

Worker 모듈의 application.yaml

server:
  port: 8081
  shutdown: graceful
spring:
  profiles:
    active: local
    include:
      - persistence
      - usecase

 

DB 채번을 줄이기 위해 Ulid 사용하기

보통 JPA를 사용하면 Primary Key를 @GeneratedValue 어노테이션을 통해 자동으로 생성합니다. 이런 전략을 사용하면 데이터베이스에서 자동으로 채번을 해주기 때문에 개발자는 신경쓰지 않아도 됩니다. 하지만 이런 전략은 데이터 베이스에 대한 채번을 유발하며, 영속화 되기 전까진 id값을 null로 유지해야한다는, 다소 데이터 베이스에 의존적으로 코드를 작성하게 되는 단점이 있습니다.

이런 단점을 해결하기 위해 UUID를 사용하는 방법이 있습니다. UUID는 데이터베이스에 의존적이지 않고, 영속화 되기 전까지 id값을 null로 유지할 필요가 없습니다. 하지만 UUID는 생성 순서를 보장하지 않기 때문에 목록 조회 시 정렬기준으로 삼기에는 적합하지 않아 성능적인 이점을 가져갈 수 없습니다. (UUIDv6, v7의 경우 시간순 정렬이 가능합니다)

이때 ULID를 활용할 수 있습니다. ULID는 UUID와 호환성을 가지면서 시간순으로 정렬할 수 있는 특징을 가지고 있습니다. 물론 ULID도단점이 있습니다. UUID가 나노초까지 시간순을 보장해주는 반면 ULID는 밀리초까지만 시간순을 보장해줍니다. 이를 보완하기위해 ULID Creator 라이브러리는 Monotonic ULID를 제공합니다. Monotonic ULID는 동일한 밀리초가 있다면 다음에 생성되는 ULID의 밀리초를 1 증가시켜서 생성하여 앞서 말한 단점을 보완합니다.

DB에 Primary Key를 채번하지 않고 도메인에서 직접 생성해서 사용하는 이러한 방식이 도메인이 외부에 의존하지 않고 직접 식별자를 생성할 수 있어서 클린 아키텍처에서는 더 큰 장점으로 느껴졌습니다.

 

부록

카프카를 사용하는 이유

선착순 이벤트의 경우 수많은 유저가 동시에 요청을 보내게 됩니다. 이때 API Server 에서 DB에 쿠폰 row를 생성하는 작업을 직접 처리를 한다면 DB에 부하가 몰리게 되어 다른 요청을 처리할 수 없는 상태가 됩니다.

이러한 문제점을 해결하기 위해 API서버가 직접 DB에 생성을 요청하는 대신 쿠폰 생성 이벤트를 발행해 카프카에 전달하고 이벤트를 작업 서버가 전달받아 DB에 쿠폰 row를 생성하게 합니다.

API 서버는 DB에 직접 접근하지 않고 카프카에 이벤트를 발행하고 응답을 전달하는 역할만 하게 되어 API 서버의 부하를 줄일 수 있으며, 작업서버는 카프카에 발행된 이벤트를 순차적으로 처리하기 때문에 DB에 동시에 부하가 몰리는 것을 방지할 수 있습니다.

Reference

최근에 지인에게 테스트코드에서 Transactional 어노테이션을 붙이면 테스트가 성공하고, 어노테이션을 제거하면 테스트가 실패하는데 그 이유를 모르겠다는 질문을 받았다.

문제상황

당시 문제상황을 간단한 샘플코드로 재현해 보았다.

@Entity  
@Table(name = "orders")  
class Order(  
    id: Long? = null,  
) {  

    @Id  
    @GeneratedValue(strategy = GenerationType.IDENTITY)  
    var id: Long? = id  
        private set  

    var isParcelRegistered: OrderParcelStatus = OrderParcelStatus.PADDING  
        private set  


    fun registerParcel() {  
        isParcelRegistered = OrderParcelStatus.REGISTERED  
    }  

    fun registerParcelFailed() {  
        isParcelRegistered = OrderParcelStatus.REGISTER_FAILED  
    }  


}

@Service  
class OrderService(  
    private val orderRepository: OrderRepository,  
) {  

    @Transactional  
    fun applyParcelEvent(parcelEvent: ParcelEvent) {  
        when (parcelEvent) {  
            is ParcelEvent.Success -> {  
                val order: Order = orderRepository.findById(parcelEvent.orderId).get()  
                order.registerParcel()  
            }  

            is ParcelEvent.Failure -> {  
                val order: Order = orderRepository.findById(parcelEvent.orderId).get()  
                order.registerParcelFailed()  
            }  
        }  
    }  

}

@SpringBootTest  
class TestCode(  
    @Autowired  
    private val orderService: OrderService,  
    @Autowired  
    private val orderRepository: OrderRepository,  
) {  

    @Test  
    @DisplayName("택배 등록 실패 이벤트가 발생하면, 주문의 택배 등록 상태가 실패로 변경된다.")  
    fun checkOrderStatus() {  
        // given  
        val order = Order()  
        val savedOrder: Order = orderRepository.save(order)  
        val failedParcelEvent = ParcelEvent.Failure(savedOrder.id!!)  


        // when  
        orderService.applyParcelEvent(failedParcelEvent)  

        // then  
        Assertions.assertThat(savedOrder.isParcelRegistered).isEqualTo(OrderParcelStatus.REGISTER_FAILED)  
    }  
}

 

처음 Order객체를 생성하면 택배 등록 상태를 나타내는 OrderParcelStatus 의 값이 PADDING인채로 생성된다. 이때 OrderService에 외부에서 발생한 택배 등록 이벤트(PercelEvent)를 전달해 주면 이벤트의 성공 실패 종류에 따라 Order객체의 OrderParcelStatus의 값을 변경한다.

테스트 코드는 택배 등록 이벤트가 전달되면 Order 객체의 값을 변경하는지 검증한다. 우선 택배 등록 실패이벤트가 발생한것을 가정하고 이를 orderService에 전달한다. 그러면 기존에 저장되어있던 Order의 택배 등록 상태가 실패로 변경되기를 기대하고 테스트를 수행한다.

이때 테스트는 실패한다. 택배 등록 실패이벤트를 전달했음에도 불구하고, 택배의 등록상태가 변경되지 않는다. 왜 그런걸까?

savedOrder 객체는 영속성 컨텍스트에서 관리되지 않는다

@SpringBootTest  
class TestCode(  
    @Autowired  
    private val entityManager: EntityManager,  
    @Autowired  
    private val orderService: OrderService,  
    @Autowired  
    private val orderRepository: OrderRepository,  
) {  

    @Test  
    @DisplayName("택배 등록 실패 이벤트가 발생하면, 주문의 택배 등록 상태가 실패로 변경된다.")  
    fun checkOrderStatus() {  
        // given  
        val order = Order()  
        val savedOrder: Order = orderRepository.save(order)  
        val failedParcelEvent = ParcelEvent.Failure(savedOrder.id!!)  

        println(entityManager.contains(savedOrder)) // false

        // when  
        orderService.applyParcelEvent(failedParcelEvent)  

        // then  
        Assertions.assertThat(savedOrder.isParcelRegistered).isEqualTo(OrderParcelStatus.REGISTER_FAILED)  
    }  
}

테스트 코드에서 위와같이 entityManager를 통해 현재 영속성 컨텍스트에 savedOrder가 있는지 여부를 조회하면 false값이 나온다. 따라서 JPA의 영속성 컨텍스트에서 관리되지 않고있다는 점을 확인할 수 있다. 영속성 컨텍스트에서 관리되지 않으니, 당연히 savedOrder 객체는 더티체킹의 효과를 볼 수 없다.

영속성 컨텍스트는 트랜잭션내에서 관리된다. 때문에 orderRepository로 가져온 savedOrder객체는 트랜잭션 밖, 즉 영속성 컨텍스트 밖이기에 영속성 컨텍스트에 의해 관리되지 않는 객체이다.

때문에 이후 orderService.applyParcelEvent(...)를 통해 DB에 저장된 order의 OrderParcelStatus를 변경하더라도, savedOrder의 값은 변화가 없다.

영속성 컨텍스트는 트랜잭션 범위 내에서 관리된다


영속성 컨텍스트의 종류는 두가지가 있다.

  • Transaction-scoped persistence context
  • Extended-scoped persistence context

Transaction-scoped persistence context의 경우 트랜잭션 단위로 영속성 컨텍스트가 유지되는 반면, Extended-scoped persistence context의 경우 컨테이너가 관리하는 영속성 컨텍스트로 여러 트랜잭션에 걸쳐 사용될 수 있다.

확장된 퍼시스턴스 컨텍스트를 갖는 EntityManager는 트랜 잭션 스코프의 퍼시스턴스 컨텍스트에서 사용되는 EntityManager 처럼 멀티스레드에서 안전한 프록시 오브젝트가 아니라 멀티스레드에서 안전하지 않은 실제 EntityManager다.
- 토비의 스프링 3.1 Vol.2 289p

public @interface PersistenceContext {  

    ...

    /**  
     * (Optional) Specifies whether a transaction-scoped persistence context     * or an extended persistence context is to be used.  
     */     
    PersistenceContextType type() default PersistenceContextType.TRANSACTION;  
    ...
}

PersistenceContext의 어노테이션을 직접 확인해보면 Transaction-scoped persistence context를 기본값으로 사용한다. Transaction-scoped persistence context를 사용하면 여러 측면에서 다음과 같은 장점이 있다.

효율성
transaction-scoped 영속성 컨텍스트는 트랜잭션이 끝나면 자동으로 종료되므로, 불필요한 리소스 사용을 줄일 수 있다. 반면에 Extended 영속성 컨텍스트는 트랜잭션이 끝나도 종료되지 않고 계속 유지되므로, 리소스 사용이 더 많을 수 있다.

일관성
transaction-scoped 영속성 컨텍스트는 트랜잭션 범위 내에서 일관성을 보장한다. 즉, 트랜잭션 내에서 수행된 모든 데이터베이스 작업은 일관된 상태를 유지한다. 반면에 Extended 영속성 컨텍스트는 여러 트랜잭션에 걸쳐 사용될 수 있으므로, 일관성을 보장하기 어려울 수 있다.

간단함
transaction-scoped 영속성 컨텍스트는 트랜잭션을 시작하고 종료하는 것만으로 영속성 컨텍스트를 관리할 수 있다. 반면에 Extended 영속성 컨텍스트는 수동으로 관리해야 하므로, 사용하기 복잡할 수 있다.

실제 트랜잭션을 부여하는 JpaTransactionManager를 살펴보면 EntityManager를 생성해 트랜잭션 단위로 관리하는것을 볼 수 있다.

우선 PlatformTransactionManager의 기본적인 동작 일부를 구현하고 있는 AbstractPlatformTransactionManager를 살펴보면 startTransaction부분에서 TransactionManager의 doBegin메서드를 호출하고있는것을 볼 수 있다.

private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,  
       boolean nested, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {  

    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);  
    DefaultTransactionStatus status = newTransactionStatus(  
          definition, transaction, true, newSynchronization, nested, debugEnabled, suspendedResources);  
    this.transactionExecutionListeners.forEach(listener -> listener.beforeBegin(status));  
    try {  
       doBegin(transaction, definition);  
    }  
    catch (RuntimeException | Error ex) {  
       this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, ex));  
       throw ex;  
    }  
    prepareSynchronization(status, definition);  
    this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, null));  
    return status;  
}

doBegin메서드는 TransactionManager를 구현하고 있는 JpaTransactionManager에서 확인할 수 있는데, entityManager를 생성하고 있는것을 볼 수 있다.

@Override  
protected void doBegin(Object transaction, TransactionDefinition definition) {  
    JpaTransactionObject txObject = (JpaTransactionObject) transaction;  

    if (txObject.hasConnectionHolder() && !txObject.getConnectionHolder().isSynchronizedWithTransaction()) {  
       throw new IllegalTransactionStateException(  
             "Pre-bound JDBC Connection found! JpaTransactionManager does not support " +  
             "running within DataSourceTransactionManager if told to manage the DataSource itself. " +  
             "It is recommended to use a single JpaTransactionManager for all transactions " +  
             "on a single DataSource, no matter whether JPA or JDBC access.");  
    }  

    try {  
       if (!txObject.hasEntityManagerHolder() ||  
             txObject.getEntityManagerHolder().isSynchronizedWithTransaction()) {  
          EntityManager newEm = createEntityManagerForTransaction();  
          if (logger.isDebugEnabled()) {  
             logger.debug("Opened new EntityManager [" + newEm + "] for JPA transaction");  
          }  
          txObject.setEntityManagerHolder(new EntityManagerHolder(newEm), true);  
       }  

       EntityManager em = txObject.getEntityManagerHolder().getEntityManager();
       ...

JpaTransactionManager의 doBegin 메서드는 새로운 트랜잭션을 시작할 때 호출된다. 이 메서드에서는 EntityManager를 생성하고 이를 JpaTransactionObject에 저장한다.

JpaTransactionObject는 현재 트랜잭션의 상태를 추적하는 데 사용되며, 트랜잭션 범위 내에서 사용되는 EntityManager를 보유하고 있다. 이렇게 하면 트랜잭션 범위 내에서 동일한 EntityManager 인스턴스가 사용될 수 있다. 이렇게 JpaTransactionManager는 트랜잭션 단위로 EntityManager를 관리한다.

하지만 항상 영속성 컨텍스트의 생존 범위가 무조건 트랜잭션 범위 내 인것은 아니다. open session in view를 사용하면 영속성 컨텍스트의 범위를 트랜잭션 범위 밖까지 확장할 수 있다.

Reference

한 테이블에 여러개의 데이터를 한번에 생성해야하는 API를 설계하면서, 성능을 개선하기 위해 각 데이터를 별개의 트랜잭션으로 나누어 DB에 병렬적으로 삽입을 요청하는 과정에서 데드락 이슈를 만나게 되었습니다.
이를 간단한 예시코드와 함께 해결해 나가는 과정을 다루어 보겠습니다.

초기 구현

요구 사항

  • 제품 배치 생성 API
  • 배치 내의 각 제품 생성 요청은 별개의 트랜잭션으로 처리되어야 한다.
  • 제품명의 중복은 허용되지 않는다.

테이블 설계

CREATE TABLE product  
(  
    id            BIGINT            NOT NULL AUTO_INCREMENT PRIMARY KEY,  
    name        VARCHAR(255)     NOT NULL,  
    description    TEXT            NOT NULL  
);  
CREATE UNIQUE INDEX Product_name_uindex ON product (name);

위와 같은 요구사항을 해결하기 위해 다음과 같이 구현을 진행했습니다.

배치 생성 UseCase

interface ProductBatchCreateUseCase {  

    fun invoke(commands: List<Command>): List<Result>  

    data class Command(  
        val name: String,  
        val description: String,  
    )  

    sealed class Result {  
        data class Success(val postId: PostId) : Result()  
        data class Failure(  
            val name: String,  
            val message: String  
        ) : Result()  
    }  
}

@Service  
class ProductBatchCreate(  
    private val productCreateUseCase: ProductCreateUseCase  
) : ProductBatchCreateUseCase {  

    override fun invoke(commands: List<ProductBatchCreateUseCase.Command>): List<ProductBatchCreateUseCase.Result> {  
        val results: List<ProductCreateUseCase.Result> = commands.map {  
            productCreateUseCase.invoke(  
                command = ProductCreateUseCase.Command(  
                    name = it.name,  
                    content = it.description  
                )  
            )  
        }  

        return results.map {  
            when (it) {  
                is ProductCreateUseCase.Result.Success -> mapToSuccess(it)  
                is ProductCreateUseCase.Result.Failure -> mapToFailure(it)  
            }  
        }  
    }  

    private fun mapToSuccess(result: ProductCreateUseCase.Result.Success): ProductBatchCreateUseCase.Result.Success {  
        return ProductBatchCreateUseCase.Result.Success(postId = result.id)  
    }  

    private fun mapToFailure(result: ProductCreateUseCase.Result.Failure): ProductBatchCreateUseCase.Result.Failure {  
        return ProductBatchCreateUseCase.Result.Failure(  
            name = result.title,  
            message = result.message,  
        )  
    }  
}

단건 생성 UseCase

interface ProductCreateUseCase {  

    fun invoke(command: Command): Result  

    data class Command(  
        val name: String,  
        val content: String,  
    )  

    sealed class Result {  
        data class Success(val id: PostId) : Result()  
        data class Failure(  
            val title: String,  
            val message: String  
        ) : Result()  
    }  
}

@Service  
class ProductCreate(  
    private val productRepository: ProductRepository  
) : ProductCreateUseCase {  

    @Transactional(propagation = Propagation.REQUIRES_NEW)  
    override fun invoke(command: ProductCreateUseCase.Command): ProductCreateUseCase.Result {  
        val product: Product = Product.create(  
            name = command.name,  
            content = command.content,  
        )  

        if (isDuplicateTitle(product.name)) {  
            return ProductCreateUseCase.Result.Failure(  
                title = product.name,  
                message = "중복된 상품 명입니다."  
            )  
        }  

        val savedProduct: Product = productRepository.save(product)  

        return ProductCreateUseCase.Result.Success(id = savedProduct.id)  
    }  

    private fun isDuplicateTitle(title: String): Boolean {  
        return productRepository.existsByName(title)  
    }  
}

단건 생성의 경우 별개의 트랜잭션으로 처리됨을 보장하고 명시하기 위해 Propagation을 REQUIRES_NEW로 두었습니다.

중복여부는 Duplicate Key 에러로도 확인할 수 있지만, DataIntegrityViolationException 안에 포함된 메시지를 파싱해 중복으로 인한 에러인지 혹은 다른 에러인지 판단해야하고 DB에 의존적이라는 문제가 있습니다.

때문에 제품의 중복 여부를 애플리케이션 레벨에서도 확인할 수 있어야 한다는 판단에 중복확인을 위한 validation 로직을 작성하게 되었습니다. 배치 생성이 정상적으로 이루어 지는지 통합 테스트를 통해 확인해 보았습니다.

@SpringBootTest  
@ContextConfiguration(classes = [IntegrationTestSetup::class])  
class ProductBatchCreateTest(  
    private val sut: ProductBatchCreateUseCase  
) : FunSpec({  

    test("제품 배치 생성") {  
        // given  
        val commands: List<ProductBatchCreateUseCase.Command> = (0 until 10).map {  
            ProductBatchCreateUseCase.Command(  
                name = "제품",  
                description = "제품 $it 설명"  
            )  
        }  

        // when  
        val results: List<ProductBatchCreateUseCase.Result> = sut.invoke(commands)  

        // then  
        results.filterIsInstance<ProductBatchCreateUseCase.Result.Success>().size shouldBe 10  
    }

    test("제품 배치 생성 시간 측정") {  
    // given  
    val commands: List<ProductBatchCreateUseCase.Command> = (0 until 1000).map {  
        ProductBatchCreateUseCase.Command(  
            name = "제품 $it",  
            description = "제품 $it 설명"  
        )  
    }  

    // when, then  
    measureTimeMillis { sut.invoke(commands) }  
        .also { time -> println("제품 배치 생성 시간: $time ms") }
    }
})
제품 배치 생성 시간: 6998 ms

통합테스트의 경우 TestContainer를 통해 운영 코드와 동일한 환경에서 테스트 했습니다. 제품 배치 생성의 경우 한개 생성 요청을 처리하면 그다음 생성 요청을 순차적으로 처리하는 방식으로 구현되어 있는데, 이러한 방식의 구현은 효율적이지 않습니다.

 

코루틴 병렬처리 적용

각 생성 요청은 하나의 트랜잭션으로 묶여있을 필요가 없기 때문에 병렬적으로 처리 가능합니다. 이를 위해 배치 생성 요청을 코루틴을 활용한 병렬 처리 방식으로 개선하고 생성 시간을 측정해 보았습니다.

    override suspend fun invoke(commands: List<ProductBatchCreateUseCase.Command>): List<ProductBatchCreateUseCase.Result> =  
    coroutineScope {  
        val deferredResults: List<Deferred<ProductCreateUseCase.Result>> = commands.map { command ->  
            async(Dispatchers.IO) {  
                productCreateUseCase.invoke(  
                    ProductCreateUseCase.Command(  
                        name = command.name,  
                        content = command.description  
                    )  
                )  
            }  
        }  

        deferredResults.awaitAll().map { result ->  
            when (result) {  
                is ProductCreateUseCase.Result.Success -> mapToSuccess(result)  
                is ProductCreateUseCase.Result.Failure -> mapToFailure(result)  
            }  
        }  
    }
제품 배치 생성 시간: 1593 ms

1000개의 데이터를 생성하는 테스트로 확인해본 결과 수행시간이 6998ms에서 1593ms으로 개선되었습니다. 오차를 감안하더라도 크게 개선된 수치입니다.

성능은 개선되었지만, 새로운 문제점이 발생했습니다. 만약 배치 생성 요청 내에서 중복된 제품명이 존재하는 경우 ProductCreate의 isDuplicateTitle 메서드가 제품명의 중복을 정상적으로 확인하지 못하고, productRepository.save(product)를 호출하게 됨으로써, DB의 DataIntegrityViolationException을 발생시키게 된다는 점입니다.

A 트랜잭션
select * from product where name = "중복이름"
insert into product (name, description) values ('중복이름', 'test');


B 트랜잭션
select * from product where name = "중복이름"
insert into product (name, description) values ('중복이름', 'test');

현재 MySQL의 트랜잭션 격리 수준은 기본값인 REPETABLE_READ격리 수준인데, 병렬적으로 수행되는 두 트랜잭션이 트랜잭션 수행전 스냅샷을 기준으로 select 쿼리를 수행하기 때문에 여러 트랜잭션이 중복된 name을 가지고 있더라도 select시에는 조회가 되지 않기 때문에 insert query는 수행되게 됩니다.

이러한 문제를 해결하기 위해 isDuplicateTitle메서드의 쿼리를 select .. for update를 사용해 쓰기잠금을 걸어 개선해 보려 시도해 보았습니다.

 

데드락

could not execute statement [Deadlock found when trying to get lock; try restarting transaction] [insert into product (description,name) values (?,?)]; SQL [insert into product (description,name) values (?,?)]

org.springframework.dao.CannotAcquireLockException: could not execute statement [Deadlock found when trying to get lock; try restarting transaction] [insert into product (description,name) values (?,?)]; SQL [insert into product (description,name) values (?,?)]

테스트를 수행해본 결과 위와 같은 오류의 데드락을 확인할 수 있었습니다. MySQL 콘솔에서는 SHOW ENGINE INNODB STATUS 명령어를 통해 최근에 발생한 데드락에 대한 정보를 확인할 수 있었습니다.

**트랜잭션 (1)**

(1) HOLDS THE LOCK(S):

RECORD LOCKS space id 2 page no 5 n bits 80 index Product_name_uindex of table `test`.`product` trx id 2984 lock_mode X locks gap before rec
Record lock, heap no 8 PHYSICAL RECORD: n_fields 2; compact format; info bits 0

(1) WAITING FOR THIS LOCK TO BE GRANTED:

RECORD LOCKS space id 2 page no 5 n bits 80 index Product_name_uindex of table `test`.`product` trx id 2984 lock_mode X locks gap before rec insert intention waiting_
Record lock, heap no 8 PHYSICAL RECORD: n_fields 2; compact format; info bits 0

**트랜잭션 (2)**

(2) HOLDS THE LOCK(S):

RECORD LOCKS space id 2 page no 5 n bits 80 index Product_name_uindex of table `test`.`product` trx id 2992 lock_mode X locks gap before rec
Record lock, heap no 8 PHYSICAL RECORD: n_fields 2; compact format; info bits 0_

(2) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 2 page no 5 n bits 80 index Product_name_uindex of table `test`.`product` trx id 2992 lock_mode X locks gap before rec insert intention waiting

Record lock, heap no 8 PHYSICAL RECORD: n_fields 2; compact format; info bits 0_

 

발생한 로그를 분석해 보면 다음과 같습니다.

트랜잭션 1

  • 상태: 삽입 중, 락 대기 중
  • 행위: product 테이블에 insert 쿼리 실행
  • 락 정보:
    • 보유 중인 락: Product_name_uindex에 대한 X 락 및 갭 락(gap lock)
    • 대기 중인 락: 동일한 인덱스에 대한 X 갭 락 및 삽입 의도 락(insert intention lock)

트랜잭션 2

  • 상태: 삽입 중, 락 대기 중
  • 행위: product 테이블에 insert 쿼리 실행
  • 락 정보:
    • 보유 중인 락: Product_name_uindex에 대한 X 락 및 갭 락(gap lock)
    • 대기 중인 락: 동일한 인덱스에 대한 X 갭 락 및 삽입 의도 락(insert intention lock)

여기서 한가지 의문이 들 수 도 있는데, 두 트랜잭션이 보유중인 락이 베타적 락(lock_mode : X)이라는 점입니다. 일반적으로 베타적 락은 동시에 소유할수 없다고 알고 있는데, 로그엔 두 트랜잭션이 동일한 위치에 베타적 락을 소유하고 있는것으로 보입니다. 여기에 대한 답은 MySQL의 공식문서에서 확인해 볼 수 있습니다.

MySQL 공식문서 - Gap lock
Gap locks in InnoDB are “purely inhibitive”, which means that their only purpose is to prevent other transactions from inserting to the gap. Gap locks can co-exist. A gap lock taken by one transaction does not prevent another transaction from taking a gap lock on the same gap. There is no difference between shared and exclusive gap locks. They do not conflict with each other, and they perform the same function.

갭 락(gap lock)의 경우 여러 트랜잭션이 동일한 갭에 대해 갭락을 가질 수 있으며, 충돌하지 않는다고 설명하고 있습니다. 이러한 의문점이 해소가 된다면 위의 로그를 통해 데드락의 발생원인을 명확히 파악할 수 있습니다.

실제로 존재하지 않는 데이터에 대해 select * for update를 쿼리를 날려 갭락이 발생했으며, 갭락은 여러 트랜잭션에서 공존할 수 있기 때문에 두 트랜잭션이 동시에 획득한 상태가 됩니다.

이때 각 트랜잭션은 이후 삽입쿼리를 위해 삽입 의도 락(insert intention lock)을 획득하려 하는데 이는 갭락과 호환되지 않기 때문에 두 트랜잭션이 서로의 갭 락을 기다리게 되고, 트랜잭션이 끝나지 않으므로 gap lock을 획득하지 못한 상태가 유지되며 데드락이 발생하게 된 것 입니다.

단순히 Gap lock으로 인한 데드락을 없애기 위해서는 Repeatable Read격리수준을 사용해 Gap락을 명시적으로 사용하지 않도록 하면 됩니다. Repeatable Read 격리수준에서는 트랜잭션이 시작될 때 읽은 데이터가 트랜잭션이 종료될 때까지 변경되지 않음을 보장합니다.

이를 위해서는 다른 트랜잭션이 특정 간격에 데이터를 삽입 하지 않음이 보장되어야 하는데, MySQL에서는 이를 갭락으로 해결합니다.

때문에 Read Committed 격리수준을 사용하면 갭 락의 사용을 명시적으로 해제할 수 있습니다. 다만 binary log format을 row로 설정하는 등의 격리수준 하향에 따른 부수효과에 대한 대응도 염두에 두어야 합니다.

 

Synchronized 키워드 사용

하지만 select ... for update는 Read Committed 레벨에서 어떠한 잠금도 발생시키지 않기때문에, 여전히 중복된 값을 삽입하여 DataIntegrityViolationException을 발생시키게 됩니다. 또한 Repeatable Read 레벨에서는 앞서 보았던 바와 같이 Deadlock을 발생시켰습니다.

애플리케이션 레벨에서 완전히 로직을 제어하기 위해 validation로직을 구현한 것이니 분산락을 활용하거나, synchronized 키워드를 사용하는것이었는데, 현재는 단일 노드에서 발생하는 동시성 처리가 주된 관심사이기에 synchronized 키워드를 통해 애플리케이션 레벨의 락을 잡는것이 좋겠다는 생각도 들었습니다.

하지만 @Synchronized와 @Transactional을 같이 사용하는 경우 몇가지 잠재적인 문제가 발생합니다.

 

  • 트랜잭션 전파 문제:
    • @Synchronized는 메소드 진입 시점에 락을 획득하고 메소드 종료 시점에 락을 해제합니다.
    • @Transactional은 실제로는 프록시를 통해 동작하며, 메소드 호출 전에 트랜잭션을 시작하고 메소드 완료 후 커밋/롤백합니다.
    • 이 두 어노테이션의 순서와 동작 방식의 차이로 인해 의도한 대로 동작하지 않을 수 있습니다.
  • 동시성 제어 수준의 불일치:
    • @Synchronized는 JVM 레벨의 동시성을 제어합니다.
    • @Transactional은 데이터베이스 레벨의 트랜잭션을 제어합니다.
    • 두 레벨의 동시성 제어를 혼용하면 복잡성이 증가하고 예상치 못한 동작이 발생할 수 있습니다.

 

요구사항 재분석

사실 이쯤에서 처음부터 다시 생각해보면, 우리가 해결하고자 했던 핵심 요구사항을 되짚어볼 필요가 있습니다.

  1. 제품 배치 생성 API 구현
  2. 각 제품 생성은 별개의 트랜잭션으로 처리
  3. 제품명 중복 불가
  4. 성능 최적화 필요

우리는 성능 최적화를 위해 코루틴을 활용한 병렬 처리를 시도했고, 이 과정에서 데드락과 같은 동시성 문제에 직면했습니다. 이를 해결하기 위해 여러 방식을 시도했지만, 각각의 접근 방식은 한계점을 보였습니다:

Select For Update 시도

  • Repeatable Read에서 Gap Lock으로 인한 데드락 발생
  • Read Committed에서는 잠금이 제대로 동작하지 않음

격리 수준 조정 시도

  • Read Committed로 낮추면 동시성 문제 발생 가능
  • Binary log format 설정 변경 등 부가적인 설정 필요

Synchronized 키워드 시도

  • @Transactional과의 조합에서 예상치 못한 동작 가능성
  • JVM 레벨과 DB 레벨의 동시성 제어 불일치

 

이러한 시행착오를 거치면서, 결국 두 가지 현실적인 해결방안으로 좁혀볼 수 있습니다:

Batch Insert 방식

@Service
class ProductBatchCreate(
    private val productRepository: ProductRepository,
    private val jdbcTemplate: JdbcTemplate,
) : ProductBatchCreateUseCase {

    companion object {

        private const val BATCH_SIZE = 1000
        private const val INSERT_QUERY = """
            INSERT INTO product (name, description) 
            VALUES (?, ?)
        """
    }

    @Transactional
    override fun invoke(commands: List<ProductBatchCreateUseCase.Command>): List<ProductBatchCreateUseCase.Result> {
        if (commands.isEmpty()) return emptyList()

        val existingNames = findExistingProductNames(commands)
        val validProducts = filterValidProducts(commands, existingNames)

        // 배치 사이즈로 나누어 처리
        val batchResults = validProducts.chunked(BATCH_SIZE).flatMap { batch ->
            batchInsertProducts(batch)
        }

        // 실패 및 성공 결과 생성
        val failures = existingNames.map {
            ProductBatchCreateUseCase.Result.Failure(
                name = it,
                message = "중복된 상품 명입니다."
            )
        }

        val successes = batchResults.map {
            ProductBatchCreateUseCase.Result.Success(postId = it)
        }

        return failures + successes
    }

    private fun batchInsertProducts(products: List<Product>): List<Long> {
        if (products.isEmpty()) return emptyList()

        val keyHolder = GeneratedKeyHolder()

        jdbcTemplate.batchUpdate(
            PreparedStatementCreator { connection ->
                connection.prepareStatement(INSERT_QUERY, Statement.RETURN_GENERATED_KEYS)
            },
            object : BatchPreparedStatementSetter {
                override fun setValues(
                    ps: PreparedStatement,
                    i: Int
                ) {
                    val product = products[i]
                    ps.setString(1, product.name)
                    ps.setString(2, product.description)
                }

                override fun getBatchSize() = products.size
            },
            keyHolder
        )

        return keyHolder.keyList.map {
            (it["GENERATED_KEY"] as Number).toLong()
        }
    }

    private fun findExistingProductNames(commands: List<ProductBatchCreateUseCase.Command>): Set<String> {
        val names = commands.map { it.name }
        return productRepository.findAllByNameIn(names)
            .asSequence()
            .map { it.name }
            .toSet()
    }

    private fun filterValidProducts(
        commands: List<ProductBatchCreateUseCase.Command>,
        existingNames: Set<String>
    ): List<Product> {
        return commands.asSequence()
            .filterNot { existingNames.contains(it.name) }
            .map { command ->
                Product.create(
                    name = command.name,
                    description = command.description,
                )
            }
            .toList()
    }
}
제품 배치 생성 시간: 416 ms

 

 

장점:

  • 현저히 빠른 성능
  • 데드락 위험 없음
  • 중복 체크의 효율성
  • 단일 트랜잭션으로 일관성 보장

단점:

  • 개별 트랜잭션 요구사항 충족 못함
  • 대량 데이터의 경우 메모리 사용량 증가
  • 배치 처리를 위한 추가 로직 구현 및 DB Connection 설정 필요 (jpa saveAll 메서드는 batch insert가 아님!)

 

분산 락을 활용한 병렬 처리

// 트랜잭션 처리를 위한 별도 서비스
@Service
class ProductCreateTransactionService(
    private val productRepository: ProductRepository
) {
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    fun createProduct(command: Command): Result {
        if (productRepository.existsByName(command.name)) {
            return Result.Failure(command.name, "중복된 상품명입니다.")
        }
        
        val product = Product.create(command.name, command.content)
        val savedProduct = productRepository.save(product)
        return Result.Success(savedProduct.id)
    }
}

// 배치 처리 서비스
@Service
class ProductBatchCreate(
    private val productCreateTransactionService: ProductCreateTransactionService,
    private val redisLockRegistry: RedisLockRegistry
) : ProductBatchCreateUseCase {
    
    override suspend fun invoke(commands: List<Command>): List<Result> = coroutineScope {
        commands.map { command ->
            async(Dispatchers.IO) {
                val lock = redisLockRegistry.obtain("product:${command.name}")
                try {
                    if (lock.tryLock(1, TimeUnit.SECONDS)) {
                        productCreateTransactionService.createProduct(command) // 프록시를 통한 호출
                    } else {
                        Result.Failure(command.name, "락 획득 실패")
                    }
                } finally {
                    lock.unlock()
                }
            }
        }.awaitAll()
    }
}

장점:

  • 개별 트랜잭션 요구사항 충족
  • 확장성 있는 동시성 제어
  • 병렬 처리를 통한 성능 향상

단점:

  • 추가 인프라(Redis) 필요
  • 구현 복잡도 증가
  • 락 타임아웃 설정의 어려움

 

최종적으로, 시스템의 요구사항과 제약사항을 고려할 때 다음과 같은 선택이 가능합니다:

  1. 만약 개별 트랜잭션 요구사항이 절대적이라면:
    • 분산 락 방식을 선택
    • 적절한 타임아웃과 재시도 정책 수립
    • 모니터링 체계 구축
  2. 만약 개별 트랜잭션이 권장사항 수준이라면:
    • Batch Insert 방식을 선택
    • 배치 사이즈 최적화
    • 메모리 사용량 모니터링

 

이러한 고민을 통해, 우리는 단순히 기술적인 해결책을 찾는 것을 넘어서서, 비즈니스 요구사항과 시스템의 제약사항 사이에서 최적의 균형점을 찾아가는 과정을 경험할 수 있었습니다. 특히 동시성 처리에서는 완벽한 해결책보다는 상황에 맞는 적절한 타협점을 찾는 것이 중요하다는 점을 배울 수 있었습니다.

 

전체 코드

Reference

목록을 보여줄때 무한 스크롤 방식으로 보여주는것이 사용성 면에서 좋을 것 같아 모든 목록 조회는 무한 스크롤 방식을 사용하기로 결정했습니다. 무한 스크롤 방식을 구현하기 전에 Offset 페이징No offset 페이징 기법의 차이에 대해 간단히 알아보겠습니다.

📃 페이징 기법

Offset

Offset과 Limit을 활용한 페이징 기법의 경우 다음과 같은 형태로 쿼리가 생성됩니다.

SELECT ...
FROM ...
WHERE ...
ORDER BY id DESC
OFFSET {page_number}
LIMIT {page_size}

이경우 page_number에 해당하는 행만큼 데이터를 읽어들인 후 다시 page_size만큼의 행을 읽고 앞에 읽은 행을 삭제하는 과정을 거치게 되는데, 데이터가 많아 질수록 읽어야 할 데이터의 개수가 많아지게 되고 결국 데이터 베이스에 많은 부하를 주게 됩니다.

또한 새로운 페이지를 가져오는 중간에 새로운 행이 삽입되면 새로운 페이지에는 이전 페이지와 중복되는 행이 존재하게 됩니다.

image

첫 페이지에서는 10번 Row 까지 조회한 후 다음 페이지를 조회하기 전, 새로운 행이 삽입됩니다. offset을 기준으로 다음 페이지의 행을 조회하게 되기 때문에 10번 Row는 다음페이지에서도 조회되는 중복조회 현상이 발생할 수 있습니다.

No Offset

만약 Offset을 사용하지 않는다면, 마지막 조회한 행의 id값을 기준으로 읽지 않은 행을 page size만큼 조회하면 됩니다. 이를 No Offset방식이라고 하며 id라는 클러스터 인덱스를 사용하기 때문에 시작부분을 빠르게 찾아 조회가 가능합니다.

No offset 방식의 쿼리 예시

SELECT ...
  FROM ...
 WHERE ...
   AND id < ?last_seen_id
 ORDER BY id DESC
 FETCH FIRST 10 ROWS ONLY

페이지 수에 따른 응답시간 비교

8517F1DF-5539-4F22-9C06-B600D402DA96

Offset방식을 사용하였을때는 Page의 수가 늘어남에 따라 응답시간이 느려지는것을 볼 수 있지만, 인덱스(id)를 기준으로 정렬하고 조회하는경우 첫페이지를 읽는것과 동일한 속도로 응답하는것을 확인할 수 있습니다.

성능면에서는 이러한 장점이 있지만 순차적으로만 다음페이지를 조회할 수 있다는 단점 또한 존재합니다.

결론 : 무한 스크롤 방식에서는 페이지 버튼이 필요 없기 때문에 성능상 우수한 no offset방식으로 구현하는것이 적절해 보입니다.

💻 구현

BoardQueryRepository구현

@Repository
@RequiredArgsConstructor
public class BoardQueryRepository {

  private final JPAQueryFactory jpaQueryFactory;

  public List<Board> getSliceOfBoard(
      @Nullable
      Long id,
      int size,
      @Nullable
      String keyword
  ) {
    return jpaQueryFactory.selectFrom(board)
                          .where(ltBoardId(id), search(keyword))
                          .orderBy(board.id.desc())
                          .limit(size)
                          .fetch();
  }

  @SuppressWarnings("all")
  private BooleanExpression search(String keyword) {
    return containsTitle(keyword).or(containsDescription(keyword));
  }

  private BooleanExpression containsTitle(@Nullable String title) {
    return hasText(title) ? board.title.contains(title) : null;
  }

  private BooleanExpression containsDescription(@Nullable String description) {
    return hasText(description) ? board.description.contains(description) : null;
  }

  private BooleanExpression ltBoardId(@Nullable Long id) {
    return id == null ? null : board.id.lt(id);
  }

}

첫 요청

[
    {
        "id": 42,
        "title": "Lodge",
        "description": "Malawi matrix Cambridgeshire"
    },

    ...

    {
        "id": 33,
        "title": "Avon",
        "description": "synergies Money complexity generation"
    }
]

2번째 요청

[
    {
        "id": 32,
        "title": "holistic",
        "description": "analyzing Fresh Senior facilitate Practical"
    },

    ...

    {
        "id": 23,
        "title": "Bedfordshire",
        "description": "Jewelery Pants Liechtenstein Metal South"
    }
]

검색어를 포함한 요청

[
    {
        "id": 23,
        "title": "Bedfordshire",
        "description": "Jewelery Pants Liechtenstein Metal South"
    },
    {
        "id": 19,
        "title": "Oklahoma",
        "description": "primary Pants"
    },
    {
        "id": 13,
        "title": "Pants",
        "description": "B2C Harbors Kuwaiti"
    }
]

⌨️ 코드

Spring-Board/BoardQueryRepository.java at main · waterfogSW/Spring-Board · GitHub

☎️ Reference

+ Recent posts