Rabbit listener error handler

RabbitMQ Spring Boot #05-error handling 1/2 — basic concepts Hi! In this story I’m going to cover error handling strategies for RabbitMq consumers, and I’ll show You some examples using Spring Boot. In my opinion this is a really important aspect , and I strongly recommend to gain some knowledge about RabbitMQ error handling […]

Содержание

  1. RabbitMQ Spring Boot #05-error handling 1/2 — basic concepts
  2. Default behaviour on NACK- requeue the message
  3. Infinite loop demo
  4. DLX (Dead Letter Exchange) and DLQ (Dead Letter Queue) concepts
  5. When to use it
  6. DLX and DLQ with retry
  7. When to retry
  8. Parking Lot Queue
  9. Обработка ошибок с помощью Spring AMQP
  10. 1. введение
  11. 2. Настройка среды
  12. 3. Сценарий Отказа
  13. 4. Настройка проекта
  14. 5. Очередь Мертвых писем
  15. 5.1. Базовая конфигурация
  16. 5.2. Неудачная Маршрутизация Сообщений
  17. 5.3. Обмен Мертвыми письмами
  18. 5.4. Обработка Сообщений Очереди Мертвых Писем
  19. 5.5. Очередь на Парковку
  20. 6. Пользовательская Обработка Ошибок
  21. 6.1. Глобальный обработчик ошибок
  22. 6.2. Стратегия Фатальных Исключений
  23. 7. Заключение

RabbitMQ Spring Boot #05-error handling 1/2 — basic concepts

Hi! In this story I’m going to cover error handling strategies for RabbitMq consumers, and I’ll show You some examples using Spring Boot. In my opinion this is a really important aspect , and I strongly recommend to gain some knowledge about RabbitMQ error handling behaviour before using this message broker on production 🙂

Default behaviour on NACK- requeue the message

By default, if consumer will return a negative acknowledgement of a message, the message is going to be requeued back to the queue. Reasons of negative acknowledgement might be different, e.g.

  • consumer application might shut down in a middle of processing
  • consumer application might have a temporal problem — it will be working again in a few minutes, but right now it won’t be able to process any new messages
  • business requirements haven’t been met, and consumer can’t accept the received message

Is this a good error handling strategy? As usually, it depends:) If the problem is temporal on consumer side (let’s say that consumer application will restart itself and it’ll be working again in a few seconds/minutes), then it‘s not the worst case scenario. However, if consumer’s application issue needs more time to be solved, or the message is never going to be accepted, then this strategy is terrible, because it will cause infinite loop on a consumer side.

Infinite loop demo

Message is going to be requeued on NACK by default, so to present this behaviour there is no need to write any extra error handling configuration.

Flow is very simple:

  1. Producer produces the message and sends it to x.error-handling-demo exchange using infinite-loop routing key
  2. Exchange routes the message to queue q.error-handling-demo.infinite-loop based on messages’ key
  3. Consumer consumes the message and throws an exception
  4. Message will be requeued back to the queue
  5. Back to point 3 — here we have infinite loop problem
  • exchange x.error-handling-demo is being declared
  • queue q.error-handling-demo.infinite-loop is being declared
  • queue q.error-handling-demo.infinite-loop is bound to an exchange x.error-handling-demo using infinite-loop routing key
  • producer sends a message to x.error-handling-demo exchange using infinite-loop routing key
  • consumer receives a message from q.error-handling-demo.infinite-loop and throws an exception — message won’t be acknowledged, parameter requeue hasn’t been modified, so it’s value is going to be true by default

As a result, You are going to see a lot of similar logs:

  1. consumer received the message — message id is b308a8d8–4eb8–4d65-a627–91c39474d50b and message time is 2021–10–31T14:06:47.351079
  2. message was rejected — stack trace will be printed
  3. the same message was consumed again — message id and time matches values from the bullet point above

After running this demo, remember to clear messages from queue q.error-handling-demo.infinite-loop. You can do it in management interface, by going to Queue tab, selecting queue q.error-handling-demo.infinite-loop, and clicking Purge Messages button

TIP: If You want to reject a message but You don’t want to requeue it, then You can use a built-in AmqpRejectAndDontRequeueException. It’s going to be used later

DLX (Dead Letter Exchange) and DLQ (Dead Letter Queue) concepts

Messages from a queue can be dead-lettered , which means, that they will be republished to a different exchange in case of:

  • message negative acknowledgment or rejection, but with requeue parameter set to false
  • message expiration due to configured TTL parameter
  • message drop due to queue length limit

Dead letter exchanges are normal exchanges — if You want to learn more about exchanges in general, You can read my story 🙂

How to configure DLX for queue? You can do it either by defining server’s policy or by passing x-dead-letter-exchange argument during queue declaration.

There is also an option to override message’s original routing key by using x-dead-letter-routing-key argument. If not set, then original routing key will be used.

Messages send to DLX will contain a set of x-death headers, which You can access to find an information about:

  • the reason why message was send to DLX
  • original queue
  • how many times this message was dead-lettered
  • … and others — here You can read more about these headers

Dead letter queue is the regular queue, bound to dead letter exchange. If You would like to read more about queues in RabbitMq, I wrote a story about it.

When to use it

DLX and DLQ are being used to collect dropped messages. What to do later is up to You. You can:

  • monitor DLQs size and present it in Grafana dashboard or other tool
  • send an email to the support team
  • …or do something different 🙂 In general it is important to have some kind of monitoring on these queues, so that You are going to know if there is something wrong happening to Your system
  1. exchange x.error-handling-demo.dlx-dlq-example is being declared
  2. queue q.error-handling-demo.dlx-dlq-example is being declared with x-dead-letter-exchange and x-dead-letter-routing-key arguments
  3. queue q.error-handling-demo.dlx-dlq-example is bound to an exchange x.error-handling-demo.dlx-dlq-example using dlx-before routing key
  4. exchange x.error-handling-demo.dlx-dlq-example.dlx is being declared
  5. queue q.error-handling-demo.dlx-dlq-example.dlq is being declared
  6. queue q.error-handling-demo.dlx-dlq-example.dlq is bound to an exchange x.error-handling-demo.dlx-dlq-example.dlx using dlx-after routing key
  1. Producer sends a message to x.error-handling-demo.dlx-dlq-example , using dlx-before routing key
  2. Consumer listens on q.error-handling-demo.dlx-dlq-example queue, and throws AmqpRejectAndDontRequeueException. This means, that message won’t be acknowledged, but it won’t be rejected back to the queue
  • consumer received the message — message id is 2e05f7a6-be9d-40ba-8a35–89d497f57316 and message time is 2021–10–31T15:47:17.364577
  • message was rejected — stack trace will be printed
  • the same message was consumed again , but this time from dead letter queue — message id and time matches values from the bullet point above
  • and… that’s over! There was no loop, and message was consumed twice — firstly from an original queue, and secondly from dead-lettered queue

DLX and DLQ with retry

Sometimes it’s worth to retry the operation. Why ?

  • consumer application might had a temporal problem — e.g. it run out of memory and had to be restarted
  • maybe consumer application uses other external service which is currently unavailable, but it will work soon — let’s consider e-mail sending service, which uses external e-mail server provided. Producer sends a command to send an e-mai. Company’s e-mail server is currently restarting — it will take about 2 minutes. Without the retry mechanism, e-mails won’t be send, messages will end up in DLQ, and they might require manual actions

When to retry

In general, it makes sense to retry a message if:

  • an issue on consumer side is temporary — like in above example with restarting e-mail server. Message itself is correct, but some required external services are currently unavailable. Who knows, maybe it will be available in a second?
  • it still makes sense to retry the message — e.g. it rather doesn’t makes sense to book a tickets for a show, which has already finished

For the simplest scenario, above properties are enough to turn on retry mechanism:

  • spring.rabbitmq.listener.simple.retry.enabled — enables/disabled properties
  • spring.rabbitmq.listener.simple.retry.initial-interval — duration between the first and second attempt to deliver a message
  • spring.rabbitmq.listener.simple.retry.max-attempts — maximum number of attempts to deliver a message
  • spring.rabbitmq.listener.simple.retry.max-interval — maximum duration between attempts
  • spring.rabbitmq.listener.simple.retry.multiplier — multiplier to apply to the previous retry interval

Based on above configuration, RabbitMQ listener will try to retry the nack-ed message.

  • there will be 3 attempts to deliver the message
  • initial interval is going to be 5 seconds
  • every other retry attempt will wait for previous interval time multiplied by multiplier, e.g. for 5,10, 20, 30,60 seconds etc.

