이벤트 소싱

이벤트 소싱(Event Sourcing)은 도메인의 상태를 변경하는 모든 사건을 이벤트 스트림으로 저장하는 방식이다.

계좌 잔액 관리를 예로 들어보자.

전통적인 방식이라면 거래 내역을 따로 저장하고 해당 거래로 인해 발생한 잔액을 별도의 테이블에 저장할 것이다.

이벤트 소싱으로 계좌 잔액을 관리한다면 거래내역이 아니라 거래내역으로 인해 발생한 이벤트를 저장하게 된다. 현재 내 계좌에 남은 잔액은 수많은 거래(이벤트)들의 결과라고 볼 수 있다.

이 때 잔액을 계산하는 로직은 이벤트 스트림을 통해 계산할 수 있다.

Kafka를 이용한 이벤트 스트리밍

이벤트는 다양한 방법으로 처리할 수 있다.

만약 이벤트를 발동하는 JVM과 이벤트를 처리하는 JVM이 다른 분산 환경이라면 브로커등의 미들웨어를 도입하여 안정적이고 효율적인 구현이 가능하다. Kafka는 분산형 스트리밍 플랫폼으로 특히 대용량 이벤트 스트림을 처리하는데 적합하다.

Axon Framework의 이벤트 처리 방식

Axon Framework는 이벤트 소싱을 통한 CQRS(Command Query Responsibility Segregation)와 DDD(Domain Driven Design)을 지원하는 프레임워크이다.

Axon Framework는 이벤트를 처리하는 방식으로 크게 2가지 방식을 제공한다.

  1. Subscribing Event Processor - 순서가 보장되고 하나의 스레드에서 처리되어야 하는 작업에 적합하다. 연동된 미들웨어가 실패할 경우 이벤트 발행 자체가 실패하도록 강제한다. AMQP를 이용하는 경우 적절하다.
  2. Tracking Event Processor - 여러 스레드에서 각 프로세서가 작업을 분배하여 병렬 작업할 수 있도록 한다. Kafka가 실패해도 이벤트 발행 자체에는 영향을 주지 않는다. Kafka를 이용할 때 적합하다.

예제에는 Tracking Event Processor를 사용하였다.

예제

0. docker-compose.yml

docker-compose.yaml를 이용하여 Kafka와 Zookeeper를 설정한다.

1. Spring Boot Kafka 설정

axon:
  axonserver:
    enabled: false
  serializer:
    events: jackson
  kafka:
    fetcher:
      enabled: true
    clientid: kafka-axon-example
    producer:
      retries: 0
      bootstrap-servers: localhost:29092
    consumer:
      event-processor-mode: tracking
      bootstrap-servers: localhost:29092
    properties:
      security.protocol: PLAINTEXT

2. Axon 설정

기본적으로 JPA를 Event Store로 사용하고 Tracking Event Processor를 사용한다. DB는 H2를 사용한다. Token Store는 InMemoryTokenStore를 사용한다. Token은 이벤트 스트림을 열 때 특정 이벤트의 위치를 특정하거나 특정한 위치에서 이벤트 스트림을 열기 위해 사용된다.

@SpringBootApplication
@EnableScheduling
class KafkaAxonApplication {
    @Bean
    fun tokenStore() = InMemoryTokenStore()
}

@Configuration
@ConditionalOnProperty(value = ["axon.kafka.consumer.event-processor-mode"], havingValue = "tracking")
class TrackingConfiguration {
    @Autowired
    fun configureStreamableKafkaSource(
        configurer: EventProcessingConfigurer,
        streamableKafkaMessageSource: StreamableKafkaMessageSource<String, ByteArray>
    ) {
        // Tracking Event Processor의 이름을 KAFKA_GROUP으로 설정하고 StreamableKafkaMessageSource를 사용하여 등록
        configurer.registerTrackingEventProcessor(KAFKA_GROUP) { streamableKafkaMessageSource }
    }
}

3. Bank Client

1초에 한번씩 입금요청을 처리하도록 한다.

@Component
public class BankClient {
    // ...
    @Scheduled(initialDelay = 10_000, fixedDelay = 100)
    fun deposit() {
        logger.debug { "depositing $amount from account $accountId" }
        commandGateway.send<CompletableFuture<String>>(
                DepositMoneyCommand(
                        bankAccountId = accountId,
                        amountOfMoney = amount.toLong()
                )
        )
        amount = amount.inc()
    }
}