Masstransit error queue

Previously, we built a simple Customer Portal application where a user could use an ASP.NET MVC app to open a new customer service ticket. The website created a TicketOpened message and published it to the MassTransit service bus. Then, we built a Windows Service, using the TopShelf library, that subscribed to TicketOpened messages and handled creating the ticket and emailing the customer a confirmation email. (I recommend you review the blog post if you aren’t familiar with it as we are going to build on that application here.)

Previously, we built a simple Customer Portal application where a user could use an ASP.NET MVC app to open a new customer service ticket. The website created a TicketOpened message and published it to the MassTransit service bus. Then, we built a Windows Service, using the TopShelf library, that subscribed to TicketOpened messages and handled creating the ticket and emailing the customer a confirmation email. (I recommend you review the blog post if you aren’t familiar with it as we are going to build on that application here.)

But what happens when something goes wrong? Blog posts usually assume the happy path when showing code examples in order to keep them easily digestible. We all know, however, things can and will go wrong. Let’s look at how we can leverage the message queuing infrastructure to handle what may be transient errors as well as perhaps more permanent failures.

When It All Goes Horribly Wrong

So what will happen to our TicketOpened messages if there’s an error in the TicketOpenedConsumer? In our example, we’re only sending an email, but the email server could be down. If we were persisting to a data store, that could be down, or maybe there was a SQL deadlock. As you know, there’s a number of things that could go wrong. Let’s start by looking at what the default MassTransit behavior is when an exception occurs in your consumer.

To see what MassTransit does, let’s inject a way to get the consumer to throw an exception. Start by cloning the https://github.com/dprothero/Loosely.CustomerPortal repository (master branch) or by building the application in my previous blog post. The final code is in the same repository, but in the error-handling branch.

Here’s the new Consume method in our Loosely.CustomerPortal.Backend.TicketOpenedConsumer class:

using Loosely.Bus.Contracts;
using MassTransit;
using System;
using System.Diagnostics;

namespace Loosely.CustomerPortal.Backend
{
  class TicketOpenedConsumer : Consumes<TicketOpened>.Context
  {
    public void Consume(IConsumeContext<TicketOpened> envelope)
    {
      // Here is where you would persist the ticket to a data store of some kind.
      // For this example, we'll just write it to the trace log.
      Trace.WriteLine("=========== NEW TICKET ===========rn" +
                      "Id: " + envelope.Message.Id + "rn" +
                      "Email: " + envelope.Message.CustomerEmail + "rn" + 
                      "Message: " + envelope.Message.Message);

      if (envelope.Message.Message.Contains("poison"))
        throw (new Exception("Something bad has happened!"));

      // Send email confirmation to the customer.
      var messageBody = "Ticket ID " + envelope.Message.Id + " has been opened for you! " +
                        "We will respond to your inquiry ASAP.nn" + 
                        "Your Message:n" + envelope.Message.Message;

      EmailHelper.Send(envelope.Message.CustomerEmail, "Ticket Opened", messageBody);
    }
  }
}

We just check to see if the text of the message contains the word “poison” and, if it does, throw an exception. Now we can run the app, open a ticket, and type “poison” into the message field to get our consumer to throw the exception:

image

Take a look at the log file (C:LogsLoosely.CustomerPortal.Backend.log) and you’ll see these entries:

image

What’s going on here? What MassTransit does, by default, is retry any message that causes an exception to be thrown in its consumer exactly 4 more times. There’s no delay between retries (we’ll look at that later). Since our exception isn’t really transient, then it’s going to try 5 times without success. Next question… where’s the TicketOpened message now?