It might be a good idea to use Exponential backoff algorithm for retry properties configuration.

Set up is the same as in the previous demo — the only difference is the configuration of above properties

  1. consumer received the message — message id is c086a5a1–251b-490f-931a-0e122ca33d35 and message time is 2021–10–31T19:27:20.371739
  2. message was rejected — first attempt
  3. after 5 seconds (configured in spring.rabbitmq.listener.simple.retry.initial-interval property) consumer receives the same message again
  4. message was rejected — second attempt
  5. after 10 seconds (previous interval multiplied by spring.rabbitmq.listener.simple.retry.multiplier property)
  6. message rejected — third attempt, spring.rabbitmq.listener.simple.retry.max-attempts threshold reached
  7. the same message was consumed again , but this time from dead letter queue
  8. and… that’s over!

Parking Lot Queue

Sometimes we might want to retry the messages from…DLQ ! In such case we are talking about another retry mechanism:

  • firstly we may retry an attempt to consume a message from the queue — like in a previous example, we try to consume message 3 times
  • after that, message will be send to DLX and DLQ
  • and here is where the second retry mechanism might take place — after some time You might want to send messages from the DLQ back to the original queue

Источник

Обработка ошибок с помощью Spring AMQP

Узнайте о различных способах обработки ошибок с помощью Spring AMQP с помощью RabbitMQ.

Автор: baeldung
Дата записи

1. введение

Асинхронный обмен сообщениями-это тип слабо связанной распределенной коммуникации, который становится все более популярным для реализации архитектуры, управляемой событиями . К счастью, Spring Framework предоставляет проект Spring AMQP, позволяющий нам создавать решения для обмена сообщениями на основе AMQP.

С другой стороны, работа с ошибками в таких средах может быть нетривиальной задачей . Поэтому в этом уроке мы рассмотрим различные стратегии обработки ошибок.

2. Настройка среды

Для этого урока мы будем использовать RabbitMQ , который реализует стандарт AMQP. Кроме того, Spring AMQP предоставляет модуль spring-rabbit , который делает интеграцию очень простой.

Давайте запустим RabbitMQ как автономный сервер. Мы запустим его в контейнере Docker, выполнив следующую команду:

Для получения подробной конфигурации и настройки зависимостей проекта, пожалуйста, обратитесь к нашей статье Spring AMQP .

3. Сценарий Отказа

Обычно существует больше типов ошибок, которые могут возникнуть в системах обмена сообщениями по сравнению с монолитными или однопакетными приложениями из-за их распределенной природы.

Мы можем указать на некоторые типы исключений:

  • Сеть- или Связанные с вводом-выводом/| – общие сбои сетевых соединений и операций ввода-вывода Протокол-
  • или связанные с инфраструктурой -ошибки, которые обычно представляют собой неправильную конфигурацию инфраструктуры обмена сообщениями Связанные с брокером
  • -сбои, предупреждающие о неправильной конфигурации между клиентами и брокером AMQP. Например, достижение определенных пределов или порога, аутентификация или недопустимая конфигурация политик Application-
  • и message-related – исключения, которые обычно указывают на нарушение некоторых бизнес-или прикладных правил

Конечно, этот список отказов не является исчерпывающим, но содержит наиболее распространенные типы ошибок.

Следует отметить, что Spring AMQP обрабатывает связанные с подключением и низкоуровневые проблемы из коробки, например, применяя политики retry или requeue . Кроме того, большинство отказов и неисправностей преобразуются в AmqpException или один из его подклассов.

В следующих разделах мы в основном сосредоточимся на специфичных для приложений и высокоуровневых ошибках, а затем рассмотрим глобальные стратегии обработки ошибок.

4. Настройка проекта

Теперь давайте определим простую конфигурацию очереди и обмена для начала:

Далее, давайте создадим простого производителя:

И, наконец, потребитель, который бросает исключение:

По умолчанию все неудачные сообщения будут немедленно запрашиваться в начале целевой очереди снова и снова.

Давайте запустим наш пример приложения, выполнив следующую команду Maven:

Теперь мы должны увидеть аналогичный результирующий результат:

Следовательно, по умолчанию мы будем видеть бесконечное количество таких сообщений в выходных данных.

Чтобы изменить это поведение у нас есть два варианта:

  • Установите параметр defaultrequeuerejected в значение false на стороне слушателя – spring.rabbitmq.listener.simple.default-requeue-rejected=false
  • Throw an AmqpRejectAndDontRequeueException – t his может быть полезен для сообщений, которые не будут иметь смысла в будущем, поэтому их можно отбросить.

Теперь давайте узнаем, как обрабатывать неудачные сообщения более разумным способом.

5. Очередь Мертвых писем

Очередь мертвых писем (DLQ) – это очередь, содержащая недоставленные или неудачные сообщения . DLQ позволяет нам обрабатывать неисправные или плохие сообщения, отслеживать шаблоны сбоев и восстанавливаться после исключений в системе.

Что еще более важно, это помогает предотвратить бесконечные циклы в очередях, которые постоянно обрабатывают плохие сообщения и снижают производительность системы.

Всего существует два основных понятия: Обмен мертвыми письмами (DLX) и сама очередь мертвых писем (DLQ). На самом деле, DLX-это обычный обмен, который мы можем определить как один из распространенных типов : direct , topic или fanout .

Очень важно понимать, что продюсер ничего не знает об очередях. Он знает только об обменах, и все произведенные сообщения маршрутизируются в соответствии с конфигурацией обмена и ключом маршрутизации сообщений .

Теперь давайте посмотрим, как обрабатывать исключения, применяя подход очереди мертвых писем.

5.1. Базовая конфигурация

Чтобы настроить DLQ, нам нужно указать дополнительные аргументы при определении нашей очереди:

В приведенном выше примере мы использовали два дополнительных аргумента: x-dead-letter-exchange и x-dead-letter-routing-key . Пустое строковое значение для параметра x-dead-letter-exchange указывает брокеру использовать обмен по умолчанию .

Второй аргумент столь же важен, как и установка ключей маршрутизации для простых сообщений. Эта опция изменяет начальный ключ маршрутизации сообщения для дальнейшей маршрутизации по DLX.

5.2. Неудачная Маршрутизация Сообщений

Поэтому, когда сообщение не доставляется, оно направляется на обмен мертвыми письмами. Но, как мы уже отмечали, DLX-это нормальный обмен. Поэтому, если ключ маршрутизации неудачного сообщения не совпадает с обменом, он не будет доставлен в DLQ.

Таким образом, если мы опустим аргумент x-dead-letter-routing-key в нашем примере, неудачное сообщение застрянет в бесконечном цикле повторных попыток.

Кроме того, исходная метаинформация сообщения доступна в x-death heaven:

То приведенная выше информация доступна в консоли управления RabbitMQ обычно работает локально на порту 15672.

Помимо этой конфигурации, если мы используем Spring Cloud Stream , мы можем даже упростить процесс настройки, используя свойства конфигурации republishToDlq и autoBindDlq .

5.3. Обмен Мертвыми письмами

В предыдущем разделе мы видели, что ключ маршрутизации изменяется, когда сообщение направляется на обмен мертвыми буквами. Но такое поведение не всегда желательно. Мы можем изменить его, настроив DLX самостоятельно и определив его с помощью типа fanout :

На этот раз мы определили пользовательский обмен типа fan out , поэтому сообщения будут отправляться во все ограниченные очереди . Кроме того, мы установили значение аргумента x-dead-letter-exchange для имени нашего DLX. В то же время мы удалили аргумент x-dead-letter-routing-key .

Теперь, если мы запустим наш пример, неудачное сообщение должно быть доставлено в DLQ, но без изменения начального ключа маршрутизации:

5.4. Обработка Сообщений Очереди Мертвых Писем

Конечно, причина, по которой мы переместили их в очередь мертвых писем, заключается в том, что они могут быть переработаны в другое время.

Давайте определим слушателя для очереди мертвых писем:

Если мы запустим наш пример кода сейчас, то увидим вывод журнала:

Мы получили неудачное сообщение, но что нам делать дальше? Ответ зависит от конкретных системных требований, вида исключения или типа сообщения.

Например, мы можем просто запросить сообщение в исходное место назначения:

Но такая логика исключений не отличается от политики повторных попыток по умолчанию:

