기본 개념

RabbitMQ는 송신자Publisher와 수신자Subscriber끼리 메시지를 주고 받기 위해 사용되는 메시지 브로커이다.

메시징 방법

아래의 6개 예제를 SpringAMQP를 사용하여 구현하고 마지막 예제는 코틀린으로 구현할 것이다. 예제는 RabbitMQ 공식 튜토리얼을 참고하였다.

1. "Hello World"

가장 간단한 1:1로 메시지를 주고 받는 모델을 만들어보자. 송신자가 수신자의 우편함에 직접 편지를 넣어주는 걸 생각하면 된다.

송신자Publisher가 특정 큐로 "Hello World!"라는 메시지를 보낸다. 이 때 메시지는 동일한 큐(=우편함)에 연결된 수신자Subscriber에게 도착한다. 큐인만큼 메시지는 FIFO 방식으로 전달된다. 큐는 빈으로 생성하여 사용한다.

@Bean
	public Queue hello() {
	return new Queue("hello"); // 큐의 이름을 "hello"라고 지정한다.
}

끝이다.

코드로 살펴보자.

public void send() {
	// 메시지 정의
	String message = "Hello World!";
    // 지정된 큐로 발송한다
	this.template.convertAndSend(queue.getName(), message);
    // 발송한 메시지 출력
	System.out.println(" [x] Sent '" + message + "'");
}

여기서 this.template으로 참조된 RabbitTemplate은 여러 설정들을 자동으로 지원하는 등 보일러플레이트 코드를 줄이고 가독성 있는 코드를 작성할 수 있게 해준다.

전체 코드는 RabbitMQ 공식 튜토리얼 깃헙에서 확인할 수 있다.

2. Work Queues

마치 택배 물류창고처럼 하나의 메시지 생산자Producer가 메시지를 보내면 임시저장소에 쌓이게 되고 다수의 소비자Consumer가 분배하여 가져가는 패턴도 있다.

이런 모델은 경쟁적 소비자 패턴(Competing consumer pattern)의 기본적인 형태이며 다수의 메시지를 최대한 빠르게 처리해야 할 때 사용된다.

1의 기본모델과 무엇이 다를까?

QoS(Quality of Service)

가장 먼저 각 컨슈머에게 어떻게 분배할지를 정해야 한다. 하나의 소비자가 너무 많은 메시지를 소비하여 지연을 발생시키고 다른 컨슈머들에 할당된 자원을 낭비하는 것은 바람직하지 않기 때문이다.

기본적으로 RabbitMQ는 컨슈머가 여러 대일 때 Round-Robin 방식으로 클라이언트 사이드 로드밸런싱을 시도한다. 즉, 각 메시지를 차례대로 다음 컨슈머에게 전달한다는 뜻이다.

위 이미지는 해당 튜토리얼의 실행 결과이다. 별도의 설정이 없어도 1,3,5번 메시지와 2,4,6번 메시지가 각 2 개의 Worker에 나눠서 처리되는 것을 볼 수 있다.

그러나 이 방식은 어떤 컨슈머가 아직 작업 중인지 여부는 확인하지 않는다는 문제가 있다. 따라서 균등 분배Fair Dispatch를 위해서는 컨슈머의 채널Channel에 한번에 몇 개의 메시지를 받을 것인지를 정의해야 한다.

Spring AMQP에서는 Fair Dispatch 방식을 디폴트로 제공하고 있다.한번에 받을 수 있는 메시지 개수 Prefetch Count는 디폴트 250개이며 AbstractMessageListenerContainer.setPrefetchCount(int prefetchCount)에서 변경할 수 있다.

메시지 확인(ACK)

만약 여러 대의 컨슈머 중 하나가 죽는다면 어떻게 할까? 일단 메시지가 성공적으로 수신됐는지 확인을 해야 그 다음의 처리가 가능하다.

RabbitMQ에서는 autoAck을 false로 설정하여 정상적인 메시지에 대해 응답 확인(ACK)을 보낼 수 있다.

큐에 올라간 메시지가 처리되면 리스너에서 아래와 같이 확인ACK 처리를 한다.

channel.basicAck()

응답이 확인되지 않으면 메시지는 unack 상태가 된다.

만약 autoAck 설정이 false인 경우 컨슈머가 위 메서드를 호출하지 않으면 메모리에 UnAcked가 쌓여 심각한 문제가 발생할 수 있다.

ACK 되지 않은 메시지는 위 이미지처럼 RabbitMQ management UI에서도 확인할 수 있고 아래의 명령어로도 확인할 수 있다.

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Spring AMQP에서는 디폴트로 channel.basicAck()을 호출해 확인 처리를 하므로 위와 같은 이슈를 신경쓰지 않아도 되지만 java 라이브러리 등을 혼합하여 사용한다면 주의해야 한다.