Go into the RabbitMQ management interface (see this post for instructions – should be at http://localhost:15672) and click on the Queues tab. Notice we have our normal Loosely_CustomerPortal_Backend queue, but we also have a Loosely_CustomerPortal_Backend_error queue, and it should have 1 message in it:

image

Click on the error queue and scroll down to the “Get messages” section. Set Requeue to ‘No’ and click “Get Message(s).” This will remove the message from the queue and display it to us. You can see our poison message in JSON format:

image

Sidebar: Changing Default Retry Limit

If you want to change MassTransit’s default retry limit of 5 to something else, put the highlighted line below in the Loosely.CustomerPortal.Backend.TicketService class, within your bus initializer code.

using Loosely.Bus.Configuration;
using MassTransit;

namespace Loosely.CustomerPortal.Backend
{
  class TicketService
  {
    IServiceBus _bus;

    public TicketService()  {  }

    public void Start()
    {
      _bus = BusInitializer.CreateBus("CustomerPortal_Backend", x =>
      {
        x.SetDefaultRetryLimit(1);
        x.Subscribe(subs =>
        {
          subs.Consumer<TicketOpenedConsumer>().Permanent();
        });
      });
    }

    public void Stop()
    {
      _bus.Dispose();
    }
  }
}

That will set the retry limit to 1.

Requeuing Error Messages

If you end up with messages in the error queue, you may want to move them back to the primary queue to be processed. In RabbitMQ this can be accomplished using the Shovel plugin. First, make sure your consumer process isn’t running. Then, open up the “RabbitMQ Command Prompt (sbin dir)” item from your Start menu and run the following two commands to install the Shovel and corresponding management plugins:

> rabbitmq-plugins enable rabbitmq_shovel
> rabbitmq-plugins enable rabbitmq_shovel_management

After restarting the RabbitMQ Windows Service, take a look in the RabbitMQ management interface. Navigate to the Admin tab and go into “Shovel Management.” Click “Add a new shovel” and name it something like “Temporary Error Move.” Set the Source Queue to “Loosely_CustomerPortal_Backend_error” and the Destination Queue to “Loosely_CustomerPortal_Backend.” Click “Add shovel.”

This starts a shovel that runs in the background and will move all messages in the error queue back to the primary queue:

image

Now go back to the Admin tab, Shovel Management, and click on your “Temporary Error Move” shovel. From there, click on the “Delete this shovel” button. If you don’t delete the shovel, it will continue to move messages from the error queue back into the primary queue… essentially creating an infinite retry loop.

Obviously, when we start up our consumer again, it will try 5 times and fail again, moving it back to the error queue. What we have with our “poison” message is really a permanent failure.

Transient Versus Permanent Failures

With a permanent failure, we’re talking about a message that just can’t be processed – at least not with the code written the way it currently is. Perhaps there’s a message that invokes a code path that throws an exception due to a coding error. In this case, these messages would end up in the error queue and should probably stay there until the error is corrected.

Perhaps the error is such an edge case that we won’t fix it and so we’re ok with the occasional message going to the error queue (we should write something to periodically clean up the error queue). It just depends on your business requirements. If, however, the message is mission critical, then the likely scenario would be to fix the bug, redeploy the new code, move the error messages back into the primary queue, and then let them get processed.

Transient Failures

What about the examples of failures mentioned earlier? A down email or database server? A deadlock condition in the SQL Server? These could be considered transient failures – meaning, if we just were to retry later, the message could likely be processed just fine with no modifications to the message or the consumer code.

As we saw, MassTransit has a bit of a blunt method to try to account for transient failures… it tries the message 5 times. Perhaps in a deadlock situation, this would work great, but probably not in a network or server outage situation. You’d likely expect those to last a little longer. What would be ideal is if we could have the message retry after some timeout delay. Perhaps we could even escalate the delay if subsequent retries fail. For example, try 1 minute later on the first retry, then 5 minutes later on the second retry, and then perhaps fail.

NServiceBus, a commercial analog to MassTransit, has this retry delay ability built into it (called “second-level retries”). However, MassTransit does not. We will have to roll our own, but it won’t be difficult.

Roll Your Own Retry Delay Logic

Assuming this is a pattern you want to implement for a larger application with multiple message types, you will probably want to build the retry delay logic into a common helper class. However, for this example, let’s just build the logic into our TicketOpenedConsumer class.

Here’s the new TicketOpenedConsumer class with progressive retry delay logic:

using Loosely.Bus.Contracts;
using MassTransit;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;

namespace Loosely.CustomerPortal.Backend
{
  class TicketOpenedConsumer : Consumes<TicketOpened>.Context
  {
    static private Dictionary<int, int> DelayProgression = new Dictionary<int, int>()
      {
        {0, 60}, {60, 300}, {300, -1}
      };

    public void Consume(IConsumeContext<TicketOpened> envelope)
    {
      int retryDelay = 0;
      int.TryParse(envelope.Headers["loosely.retry-delay-seconds"], out retryDelay);
      var nextRetryDelay = DelayProgression[retryDelay];
      bool sleepAndRepublish = false;
      
      try
      {
        // Here is where you would persist the ticket to a data store of some kind.
        // For this example, we'll just write it to the trace log.
        Trace.WriteLine("=========== NEW TICKET ===========rn" +
                        "Id: " + envelope.Message.Id + "rn" +
                        "Email: " + envelope.Message.CustomerEmail + "rn" +
                        "Message: " + envelope.Message.Message + "rn" +
                        "Current/Next Retry Delay: " + retryDelay.ToString() + "/" + 
                          nextRetryDelay.ToString() + "rn" +
                        "Current Time: " + DateTime.Now.ToString());

        CheckForContrivedErrorConditions(envelope);

        // Send email confirmation to the customer.
        var messageBody = "Ticket ID " + envelope.Message.Id + " has been opened for you! " +
                          "We will respond to your inquiry ASAP.nn" +
                          "Your Message:n" + envelope.Message.Message;

        EmailHelper.Send(envelope.Message.CustomerEmail, "Ticket Opened", messageBody);

        // Here is where you would commit any open database transaction
        Trace.WriteLine("Message committed.");
      }
      catch (Exception ex)
      {
        Trace.WriteLine("Exception caught.");
        if (ex.Message.Contains("server is down") && nextRetryDelay > -1)
          sleepAndRepublish = true;
        else throw;
      }

      if(sleepAndRepublish)
      {
        Thread.Sleep(nextRetryDelay * 1000);
        envelope.Bus.Publish<TicketOpened>(envelope.Message, x => {
          x.SetHeader("loosely.retry-delay-seconds", nextRetryDelay.ToString());
          x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent);
        });
      }
    }

    private void CheckForContrivedErrorConditions(IConsumeContext<TicketOpened> envelope)
    {
      if (envelope.Message.Message.Contains("poison"))
        throw (new Exception("Something bad has happened!"));

      if (envelope.Message.Message.Contains("server-blip"))
      {
        envelope.Message.Message = envelope.Message.Message.Replace("server-blip", 
          "server-online(blipped)");
        throw (new Exception("The mail server is down."));
      }

      if (envelope.Message.Message.Contains("server-down"))
      {
        envelope.Message.Message = envelope.Message.Message.Replace("server-down",
            "server-blip(downed)");
        throw (new Exception("The mail server is down."));
      }

      if (envelope.Message.Message.Contains("server-disaster"))
        throw (new Exception("The mail server is down."));

    }
  }
}