Общая стратегия может потребовать повторной обработки сообщения в течение n раз, а затем отклонить его. Давайте реализуем эту стратегию, используя заголовки сообщений:

Сначала мы получаем значение заголовка x-retries-count , затем сравниваем это значение с максимально допустимым значением. Впоследствии, если счетчик достигнет предельного числа попыток, сообщение будет отброшено:

Мы должны добавить, что мы также можем использовать заголовок x-message-ttl , чтобы установить время, после которого сообщение должно быть отброшено. Это может быть полезно для предотвращения бесконечного роста очередей.

5.5. Очередь на Парковку

С другой стороны, рассмотрим ситуацию, когда мы не можем просто отбросить сообщение, например, это может быть транзакция в банковской сфере. Кроме того, иногда сообщение может потребовать ручной обработки или нам просто нужно записать сообщения, которые потерпели неудачу более n раз.

Для подобных ситуаций существует понятие очереди на парковку . Мы можем переслать все сообщения из DLQ, которые вышли из строя более разрешенного количества раз, в очередь парковки для дальнейшей обработки .

Давайте теперь воплотим эту идею:

Во – вторых, давайте реорганизуем логику прослушивателя, чтобы отправить сообщение в очередь парковки:

В конце концов, нам также нужно обрабатывать сообщения, поступающие в очередь парковки:

Теперь мы можем сохранить неудачное сообщение в базе данных или, возможно, отправить уведомление по электронной почте.

Давайте проверим эту логику, запустив наше приложение:

Как видно из выходных данных, после нескольких неудачных попыток сообщение было отправлено в очередь парковки.

6. Пользовательская Обработка Ошибок

В предыдущем разделе мы видели, как обрабатывать сбои с помощью выделенных очередей и обменов. Однако иногда нам может потребоваться отловить все ошибки, например, для регистрации или сохранения их в базе данных.

6.1. Глобальный обработчик ошибок

До сих пор мы использовали по умолчанию SimpleRabbitListenerContainerFactory и эта фабрика по умолчанию использует ConditionalRejectingErrorHandler . Этот обработчик ловит различные исключения и преобразует их в одно из исключений в иерархии AmqpException .

Важно отметить, что если нам нужно обрабатывать ошибки подключения, то нам нужно реализовать интерфейс ApplicationListener .

Проще говоря, Условное отклонение ErrorHandler решает, отклонять ли конкретное сообщение или нет. Когда сообщение, вызвавшее исключение, отклоняется, оно не запрашивается.

Давайте определим пользовательский Обработчик ошибок , который будет просто запрашивать только Бизнес-исключение s:

Кроме того, поскольку мы выбрасываем исключение внутри нашего метода listener, оно оборачивается в ListenerExecutionFailedException . Итак, нам нужно вызвать метод getCause , чтобы получить исходное исключение.

6.2. Стратегия Фатальных Исключений

Под капотом этот обработчик использует стратегию Fatal Exception , чтобы проверить, следует ли считать исключение фатальным. Если это так, то неудачное сообщение будет отклонено.

По умолчанию эти исключения фатальны:

  • MessageConversionException
  • MessageConversionException
  • MethodArgumentNotValidException
  • Метод Аргумент TypeMismatchException
  • NoSuchMethodException
  • ClassCastException

Вместо реализации интерфейса Ierrorhandler мы можем просто предоставить нашу стратегию Фатальных исключений :

Наконец, нам нужно передать нашу пользовательскую стратегию конструктору Conditional Rejecting ErrorHandler :

7. Заключение

В этом уроке мы обсудили различные способы обработки ошибок при использовании Spring AMQP и, в частности, RabbitMQ.

Каждая система нуждается в определенной стратегии обработки ошибок. Мы рассмотрели наиболее распространенные способы обработки ошибок в архитектуре, управляемой событиями. Кроме того, мы убедились, что можем объединить несколько стратегий для создания более комплексного и надежного решения.

Как всегда, полный исходный код статьи доступен на GitHub .

Источник

In this article we will implement a RabbitMQ Error Handling.
Whenever any data in the message is
transmitted that the receiver does not accept, or when a message is sent to a queue that does not
exist. The message is retried and sent up to a set number of times. Even if the communication is not
received by the recipient but is sent from the sender’s end.
Now In such instances, the message queue is marked as undeliverable or deadLetter queue.

RabbitMQ provides a method for handling message failures in a really efficient way known as the Retry
and Error Handling feature
.

What is Dead Message in RabbitMQ.

If certain messages become undeliverable or unhandled even though when message received by the
broker. This can
occur whenever the amount of time that the message spent in a queue is longer than the time it takes
to live TTL, when the queue hits its capacity or the consumer admits a message negatively. Such a
message is known as dead
message
.

However there is a better way to handle this situation. Setting up a RabbitMQ dead letter
exchange and a dead letter queue enables for the storage and processing of orphaned messages.
Now there is no need to lose messages completely.

RabbitMQ Tutorial :

  1. Install RabbitMQ on Windows
  2. Install RabbitMQ using Docker on Windows
  3. Spring Boot RabbitMQ Example
  4. Spring Boot + RabbitMQ + Error Handling
    Example
  5. Spring Cloud Stream + RabbitMQ Example

  6. Spring Boot + Pivotal Cloud Foundry (PCF) + RabbitMQ Example
  7. RabbitMQ Interview Questions

In this case, if the employee experience provided in the RequestParam does not meet the criteria,
then retry and error handling feature is invoked, and the message queue is declared as deadLetter
queue.

Let’s start developing Spring Boot for Producer and Consumer applications.

Producer

Project Structure

Project Structure Producer

Maven Dependencies

We will use the Spring AMQP dependency to develop AMQP-based messaging solutions.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.5.1</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.techgeeknext</groupId>
	<artifactId>SpringBootRabbitMQProducer</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>SpringBootRabbitMQProducer</name>
	<description>Spring Boot RabbitMQ + Error Handling</description>
	<properties>
		<java.version>1.8</java.version>
	</properties>
	<dependencies>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-logging</artifactId>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.20</version>
			<scope>provided</scope>
		</dependency>

	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>
Take a look at our suggested posts:

Employee Model Class

Using lombok, which will generate getters and setters automatically.

package com.techgeeknext.model;


import com.fasterxml.jackson.annotation.JsonIdentityInfo;
import com.fasterxml.jackson.annotation.ObjectIdGenerators;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@Setter
@ToString
@Getter
@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id"
		, scope = Employee.class)
public class Employee {

    private String name;
    private String domain;
    private int experience;

}

RabbitMQ Configuration Class

We require x-dead-letter-exchange and x-dead-letter-routing-key in order
to configure a dead letter.
It instructs the broker to use the standard exchange.
The deadLetterExchange is used to bind dlq, and the techgeeknextExchange
is used to bind
techgeeknext.

The Jackson2JsonMessageConverter translates an object to JSON and then from JSON to a Java object.

package com.techgeeknext.config;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQProducerConfig {

    @Bean
    DirectExchange deadLetterExchange() {
        return new DirectExchange("deadLetterExchange");
    }

    @Bean
    DirectExchange exchange() {
        return new DirectExchange("techgeeknextExchange");
    }

    @Bean
    Queue dlq() {
        return QueueBuilder.durable("deadLetter.queue").build();
    }

    @Bean
    Queue queue() {
        return QueueBuilder.durable("techgeeknext.queue")
                .withArgument("x-dead-letter-exchange", "deadLetterExchange")
                .withArgument("x-dead-letter-routing-key", "deadLetter").build();
    }

    @Bean
    Binding DLQbinding() {
        return BindingBuilder.bind(dlq())
                .to(deadLetterExchange()).with("deadLetter");
    }

    @Bean
    Binding binding() {
        return BindingBuilder.bind(queue())
                .to(exchange()).with("techgeeknext");
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }
}

Controller

Produceer method is used to send the message to the consumer through AMQP message.

package com.techgeeknext.controller;

import com.techgeeknext.model.Employee;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController

public class RabbitMQProducerController {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @GetMapping(value = "/sendMessage")
    public String producer(@RequestParam("name") String name, @RequestParam("domain") String domain,
						   @RequestParam("exp") int exp) {
        Employee emp = new Employee();
        emp.setName(name);
        emp.setDomain(domain);
        emp.setExperience(exp);

        //The convertAndSend method converts the java object to an amqp message,
        // and then sends this message via the routing key to the exchange.
        amqpTemplate.convertAndSend("techgeeknextExchange", "techgeeknext", emp);
        return "Message sent to RabbitMQ server successfully!!";
    }
}