응답 확인을 받은 이후 RabbitMQ는 디폴트로 실패한 메시지를 다시 큐에 올린다(Requeue). 큐에 올라간 메시지는 다른 컨슈머 등에 의해 재처리될 수 있다. 동시간 대에 모든 컨슈머가 다운되어도 컨슈머가 복구되는 시점에 해당 큐와 연결되어 메시지를 재전송할 수 있다.

이 때 만약 다시 큐에 올리지 않고 예외를 던지도록 하고 싶다면 아래와 같이 프로퍼티에 명시적으로 설정하면

defaultRequeueRejected=false

AmqpRejectAndDontRequeueException를 던지게 된다.

메시지 확인을 하는 쪽에서는 기본적으로 메시지를 보낸 채널과 동일한 채널에 응답을 보낸다. 채널이 다를 경우 예외가 발생한다.

메시지 보존

Spring AMQP의 메시지는 기본적으로 durable(영속화persistence 되도록)하게 설정된다. 따라서 브로커가 리부팅되어도 큐와 메시지가 복구된다. 물론 이런 설정이 메시지 무손실을 보장하는 것은 아니다. 메시지가 영속화되기 전 다운될 경우도 있기 때문이다.

무손실 보장 정도를 높이고 싶다면 7의 publisher Confirms 설정을 이용해야 한다.

전체 코드는 RabbitMQ 공식 튜토리얼 깃헙에서 확인할 수 있다.

3. Publish/Subscribe

하나의 메시지 발신자Publisher와 다수의 구독자Subscriber로 구성된 Publish/Subscribe 패턴도 있다. SNS 또는 미디어를 구독하는 것과 비슷하다.

다수의 메시지 소비자가 있지만 2의 Work Queue 모델과 다른 점은 소비자들이 경쟁적으로 메시지를 소비하는 것이 아니라 구독자 모두가 동일한 메시지를 함께 받는다는 점이다.

여기서 메시지 발신자는 누가 자신의 메시지를 모르고 또 알 필요도 없이 모든 구독자에게 브로드캐스팅을 한다. 구독자는 언제든 자유롭게 구독/해제를 할 수 있다.

Exchange

그렇다면 구독자를 모르는 채로 어떻게 메시지를 전달할 수 있을까?

여기서 Exchange의 개념이 등장한다.

퍼블리셔는 큐가 아닌 Exchange에 메시지를 보낸다. 이 때 Exchange는 큐와 연결되어 있다. 구독자 또한 발신자에게서 메시지를 직접 받는 것이 아닌 자신이 구독하는 큐로부터 메시지를 받는다.

실제 발신자에게서 메시지를 받고 동시에 구독자와 연결된 큐에 메시지를 푸쉬하는 역할을 Exchange가 담당하며 큐는 메시지를 임시로 저장하는 버퍼의 역할을 한다.

Exchangedirect, topic, headers, fanout의 4개가 있으며 여기에서는 모든 큐에 무차별적으로 메시지를 보내야 하기에 fanout 타입의 Exchange를 사용한다.

Spring AMQP에서는 아래와 같이 빈으로 설정 후 주입하여 사용한다.

@Bean
public FanoutExchange fanout() {
	return new FanoutExchange("tut.fanout");
}

익명 큐

구독자는 언제든 자유롭게 구독을 시작할 수 있다.

또 오래된 메시지는 받을 필요가 없기 때문에 구독을 시작할 때마다 새로운 큐를 할당받아야 한다. 마찬가지로 구독을 끊어 더 이상 연결된 구독자가 없을 때 큐는 삭제된다.

하나의 큐를 여러 구독자가 공유할 필요가 없기에 하나의 큐는 하나의 구독자와 연결된다. 이러한 특성을 가진 큐를 Spring AMQP에서는 아래와 같이 간단하게 생성할 수 있다.

@Bean
public Queue autoDeleteQueue1() {
	return new AnonymousQueue();
}

위와 같이 큐의 이름을 지정하지 않고 서버에서 지정한 익명(server-named)의 자동삭제(auto delete)되면서 하나의 구독자와 연결되는(exclusive) 큐를 만든다.

바인딩

이제 위에서 만든 큐와 Exchange와 연결할 차례이다. 이러한 연결정보를 RabbitMQ에서는 바인딩Binding이라고 부른다.

@Bean
public Binding binding1(FanoutExchange fanout,
        Queue autoDeleteQueue1) {
	return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
}

구독자는 아래와 같이 익명큐를 리스너에 넘겨줄 수 있다.

@RabbitListener(queues = "#{autoDeleteQueue1.name}")
public void receive1(String in) throws InterruptedException {
	...
}