So let’s take a look at a few lines of code in isolation and discuss what’s happening. First, we setup a dictionary to indicate what we’d like the progression of delays to be.

static private Dictionary<int, int> DelayProgression = new Dictionary<int, int>()
  {
    {0, 60}, {60, 300}, {300, -1}
  };

The key is the last number of seconds delayed and the value is the next delay value to use. We start with 0 as you can see in the initialization code:

int retryDelay = 0;
int.TryParse(envelope.Headers["loosely.retry-delay-seconds"], out retryDelay);
var nextRetryDelay = DelayProgression[retryDelay];

Then we check for a header on the message called “loosely.retry-delay-seconds.” Yes, I just made that up. Headers are meta-data you can attach to a message and can contain whatever string data you’d like. When we want to retry a message later, we’ll add a header with the number of seconds we just delayed so the next time through the code can know the next delay value to use if the message fails again.

Now we just have a method that can check for some magic strings in our message to see if it should trigger a contrived exception:

CheckForContrivedErrorConditions(envelope);

Within that function, we define three strings (in addition to the original “poison” string) for which we will scan.

“server-disaster” Simulate mail server down for a very long time.
“server-down” Simulate mail server down for less than 5 minutes.
“server-blip” Simulate the mail server down for less than 30 seconds.

Finally, we wrap all of the actual message processing in a try…catch block. If an exception occurs, we check the message to see if it’s a message we know to be a transient condition and if the next retry delay value is not negative one (-1). Negative one will be our trigger to tell us we need to give up on retrying.