Consumer

Project Structure

Project Structure Consumer

Maven Dependencies

We will use the Spring AMQP dependency to develop AMQP-based messaging solutions.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.5.1</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.techgeeknext</groupId>
	<artifactId>SpringBootRabbitMQConsumer</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>SpringBootRabbitMQConsumer</name>
	<description>Spring Boot RabbitMQ + Error Handling Example</description>
	<properties>
		<java.version>1.8</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-logging</artifactId>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<scope>provided</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

Properties

Here in application.yml file, we have activated the RabbitMQ retry mechanism for Spring Boot.

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true

##After an interval of 3s the message should be retried.
          initial-interval: 3s

##Maximum 6 times the message should be retried.
##It is then sent to the dead letter Queue.
          max-attempts: 6

##The maximum interval should not be more than 10s between two retries.
          max-interval: 10s

##The gap is multiplied by 2 between second repetition.
## But the maximum interval can never be exceeded
          multiplier: 2
##So the retry interval values will be 3s, 6s, 10s, 10s, 10s. As 10 sec is the max interval specified.

server:
  port: 8081

Custom Checked Exception

Create custom checked exception called CustomInvalidException.

package com.techgeeknext.exception;

public class CustomInvalidException extends Exception {

    private static final long serialVersionUID = -3154618962130084535L;

}

RabbitMQ Consumer

Create the class EmployeeConsumerService that uses RabbitListener to consume message from RabbitMQ.
For
incoming messages the RabbitMQ Listener listens for RabbitMQ Queueue. The Queue/Topic Name (name of
the queue/topic where the message should be consumed) is used for the basic configuration.

We’ll also check the employee experience field here and throw CustomInvalidException in the
case of
a negative or invalid experience.

package com.techgeeknext.service;

import com.techgeeknext.exception.CustomInvalidException;
import com.techgeeknext.model.Employee;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class EmployeeConsumerService {

	private static final Logger logger = LoggerFactory.getLogger(EmployeeConsumerService.class);

	@RabbitListener(queues = "techgeeknext.queue")
	public void consumeMessage(Employee employee) throws CustomInvalidException {
		logger.info("Recieved Message From RabbitMQ techgeeknextExchange: " + employee);
		if (employee.getExperience() < 0 || employee.getExperience() > 30 ) {
			throw new CustomInvalidException();
		}
	}
}

Test

  1. Now, test this application at local. Start local RabbitMQ.
  2. Rabbitmq Login Page

  3. Start the Producer Spring Boot Application.
  4.   .   _____  __ _ _
     /\ / ___'_ __ _ _(_)_ __  __ _    
    ( ( )___ | '_ | '_| | '_ / _` |    
     \/  ___)| |_)| | | | | || (_| |  ) ) ) )
      '  |____| .__|_| |_|_| |___, | / / / /
     =========|_|==============|___/=/_/_/_/
     :: Spring Boot ::(v2.5.1)
    
    16:00:18.798  INFO 17000 --- [ main] c.t.SpringBootRabbitMQProducer : Starting SpringBootRabbitMQProducer using Java 12.0.2 on LAPTOP-S6R44CQL with PID 17000 (D:SpringBootRabbitMQProducertargetclasses started in D:SpringBootRabbitMQProducer)
    16:00:18.801  INFO 17000 --- [ main] c.t.SpringBootRabbitMQProducer : No active profile set, falling back to default profiles: default
    16:00:19.922  INFO 17000 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8080 (http)
    16:00:19.932  INFO 17000 --- [ main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
    16:00:19.933  INFO 17000 --- [ main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.46]
    16:00:20.008  INFO 17000 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
    16:00:20.008  INFO 17000 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1174 ms
    16:00:20.525  INFO 17000 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer  :
    Tomcat started on port(s): 8080 (http) with context path ''
    16:00:20.534  INFO 17000 --- [ main] c.t.SpringBootRabbitMQProducer :
    Started SpringBootRabbitMQProducer in 2.073 seconds (JVM running for 2.362)
    16:00:44.371  INFO 17000 --- [nio-8080-exec-2] o.a.c.c.C.[Tomcat].[localhost].[/] :
    Initializing Spring DispatcherServlet 'dispatcherServlet'
    16:00:44.372  INFO 17000 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet  :
    Initializing Servlet 'dispatcherServlet'
    16:00:44.374  INFO 17000 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet  :
    Completed initialization in 2 ms
    16:01:04.917  INFO 17000 --- [nio-8080-exec-3] o.s.a.r.c.CachingConnectionFactory :
    Attempting to connect to: [localhost:5672]
    16:01:05.029  INFO 17000 --- [nio-8080-exec-3] o.s.a.r.c.CachingConnectionFactory :
    Created new connection: rabbitConnectionFactory#14229fa7:0/SimpleConnection@58b43bdf
    [delegate=amqp://guest@127.0.0.1:5672/, localPort= 61153]
    
    
    
  5. Produce/send the message with invalid Employee experience as 35 to consumer by using endpoint as
    http://localhost:8080/sendMessage?name=techgeeknext&domain=XYZ&exp=35
  6. Rabbitmq Sender

  7. Now start the Consumer Spring Boot Application.
  8. Consumer : The message is sent by rabbitMQ queue to the consumer’s application, which is designated
    techgeeknext.queue. Since the employee experience is invalid,
    it’ll throw CustomInvalidException
    . This message is tried again 6 times and then placed in the queue of the dead letter.

     .   ____    _ __ _ _
     /\ / ___'_ __ _ _(_)_ __  __ _    
    ( ( )___ | '_ | '_| | '_ / _` |    
     \/  ___)| |_)| | | | | || (_| |  ) ) ) )
      '  |____| .__|_| |_|_| |___, | / / / /
     =========|_|==============|___/=/_/_/_/
     :: Spring Boot ::     (v2.5.1)
    
    16:00:31.805  INFO 8532 --- [main] c.t.SpringBootRabbitMQConsumer: Starting SpringBootRabbitMQConsumer using Java 12.0.2 on LAPTOP-S6R44CQL with PID 8532 (D:SpringBootRabbitMQConsumertargetclasses started in D:SpringBootRabbitMQConsumer)
    16:00:31.808  INFO 8532 --- [main] c.t.SpringBootRabbitMQConsumer: No active profile set, falling back to default profiles: default
    16:00:32.848  INFO 8532 --- [main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8081 (http)
    16:00:32.856  INFO 8532 --- [main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
    16:00:32.856  INFO 8532 --- [main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.46]
    16:00:32.939  INFO 8532 --- [main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
    16:00:32.939  INFO 8532 --- [main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1088 ms
    16:00:33.417  INFO 8532 --- [main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8081 (http) with context path ''
    16:00:33.418  INFO 8532 --- [main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
    16:00:33.458  INFO 8532 --- [main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#37b56ac7:0/SimpleConnection@6ebc9573 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 64346]
    16:00:33.510  INFO 8532 --- [main] c.t.SpringBootRabbitMQConsumer: Started SpringBootRabbitMQConsumer in 2.069 seconds (JVM running for 2.353)
    16:03:10.713  INFO 8532 --- [ntContainer#0-1] c.t.service.EmployeeConsumerService:
    Recieved Message From RabbitMQ techgeeknextExchange:
    Employee(name=techgeeknext, domain=XYZ, experience=35)
    16:03:13.719  INFO 8532 --- [ntContainer#0-1] c.t.service.EmployeeConsumerService:
    Recieved Message From RabbitMQ techgeeknextExchange:
    Employee(name=techgeeknext, domain=XYZ, experience=35)
    16:03:19.735  INFO 8532 --- [ntContainer#0-1] c.t.service.EmployeeConsumerService:
    Recieved Message From RabbitMQ techgeeknextExchange:
    Employee(name=techgeeknext, domain=XYZ, experience=35)
    16:03:29.746  INFO 8532 --- [ntContainer#0-1] c.t.service.EmployeeConsumerService:
    Recieved Message From RabbitMQ techgeeknextExchange:
    Employee(name=techgeeknext, domain=XYZ, experience=35)
    16:03:39.758  INFO 8532 --- [ntContainer#0-1] c.t.service.EmployeeConsumerService:
    Recieved Message From RabbitMQ techgeeknextExchange:
    Employee(name=techgeeknext, domain=XYZ, experience=35)
    16:03:49.760  INFO 8532 --- [ntContainer#0-1] c.t.service.EmployeeConsumerService:
    Recieved Message From RabbitMQ techgeeknextExchange:
    Employee(name=techgeeknext, domain=XYZ, experience=35)
    16:03:49.784  WARN 8532 --- [ntContainer#0-1] o.s.a.r.r.RejectAndDontRequeueRecoverer  :
    Retries exhausted for message (Body:'{"@id":1,"name":"techgeeknext","domain":"XYZ","experience":35}' MessageProperties [headers={__TypeId__=com.techgeeknext.model.Employee}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=techgeeknextExchange, receivedRoutingKey=techgeeknext, deliveryTag=2, consumerTag=amq.ctag-u4Xd3zuiVqOKGf8FzwA7qg, consumerQueue=techgeeknext.queue])
    
    org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void
    com.techgeeknext.service.EmployeeConsumerService.recievedMessage(com.techgeeknext.model.Employee)
    throws com.techgeeknext.exception.CustomInvalidException' threw exception
    	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:247) ~[spring-rabbit-2.3.8.jar:2.3.8]
    	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:191) ~[spring-rabbit-2.3.8.jar:2.3.8]
    	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:137) ~[spring-rabbit-2.3.8.jar:2.3.8]
    	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1654) ~[spring-rabbit-2.3.8.jar:2.3.8]
    	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1573) ~[spring-rabbit-2.3.8.jar:2.3.8]
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    	at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na]
    	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.8.jar:5.3.8]
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.8.jar:5.3.8]
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.8.jar:5.3.8]
    	at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:93) ~[spring-retry-1.3.1.jar:na]
    	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.1.jar:na]
    	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) ~[spring-retry-1.3.1.jar:na]
    	at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:116) ~[spring-retry-1.3.1.jar:na]
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.8.jar:5.3.8]
    	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.8.jar:5.3.8]
    	at org.springframework.amqp.rabbit.listener.$Proxy65.invokeListener(Unknown Source) ~[na:2.3.8]
    	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1561) ~[spring-rabbit-2.3.8.jar:2.3.8]
    	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1552) 
  9. Rabbitmq Exchange

    Rabbitmq Exchange

  10. Rabbitmq Queues

    Rabbitmq Queues