최종적으로 완성되는 형태는 아래와 같다.

전체 코드는 RabbitMQ 공식 튜토리얼 깃헙에서 확인할 수 있다.

4. Routing

만약 메시지 특정 조건에 따라 필터링하여 특정 구독자에게 보내야 한다면 어떻게 해야 할까? 마치 우체국에서 주소에 따라 알맞은 우편함에 편지를 넣어주듯이 말이다.

이 때 일괄적으로 모든 구독자에게 메시지를 보내는 Fanout Exchange가 아닌 Direct Exchange를 사용하여 큐를 바인딩하면 된다.

@Bean
public DirectExchange direct() {
    return new DirectExchange("tut.direct");
}
...
@Bean
public Binding binding1a(DirectExchange direct,
    Queue autoDeleteQueue1) {
    // 위에서 정의한 Exchange와 큐를 "orange"라는 routing key로 바인딩
    return BindingBuilder.bind(autoDeleteQueue1)
        .to(direct)
        .with("orange");
}

fanout과 다르게 with를 통해 routing key를 추가한다.

바인딩 정보는 RabbitMQ management UI에서 확인할 수 있다.

전체 코드는 RabbitMQ 공식 튜토리얼 깃헙에서 확인할 수 있다.

5. Topics

Topics Exchange는 지금까지 소개 된 Exchange 타입 중 가장 강력한 패턴을 제공한다. 만약 필터링 기준이 여러개라면 direct만으로는 관리가 어려울 것이다. 이 때 topics는 마치 디렉토리를 참조하듯 다양한 단어와 .의 조합으로 경로를 표시한다.

  • *는 정확하게 하나의 단어를 대체한다
  • #은 하나 이상의 단어를 대체한다.

만약 위 그림처럼 "speed"."colour"."species"로 이루어진 각기 다른 관심사를 필터링해 각 큐로 전달한다면 #, *를 적절하게 조합해 원하는 메시지만 받을 수 있다.

quick.orange.rabbit이라는 routing key를 가진 메시지는 모든 큐로 전달된다. lazy.orange.elephant도 모든 큐로 전달된다. 그러나 quick.orange.fox는 첫 번째 큐로만 전달되고 lazy.brown.fox는 두 번째 큐로만 전달된다. lazy.pink.rabbit는 두 개의 바인딩에 일치하지만 두 번째 큐로만 전달된다. quick.brown.fox는 어떤 바인딩과도 일치하지 않으므로 폐기된다.

위와 같은 방식으로 Topics Exchangedirectfanout 모두를 대체할 수 있다. #을 사용하면 전체 컨슈머에 전달되고 # 없이 *만을 사용하면 특정 컨슈머에만 전달할 수 있기 때문이다.

전체 코드는 RabbitMQ 공식 튜토리얼 깃헙에서 확인할 수 있다.

6. RPC

지금까지의 예시들은 모두 퍼블리셔 -> 컨슈머로 메시지가 발송되는 one-way 방식이었다.

하지만 클라이언트가 요청하고 서버가 응답하여 다시 클라이언트에게 돌아오는 two-way 방식이 필요할 때가 있다. EIP에서는 Request-Reply 패턴으로도 부르며 RabbitMQ에서는 RPC 패턴으로 소개한다.

전부 client-server 패턴인 것 같은데 왜이렇게 용어가 다양한걸까?

메시지 브로커를 이용한 RPC 패턴에서 가장 먼저 고려할 점은 고유한 메시지를 서버와 클라이언트 모두가 인식할 수 있도록 correlationId를 부여하는 것이다.

서로 다른 스레드에서 클라이언트와 서버들이 m:n의 관계로 메시지를 주고 받으므로 어떤 메시지가 누구에게서 왔는지 또 해당 응답이 어떤 메시지에 대한 응답인지 정확하게 매칭할 수 있어야 하기 때문이다.

Spring AMQP에서는 convertSendAndReceive라는 RabbitTemplate의 메서드를 활용해 별도의 correlationId 생성 및 설정 또는 매칭 과정 등을 대신 처리해주므로 간단하게 RPC 요청을 보낼 수 있다.

int start = 0;
...
Integer response = (Integer) template.convertSendAndReceive(
	exchange.getName(), // exchange 이름 
    "rpc", // routing key(reply_to)
    start++ // 메시지
);

서버에서는 아래와 같이 응답할 메서드에 @RabbitListener를 붙여준 뒤 리턴하면 클라이언트로 전달된다.

@RabbitListener(queues = "tut.rpc.requests")
public int fibonacci(int n) {
	System.out.println(" [x] Received request for " + n);
	int result = fib(n);
	System.out.println(" [.] Returned " + result);
	return result;
}