Trace.WriteLine("Exception caught.");
if (ex.Message.Contains("server is down") && nextRetryDelay > -1)
  sleepAndRepublish = true;
else throw;

If the condition is met, we set a flag to indicate we want to sleep (delay) for a bit and then republish the message so it will be retried later. If the condition is not met, we re-throw the exception and MassTransit will handle it per normal message processing rules (default being to retry 4 more times and then move to the error queue).

If we do want to sleep and republish, that code is simple:

if(sleepAndRepublish)
{
  Thread.Sleep(nextRetryDelay * 1000);
  envelope.Bus.Publish<TicketOpened>(envelope.Message, x => {
    x.SetHeader("loosely.retry-delay-seconds", nextRetryDelay.ToString());
    x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent);
  });
}

We put the thread to sleep for the prescribed number of seconds (more on that later) and then, after the time has elapsed, we republish the message to the bus with a “loosely.retry-delay-seconds” header value of the amount of time we delayed before republishing the message. That will put the message back on the bus and our consumer will get called again with it. This time, the message will have the special header on it and we’ll know to move onto the next delay value (or stop retrying if that value is –1).

Did You Seriously Just Block the Consumer Thread?

Good catch. Yes, this can have performance implications. MassTransit has a setting called ConcurrentConsumerLimit, which is set to the number of CPU’s times 4 (so 16 on a 4 processor machine). We’re essentially “killing” one of these 16 (or however many) threads while we sleep, thus limiting the number of messages we can process while we’re waiting.

But is this really a problem? In this example, our service is only responsible for processing TicketOpened messages. Every TicketOpened message needs to trigger an email to be sent. If the email server is down, then none of the TicketOpened messages are going to be able to successfully be processed. In this case, it probably makes sense for the entire service to slow down and wait until the mail server is back online.

If the service were responsible for processing many different types of messages, then this would certainly be an issue. However, it begs the question whether it makes sense for a single service to handle different types of messages. In some cases it might, particularly if they all need to be handled in much the same way. But in a lot of cases, it will make more sense to create separate services for your different message types.

What If the Service Crashes While Sleeping?

So we have our consumer sleeping on the consumer thread and the message is “in limbo” while we’re waiting. What happens to the message if the service crashes during the Thread.Sleep? If you send a “server-down” message to get the message to go through the retry logic, take a look at the queue in RabbitMQ:

image

It shows one message in the “Unacked” (unacknowledged) column. This means two things: 1) it won’t deliver the message to any other consumer threads or processes, and 2) it won’t remove the message unless it is acknowledged. If the process hosting our consumer service dies before acknowledging the message, RabbitMQ will move the message back to the “Ready” state.

Caveats and Disclaimers

These bloggers, sheesh. Always cutting corners in the code for the “sake of brevity.” It’s difficult to balance a good, crisp article with well crafted code. First, you don’t see any unit tests. Bad programmer. Next, with a good suite of tests watching your back, the code in this example could be refactored into shorter methods and perhaps a helper class for the retry delay progression. Finally, the call to Thread.Sleep should probably be refactored into a loop to wake up every couple of seconds to see if the service needs to stop.

Other Options

Of course there are other ways to build delayed retry logic into your services. The method used in this post is just the simplest to illustrate, but you can take this further. For example, take a look at the MassTransit-Quartz project. This uses the open source Quartz.NET scheduler to enable delayed publishing of messages. It does, however, require an ADO.NET database to persist the scheduled jobs so you don’t lose your delayed messages. If you need scheduling and visibility into messages that were delayed, then this is your ticket.

Another pattern that could be implemented is that of moving the delayed messages to another RabbitMQ queue. Then you could write something that periodically polled that queue and moved the messages back into the primary queue after the desired delay.

Next Stop…

Let’s take a look at how we can implement multiple consumers in a couple different use cases. In one case, we might want to spin up multiple. identical consumers on different machines to scale out message processing. In another case, we may want to have completely different consumers subscribing to the same message type but intending to do different things with the messages. After that, we’ll probably take a look at Sagas (chaining multiple messaging processing steps together) and then… who knows? Send me your thoughts and questions regarding anything you’d like to see here.

Introduction