Download Source Code

The full source code for this article can be found on below.
Download it here —

  1. Producer Spring Boot + RabbitMQ + Error Handling Example
  2. Consumer Spring Boot + RabbitMQ + Error Handling Example

Table of Contents

  • Introduction
  • What Is Dead Letter Queue
  • Prerequisites
  • Project Setup
  • Application Properties
  • Custom Exception
  • Producer or Sender
  • Consumer or Receiver
  • Config Class
  • Spring Boot Main Class
  • Testing RabbitMQ Dead Letter Queue
  • Source Code

Introduction

In this example I am going to show you how to retry the failed messages in RabbitMQ message broker. So, here I am going to retry messages for few times and if error still exists after retrying the maximum attempts then the message will be put into the dead letter queue.

Related Posts:

  • RabbitMQ Spring Boot Publish Subscribe Example
  • RabbitMQ Spring Boot Work Queues Example
  • RabbitMQ Spring Boot Producer Consumer Example
  • RabbitMQ Spring Boot Routing Example
  • RabbitMQ Spring Boot Message Pattern Example
  • RabbitMQ Spring Boot RPC Example

What is Dead Letter Queue?

A dead letter queue is an undelivered message queue, so a dead letter queue is a service implementation used to store messages that meet one or more of the following criteria:

  1. Message that is sent to a queue that does not exist.
  2. Queue length limit exceeded.
  3. Message length limit exceeded.
  4. Message is rejected by another queue exchange.
  5. Message reaches a threshold read counter number, because it is not consumed. Sometimes this is called a “back out queue”.
  6. The message expires due to per-message TTL (time to live).
  7. Message is not processed successfully.

The dead letter queue storing these messages allows developers to analyze messages to look for the problems why the messages are failing to be delivered.

Prerequisites

Java 1.8+, Maven 3.8.2, Spring Boot 2.6.2, Spring AMQP 2.6.2, RabbitMQ Server 3.9.4 – 3.9.13

Project Setup

The following pom.xml file can be used for the maven based project creation in your favorite IDE or tool.

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.roytuts</groupId>
	<artifactId>spring-rabbitmq-error-retry-dlq</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>11</maven.compiler.source>
		<maven.compiler.target>11</maven.compiler.target>
	</properties>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.6.2</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

Application Properties

The application.properties file is kept with the following content under classpath folder src/main/resources.

exchange.dl=dlExchange
exchange.roytuts=roytutsExchange
queue.dl=dl.queue
queue.roytuts=roytuts.queue
routing.key.dl=dl
routing.key.roytuts=roytuts

spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.initial-interval=2s
spring.rabbitmq.listener.simple.retry.max-attempts=3
spring.rabbitmq.listener.simple.retry.max-interval=6s
spring.rabbitmq.listener.simple.retry.multiplier=2

I have enabled retry mechanism using the property key spring.rabbitmq.listener.simple.retry.enabled=true. The spring.rabbitmq.listener.simple.retry.initial-interval=2s tells to retry the message after initial interval of 2 seconds.

spring.rabbitmq.listener.simple.retry.max-attempts=3 tells that the maximum of 3 retries will be happened and after that the message will be put into dead letter queue.

spring.rabbitmq.listener.simple.retry.max-interval=6s says that the maximum interval between two retries is 6 seconds.

The interval in subsequent retry gets multiplied by 2 using the key/value pair spring.rabbitmq.listener.simple.retry.multiplier=2. Therefore, the retry interval will be 2s, 4s, 6s, etc.

Custom Exception

I am creating a custom exception class to throw an exception for invalid string.

package com.roytuts.spring.rabbitmq.error.retry.dlq.exception;

public class InvalidNameException extends Exception {

	private static final long serialVersionUID = 1L;

}

Producer or Sender

The producer or sender class produces or sends a message.

package com.roytuts.spring.rabbitmq.error.retry.dlq.producer;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class Producer {

	@Value("${exchange.roytuts}")
	private String roytutsExchange;

	@Value("${routing.key.roytuts}")
	private String routingKeyRoytuts;

	@Autowired
	private RabbitTemplate rabbitTemplate;

	public void sendName(final String name) {
		rabbitTemplate.convertAndSend(roytutsExchange, routingKeyRoytuts, name);
		System.out.println("Sent: " + name);
	}

}

Consumer or Receiver

The consumer or receiver consumes or receives a message.

package com.roytuts.spring.rabbitmq.error.retry.dlq.consumer;

import java.util.regex.Pattern;

import javax.naming.InvalidNameException;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "${queue.roytuts}")
public class Consumer {

	@RabbitHandler
	public void receiveMsg(final String name) throws InvalidNameException {
		if (!Pattern.matches("[a-zA-Z]+", name)) {
			throw new InvalidNameException("Name should contain only alphabets");
		}

		System.out.println("Received: " + name);
	}

}

Config Class

The configuration class will create several beans required for the applications.

package com.roytuts.spring.rabbitmq.error.retry.dlq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class Config {

	@Value("${exchange.dl}")
	private String dlExchange;

	@Value("${exchange.roytuts}")
	private String roytutsExchange;

	@Value("${queue.dl}")
	private String dlQueue;

	@Value("${queue.roytuts}")
	private String roytutsQueue;

	@Value("${routing.key.dl}")
	private String routingKeyDl;

	@Value("${routing.key.roytuts}")
	private String routingKeyRoytuts;

	@Bean
	public DirectExchange dlExchange() {
		return new DirectExchange(dlExchange);
	}

	@Bean
	public DirectExchange roytutsExchange() {
		return new DirectExchange(roytutsExchange);
	}

	@Bean
	public Queue dlq() {
		return QueueBuilder.durable(dlQueue).build();
	}

	@Bean
	public Queue queue() {
		return QueueBuilder.durable(roytutsQueue).withArgument("x-dead-letter-exchange", dlExchange)
				.withArgument("x-dead-letter-routing-key", routingKeyDl).build();
	}

	@Bean
	public Binding binding() {
		return BindingBuilder.bind(queue()).to(roytutsExchange()).with(routingKeyRoytuts);
	}

	@Bean
	public Binding dlqBinding() {
		return BindingBuilder.bind(dlq()).to(dlExchange()).with(routingKeyDl);
	}

}

Spring Boot Main Class

A class with a main method and @SpringBootApplication annotation is enough to deploy the application into embedded Tomcat server.

