기본 개념
RabbitMQ
는 송신자Publisher
와 수신자Subscriber
끼리 메시지를 주고 받기 위해 사용되는 메시지 브로커이다.
메시징 방법
아래의 6개 예제를 SpringAMQP
를 사용하여 구현하고 마지막 예제는 코틀린으로 구현할 것이다. 예제는 RabbitMQ 공식 튜토리얼을 참고하였다.
1. "Hello World"
가장 간단한 1:1로 메시지를 주고 받는 모델을 만들어보자. 송신자가 수신자의 우편함에 직접 편지를 넣어주는 걸 생각하면 된다.
송신자Publisher
가 특정 큐로 "Hello World!"라는 메시지를 보낸다. 이 때 메시지는 동일한 큐(=우편함)에 연결된 수신자Subscriber
에게 도착한다. 큐인만큼 메시지는 FIFO 방식으로 전달된다. 큐는 빈으로 생성하여 사용한다.
@Bean
public Queue hello() {
return new Queue("hello"); // 큐의 이름을 "hello"라고 지정한다.
}
끝이다.
코드로 살펴보자.
public void send() {
// 메시지 정의
String message = "Hello World!";
// 지정된 큐로 발송한다
this.template.convertAndSend(queue.getName(), message);
// 발송한 메시지 출력
System.out.println(" [x] Sent '" + message + "'");
}
여기서 this.template
으로 참조된 RabbitTemplate
은 여러 설정들을 자동으로 지원하는 등 보일러플레이트 코드를 줄이고 가독성 있는 코드를 작성할 수 있게 해준다.
전체 코드는 RabbitMQ 공식 튜토리얼 깃헙에서 확인할 수 있다.
2. Work Queues
마치 택배 물류창고처럼 하나의 메시지 생산자Producer
가 메시지를 보내면 임시저장소에 쌓이게 되고 다수의 소비자Consumer
가 분배하여 가져가는 패턴도 있다.
이런 모델은 경쟁적 소비자 패턴(Competing consumer pattern)의 기본적인 형태이며 다수의 메시지를 최대한 빠르게 처리해야 할 때 사용된다.
1의 기본모델과 무엇이 다를까?
QoS(Quality of Service)
가장 먼저 각 컨슈머에게 어떻게 분배할지를 정해야 한다. 하나의 소비자가 너무 많은 메시지를 소비하여 지연을 발생시키고 다른 컨슈머들에 할당된 자원을 낭비하는 것은 바람직하지 않기 때문이다.
기본적으로 RabbitMQ
는 컨슈머가 여러 대일 때 Round-Robin
방식으로 클라이언트 사이드 로드밸런싱을 시도한다. 즉, 각 메시지를 차례대로 다음 컨슈머에게 전달한다는 뜻이다.
위 이미지는 해당 튜토리얼의 실행 결과이다. 별도의 설정이 없어도 1,3,5번 메시지와 2,4,6번 메시지가 각 2 개의 Worker에 나눠서 처리되는 것을 볼 수 있다.
그러나 이 방식은 어떤 컨슈머가 아직 작업 중인지 여부는 확인하지 않는다는 문제가 있다. 따라서 균등 분배Fair Dispatch
를 위해서는 컨슈머의 채널Channel
에 한번에 몇 개의 메시지를 받을 것인지를 정의해야 한다.
Spring AMQP
에서는 Fair Dispatch
방식을 디폴트로 제공하고 있다.한번에 받을 수 있는 메시지 개수 Prefetch Count는 디폴트 250개이며 AbstractMessageListenerContainer.setPrefetchCount(int prefetchCount)
에서 변경할 수 있다.
메시지 확인(ACK)
만약 여러 대의 컨슈머 중 하나가 죽는다면 어떻게 할까? 일단 메시지가 성공적으로 수신됐는지 확인을 해야 그 다음의 처리가 가능하다.
RabbitMQ
에서는 autoAck
을 false로 설정하여 정상적인 메시지에 대해 응답 확인(ACK)을 보낼 수 있다.
큐에 올라간 메시지가 처리되면 리스너에서 아래와 같이 확인ACK
처리를 한다.
channel.basicAck()
응답이 확인되지 않으면 메시지는 unack
상태가 된다.
만약 autoAck
설정이 false인 경우 컨슈머가 위 메서드를 호출하지 않으면 메모리에 UnAcked
가 쌓여 심각한 문제가 발생할 수 있다.
ACK
되지 않은 메시지는 위 이미지처럼 RabbitMQ management UI
에서도 확인할 수 있고 아래의 명령어로도 확인할 수 있다.
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Spring AMQP
에서는 디폴트로 channel.basicAck()
을 호출해 확인 처리를 하므로 위와 같은 이슈를 신경쓰지 않아도 되지만 java 라이브러리 등을 혼합하여 사용한다면 주의해야 한다.
응답 확인을 받은 이후 RabbitMQ
는 디폴트로 실패한 메시지를 다시 큐에 올린다(Requeue). 큐에 올라간 메시지는 다른 컨슈머 등에 의해 재처리될 수 있 다. 동시간 대에 모든 컨슈머가 다운되어도 컨슈머가 복구되는 시점에 해당 큐와 연결되어 메시지를 재전송할 수 있다.
이 때 만약 다시 큐에 올리지 않고 예외를 던지도록 하고 싶다면 아래와 같이 프로퍼티에 명시적으로 설정하면
defaultRequeueRejected=false
AmqpRejectAndDontRequeueException
를 던지게 된다.
메시지 확인을 하는 쪽에서는 기본적으로 메시지를 보낸 채널과 동일한 채널에 응답을 보낸다. 채널이 다를 경우 예외가 발생한다.
메시지 보존
Spring AMQP
의 메시지는 기본적으로 durable(영속화persistence
되도록)하게 설정된다. 따라서 브로커가 리부팅되어도 큐와 메시지가 복구된다. 물론 이런 설정이 메시지 무손실을 보장하는 것은 아니다. 메시지가 영속화되기 전 다운될 경우도 있기 때문이다.
무손실 보장 정도를 높이고 싶다면 7의 publisher Confirms
설정을 이용해야 한다.
전체 코드는 RabbitMQ 공식 튜토리얼 깃헙에서 확인할 수 있다.
3. Publish/Subscribe
하나의 메시지 발신자Publisher
와 다수의 구독자Subscriber
로 구성된 Publish/Subscribe 패턴도 있다. SNS 또는 미디어를 구독 하는 것과 비슷하다.
다수의 메시지 소비자가 있지만 2의 Work Queue
모델과 다른 점은 소비자들이 경쟁적으로 메시지를 소비하는 것이 아니라 구독자 모두가 동일한 메시지를 함께 받는다는 점이다.
여기서 메시지 발신자는 누가 자신의 메시지를 모르고 또 알 필요도 없이 모든 구독자에게 브로드캐스팅을 한다. 구독자는 언제든 자유롭게 구독/해제를 할 수 있다.
Exchange
그렇다면 구독자를 모르는 채로 어떻게 메시지를 전달할 수 있을까?
여기서 Exchange
의 개념이 등장한다.
퍼블리셔는 큐가 아닌 Exchange
에 메시지를 보낸다. 이 때 Exchange
는 큐와 연결되어 있다. 구독자 또한 발신자에게서 메시지를 직접 받는 것이 아닌 자신이 구독하는 큐로부터 메시지를 받는다.
실제 발신자에게서 메시지를 받고 동시에 구독자와 연결된 큐에 메시지를 푸쉬하는 역할을 Exchange
가 담당하며 큐는 메시지를 임시로 저장하는 버퍼의 역할을 한다.
Exchange
는 direct
, topic
, headers
, fanout
의 4개가 있으며 여기에서는 모든 큐에 무차별적으로 메시지를 보내야 하기에 fanout
타입의 Exchange
를 사용한다.
Spring AMQP
에서는 아래와 같이 빈으로 설정 후 주입하여 사용한다.
@Bean
public FanoutExchange fanout() {
return new FanoutExchange("tut.fanout");
}
익명 큐
구독자는 언제든 자유롭게 구독을 시작할 수 있다.
또 오래된 메시지는 받을 필요가 없기 때문에 구독을 시작할 때마다 새로운 큐를 할당받아야 한다. 마찬가지로 구독을 끊어 더 이상 연결된 구독자가 없을 때 큐는 삭제된다.
하나의 큐를 여러 구독자가 공유할 필요가 없기에 하나의 큐는 하나의 구독자와 연결된다.
이러한 특성을 가진 큐를 Spring AMQP
에서는 아래와 같이 간단하게 생성할 수 있다.
@Bean
public Queue autoDeleteQueue1() {
return new AnonymousQueue();
}
위와 같이 큐의 이름을 지정하지 않고 서버에서 지정한 익명(server-named)의 자동삭제(auto delete)되면서 하나의 구독자와 연결되는(exclusive) 큐를 만든다.
바인딩
이제 위에서 만든 큐와 Exchange
와 연결할 차례이다. 이러한 연결정보를 RabbitMQ
에서는 바인딩Binding
이라고 부른다.
@Bean
public Binding binding1(FanoutExchange fanout,
Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
}
구독자는 아래와 같이 익명큐를 리스너에 넘겨줄 수 있다.
@RabbitListener(queues = "#{autoDeleteQueue1.name}")
public void receive1(String in) throws InterruptedException {
...
}
최종적으로 완성되는 형태는 아래와 같다.
전체 코드는 RabbitMQ 공식 튜토리얼 깃헙에서 확인할 수 있다.
4. Routing
만약 메시지 특정 조건에 따라 필터링하여 특정 구독자에게 보내야 한다면 어떻게 해야 할까? 마치 우체국에서 주소에 따라 알맞은 우편함에 편지를 넣어주듯이 말이다.
이 때 일괄적으로 모든 구독자에게 메시지를 보내는 Fanout Exchange
가 아닌 Direct Exchange
를 사용하여 큐를 바인딩하면 된다.
@Bean
public DirectExchange direct() {
return new DirectExchange("tut.direct");
}
...
@Bean
public Binding binding1a(DirectExchange direct,
Queue autoDeleteQueue1) {
// 위에서 정의한 Exchange와 큐를 "orange"라는 routing key로 바인딩
return BindingBuilder.bind(autoDeleteQueue1)
.to(direct)
.with("orange");
}
fanout
과 다르게 with
를 통해 routing key
를 추가한다.
바인딩 정보는 RabbitMQ management UI
에서 확인할 수 있다.
전체 코드는 RabbitMQ 공식 튜토리얼 깃헙에서 확인할 수 있다.
5. Topics
Topics Exchange
는 지금까지 소개 된 Exchange
타입 중 가장 강력한 패턴을 제공한다.
만약 필터링 기준이 여러개라면 direct
만으로는 관리가 어려울 것이다. 이 때 topics
는 마치 디렉토리를 참조하듯 다양한 단어와 .
의 조합으로 경로를 표시한다.
*
는 정확하게 하나의 단어를 대체한다#
은 하나 이상의 단어를 대체한다.
만약 위 그림처럼 "speed"."colour"."species"로 이루어진 각기 다른 관심사를 필터링해 각 큐로 전달한다면 #
, *
를 적절하게 조합해 원하는 메시지만 받을 수 있다.
quick.orange.rabbit
이라는 routing key
를 가진 메시지는 모든 큐로 전달된다. lazy.orange.elephant
도 모든 큐로 전달된다. 그러나 quick.orange.fox
는 첫 번째 큐로만 전달되고 lazy.brown.fox
는 두 번째 큐로만 전달된다. lazy.pink.rabbit
는 두 개의 바인딩에 일치하지만 두 번째 큐로만 전달된다. quick.brown.fox
는 어떤 바인딩과도 일치하지 않으므로 폐기된다.
위와 같은 방식으로 Topics Exchange
는 direct
와 fanout
모두를 대체할 수 있다. #
을 사용하면 전체 컨슈머에 전달되고 #
없이 *
만을 사용하면 특정 컨슈머에만 전달할 수 있기 때문이다.
전체 코드는 RabbitMQ 공식 튜토리얼 깃헙에서 확인할 수 있다.
6. RPC
지금까지의 예시들은 모두 퍼블리셔 -> 컨슈머로 메시지가 발송되는 one-way
방식이었다.
하지만 클라이언트가 요청하고 서버가 응답하여 다시 클라이언트에게 돌아오는 two-way
방식이 필요할 때가 있다. EIP에서는 Request-Reply 패턴으로도 부르며 RabbitMQ
에서는 RPC
패턴으로 소개한다.
전부 client-server 패턴인 것 같은데 왜이렇게 용어가 다양한걸까?
메시지 브로커를 이용한 RPC
패턴에서 가장 먼저 고려할 점은 고유한 메시지를 서버와 클라이언트 모두가 인식할 수 있도록 correlationId
를 부여하는 것이다.
서로 다른 스레드에서 클라이언트와 서버들이 m:n의 관계로 메시지를 주고 받으므로 어떤 메시지가 누구에게서 왔는지 또 해당 응답이 어떤 메시지에 대한 응답인지 정확하게 매칭할 수 있어야 하기 때문이다.
Spring AMQP
에서는 convertSendAndReceive
라는 RabbitTemplate
의 메서드를 활용해 별도의 correlationId
생성 및 설정 또는 매칭 과정 등을 대신 처리해주므로 간단하게 RPC 요청을 보낼 수 있다.
int start = 0;
...
Integer response = (Integer) template.convertSendAndReceive(
exchange.getName(), // exchange 이름
"rpc", // routing key(reply_to)
start++ // 메시지
);
서버에서는 아래와 같이 응답할 메서드에 @RabbitListener
를 붙여준 뒤 리턴하면 클라이언트로 전달된다.
@RabbitListener(queues = "tut.rpc.requests")
public int fibonacci(int n) {
System.out.println(" [x] Received request for " + n);
int result = fib(n);
System.out.println(" [.] Returned " + result);
return result;
}
기본적으로 RabbitMQ
의 RPC
는 큐에서 Fair Dispatch
를 지원하므로 트래픽이 몰릴 때 별도의 서버사이드 로드밸런싱이 없어도 새로운 서버를 가동할 수 있어 스케일업에 용이하다는 장점이 있다.
반면 다음과 같이 일반적으로 RPC
패턴에서 주의하여야 할 점을 고려하여 적용해야 한다.
- 만약 의존성이 문서화가 되어있지 않다면 remote 콜과 local 콜이 구분되지 않을 수 있다.
- 서버에서 실패한 메시지에 대한 retry 정책이나 timeout 등 예외 핸들링이 필요하다. 클라이언트에서도 마찬가지로 timeout 정책을 고려해야 한다.
- 서버에서 예외 전달을 어떻게 해야할 지 고민해야 한다.
- 서버에서 밸리데이션에 대한 고려가 필요하다.
전체 코드는 RabbitMQ 공식 튜토리얼 깃헙에서 확인할 수 있다.
7. Publisher Confirms
RabbitMQ
가 우체국이라면 Publisher Confirms
는 등기우편을 보내는 것과 같다. 반드시 수신확인이 되어야 하는 중요한 메시지가 정상적으로 발송되었는지, 수신되었는지를 확인하기 위한 절차이기 때문이다.
큐에서 컨슈머로 정상적으로 메시지가 꺼내졌는지를 확인하는 ACK
처리 만으로는 충분하지 않으며 퍼블리셔 입장에서 안정적인 메시지 전송 확인을 한 후 예외 처리까지 해야할 경우도 있기 때문이다.
다만 RabbitMQ
에서 공식적으로 Spring AMQP
를 지원하지 않기 때문에 (자바, C#, PHP만 지원) 자바 라이브러리를 활용해 코틀린으로 구현한 예시로 설명한다.
기본적으로 publisher confirms
는 채널 단위로 설정된다.
channel.confirmSelect()
Connection - 소켓 연결이 추상화된 객체이다. 물리적으로 단일 소켓을 통한 TCP 연결을 사용한다. Channel- Connection을 공유하는 논리적인 개념의 다중화된 경량 연결이다. 실제 api가 메시지를 보내고 받는 작업을 수행한다. Connection의 생명주기에 종속적이다.
다음은 메시지를 저장할 변수를 만든다.
// ConcurrentSkipListMap은 시퀀스 번호 순서로 메시지를 저장할 수 있고 동시성을 지원한다. 메시지의 시퀀스 번호를 키, 메시지의 내용은 값으로 사용된다.
val outstandingConfirms: ConcurrentNavigableMap<Long, String> = ConcurrentSkipListMap()
outstandingConfirms
은 확인되지 않은 메시지를 추적하는 데 사용된다. 정상적인 메시지 응답Ack
을 받았을 때의 다음의 콜백을 이용하여 삭제할 것이기 때문에 비정상적인 응답일 경우에 메시지가 남아있게 된다.
// 정상적으로 메시지가 확인되면 outstandingConfirms에서 해당 메시지를 제거하는 콜백이다.
val cleanOutstandingConfirms =
ConfirmCallback { sequenceNumber: Long, multiple: Boolean ->
if (multiple) { // 메시지가 다수일 때
val confirmed = outstandingConfirms.headMap(sequenceNumber, true)
confirmed.clear()
} else { // 메시지가 하나일 때
outstandingConfirms.remove(sequenceNumber)
}
}
위의 콜백을 컨펌리스너에 추가한다.
// ConfirmCallback을 사용하여 확인되지 않은 메시지를 추적하고, nack을 처리한다.
channel.addConfirmListener(
cleanOutstandingConfirms // Ack일 경우 콜백
) { sequenceNumber: Long, multiple: Boolean -> // Nack일 경우 콜백
val body = outstandingConfirms[sequenceNumber]
System.err.format(
"Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
body, sequenceNumber, multiple
)
cleanOutstandingConfirms.handle(sequenceNumber, multiple)
}
아래와 같이 메시지를 발행할 때마다 저장한다.
for (i in 0 until MESSAGE_COUNT) {
val body = i.toString()
// 다음 발행 시퀀스 번호를 키로 사용하여 메시지를 outstandingConfirms에 저장한다.
outstandingConfirms[channel.nextPublishSeqNo] = body
channel.basicPublish("", queue, null, body.toByteArray())
}
한가지 주의하여야 할 점은 publishers confirm
이 비동기적으로 작동할 수 있도록 코드를 작성해야 성능의 저하가 없다는 것이다.
전체 코드는 개인 깃헙에 올려놓았다.