In the previous post we explored how to inject a dependency into the registered consumer class in MassTransit. Consumer classes will often have at least some sort of dependency such as a repository interface or another abstraction to propagate the changes made. Good software engineering dictates that a class should indicate what dependencies it needs through e.g. its constructor. This is the contrary of control-freak objects that construct all their dependencies hidden in their implementations.

In this post we’ll take a look at various failure handling options in MassTransit.

Exceptions in the consumer class

It happens that an exception is thrown in the consumer class so that it cannot acknowledge the message. What happens then? Let’s see.

Currently we have a consumer called RegisterCustomerConsumer in our MassTransit.Receiver console demo application. Its implementation starts like this:

public Task Consume(ConsumeContext<IRegisterCustomer> context)
{
			IRegisterCustomer newCustomer = context.Message;
			Console.WriteLine("A new customer has signed up, it's time to register it in the command receiver. Details: ");

Insert the following line just before newCustomer is declared:

throw new ArgumentException("We pretend that an exception was thrown...");

Visual Studio will warn you that the rest of the implementation is unreachable, but we don’t care for now. Start the receiver application and then also start MassTransit.Publisher. The publisher will send the usual IRegisterCustomer object. The exception is thrown in the consumer and you’ll see the stacktrace printed in the consumer’s command window:

consumer-throws-exception-in-masstransit

So what happened to the message? By default it ends up in another queue. This queue is named after the queue that the receiver is listening to with “_error” attached to it. Here’s our message:

error-queue-for-unacked-messages-in-masstransit

As extra information we can mentioned that there are also “_skipped” queues, like “mycompany.domains.queues_skipped” in our case. Skipped queues store messages that cannot be routed to the queue for some reason. Therefore it’s important to check the _error and _skipped queues periodically. You can even set up consumers for these queues if you to process them further, e.g. log them for later inspection.

Exception handling strategies

We saw above that the publisher published a message and the message ended up in the error queue after a single try. We can declare various retry policies in MassTransit but by default there is no retry policy at all. We can easily declare a retry policy for a specific consumer or for the service bus as a whole.

Probably the most basic retry policy is to tell MassTransit to resend a message a number of times before giving up, i.e. sending the message to the error queue. The retry policy for the receiver is declared in the ReceiveEndpoint method as follows:

rabbit.ReceiveEndpoint(rabbitMqHost, "mycompany.domains.queues", conf =>
{
	conf.Consumer<RegisterCustomerConsumer>(container);
	conf.UseRetry(Retry.Immediate(5));
});

The UseRetry extension method accepts an IRetryPolicy interface where the Retry class offers a number of shortcuts to build an object that implements the interface. The Immediate method accepts an integer and sets up a policy to resend a message that many times with no delay in between. The same UseRetry extension method is available within the Action supplied to the CreateUsingRabbitMq function:

IBusControl rabbitBusControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
        //rest of code ignored
	rabbit.UseRetry(...)
});

This assigns the retry policy to the bus, so it’s a more general policy than on the consumer level. If you now run the same test then you’ll see that the same command message is re-sent 5 times in quick succession before it ends up in the _error queue.

The Retry class exposes a wide range of retry policies. If you type “Retry.” in the editor then IntelliSense will show you several options. The Except exception will run the retry policy EXCEPT for the exception type specified:

conf.UseRetry(Retry.Except(typeof(ArgumentException)).Immediate(5));

If you test the code now you’ll see that the Immediate policy is bypassed since we want MassTransit to ignore argument exceptions. The Except function accepts an array of Exception types so you can specify as many as you want.

The opposite case is the Selected function where we can provide the exception types for which the retry policy should be applied:

conf.UseRetry(Retry.Selected(typeof(ArgumentException)).Immediate(5));

Retry.All() will include all exception types in the retry policy. Finally we have a Filter method which accepts a Func that returns a bool. Here we can fine grain our exception filtering logic. The Func has the exception as its input parameter. Here’s an example:

conf.UseRetry(Retry.Filter<Exception>(e => e.Message.IndexOf("We pretend that an exception was thrown") > -1).Immediate(5));

Filter, Selected, Except and All were all exception related retry policy filters. On the other hand we have the time and frequency based retry policies like Immediate. This latter group includes a number of other functions.