package com.roytuts.spring.rabbitmq.error.retry.dlq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.roytuts.spring.rabbitmq.error.retry.dlq.producer.Producer;

@SpringBootApplication
public class SpringRabbitMqDlqApp implements CommandLineRunner {

	@Autowired
	private Producer producer;

	public static void main(String[] args) {
		SpringApplication.run(SpringRabbitMqDlqApp.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		producer.sendName("Soumitra");
		producer.sendName("Roy");
		producer.sendName("Roy Tutorials");
		producer.sendName("Soumitra 2");
		producer.sendName("Roy Tutorials2");
	}

}

Testing RabbitMQ Dead Letter Queue

Once you run the above main class you will see the following output:

20.378  INFO 22836 --- [  restartedMain] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
20.430  INFO 22836 --- [  restartedMain] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#5fe18d6d:0/SimpleConnection@18cce147 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56240]
21.131  INFO 22836 --- [  restartedMain] c.r.s.r.e.r.dlq.SpringRabbitMqDlqApp     : Started SpringRabbitMqDlqApp in 2.001 seconds (JVM running for 2.433)
Sent: Soumitra
Sent: Roy
Sent: Roy Tutorials
Sent: Soumitra 2
Sent: Roy Tutorials2
Received: Soumitra
Received: Roy
27.167  WARN 22836 --- [ntContainer#0-1] o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message (Body:'Roy Tutorials' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=roytutsExchange, receivedRoutingKey=roytuts, deliveryTag=3, consumerTag=amq.ctag-d6XWSD3q37Fmbo6o9tqthw, consumerQueue=roytuts.queue])

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(java.lang.String) throws javax.naming.InvalidNameException' threw exception
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:270) ~[spring-rabbit-2.4.2.jar:2.4.2]
	...
Caused by: javax.naming.InvalidNameException: Name should contain only alphabets
	at com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(Consumer.java:18) ~[classes/:na]
	...

27.168  WARN 22836 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
	at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:76) ~[spring-rabbit-2.4.2.jar:2.4.2]
	...
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(java.lang.String) throws javax.naming.InvalidNameException' threw exception
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:270) ~[spring-rabbit-2.4.2.jar:2.4.2]
	...
Caused by: javax.naming.InvalidNameException: Name should contain only alphabets
	at com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(Consumer.java:18) ~[classes/:na]
	...

33.194  WARN 22836 --- [ntContainer#0-1] o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message (Body:'Soumitra 2' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=roytutsExchange, receivedRoutingKey=roytuts, deliveryTag=4, consumerTag=amq.ctag-d6XWSD3q37Fmbo6o9tqthw, consumerQueue=roytuts.queue])

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(java.lang.String) throws javax.naming.InvalidNameException' threw exception
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:270) ~[spring-rabbit-2.4.2.jar:2.4.2]
	...
Caused by: javax.naming.InvalidNameException: Name should contain only alphabets
	at com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(Consumer.java:18) ~[classes/:na]
	...

33.195  WARN 22836 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
	at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:76) ~[spring-rabbit-2.4.2.jar:2.4.2]
	...
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(java.lang.String) throws javax.naming.InvalidNameException' threw exception
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:270) ~[spring-rabbit-2.4.2.jar:2.4.2]
	...
Caused by: javax.naming.InvalidNameException: Name should contain only alphabets
	at com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(Consumer.java:18) ~[classes/:na]
	...

39.209  WARN 22836 --- [ntContainer#0-1] o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message (Body:'Roy Tutorials2' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=roytutsExchange, receivedRoutingKey=roytuts, deliveryTag=5, consumerTag=amq.ctag-d6XWSD3q37Fmbo6o9tqthw, consumerQueue=roytuts.queue])

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(java.lang.String) throws javax.naming.InvalidNameException' threw exception
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:270) ~[spring-rabbit-2.4.2.jar:2.4.2]
	...
Caused by: javax.naming.InvalidNameException: Name should contain only alphabets
	at com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(Consumer.java:18) ~[classes/:na]
	...

39.213  WARN 22836 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Retry Policy Exhausted
	at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:76) ~[spring-rabbit-2.4.2.jar:2.4.2]
	...
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(java.lang.String) throws javax.naming.InvalidNameException' threw exception
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:270) ~[spring-rabbit-2.4.2.jar:2.4.2]
	...
Caused by: javax.naming.InvalidNameException: Name should contain only alphabets
	at com.roytuts.spring.rabbitmq.error.retry.dlq.consumer.Consumer.receiveMsg(Consumer.java:18) ~[classes/:na]
	...

In the above output you will see that only Soumitra and Roy were received by the receiver because these two strings do not contain any space or other than alphabets. The rest of the strings were discarded and hence exceptions thrown and moved to the dead letter queue as shown in the following image:

spring rabbitmq dead letter queue

Source Code

Download

Exception 1: Method does not match exception

Listener Method ‘no match’ threw Exception flushed