기본적으로 RabbitMQRPC는 큐에서 Fair Dispatch를 지원하므로 트래픽이 몰릴 때 별도의 서버사이드 로드밸런싱이 없어도 새로운 서버를 가동할 수 있어 스케일업에 용이하다는 장점이 있다.

반면 다음과 같이 일반적으로 RPC 패턴에서 주의하여야 할 점을 고려하여 적용해야 한다.

  • 만약 의존성이 문서화가 되어있지 않다면 remote 콜과 local 콜이 구분되지 않을 수 있다.
  • 서버에서 실패한 메시지에 대한 retry 정책이나 timeout 등 예외 핸들링이 필요하다. 클라이언트에서도 마찬가지로 timeout 정책을 고려해야 한다.
  • 서버에서 예외 전달을 어떻게 해야할 지 고민해야 한다.
  • 서버에서 밸리데이션에 대한 고려가 필요하다.

전체 코드는 RabbitMQ 공식 튜토리얼 깃헙에서 확인할 수 있다.

7. Publisher Confirms

RabbitMQ가 우체국이라면 Publisher Confirms는 등기우편을 보내는 것과 같다. 반드시 수신확인이 되어야 하는 중요한 메시지가 정상적으로 발송되었는지, 수신되었는지를 확인하기 위한 절차이기 때문이다.

큐에서 컨슈머로 정상적으로 메시지가 꺼내졌는지를 확인하는 ACK 처리 만으로는 충분하지 않으며 퍼블리셔 입장에서 안정적인 메시지 전송 확인을 한 후 예외 처리까지 해야할 경우도 있기 때문이다.

다만 RabbitMQ에서 공식적으로 Spring AMQP를 지원하지 않기 때문에 (자바, C#, PHP만 지원) 자바 라이브러리를 활용해 코틀린으로 구현한 예시로 설명한다.

기본적으로 publisher confirms는 채널 단위로 설정된다.

channel.confirmSelect()

Connection - 소켓 연결이 추상화된 객체이다. 물리적으로 단일 소켓을 통한 TCP 연결을 사용한다. Channel- Connection을 공유하는 논리적인 개념의 다중화된 경량 연결이다. 실제 api가 메시지를 보내고 받는 작업을 수행한다. Connection의 생명주기에 종속적이다.

다음은 메시지를 저장할 변수를 만든다.

// ConcurrentSkipListMap은 시퀀스 번호 순서로 메시지를 저장할 수 있고 동시성을 지원한다. 메시지의 시퀀스 번호를 키, 메시지의 내용은 값으로 사용된다.
val outstandingConfirms: ConcurrentNavigableMap<Long, String> = ConcurrentSkipListMap()

outstandingConfirms은 확인되지 않은 메시지를 추적하는 데 사용된다. 정상적인 메시지 응답Ack을 받았을 때의 다음의 콜백을 이용하여 삭제할 것이기 때문에 비정상적인 응답일 경우에 메시지가 남아있게 된다.

// 정상적으로 메시지가 확인되면 outstandingConfirms에서 해당 메시지를 제거하는 콜백이다.
val cleanOutstandingConfirms =
	ConfirmCallback { sequenceNumber: Long, multiple: Boolean ->
    if (multiple) { // 메시지가 다수일 때
    	val confirmed = outstandingConfirms.headMap(sequenceNumber, true)
    	confirmed.clear()
    } else { // 메시지가 하나일 때
        outstandingConfirms.remove(sequenceNumber)
    }
}

위의 콜백을 컨펌리스너에 추가한다.

// ConfirmCallback을 사용하여 확인되지 않은 메시지를 추적하고, nack을 처리한다.
channel.addConfirmListener(
	cleanOutstandingConfirms // Ack일 경우 콜백
) { sequenceNumber: Long, multiple: Boolean -> // Nack일 경우 콜백
	val body = outstandingConfirms[sequenceNumber]
    System.err.format(
    	"Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
    	body, sequenceNumber, multiple
    )
	cleanOutstandingConfirms.handle(sequenceNumber, multiple)
}

아래와 같이 메시지를 발행할 때마다 저장한다.

for (i in 0 until MESSAGE_COUNT) {
    val body = i.toString()
    // 다음 발행 시퀀스 번호를 키로 사용하여 메시지를 outstandingConfirms에 저장한다.
	outstandingConfirms[channel.nextPublishSeqNo] = body
    channel.basicPublish("", queue, null, body.toByteArray())
}

한가지 주의하여야 할 점은 publishers confirm이 비동기적으로 작동할 수 있도록 코드를 작성해야 성능의 저하가 없다는 것이다.

전체 코드는 개인 깃헙에 올려놓았다.