Using the Exponential policy builder we can specify a min and max interval for the time between retries as follows:

conf.UseRetry(Retry.Exponential(5, TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(5)));

The first integer is the max number of retries. Then come the min and max intervals followed by the delta. If you test the code with this policy you’ll see that the wait times between retries keeps increasing exponentially. Exponential has an overload without the max number of retries if you want MassTransit to keep retrying forever.

A similar policy is Incremental where we can provide a max number of retries, an initial delay and an interval increment:

conf.UseRetry(Retry.Incremental(5, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3)));

With the Interval function we can supply a retry count and an interval between each retry:

conf.UseRetry(Retry.Interval(5, TimeSpan.FromSeconds(5)));

Finally we have the Intervals function which accepts an array of retry intervals:

conf.UseRetry(Retry.Intervals(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(4)));

So we have a number of interesting options here.

Faults

When MassTransit has finished retrying with no success then it issues an object of type Fault of T where T is the type of command or event for which all retries have failed. We can set up a consumer for it as follows:

using MyCompany.Messaging;
using System.Threading.Tasks;

namespace MassTransit.Receiver
{
	public class RegisterCustomerFaultConsumer : IConsumer<Fault<IRegisterCustomer>>
	{
		public Task Consume(ConsumeContext<Fault<IRegisterCustomer>> context)
		{
			IRegisterCustomer originalFault = context.Message.Message;
			ExceptionInfo[] exceptions = context.Message.Exceptions;
			return Task.FromResult(originalFault);
		}
	}
}

We can register the above consumer like we did before:

rabbit.ReceiveEndpoint(rabbitMqHost, "mycompany.queues.errors.newcustomers", conf =>
{
	conf.Consumer<RegisterCustomerFaultConsumer>();
});

For the above to work properly we have to set up a fault address when sending or publishing the message in the publisher. Here’s an example:

Task sendTask = sendEndpoint.Send<IRegisterCustomer>(new
{
	Address = "New Street",
	Id = Guid.NewGuid(),
	Preferred = true,
	RegisteredUtc = DateTime.UtcNow,
	Name = "Nice people LTD",
	Type = 1,
	DefaultDiscount = 0
}, c => c.FaultAddress = new Uri("rabbitmq://localhost:5672/accounting/mycompany.queues.errors.newcustomers"));			

The same FaultAddress property can be configured in the Publish method as well. An alternative to the FaultAddress is the ResponseAddress which MassTransit will inspect if a FaultAddress is not present.

If you test the above code then you’ll see that the fault consumer receives the Fault object after all retries have been exhausted.

You can find the exception handling documentation of MassTransit here.

We’ll continue with exploring how MassTransit handles types in the next post.

View the list of posts on Messaging here.

Using a Service Bus

We all know there’s lots of benefits to using a service bus architecture, so depending if you need a way to scale out or the ability to retried failed processes it might be worth giving a service bus a try.
There are however limiting factors when using a service bus architecture, the main one that most people hit is the asynchronous nature and how to relay this back to the end user in a way that will not cause frustration.

Happy case

Normally without a service bus we’d just do the actions on behalf of the user straight away on the users request, but when using a service bus we would send off a message on to the bus then wait for a completed message:

static class Program
{
    static void Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.Host(new Uri("rabbitmq://localhost"), host =>
            {
                host.Username("guest");
                host.Password("guest");
            });

            cfg.ReceiveEndpoint("client", endpointCfg =>
            {
                endpointCfg.Consumer<SquareCompletedConsumer>();
            });
        });

        busControl.Start();

        for (;;)
        {
            Console.Write("Request a square size: ");
            int size = 0;
            if (int.TryParse(Console.ReadLine(), out size))
            {
                busControl.Publish(new SquareRequested() {Size = size});
                Console.WriteLine("Square requested");
            }
        }
    }
}
public class SquareCompletedConsumer : IConsumer<SquareCompleted>
{
    public Task Consume(ConsumeContext<SquareCompleted> context)
    {
        Console.ForegroundColor = ConsoleColor.Green;
        Console.WriteLine();
        Console.WriteLine("Got you a square!");
        Console.WriteLine(context.Message.Square);
        Console.ResetColor();

        return Task.CompletedTask;
    }
}