The 2020-03-26 11:00:07. 747 [org. Springframework. Closer. Rabbit. RabbitListenerEndpointContainer#1-1] WARN org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.log(117) - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'no match'threw exception at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerA dapter.java:204) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapt er.java:129) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContai ner.java:1542) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerCo ntainer.java:1468) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContaine r.java:1456) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerConta iner.java:1451) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContain er.java:1400) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContain er.java:870) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer .java:854) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.accessThe $1600(SimpleMessageListenerContainer.java:78)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.AmqpException: No method found forclass [B at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHand ler.java:149) at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.invoke(DelegatingInvocableHandler.java:129) at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:61) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerA dapter.java:196) ... 12 common frames omittedCopy the code

Start with my code:

@Slf4j
@Component
@RabbitListener(queues = "mall.order.cancel")
public class CancelOrderReceiver {

    @Autowired
    private OmsPortalOrderService portalOrderService;

    @Resource
    private CancelOrderSender cancelOrderSender;

    @RabbitHandler
    public void handler(Long orderId){
        try {
            log.info("process orderId:{}", orderId);
            portalOrderService.cancelOrder(orderId);
        } catch (Exception e) {
            log.error(ExceptionUtils.getStackTrace(e));
            // Exception handlingcancelOrderSender.sendFailedMessage(orderId); }}}Copy the code

Why do methods not match? The reason for this is @RabbitListener. When this annotation is placed on a class, it matches the @Rabbithandler annotation’s method by data type. My handler method has an input of type Long. The Listener method ‘no match’ threw Exception is reported when the Listener method is a String.

Solution 1:

Add a handler(String orderId) method that will be processed regardless of whether a Long or String is passed in. This method is clumsy and not recommended

Solution 2 (Recommended) :

Add the @rabbitListener annotation to the method as follows:

@Slf4j
@Component
public class CancelOrderReceiver {

    @Autowired
    private OmsPortalOrderService portalOrderService;

    @Resource
    private CancelOrderSender cancelOrderSender;

    @RabbitHandler
    @RabbitListener(queues = "mall.order.cancel")
    public void handler(Long orderId){
        try {
            log.info("process orderId:{}", orderId);
            portalOrderService.cancelOrder(orderId);
        } catch (Exception e) {
            log.error(ExceptionUtils.getStackTrace(e));
            // Exception handlingcancelOrderSender.sendFailedMessage(orderId); }}}Copy the code

So whatever data type you pass in, it goes into this method and you don’t have to worry about flushing. Of course, if the incoming type does not match, an error will be reported. Therefore, it is necessary to constrain the data type with the consumer sender.

Solution 3:

Set Requeue to false for RabbitMQ. This method is simple and crude, but it is difficult to catch exceptions and difficult to handle subsequent exceptions

Exception 2: Listen for exceptions in the method

Listener Method ‘***’ threw Exception flushed

The 2020-03-26 11:46:22. 789 [org. Springframework. Closer. Rabbit. RabbitListenerEndpointContainer#0-1] WARN org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.log(117) - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'public void com.macro.mall.portal.component.CancelOrderReceiver.handler(java.lang.Long)'threw exception at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerA dapter.java:204) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapt er.java:129) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContai ner.java:1542) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerCo ntainer.java:1468) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContaine r.java:1456) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerConta iner.java:1451) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContain er.java:1400) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContain er.java:870) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer .java:854) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.accessThe $1600(SimpleMessageListenerContainer.java:78)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: For input string: "Hey hey hey."at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at com.macro.mall.portal.component.CancelOrderReceiver.handler(CancelOrderReceiver.java:31) at sun.reflect.GeneratedMethodAccessor122.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:50) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerA dapter.java:196) ... 12 common frames omittedCopy the code

Look at my code:

    @RabbitHandler
    @RabbitListener(queues = "mall.order.cancel")
    public void handler(Long orderId){
        Integer.parseInt("Hey hey hey."); // demonstrate exception usage
        try {
            log.info("process orderId:{}", orderId);
            portalOrderService.cancelOrder(orderId);
        } catch (Exception e) {
            log.error(ExceptionUtils.getStackTrace(e));
            // Exception handlingcancelOrderSender.sendFailedMessage(orderId); }}Copy the code

Clearly integer.parseint (» hey, hey, hey «) will report an error, causing an abnormal flush

Solutions:

Call all code inside the listening method with a try… Add exception handler to cache.

    @RabbitHandler
    @RabbitListener(queues = "mall.order.cancel")
    public void handler(Long orderId){
        try {
            Integer.parseInt("Hey hey hey."); // demonstrate exception usage
            log.info("process orderId:{}", orderId);
            portalOrderService.cancelOrder(orderId);
        } catch (Exception e) {
            log.error(ExceptionUtils.getStackTrace(e));
            // Exception handlingcancelOrderSender.sendFailedMessage(orderId); }}Copy the code

Write so much for now, and I will continue to update you if you have any problems. If you have a good plan, please leave a message to me, so that we can learn and make progress together

1. Introduction

Asynchronous messaging is a type of loosely-coupled distributed communication that is becoming increasingly popular for implementing event-driven architectures. Fortunately, the Spring Framework provides the Spring AMQP project allowing us to build AMQP-based messaging solutions.

On the other hand, dealing with errors in such environments can be a non-trivial task. So in this tutorial, we’ll cover different strategies for handling errors.

2. Environment Setup

For this tutorial, we’ll use RabbitMQ which implements the AMQP standard. Also, Spring AMQP provides the spring-rabbit module which makes integration really easy.

Let’s run RabbitMQ as a standalone server. We’ll run it in a Docker container by executing the following command:

docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-management

For detailed configuration and project dependencies setup, please refer to our Spring AMQP article.

3. Failure Scenario

Usually, there are more types of errors that can occur in messaging-based systems compared to a monolith or single-packaged applications due to its distributed nature.

We can point out some of the types of exceptions:

  • Network- or I/O-related – general failures of network connections and I/O operations
  • Protocol- or infrastructure-related – errors that usually represent misconfiguration of the messaging infrastructure
  • Broker-related – failures that warn about improper configuration between clients and an AMQP broker. For instance, reaching defined limits or threshold, authentication or invalid policies configuration
  • Application- and message-related – exceptions that usually indicate a violation of some business or application rules

Certainly, this list of failures is not exhaustive but contains the most common type of errors.

We should note that Spring AMQP handles connection-related and low-level issues out of the box, for example by applying retry or requeue policies. Additionally, most of the failures and faults are converted into an AmqpException or one of its subclasses.

In the next sections, we’ll mostly focus on application-specific and high-level errors and then cover global error handling strategies.

4. Project Setup

Now, let’s define a simple queue and exchange configuration to start:

public static final String QUEUE_MESSAGES = "baeldung-messages-queue";
public static final String EXCHANGE_MESSAGES = "baeldung-messages-exchange";

@Bean
Queue messagesQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES)
      .build();
}
 
@Bean
DirectExchange messagesExchange() {
    return new DirectExchange(EXCHANGE_MESSAGES);
}
 
@Bean
Binding bindingMessages() {
    return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(QUEUE_MESSAGES);
}

Next, let’s create a simple producer:

public void sendMessage() {
    rabbitTemplate
      .convertAndSend(SimpleDLQAmqpConfiguration.EXCHANGE_MESSAGES,
        SimpleDLQAmqpConfiguration.QUEUE_MESSAGES, "Some message id:" + messageNumber++);
}

And finally, a consumer that throws an exception:

@RabbitListener(queues = SimpleDLQAmqpConfiguration.QUEUE_MESSAGES)
public void receiveMessage(Message message) throws BusinessException {
    throw new BusinessException();
}

By default, all failed messages will be immediately requeued at the head of the target queue over and over again.

Let’s run our sample application by executing the next Maven command:

mvn spring-boot:run -Dstart-class=com.baeldung.springamqp.errorhandling.ErrorHandlingApp

Now we should see the similar resulting output:

WARN 22260 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
  Execution of Rabbit message listener failed.
Caused by: com.baeldung.springamqp.errorhandling.errorhandler.BusinessException: null

Consequently, by default, we will see an infinite number of such messages in the output.

To change this behavior we have two options:

  • Set the default-requeue-rejected option to false on the listener side – spring.rabbitmq.listener.simple.default-requeue-rejected=false
  • Throw an AmqpRejectAndDontRequeueException – this might be useful for messages that won’t make sense in the future, so they can be discarded.

Now, let’s discover how to process failed messages in a more intelligent way.

5. Dead Letter Queue

A Dead Letter Queue (DLQ) is a queue that holds undelivered or failed messages. A DLQ allows us to handle faulty or bad messages, monitor failure patterns and recover from exceptions in a system.

More importantly, this helps to prevent infinite loops in queues that are constantly processing bad messages and degrading system performance.

Altogether, there are two main concepts: Dead Letter Exchange (DLX) and a Dead Letter Queue (DLQ) itself. In fact, DLX is a normal exchange that we can define as one of the common types: direct, topic or fanout.

It’s very important to understand that a producer doesn’t know anything about queues. It’s only aware of exchanges and all produced messages are routed according to the exchange configuration and the message routing key.

Now let’s see how to handle exceptions by applying the Dead Letter Queue approach.

5.1. Basic Configuration

In order to configure a DLQ we need to specify additional arguments while defining our queue:

@Bean
Queue messagesQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES)
      .withArgument("x-dead-letter-exchange", "")
      .withArgument("x-dead-letter-routing-key", QUEUE_MESSAGES_DLQ)
      .build();
}
 
@Bean
Queue deadLetterQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
}

In the above example, we’ve used two additional arguments: x-dead-letter-exchange and x-dead-letter-routing-key. The empty string value for the x-dead-letter-exchange option tells the broker to use the default exchange.

The second argument is as equally important as setting routing keys for simple messages. This option changes the initial routing key of the message for further routing by DLX.

5.2. Failed Messages Routing

So, when a message fails to deliver, it’s routed to the Dead Letter Exchange. But as we’ve already noted, DLX is a normal exchange. Therefore, if the failed message routing key doesn’t match the exchange, it won’t be delivered to the DLQ.

Exchange: (AMQP default)
Routing Key: baeldung-messages-queue.dlq

So, if we omit the x-dead-letter-routing-key argument in our example, the failed message will be stuck in an infinite retry loop.

Additionally, the original meta information of the message is available in the x-death header:

x-death:
  count: 1
  exchange: baeldung-messages-exchange
  queue: baeldung-messages-queue 
  reason: rejected
  routing-keys: baeldung-messages-queue 
  time: 1571232954

The information above is available in the RabbitMQ management console usually running locally on port 15672.

Besides this configuration, if we are using Spring Cloud Stream we can even simplify the configuration process by leveraging configuration properties republishToDlq and autoBindDlq.

5.3. Dead Letter Exchange

In the previous section, we’ve seen that the routing key is changed when a message is routed to the dead letter exchange. But this behavior is not always desirable. We can change it by configuring DLX by ourselves and defining it using the fanout type:

public static final String DLX_EXCHANGE_MESSAGES = QUEUE_MESSAGES + ".dlx";
 
@Bean
Queue messagesQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES)
      .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_MESSAGES)
      .build();
}
 
@Bean
FanoutExchange deadLetterExchange() {
    return new FanoutExchange(DLX_EXCHANGE_MESSAGES);
}
 
@Bean
Queue deadLetterQueue() {
    return QueueBuilder.durable(QUEUE_MESSAGES_DLQ).build();
}
 
@Bean
Binding deadLetterBinding() {
    return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
}

This time we’ve defined a custom exchange of the fanout type, so messages will be sent to all bounded queues. Furthermore, we’ve set the value of the x-dead-letter-exchange argument to the name of our DLX. At the same time, we’ve removed the x-dead-letter-routing-key argument.

Now if we run our example the failed message should be delivered to the DLQ, but without changing the initial routing key:

Exchange: baeldung-messages-queue.dlx
Routing Key: baeldung-messages-queue

5.4. Processing Dead Letter Queue Messages

Of course, the reason we moved them to the Dead Letter Queue is so they can be reprocessed at another time.

Let’s define a listener for the Dead Letter Queue:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessages(Message message) {
    log.info("Received failed message: {}", message.toString());
}

If we run our code example now, we should see the log output:

WARN 11752 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
  Execution of Rabbit message listener failed.
INFO 11752 --- [ntContainer#1-1] c.b.s.e.consumer.SimpleDLQAmqpContainer  : 
  Received failed message:

We’ve got a failed message, but what should we do next? The answer depends on specific system requirements, the kind of the exception or type of the message.

For instance, we can just requeue the message to the original destination:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRequeue(Message failedMessage) {
    log.info("Received failed message, requeueing: {}", failedMessage.toString());
    rabbitTemplate.send(EXCHANGE_MESSAGES, 
      failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

But such exception logic is not dissimilar from the default retry policy:

INFO 23476 --- [ntContainer#0-1] c.b.s.e.c.RoutingDLQAmqpContainer        :
  Received message: 
WARN 23476 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler :
  Execution of Rabbit message listener failed.
INFO 23476 --- [ntContainer#1-1] c.b.s.e.c.RoutingDLQAmqpContainer        : 
  Received failed message, requeueing:

A common strategy may need to retry processing a message for n times and then reject it. Let’s implement this strategy by leveraging message headers:

public void processFailedMessagesRetryHeaders(Message failedMessage) {
    Integer retriesCnt = (Integer) failedMessage.getMessageProperties()
      .getHeaders().get(HEADER_X_RETRIES_COUNT);
    if (retriesCnt == null) retriesCnt = 1;
    if (retriesCnt > MAX_RETRIES_COUNT) {
        log.info("Discarding message");
        return;
    }
    log.info("Retrying message for the {} time", retriesCnt);
    failedMessage.getMessageProperties()
      .getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
    rabbitTemplate.send(EXCHANGE_MESSAGES, 
      failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

At first, we are getting the value of the x-retries-count header, then we compare this value with the maximum allowed value. Subsequently, if the counter reaches the attempts limit number the message will be discarded:

WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer  : 
  Retrying message for the 1 time
WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer  : 
  Retrying message for the 2 time
WARN 1224 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 1224 --- [ntContainer#1-1] c.b.s.e.consumer.DLQCustomAmqpContainer  : 
  Discarding message

We should add that we can also make use of the x-message-ttl header to set a time after that the message should be discarded. This might be helpful for preventing queues to grow infinitely.

5.5. Parking Lot Queue

On the other hand, consider a situation when we cannot just discard a message, it could be a transaction in the banking domain for example. Alternatively, sometimes a message may require manual processing or we simply need to record messages that failed more than n times.

For situations like this, there is a concept of a Parking Lot Queue. We can forward all messages from the DLQ, that failed more than the allowed number of times, to the Parking Lot Queue for further processing.

Let’s now implement this idea:

public static final String QUEUE_PARKING_LOT = QUEUE_MESSAGES + ".parking-lot";
public static final String EXCHANGE_PARKING_LOT = QUEUE_MESSAGES + "exchange.parking-lot";
 
@Bean
FanoutExchange parkingLotExchange() {
    return new FanoutExchange(EXCHANGE_PARKING_LOT);
}
 
@Bean
Queue parkingLotQueue() {
    return QueueBuilder.durable(QUEUE_PARKING_LOT).build();
}
 
@Bean
Binding parkingLotBinding() {
    return BindingBuilder.bind(parkingLotQueue()).to(parkingLotExchange());
}

Secondly, let’s refactor the listener logic to send a message to the parking lot queue:

@RabbitListener(queues = QUEUE_MESSAGES_DLQ)
public void processFailedMessagesRetryWithParkingLot(Message failedMessage) {
    Integer retriesCnt = (Integer) failedMessage.getMessageProperties()
      .getHeaders().get(HEADER_X_RETRIES_COUNT);
    if (retriesCnt == null) retriesCnt = 1;
    if (retriesCnt > MAX_RETRIES_COUNT) {
        log.info("Sending message to the parking lot queue");
        rabbitTemplate.send(EXCHANGE_PARKING_LOT, 
          failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
        return;
    }
    log.info("Retrying message for the {} time", retriesCnt);
    failedMessage.getMessageProperties()
      .getHeaders().put(HEADER_X_RETRIES_COUNT, ++retriesCnt);
    rabbitTemplate.send(EXCHANGE_MESSAGES, 
      failedMessage.getMessageProperties().getReceivedRoutingKey(), failedMessage);
}

Eventually, we also need to process messages that arrive at the parking lot queue:

@RabbitListener(queues = QUEUE_PARKING_LOT)
public void processParkingLotQueue(Message failedMessage) {
    log.info("Received message in parking lot queue");
    // Save to DB or send a notification.
}

Now we can save the failed message to the database or perhaps send an email notification.

Let’s test this logic by running our application:

WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Retrying message for the 1 time
WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Retrying message for the 2 time
WARN 14768 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : 
  Execution of Rabbit message listener failed.
INFO 14768 --- [ntContainer#1-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Sending message to the parking lot queue
INFO 14768 --- [ntContainer#2-1] c.b.s.e.c.ParkingLotDLQAmqpContainer     : 
  Received message in parking lot queue

As we can see from the output, after several failed attempts, the message was sent to the Parking Lot Queue.

6. Custom Error Handling

In the previous section, we’ve seen how to handle failures with dedicated queues and exchanges. However, sometimes we may need to catch all errors, for example for logging or persisting them to the database.

6.1. Global ErrorHandler

Until now, we’ve used the default SimpleRabbitListenerContainerFactory and this factory by default uses ConditionalRejectingErrorHandler. This handler catches different exceptions and transforms them into one of the exceptions within the AmqpException hierarchy.

It’s important to mention that if we need to handle connection errors, then we need to implement the ApplicationListener interface.

Simply put, ConditionalRejectingErrorHandler decides whether to reject a specific message or not. When the message that caused an exception is rejected, it won’t be requeued.

Let’s define a custom ErrorHandler that will simply requeue only BusinessExceptions:

public class CustomErrorHandler implements ErrorHandler {
    @Override
    public void handleError(Throwable t) {
        if (!(t.getCause() instanceof BusinessException)) {
            throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", t);
        }
    }
}

Furthermore, as we are throwing the exception inside our listener method it is wrapped in a ListenerExecutionFailedException. So, we need to call the getCause method to get a source exception.

6.2. FatalExceptionStrategy

Under the hood, this handler uses the FatalExceptionStrategy to check whether an exception should be considered fatal. If so, the failed message will be rejected.

By default these exceptions are fatal:

  • MessageConversionException
  • MessageConversionException
  • MethodArgumentNotValidException
  • MethodArgumentTypeMismatchException
  • NoSuchMethodException
  • ClassCastException

Instead of implementing the ErrorHandler interface, we can just provide our FatalExceptionStrategy:

public class CustomFatalExceptionStrategy 
      extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
    @Override
    public boolean isFatal(Throwable t) {
        return !(t.getCause() instanceof BusinessException);
    }
}

Finally, we need to pass our custom strategy to the ConditionalRejectingErrorHandler constructor:

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
  ConnectionFactory connectionFactory,
  SimpleRabbitListenerContainerFactoryConfigurer configurer) {
      SimpleRabbitListenerContainerFactory factory = 
        new SimpleRabbitListenerContainerFactory();
      configurer.configure(factory, connectionFactory);
      factory.setErrorHandler(errorHandler());
      return factory;
}
 
@Bean
public ErrorHandler errorHandler() {
    return new ConditionalRejectingErrorHandler(customExceptionStrategy());
}
 
@Bean
FatalExceptionStrategy customExceptionStrategy() {
    return new CustomFatalExceptionStrategy();
}

7. Conclusion

In this tutorial, we’ve discussed different ways of handling errors while using Spring AMQP, and RabbitMQ in particular.

Every system needs a specific error handling strategy. We’ve covered the most common ways of error handling in event-driven architectures. Furthermore, we’ve seen that we can combine multiple strategies to build a more comprehensive and robust solution.

As always, the full source code of the article is available over on GitHub.


Понравилась статья? Поделить с друзьями:
  • Rabbit error destiny 2
  • Rails rescue error
  • Rails flash error
  • R99 на брелке starline a93 как исправить
  • R6s error at hooking api