반응형

dependencies

implementation("org.springframework.boot:spring-boot-starter-amqp")

application.yml

spring:
  rabbitmq:
    addresses: localhost:5672
    username: guest
    password: guest
    virtual-host: test
    listener:
      simple:
        acknowledge-mode: auto # manual X auto O
        retry:
          enabled: true         # 재시도
          initial-interval: 3s  # 처음 메세지 처리 실패시 N초 후에 다시 해당 메세지를 처리
          max-interval: 10s     # 최대 N초 후에 실패한 메세지를 처리
          max-attempts: 2       # 최대 N번까지만 메세지 처리를 시도. 만약 N을 넘어가면 해당 메세지는 dlx exchange로 넘긴다.
          multiplier: 2         # 동일 메세지에 대한 처리 시도 횟수가 증가할 수록 interval 시간에 N을 곱한다. 예를 들어 처음엔 3초를 기다린 후 재시도했다면 다음 횟수에서는 6초를, 다음 횟수에서는 12초를 기다린 후 메세지 처리를 시도한다. 만약 initial-interval * multiplier > max-interval 일 경우에는 max-interval 값을 사용한다.

RabbitMqConfig

@Configuration
public class RabbitMqConfig {
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public DirectExchange directExchange() {
        return ExchangeBuilder
            .directExchange("direct-exchange")
            .build();
    }

    @Bean
    public Queue studentQueue(DirectExchange directExchange, Queue studentDeadLetterQueue) {
        return QueueBuilder
            .durable("student-queue")        // durable : 메시지를 디스크에 저장하여 브로커가 재시작되어도 메시지 유지
            .autoDelete()                           // 마지막 큐 연결이 해제되면 삭제
            .deadLetterExchange(directExchange.getName())
            .deadLetterRoutingKey(studentDeadLetterQueue.getName())
            .build();
    }

    @Bean
    public Binding studentBinding(DirectExchange directExchange, Queue studentQueue) {
        return BindingBuilder
            .bind(studentQueue)
            .to(directExchange)
            .with(studentQueue.getName());
    }

    @Bean
    public Queue studentDeadLetterQueue() {         // studentQueue 컨슈밍 재처리까지 실패시 해당 메시지를 쌓는 큐
        return QueueBuilder
            .durable("student-queue-dlx")
            .autoDelete()
            .build();
    }

    @Bean
    public Binding studentDeadLetterQueueBinding(DirectExchange directExchange, Queue studentDeadLetterQueue) {
        return BindingBuilder
            .bind(studentDeadLetterQueue)
            .to(directExchange)
            .with(studentDeadLetterQueue.getName());
    }
}

Student

@AllArgsConstructor
@NoArgsConstructor
@Data
public class Student {
    private String name;
}

RabbitMqController

@Slf4j
@RequiredArgsConstructor
@RestController
public class RabbitMqController {
    private final RabbitTemplate rabbitTemplate;
    private final DirectExchange directExchange;
    private final Queue studentQueue;

    @PostMapping("/api/students")
    public void produceStudent() {
        rabbitTemplate.convertAndSend(directExchange.getName(), studentQueue.getName(), new Student("john"));
    }

    @RabbitListener(queues = "#{studentQueue.name}")
    public void consumeStudent(Message message, Student student) {
        log.info("student : {}, student : {}", message, student);
        throw new IllegalStateException("for test");
    }

    @RabbitListener(queues = "#{studentDeadLetterQueue.name}")
    public void consumeStudentDlx(Message message, Student student) {
        log.info("studentDlx : {}, student : {}", message, student);
    }
}

참고

반응형

'Development > RabbitMQ' 카테고리의 다른 글

[RabbitMQ] Web UI  (0) 2020.12.30
[RabbitMQ] 클러스터 구성하기  (0) 2020.12.30
[RabbitMQ] 설치하기  (0) 2020.12.30
[RabbitMQ] RabbitMQ란?  (0) 2020.12.30

+ Recent posts