The above will wait for a user input and then raises a SquareRequested message, for the sake of this example we’ll assume drawing a square takes time and resources and that’s why we’ve offloaded it on to the service bus for processing.

We’ll also have another process and handler listening to that message which will handle our SquareRequested message:

static class Program
{
    static void Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.Host(new Uri("rabbitmq://localhost"), host =>
            {
                host.Username("guest");
                host.Password("guest");
            });

            cfg.ReceiveEndpoint("drawer", endpointCfg =>
            {
                endpointCfg.Consumer<SquareRequestedConsumer>();
            });
        });

        busControl.Start();

        Console.ReadKey();
    }
}

public class SquareRequestedConsumer : IConsumer<SquareRequested>
{
    public async Task Consume(ConsumeContext<SquareRequested> context)
    {
        Console.WriteLine("Making square...");
        await Task.Delay(3000);
        var stringBuilder = new StringBuilder();
        var line = new string('*', context.Message.Size);
        for (int i = 0; i < context.Message.Size; i++)
        {
            stringBuilder.AppendLine(line);
        }

        await context.Publish(new SquareCompleted() {Square = stringBuilder.ToString()});
    }
}

This now allows us to notify back to the user once it’s completed:

Demo

Failure case

Within our scenario if something went wrong with generating a square within our drawer endpoint, the user would not be notified and it would just sit in our error queue until it was manually worked, Try requesting a square of -1:

Drawer error

I’ve seen before where people just wrap the whole body of the handler in a try catch and then raise another message if something went wrong:

// Bad example.
public class SquareRequestedConsumer : IConsumer<SquareRequested>
{
    public async Task Consume(ConsumeContext<SquareRequested> context)
    {
        try
        {
            // Do the work...
        }
        catch (Exception)
        {
            await context.Publish(new SquareFailed());
        }
    }
}

This isn’t ideal as you’ll lose the exception details and they wont even get pushed in to your error queue to investigate at a later date, but rest assure MassTransit comes with some built in filters for dealing with errors.

Within MassTransit when it moves the message to the error queue it will also raise a Fault<T> message, within our case it would be a Fault<SquareRequested> message. So all we need to do in our client is create another Consumer to handle a Fault<SquareRequested> message:

public class SquareRequestedFaultConsumer : IConsumer<Fault<SquareRequested>>
{
    public Task Consume(ConsumeContext<Fault<SquareRequested>> context)
    {
        Console.ForegroundColor = ConsoleColor.Red;
        Console.WriteLine();
        Console.WriteLine("There was an error with requesting a square of size {0}", context.Message.Message.Size);
        Console.ResetColor();

        return Task.CompletedTask;
    }
}

Now when there’s an error the user will be notified straight away:
User notified

And we will also have the full exception message and be able to reply it from the error queue if we wish:
drawer error queue

As you can see MassTransit makes it a lot easier for your other endpoints to be notified if something went wrong, you could even use it as a way to push out notifications in to slack.

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and
privacy statement. We’ll occasionally send you account related emails.

Already on GitHub?
Sign in
to your account


Closed

ahocquet opened this issue

Jul 1, 2019

· 15 comments

Comments

@ahocquet

Is this a bug report?

Yes. When consuming a message that throws an exception, the message doesn’t end up in the dead letter queue nor in the error queue. Masstransit should move the message to the error queue.

Can you also reproduce the problem with the latest version?

Yes, version 5.5.2

Environment

  1. Operating system: Azure Cloud
  2. Visual Studio version: 16.1.4
  3. Dotnet version: .NET Core 2.2

Steps to Reproduce

(Write your steps here:)

  1. Create an Azure Service Bus trigger in an Azure Function
  2. Consume the message using Bus.Factory.CreateBrokeredMessageReceiver
  3. Throws an exception within the Masstransit consumer

Expected Behavior

The message should end up in an error queue.

Actual Behavior

The message is succesfully consumed from the Azure Service Bus point of view.

Reproducible Demo

https://github.com/ahocquet/masstransit-error-handling-azure-functions

@ahocquet
ahocquet

changed the title
Fault Messages in Azure Functions don’t end up in Dead Letter Queue

Fault Messages in Azure Functions don’t end up in error queue

Aug 22, 2019

@rodeoboy

We are also running into this issue with a saga that we are developing. All the heavy lifting is done in individual consumers. When there are errors the expected Fault is queued, but the original message is lost and does not show up on the error queue. Without the ability to resubmit failed messages we can’t move these changes into production.

Any idea when this might be fixed?

@phatboyg

So, to resolve this issue, the call:

await receiver.Handle(message);

Should throw an exception if the message is not consumed successfully?

@ahocquet

That would send the original message in the dead letter queue. It doesn’t respect the expect behavior where it should be forwarded to an _error suffixed queue, but at least we don’t loose the message.

@phatboyg

Hmm, okay. I think moving to the _error queue makes the most sense, honestly.

@sajibcefalo

@phatboyg I’m currently working with a similar situation. In the above situation, how we can consume faults. As described in the article,
I have not found anything to configure the fault consumer in Azure Service Bus and Azure function.
My main goal is to log the exceptions once. If we put a try/ catch block inside the consumer log the exception and rethrow the exception it ‘ll log the exception for each retry also. I just want to log the exception once before finally putting it into poison queue.

@gertjvr

@sajibcefalo if you create a consumer for IConsumer<Fault<{MessageType}>> this consumer is called once before the message is move to the _error queue. As described here

@sajibcefalo

@gertjvr Thanks for the reply. How exactly we configured the fault consumer? I created a fault consumer but have not found any way to configure it.

@gertjvr

@sajibcefalo just register the consumer under a receive endpoint as with normal consumers.

cfg.ReceiveEndpoint("process_orders", ec =>
{
    ec.Consumer<ReceiveOrdersConsumer>(context);
    ec.Consumer<ReceiveFaultedOrdersConsumer>(context);
});

public class ReceiveOrdersConsumer : IConsumer<ReceiveOrdersCommand>
public class ReceiveFaultedOrdersConsumer : IConsumer<Fault<ReceiveOrdersCommand>>

@sajibcefalo

@gertjvr How this thing fit in the above example for azure function?

By the way, I registered the consumers the way you mentioned inside the Bus.Factory.CreateBrokeredMessageReceiver but it didn’t work.

@gertjvr

looking at the repo https://github.com/ahocquet/masstransit-error-handling-azure-functions you could add the below and then the retries for the message consumer is exhausted should call any registered fault consumers.

Below is only alteration to above repo.

    public class CommonFaultConsumer : IConsumer<Fault<OrderCreated>>
    {
        public Task Consume(ConsumeContext<Fault<OrderCreated>> context)
        {
            // log exception etc...
            return Task.CompletedTask;
        }
    }

    public class Startup : FunctionsStartup
    {
        public override void Configure(IFunctionsHostBuilder builder)
        {
            builder.Services.AddMassTransit(configurator =>
            {
                configurator.AddConsumer<CommonConsumer>();
                configurator.AddConsumer<CommonFaultConsumer>();
            });
        }
    }

@gertjvr

the above is a possibly solution to only log once versus each retry.

@phatboyg

That’s a pretty decent example for Azure Functions, I should use it as a base for a real function sample.

@sajibcefalo

@gertjvr Thank you very much for your reply and effort. I’m not sure whether I should mention another exceptional case where I do not use the azure function but azure web job with .Net framework. I configured the consumers as follows but the fault consumer was not triggered. Am I missing something? Can you please identify what’s wrong here?

    Bus.Factory.CreateBrokeredMessageReceiver(binder, cfg =>
                {
                    cfg.CancellationToken = cancellationToken;
                    cfg.SetLog(traceWriter);
                    cfg.InputAddress = new Uri(serviceBusUri + queueName);
                    cfg.Consumer(() => new CommonConsumer());
                    cfg.Consumer(() => new CommonFaultConsumer());

                }); 

@sajibcefalo

@phatboyg

Fixed by #1661 I believe, and the PR that accompanied it.

Понравилась статья? Поделить с друзьями:
  • Massa k весы ошибка batt
  • Mass storage device usb device как исправить вин 10
  • Mathcad как изменить шрифт
  • Mass effect как изменить разрешение экрана на 1920х1080
  • Mathcad все расчеты привели к ошибке либо к комплексному результату