Rxjs subscribe error

Error handling is an essential part of RxJs, as we will need it in just about any reactive program that we write. Error handling in RxJS is likely not as well understood as other parts of the library, but it's actually quite simple to understand if we focus on understanding first the Observable contract in general. In this post, we are going to provide a complete guide containing the most common error handling strategies that you will need in order to cover most practical scenarios, starting w

Error handling is an essential part of RxJs, as we will need it in just about any reactive program that we write.

Error handling in RxJS is likely not as well understood as other parts of the library, but it’s actually quite simple to understand if we focus on understanding first the Observable contract in general.

In this post, we are going to provide a complete guide containing the most common error handling strategies that you will need in order to cover most practical scenarios, starting with the basics (the Observable contract).

Table Of Contents

In this post, we will cover the following topics:

  • The Observable contract and Error Handling
  • RxJs subscribe and error callbacks
  • The catchError Operator
  • The Catch and Replace Strategy
  • throwError and the Catch and Rethrow Strategy
  • Using catchError multiple times in an Observable chain
  • The finalize Operator
  • The Retry Strategy
  • Then retryWhen Operator
  • Creating a Notification Observable
  • Immediate Retry Strategy
  • Delayed Retry Strategy
  • The delayWhen Operator
  • The timer Observable creation function
  • Running Github repository (with code samples)
  • Conclusions

So without further ado, let’s get started with our RxJs Error Handling deep dive!

The Observable Contract and Error Handling

In order to understand error handling in RxJs, we need to first understand that any given stream can only error out once. This is defined by the Observable contract, which says that a stream can emit zero or more values.

The contract works that way because that is just how all the streams that we observe in our runtime work in practice. Network requests can fail, for example.

A stream can also complete, which means that:

  • the stream has ended its lifecycle without any error
  • after completion, the stream will not emit any further values

As an alternative to completion, a stream can also error out, which means that:

  • the stream has ended its lifecycle with an error
  • after the error is thrown, the stream will not emit any other values

Notice that completion or error are mutually exclusive:

  • if the stream completes, it cannot error out afterwards
  • if the streams errors out, it cannot complete afterwards

Notice also that there is no obligation for the stream to complete or error out, those two possibilities are optional. But only one of those two can occur, not both.

This means that when one particular stream errors out, we cannot use it anymore, according to the Observable contract. You must be thinking at this point, how can we recover from an error then?

RxJs subscribe and error callbacks

To see the RxJs error handling behavior in action, let’s create a stream and subscribe to it. Let’s remember that the subscribe call takes three optional arguments:

  • a success handler function, which is called each time that the stream emits a value
  • an error handler function, that gets called only if an error occurs. This handler receives the error itself
  • a completion handler function, that gets called only if the stream completes

Completion Behavior Example

If the stream does not error out, then this is what we would see in the console:

HTTP response {payload: Array(9)}
HTTP request completed.

As we can see, this HTTP stream emits only one value, and then it completes, which means that no errors occurred.

But what happens if the stream throws an error instead? In that case, we will see the following in the console instead:

RxJs Error Handling console output

As we can see, the stream emitted no value and it immediately errored out. After the error, no completion occurred.

Limitations of the subscribe error handler

Handling errors using the subscribe call is sometimes all that we need, but this error handling approach is limited. Using this approach, we cannot, for example, recover from the error or emit an alternative fallback value that replaces the value that we were expecting from the backend.

Let’s then learn a few operators that will allow us to implement some more advanced error handling strategies.

The catchError Operator

In synchronous programming, we have the option to wrap a block of code in a try clause, catch any error that it might throw with a catch block and then handle the error.

Here is what the synchronous catch syntax looks like:

This mechanism is very powerful because we can handle in one place any error that happens inside the try/catch block.

The problem is, in Javascript many operations are asynchronous, and an HTTP call is one such example where things happen asynchronously.

RxJs provides us with something close to this functionality, via the RxJs catchError Operator.

How does catchError work?

As usual and like with any RxJs Operator, catchError is simply a function that takes in an input Observable, and outputs an Output Observable.

With each call to catchError, we need to pass it a function which we will call the error handling function.

The catchError operator takes as input an Observable that might error out, and starts emitting the values of the input Observable in its output Observable.

If no error occurs, the output Observable produced by catchError works exactly the same way as the input Observable.

What happens when an error is thrown?

However, if an error occurs, then the catchError logic is going to kick in. The catchError operator is going to take the error and pass it to the error handling function.

That function is expected to return an Observable which is going to be a replacement Observable for the stream that just errored out.

Let’s remember that the input stream of catchError has errored out, so according to the Observable contract we cannot use it anymore.

This replacement Observable is then going to be subscribed to and its values are going to be used in place of the errored out input Observable.

The Catch and Replace Strategy

Let’s give an example of how catchError can be used to provide a replacement Observable that emits fallback values:

Let’s break down the implementation of the catch and replace strategy:

  • we are passing to the catchError operator a function, which is the error handling function
  • the error handling function is not called immediately, and in general, it’s usually not called
  • only when an error occurs in the input Observable of catchError, will the error handling function be called
  • if an error happens in the input stream, this function is then returning an Observable built using the of([]) function
  • the of() function builds an Observable that emits only one value ([]) and then it completes
  • the error handling function returns the recovery Observable (of([])), that gets subscribed to by the catchError operator
  • the values of the recovery Observable are then emitted as replacement values in the output Observable returned by catchError

As the end result, the http$ Observable will not error out anymore! Here is the result that we get in the console:

HTTP response []
HTTP request completed.

As we can see, the error handling callback in subscribe() is not invoked anymore. Instead, here is what happens:

  • the empty array value [] is emitted
  • the http$ Observable is then completed

As we can see, the replacement Observable was used to provide a default fallback value ([]) to the subscribers of http$, despite the fact that the original Observable did error out.

Notice that we could have also added some local error handling, before returning the replacement Observable!

And this covers the Catch and Replace Strategy, now let’s see how we can also use catchError to rethrow the error, instead of providing fallback values.

The Catch and Rethrow Strategy

Let’s start by noticing that the replacement Observable provided via catchError can itself also error out, just like any other Observable.

And if that happens, the error will be propagated to the subscribers of the output Observable of catchError.

This error propagation behavior gives us a mechanism to rethrow the error caught by catchError, after handling the error locally. We can do so in the following way:

Catch and Rethrow breakdown

Let’s break down step-by-step the implementation of the Catch and Rethrow Strategy:

  • just like before, we are catching the error, and returning a replacement Observable
  • but this time around, instead of providing a replacement output value like [], we are now handling the error locally in the catchError function
  • in this case, we are simply logging the error to the console, but we could instead add any local error handling logic that we want, such as for example showing an error message to the user
  • We are then returning a replacement Observable that this time was created using throwError
  • throwError creates an Observable that never emits any value. Instead, it errors out immediately using the same error caught by catchError
  • this means that the output Observable of catchError will also error out with the exact same error thrown by the input of catchError
  • this means that we have managed to successfully rethrow the error initially thrown by the input Observable of catchError to its output Observable
  • the error can now be further handled by the rest of the Observable chain, if needed

If we now run the code above, here is the result that we get in the console:

RxJs Error Handling console output

As we can see, the same error was logged both in the catchError block and in the subscription error handler function, as expected.

Using catchError multiple times in an Observable chain

Notice that we can use catchError multiple times at different points in the Observable chain if needed, and adopt different error strategies at each point in the chain.

We can, for example, catch an error up in the Observable chain, handle it locally and rethrow it, and then further down in the Observable chain we can catch the same error again and this time provide a fallback value (instead of rethrowing):

If we run the code above, this is the output that we get in the console:

RxJs Map Operator marble diagram

As we can see, the error was indeed rethrown initially, but it never reached the subscribe error handler function. Instead, the fallback [] value was emitted, as expected.

The Finalize Operator

Besides a catch block for handling errors, the synchronous Javascript syntax also provides a finally block that can be used to run code that we always want executed.

The finally block is typically used for releasing expensive resources, such as for example closing down network connections or releasing memory.

Unlike the code in the catch block, the code in the finally block will get executed independently if an error is thrown or not:

RxJs provides us with an operator that has a similar behavior to the finally functionality, called the finalize Operator.

Note: we cannot call it the finally operator instead, as finally is a reserved keyword in Javascript

Finalize Operator Example

Just like the catchError operator, we can add multiple finalize calls at different places in the Observable chain if needed, in order to make sure that the multiple resources are correctly released:

Let’s now run this code, and see how the multiple finalize blocks are being executed:

RxJs Error Handling console output

Notice that the last finalize block is executed after the subscribe value handler and completion handler functions.

The Retry Strategy

As an alternative to rethrowing the error or providing fallback values, we can also simply retry to subscribe to the errored out Observable.

Let’s remember, once the stream errors out we cannot recover it, but nothing prevents us from subscribing again to the Observable from which the stream was derived from, and create another stream.

Here is how this works:

  • we are going to take the input Observable, and subscribe to it, which creates a new stream
  • if that stream does not error out, we are going to let its values show up in the output
  • but if the stream does error out, we are then going to subscribe again to the input Observable, and create a brand new stream

When to retry?

The big question here is, when are we going to subscribe again to the input Observable, and retry to execute the input stream?

  • are we going to retry that immediately?
  • are we going to wait for a small delay, hoping that the problem is solved and then try again?
  • are we going to retry only a limited amount of times, and then error out the output stream?

In order to answer these questions, we are going to need a second auxiliary Observable, which we are going to call the Notifier Observable. It’s the Notifier
Observable that is going to determine when the retry attempt occurs.

The Notifier Observable is going to be used by the retryWhen Operator, which is the heart of the Retry Strategy.

RxJs retryWhen Operator Marble Diagram

To understand how the retryWhen Observable works, let’s have a look at its marble diagram:

RxJs retryWhen Operator

Notice that the Observable that is being re-tried is the 1-2 Observable in the second line from the top, and not the Observable in the first line.

The Observable on the first line with values r-r is the Notification Observable, that is going to determine when a retry attempt should occur.

Breaking down how retryWhen works

Let’s break down what is going in this diagram:

  • The Observable 1-2 gets subscribed to, and its values are reflected immediately in the output Observable returned by retryWhen
  • even after the Observable 1-2 is completed, it can still be re-tried
  • the notification Observable then emits a value r, way after the Observable 1-2 has completed
  • The value emitted by the notification Observable (in this case r) could be anything
  • what matters is the moment when the value r got emitted, because that is what is going to trigger the 1-2 Observable to be retried
  • the Observable 1-2 gets subscribed to again by retryWhen, and its values are again reflected in the output Observable of retryWhen
  • The notification Observable is then going to emit again another r value, and the same thing occurs: the values of a newly subscribed 1-2 stream are going to start to get reflected in the output of retryWhen
  • but then, the notification Observable eventually completes
  • at that moment, the ongoing retry attempt of the 1-2 Observable is completed early as well, meaning that only the value 1 got emitted, but not 2

As we can see, retryWhen simply retries the input Observable each time that the Notification Observable emits a value!

Now that we understand how retryWhen works, let’s see how we can create a Notification Observable.

Creating a Notification Observable

We need to create the Notification Observable directly in the function passed to the retryWhen operator. This function takes as input argument an Errors Observable, that emits as values the errors of the input Observable.

So by subscribing to this Errors Observable, we know exactly when an error occurs. Let’s now see how we could implement an immediate retry strategy using the Errors Observable.

Immediate Retry Strategy

In order to retry the failed observable immediately after the error occurs, all we have to do is return the Errors Observable without any further changes.

In this case, we are just piping the tap operator for logging purposes, so the Errors Observable remains unchanged:

Let’s remember, the Observable that we are returning from the retryWhen function call is the Notification Observable!

The value that it emits is not important, it’s only important when the value gets emitted because that is what is going to trigger a retry attempt.

Immediate Retry Console Output

If we now execute this program, we are going to find the following output in the console:

retryWhen console output

As we can see, the HTTP request failed initially, but then a retry was attempted and the second time the request went through successfully.

Let’s now have a look at the delay between the two attempts, by inspecting the network log:

RxJs retryWhen network log

As we can see, the second attempt was issued immediately after the error occurred, as expected.

Delayed Retry Strategy

Let’s now implement an alternative error recovery strategy, where we wait for example for 2 seconds after the error occurs, before retrying.

This strategy is useful for trying to recover from certain errors such as for example failed network requests caused by high server traffic.

In those cases where the error is intermittent, we can simply retry the same request after a short delay, and the request might go through the second time without any problem.

The timer Observable creation function

To implement the Delayed Retry Strategy, we will need to create a Notification Observable whose values are emitted two seconds after each error occurrence.

Let’s then try to create a Notification Observable by using the timer creation function. This timer function is going to take a couple of arguments:

  • an initial delay, before which no values will be emitted
  • a periodic interval, in case we want to emit new values periodically

Let’s then have a look at the marble diagram for the timer function:

The timer Operator

As we can see, the first value 0 will be emitted only after 3 seconds, and then we have a new value each second.

Notice that the second argument is optional, meaning that if we leave it out our Observable is going to emit only one value (0) after 3 seconds and then complete.

This Observable looks like its a good start for being able to delay our retry attempts, so let’s see how we can combine it with the retryWhen and delayWhen operators.

The delayWhen Operator

One important thing to bear in mind about the retryWhen Operator, is that the function that defines the Notification Observable is only called once.

So we only get one chance to define our Notification Observable, that signals when the retry attempts should be done.

We are going to define the Notification Observable by taking the Errors Observable and applying it the delayWhen Operator.

Imagine that in this marble diagram, the source Observable a-b-c is the Errors Observable, that is emitting failed HTTP errors over time:

The timer Operator

delayWhen Operator breakdown

Let’s follow the diagram, and learn how the delayWhen Operator works:

  • each value in the input Errors Observable is going to be delayed before showing up in the output Observable
  • the delay per each value can be different, and is going to be created in a completely flexible way
  • in order to determine the delay, we are going to call the function passed to delayWhen (called the duration selector function) per each value of the input Errors Observable
  • that function is going to emit an Observable that is going to determine when the delay of each input value has elapsed
  • each of the values a-b-c has its own duration selector Observable, that will eventually emit one value (that could be anything) and then complete
  • when each of these duration selector Observables emits values, then the corresponding input value a-b-c is going to show up in the output of delayWhen
  • notice that the value b shows up in the output after the value c, this is normal
  • this is because the b duration selector Observable (the third horizontal line from the top) only emitted its value after the duration selector Observable of c, and that explains why c shows up in the output before b

Delayed Retry Strategy implementation

Let’s now put all this together and see how we can retry consecutively a failing HTTP request 2 seconds after each error occurs:

Let’s break down what is going on here:

  • let’s remember that the function passed to retryWhen is only going to be called once
  • we are returning in that function an Observable that will emit values whenever a retry is needed
  • each time that there is an error, the delayWhen operator is going to create a duration selector Observable, by calling the timer function
  • this duration selector Observable is going to emit the value 0 after 2 seconds, and then complete
  • once that happens, the delayWhen Observable knows that the delay of a given input error has elapsed
  • only once that delay elapses (2 seconds after the error occurred), the error shows up in the output of the notification Observable
  • once a value gets emitted in the notification Observable, the retryWhen operator will then and only then execute a retry attempt

Retry Strategy Console Output

Let’s now see what this looks like in the console! Here is an example of an HTTP request that was retried 5 times, as the first 4 times were in error:

The timer Operator

And here is the network log for the same retry sequence:

The timer Operator

As we can see, the retries only happened 2 seconds after the error occurred, as expected!

And with this, we have completed our guided tour of some of the most commonly used RxJs error handling strategies available, let’s now wrap things up and provide some running sample code.

Running Github repository (with code samples)

In order to try these multiple error handling strategies, it’s important to have a working playground where you can try handling failing HTTP requests.

This playground contains a small running application with a backend that can be used to simulate HTTP errors either randomly or systematically. Here is what the application looks like:

RxJs sample application

Conclusions

As we have seen, understanding RxJs error handling is all about understanding the fundamentals of the Observable contract first.

We need to keep in mind that any given stream can only error out once, and that is exclusive with stream completion; only one of the two things can happen.

In order to recover from an error, the only way is to somehow generate a replacement stream as an alternative to the errored out stream, like it happens in the case of the catchError or retryWhen Operators.

I hope that you have enjoyed this post, if you would like to learn a lot more about RxJs, we recommend checking the RxJs In Practice Course course, where lots of useful patterns and operators are covered in much more detail.

Also, if you have some questions or comments please let me know in the comments below and I will get back to you.

To get notified of upcoming posts on RxJs and other Angular topics, I invite you to subscribe to our newsletter:

If you are just getting started learning Angular, have a look at the Angular for Beginners Course:

In Rxjs, when we work with observables handling the errors is a bit confusing for beginners because you can think of a try-catch, but Rxjs comes with operators to manage it, so what can I use and when?

Let’s move to each step with code, the example uses angular httpClient, but it applies to any data stream.

The scenario

Our app uses a service to get the list of beers and show the first one as the title.

import { HttpClient } from '@angular/common/http';
import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';

@Injectable()
export class BeerService {
  private apiUrl = 'https://api.punkapi.com/v2/beers';
  constructor(private HTTP: HttpClient) {}

  getBeers(): Observable<any> {
    return this.http.get(this.apiUrl);
  }
}

The app component subscribes to it, shows the beer list, and takes the first one.

import { Component, OnInit } from '@angular/core';
import { BeerService } from './beer.service';

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css'],
})
export class AppComponent implements OnInit {
  title = 'my first beer';
  beers = [];
  constructor(private beerService: BeerService) {}

  ngOnInit() {
    try {
      this.beerService.getBeers().subscribe((beers) => {
        console.log(beers);
        this.beers = beers;
        this.title = beers[0].name;
      });
    } catch (err) {
      this.title = 'Ups a error';
    }
  }
}

What happens if the API fails? , We changed the URL to a failed URL to catch the error with some strategies.

Using try-catch

In JavaScript, we use a try-catch to validate a piece of code, and if something comes with an error, it catches.

But the try-catch is useless with our rxjs code because the errors happen in the subscribe scope, so try-catch doesn’t solve anything, so we need to use Rxjs operators.

export class AppComponent implements OnInit {
  title = 'my first beer';
  beers = [];
  constructor(private beerService: BeerService) {}

  ngOnInit() {
    try {
      this.beerService.getBeers().subscribe((beers) => {
        console.log(beers);
        this.beers = beers;
        this.title = beers[0].name;
      });
    } catch (err) {
      this.title = 'Us a error';
    }
  }
}

Read more about try-cath

So, who is to catch the error in the subscription?

To understand why it is not working, remember that when we subscribe to an observable, the subscribe call takes three optional arguments.

      this.beerService
      .getBeers()
      .subscribe({
        next: (beers) => {
          console.log(beers);
          this.beers = beers;
          this.title = beers[0].name;
        },
        error: (e) => {
          console.log(e);
          this.title = 'ups';
        },
        complete: () => console.log('done'),
      });
  • next or success function is called each time the stream emits a value.
  • error: is a function called when an error occurs and gets the error.
  • complete: is a function that gets called only if the stream completes

So the error is in the subscribe function scope, so how can we manage the case?

Using Rxjs Operators

Rxjs provide some operators to help us with the errors, each of them is used in the scenario, so let’s use each.

We are going to play with cathError, throwError, and EMPTY.

cathError

It catches the error, but emits the value. In short, it takes the error and returns another observable.

I removed the previous strategy about three callback functions and used the pipe to work with the catchError operator.

When the API fails, I return an array with the default beer Observable object.

Learn more about pipe

this.beerService
      .getBeers()
      .pipe(catchError(() => of([{ name: 'my default beer' }])))
      .subscribe((beers) => {
        console.log(beers);
        this.beers = beers;
        this.title = beers[0].name;
      });

The catchError is perfect for emitting a default value if something happens in our code, and to subscribe can take the default value as an emission.

throwError

Sometimes we don’t want to emit the error but want to notify the error; for those scenarios, the throwError helps us.

throwError does not emit the data to the next, and it uses the error on the subscriber callbacks. If we want to catch a custom error or notify the backend, we can use the error callback in the subscriber.

 ngOnInit() {
    this.beerService
      .getBeers()
      .pipe(
        catchError(() => {
          return throwError(() => new Error('ups something happened));
        })
      )
      .subscribe({
        next: (beers) => {
          console.log(beers);
          this.beers = beers;
          this.title = beers[0].name;
        },
        error: (err) => {
          console.log(err);
        },
      });
  }

Read more about throwError

EMPTY

Sometimes we don’t want to propagate the error in our component. Rxjs provide an EMPTY constant and return an empty Observable without emitting any data to the subscriber callbacks.

this.beerService
      .getBeers()
      .pipe(
        catchError(() => {
          return EMPTY;
        })
      )
      .subscribe({
        next: (beers) => {
          this.beers = beers;
          this.title = beers[0].name;
        },
        error: (err) => console.log(err),
      });

Read more about EMPTY

Conclusion

In short, we learn how to pipe the data and catch the errors using catchError, to modify the return observable or use EMPTY not to trigger the error to the component.

Play with the code in:

  • Stackbliz

Error Handling in the Reactive Extensions

One of the most difficult tasks in asynchronous programming is dealing with errors. Unlike interactive style programming, we cannot simply use the try/catch/finally approach that we use when dealing with blocking code.

try {
  for (var obj in objs) {
    doSomething(obj);
  }
} catch (e) {
  handleError(e);
} finally {
  doCleanup();
}

These actions mirror exactly our Observer class which has the following contract for handing zero to infinite items with onNext and optionally handling either an Error with onError or successful completion with onCompleted.

interface Observer<T> {
  onNext(value: T) : void
  onError(error: Error) : void
  onCompleted() : void
}

But the try/catch/finally approach won’t work with asynchronous code. Instead, we have a myriad of ways of handling errors as they occur, and ensure proper disposal of resources.

For example, we might want to do the following:

  • swallow the error and switch over to a backup Observable to continue the sequence
  • swallow the error and emit a default item
  • swallow the error and immediately try to restart the failed Observable
  • swallow the error and try to restart the failed Observable after some back-off interval

We’ll cover each of those scenarios and more in this section.

Catching Errors

The first topic is catching errors as they happen with our streams. In the Reactive Extensions, any error is propogated through the onError channel which halts the sequence. We can compensate for this by using the catch operator, at both the class and instance level.

Using the class level catch method, we can catch errors as they happen with the current sequence and then move to the next sequence should there be an error. For example, we could try getting data from several URLs, it doesn’t matter which since they all have the same data, and then if that fails, default to a cached version, so an error should never propagate. One thing to note is that if get('url') calls succeed, then it will not move onto the next sequence in the list.

var source = Rx.Observable.catch(
  get('url1'),
  get('url2'),
  get('url3'),
  getCachedVersion()
);

var subscription = source.subscribe(
  data => {
    // Display the data as it comes in
  }
);

We also have an instance version of catch which can be used two ways. The first way is much like the example above, where we can take an existing stream, catch the error and move onto the next stream or Promise.

var source = get('url1').catch(getCachedVersion());

var subscription = source.subscribe(
  data => {
    // Display the data as it comes in
  }
);

The other overload of catch allows us to inspect the error as it comes in so we can decide which route to take. For example, if an error status code of 500 comes back from our web server, we can assume it is down and then use a cached version.

var source = get('url1').catch(e => {
  if (e.status === 500) {
    return cachedVersion();
  } else {
    return get('url2');
  }
});

var subscription = source.subscribe(
  data => {
    // Display the data as it comes in
  }
);

This isn’t the only way to handle errors as there are plenty of others as you’ll see below.

Ignoring Errors with onErrorResumeNext

The Reactive Extensions borrowed from a number of languages in our design. One of those features is bringing On Error Resume Next from Microsoft Visual Basic. This operation specifies that when a run-time error occurs, control goes to the statement immediately following the statement where the error occurred, and execution continues from that point. There are some instances with stream processing that you simply want to skip a stream which produces an error and move to the next stream. We can achieve this with a class based and instance based onErrorResumeNext method.

The class based onErrorResumeNext continues a stream that is terminated normally or by an Error with the next stream or Promise. Unlike catch, onErrorResumeNext will continue to the next sequence regardless of whether the previous was in error or not. To make this more concrete, let’s use a simple example of mixing error sequences with normal sequences.

var source = Rx.Observable.onErrorResumeNext(
  Rx.Observable.just(42),
  Rx.Observable.throw(new Error()),
  Rx.Observable.just(56),
  Rx.Observable.throw(new Error()),
  Rx.Observable.just(78)
);

var subscription = source.subscribe(
  data => console.log(data)
);
// => 42
// => 56
// => 78

The instance based onErrorResumeNext is similar to the class based version, the only difference being that it is attached to the prototype, but can take another sequence or Promise and continue.

Retrying Sequences

When catching errors isn’t enough and we want to retry our logic, we can do so with retry or retryWhen operators. With the retry operator, we can try a certain operation a number of times before an error is thrown. This is useful when you need to get data from a resource which may have intermittent failures due to load or any other issue.

Let’s take a look at a simple example of trying to get some data from a URL and giving up after three tries.

// Try three times to get the data and then give up
var source = get('url').retry(3);

var subscription = source.subscribe(
  data => console.log(data),
  err => console.log(err)
);

In the above example, it will give up after three tries and thus call onError if it continues to fail after the third try. We can remedy that by adding catch to use an alternate source.

// Try three times to get the data and then return cached data if still fails
var source = get('url').retry(3).catch(cachedVersion());

var subscription = source.subscribe(
  data => {
    // Displays the data from the URL or cached data
    console.log(data);
  }
);

The above case retries immediately upon failure. But what if you want to control when a retry happens? We have the retryWhen operator which allows us to deeply control when the next try happens. We incrementally back off trying again by using the following method:

var source = get('url').retryWhen(
   attempts =>
    attempts
      .zip(Observable.range(1, 3), (_, i) => i)
      .flatMap(i => {
        console.log('delay retry by ' + i + ' second(s)');
        return Rx.Observable.timer(i * 1000);
      });
);

var subscription = source.subscribe(
  data => {
    // Displays the data from the URL or cached data
    console.log(data);
  }
);
// => delay retry by 1 second(s)
// => delay retry by 2 second(s)
// => Data

Ensuring Cleanup with Finally

We’ve already covered the try/catch part of try/catch/finally, so what about finally? We have the finally operator which calls a function after the source sequence terminates gracefully or exceptionally. This is useful if you are using external resources or need to free up a particular variable upon completion.

In this example, we can ensure that our WebSocket will indeed be closed once the last message is processed.

var socket = new WebSocket('ws://someurl', 'xmpp');

var source = Rx.Observable.from(data)
  .finally(() => socket.close());

var subscription = source.subscribe(
  data => {
    socket.send(data);
  }
);

But we can do a better job in terms of managing resources if need be by using the using method.

Ensuring Resource Disposal

As stated above, finally can be used to ensure proper cleanup of any resources or perform any side effects as necessary. There is a cleaner approach we can take by creating a disposable wrapper around our object with a dispose method so that when our scope is complete, then the resource is automatically disposed through the using operator.

function DisposableWebSocket(url, protocol) {
  var socket = new WebSocket(url, protocol);

  // Create a way to close the WebSocket upon completion
  var d = Rx.Disposable.create(() => socket.close());

  d.socket = socket;

  return d;
}

var source = Rx.Observable.using(
  () => new DisposableWebSocket('ws://someurl', 'xmpp'),
  d => 
    Rx.Observable.from(data)
      .tap(data => d.socket.send(data));
  }
);

var subscription = source.subscribe();

Delaying Errors with mergeDelayError

Another issue may arise when you are dealing with flattening sequences into a single sequence and there may be errors along the way. We want a way to flatten without being interrupted by one of our sources being in error. This is much like the other operator mergeAll but the main difference is, instead of immediately bubbling up the error, it holds off until the very end.

To illustrate, we can create this little sample that has an errored sequence in the middle when it is trying to flatten the sequences.

var source1 = Rx.Observable.of(1,2,3);
var source2 = Rx.Observable.throwError(new Error('woops'));
var source3 = Rx.Observable.of(4,5,6);

var source = Rx.Observable.mergeDelayError(source1, source2, source3);

var subscription = source.subscribe(
  x => console.log('onNext: %s', x),
  e => console.log('onError: %s', e),
  () => console.log('onCompleted'));

// => 1
// => 2
// => 3
// => 4
// => 5
// => 6
// => Error: Error: woops

Further Reading

  • Using Generators For Try/Catch Operations
  • Testing and Debugging Your RxJS Application

Errors are first class citizens in reactive programming. The error flow is adopted as a valid case of all observables. This is an essential and great feature but also one of the less understood parts. In this post we explore some of the most common RxJS error handling strategies.

For more error handling example look at this helpful page: learnrxjs.

Observable contract

The following code snippet is typical for RxJS. The subscribe method on the observable http$ has 3 optional handler functions for the value-, error- and complete-flow. These are 3 events an observable can emit. Note that error and complete event are mutually exclusive by the observable contract.

const http$ = this.http.get<Person[]>('/api/persons'); 

http$.subscribe(
    res => console.log('Response', res),
    err => console.log('Error', err),
    () => console.log('Completed')
);

But the above subscribe method has its limitations when it comes to error handling. What if we want to recover from an error? In the next part we discuss some other ways to handle errors.

Operator: catchError

The catchError operator will pass through all value and complete events from the source observable. Only when an error is emitted the operator will return a Replacement Observable.

const http$ = this.http.get<Person[]>('/api/persons');

http$
    .pipe(
        catchError(err => of([]))
    )
    .subscribe(
        res => console.log('Response', res),
        err => console.log('Error', err),
        () => console.log('Completed')
    );

In this example we see that the replacement observable in case of an error is of([]), this creates an observable with only one value: []. In the case that the http call returns an error the result is:

Response []
Completed

No error! The replacement observable is returned instead.

Another strategy could be to rethrow the error. This could be the case when you want to have your own custom error to be handled further on.

const http$ = this.http.get<Person[]>('/api/persons');

http$
    .pipe(
        catchError(err => {
            return throwError('This is a custom error!');
        })
    )
    .subscribe(
        res => console.log('Response', res),
        err => console.log('Error', err),
        () => console.log('Completed')
    );

Operator: finalize

Sometimes a piece of code has to be executed no matter what the case. “Error or not this resource/memory has to be released.” RxJS has the finalize operator that will always be executed. Let’s combine the previous examples in the following.

const http$ = this.http.get<Person[]>('/api/persons');

http$
    .pipe(
        catchError(err => {
            console.log('rethrowing error', err);
            return throwError('This is a custom error!');
        }),
        finalize(() => console.log("first finalize")),
        catchError(err => {
             console.log('provide replacement');
             return of([]);
        }),
        finalize(() => console.log("second finalize"))
    )
    .subscribe(
        res => console.log('Response', res),
        err => console.log('Error', err),
        () => console.log('Completed')
    );

An error of the http observable will output the following. Note that the last finalize is executed after the value handler and complete handler functions.

GET http://localhost:4200/api/persons 500
   (Internal Server Error)
rethrowing error
   (HttpErrorResponse {headers: ... })
provide replacement
first finalize
Response []
Completed
second finalize

Operator: retryWhen

Yet another strategy is to retry. In the case of an error we can catch it and just subscribe to the source observable again. The only question is when to retry. Do we want to retry immediately? Or do we want a small delay? We will create a Notifier Observable to signal when to retry. Take a look at the marble diagram of the retryWhen operator:

rxjs retrywhen marble diagram - RXJS ERROR HANDLING

The first line is the Notifier Observable where the r is a signal to retry. The second line is the source observable that errors. At the bottom we have the result: After each retry event we subscribe again te the source and all events are received again.

The power of this construction is that we can build the notifier observable in whatever way we want. In the next example we will retry every second if the http call emits an error.

const http$ = this.http.get<Person[]>('/api/persons');

http$
    .pipe(
        tap(() => console.log("HTTP request executed")),
        retryWhen(errors => {
            return errors.pipe(
                       tap(() => console.log('retrying...')),
                       delay(1000)
                   );
        })
    )
    .subscribe(
        res => console.log('Response', res),
        err => console.log('Error', err),
        () => console.log('Completed')
    );

Now let’s say the api is temporarily unavailable and after 1 second it is back up. This would be the output:

GET http://localhost:4200/api/persons 500
   (Internal Server Error)
retrying...
HTTP request executed
Response {...}
Completed

Operator: delayWhen

When we want some logic to determine how long the delay should be, we can use the delayWhen operator. The following code snippet will have the same result as the previous. But the setup allows to expand the function as desired.

retryWhen(errors => {
    return errors.pipe(
        tap(() => console.log('retrying...')),
        delayWhen(() => timer(1000))
    );
})

Hopefully you are now able to use these different strategies of RxJS error handling. Handle with care.

 
 
 
 
 

iFour Logo iFour Logo

Get in touch

  • info@ifourtechnolab.com
  • 601 & 612, The Times square Arcade, Near Baghban party plot, Thaltej — Shilaj Road, Thaltej,
    Ahmedabad, Gujarat — 380059

iFour Team — February 05, 2021

Listening is fun too.

Straighten your back and cherish with coffee — PLAY !

Error Handling using Angular RxJS

RxJS is the biggest part of Angular. With a well understanding of how to implement error handling the right way with RxJS, you are sure to run with strange problems down the line when an error does occur. By contrast, if you know what you are doing up a cover, you can remove those strange problems and save yourself some debugging distress.

 

Table of Content

  • 1. Infinite Observables
  • 2. DOM Event Case Study
  • 3. Bad Error Handling
  • 4. Good Error Handling
  • 5. Output
  • 6. NgRX Effect Case Study
  • 7. CallWithoutError Effect
  • 8. CallWithError Effect
  • 9. CallWithErrorKeepListening Effect
  • 10. CallWithErrorNotCaughtEffect
  • 11. Conclusion

In this blog, we will discuss the type of RxJS observables to be most involved, how to incorrectly handle an error through RxJS, what happens when you do not handle an error applying RxJS and how to precisely handle errors applying RxJS.

Infinite Observables

This blog will be linked with infinite observables, those that you hope to keep getting values from. That is because if you do error handling incorrectly, they stop to be infinite observables and finish, which will be very bad. After all, your app is expecting it to be infinite.

These will be considered:

  • DOM Event: A DOM Event ‘keyup’ that you wanna debounce as the user types on the page and then look up through an API.

  • NgRx Effect: An NgRx Effect that you hope will forever be listening for dispatched actions

DOM Event Case Study

That first consideration will concentrate on handling DOM events and doing searches based on them. There will be two input boxes that you type in a character’s name of Star Wars. While you stop typing for 300 milliseconds and the text is changed. It will search for those names through the Star Wars API and display results. The first input box will keep working after an error. The second input box will halt working after an error.

Here is the interface:

ai-Hiring-banner

Figure: DOM Event case study interface

We have gamed this a pretty bit so that if you type “error”, it will search over a wrong URL, therefore creating an error.

Here is the appropriate HTML:


                                        

The key up event normally emits through the “next” method of the Subject.

Following is the component code:

 

  import { Component } from '@angular/core';
  import { Subject } from 'rxjs';
  import { finalize } from 'rxjs/operators';
  import { RxJSDemoService } from './rx-jsdemo.service';
  
  @Component({
    selector: 'app-root',
    templateUrl: './app.component.html',
    styleUrls: ['./app.component.css']
  })
  export class AppComponent {
    title = 'RxJSErrorHandlingDemo';
    searchTerm$ = new Subject();
    searchTermError$ = new Subject();
    resultsError: any;
    results: any;
    constructor(
      private rxjsDemoService: RxJSDemoService,  
    ) {}
  
    ngOnInit(): void {
      this.rxjsDemoService
        .searchBadCatch(this.searchTermError$)
        .pipe(
          finalize(() =>
            console.log("searchTermError$ (bad catch) finalize called!")
          )
        )
        .subscribe(results => {
          console.log("Got results from search (bad catch)");
          this.resultsError = results.results;
        });
  
      this.rxjsDemoService
        .search(this.searchTerm$)
        .pipe(finalize(() => console.log("searchTerm$ finalize called!")))
        .subscribe(results => {
          console.log("Got results from search (good catch)");
          this.results = results.results;
        });
    }
  }
  
                                        

This code essentially will be reporting results to the page and logging either it was called or not. Note that we are calling two various service methods and passing in two several subjects.

Read More: Getting Started With Angular Animations

The error handling code for this case study is in the following rxjsDemoService :

 

import { Injectable } from '@angular/core';
import { of } from 'rxjs';
import { switchMap, debounceTime, distinctUntilChanged } from 'rxjs/operators';
import { catchError } from 'rxjs/operators'; 
import { Observable } from 'rxjs';
import { HttpClient } from "@angular/common/http";

@Injectable({
  providedIn: 'root'
})
export class RxJSDemoService {
  constructor(private http: HttpClient){}    
  
  searchBadCatch(terms: Observable) {
    return terms.pipe(
      debounceTime(300),
      distinctUntilChanged(),
      switchMap(term => this.searchStarWarsNames(term)),
      catchError(error => {
        console.log("Caught search error the wrong way!");
        return of({ results: null });
      })
    );
  }

  search(terms: Observable) {
    return terms.pipe(
      debounceTime(300),
      distinctUntilChanged(),
      switchMap(term =>
        this.searchStarWarsNames(term).pipe(
          catchError(error => {
            console.log("Caught search error the right way!");
            return of({ results: null });
          })
        )
      )
    );
  }

  private searchStarWarsNames(term) {
    let url = `https://swapi.co/api/people/?search=${term}`;
    if (term === "error") {
      url = `https://swapi.co/apix/people/?search=${term}`;
}

    return this.http.get(url);
  }  
}

                                        

Bad Error Handling

The searchBadCatch method has our bad error handling code. Just looking at it, it looks fine. It’s debouncing for 300 ms, has the distinctUntilChanged to ensure we do not search for the same thing twice in a row. There is a switchMap that calls the searchStarWarsName method and we are catching errors through the catchError method.

If you catch the error through catchError at the first step of the Observables pipe method. It will enable you to handle the error and return one more result in the stream but will then finish the observable stream. And that indicates it won’t listen to key up events anymore. Thus, at all costs never allow an error to percolate to this step.

Remark that if catchError is given on the initial level of the Observable pipe method, the finalize operator will be called. You can see this up in the component code.

Following is the code of it:

ai-Hiring-banner

Figure: Component code

Never let an error permeate to the level of the red line.

Good Error Handling

The search method has our RxJS finest practice error handling code:

Always put the catchError operator within a switchMap so that it only ends the API call stream and then returns the stream to the switchMap, which remains the Observable.

If you are not calling an API, ensure to add a try or catch block so that you can handle the error in the catch block and not allow it to permeate to the first level pipe. Do not believe your code will never fail, use a try or catch block.

So, you can see in the code that we add a pipe to the searchStarWarsNames call so that within of there, we catchError and therefore not allow the error to permeate to the first level pipe.

And the best practice code is the following:

ai-Hiring-banner

Figure: Best practice code

Always catch errors within the switchMap/mergeMap/concatMap, etc.

Output

Now it is time to see how this works on the website. We can imagine that it works at first. The fun starts when an error is generated from the API call.

First,we will type error in both input boxes as shown:

ai-Hiring-banner

Figure: Output

We will quite it as an exercise for you to see the console output. Now for the actual test cantype in something and get results later handling the error?

Here we see that the first one works and the second one does not work anymore:

ai-Hiring-banner

Figure: console output

The second input box is what we were talking about over a Weird problem in the intro. You would have a difficult time estimating out why your search quit working.

NgRX Effect Case Study

The reason we began writing this blog was that we had a weird problem with one of our apps that was through NgRX and Effects. See for information on effects.

The interface is the following:

ai-Hiring-banner

Figure: NgRX and Effects Interface

Nothing desires here:

  1. The success button calls the Star Wars API with «person/1» (Luke Skywalker) and gives the name in the output on the screen.

  2. Error-stops listening button calls the API with an incorrect URL so it generates an error message «catch is done wrong so it stops listening for the effect».

  3. Error-Don’t catch error button calls the API with an incorrect URL so it generates an error message «not catching the error».

  4. Error-Keeps Listening button calls the API with an incorrect URL so it generates an error message «properly catching the error so you can click it multiple times».

We will skip the HTML for this because it is just buttons calling component methods. Following is the component code:

 

import { Component, OnInit } from "@angular/core";
import { catchError } from "rxjs/operators";
import { of, throwError, Observable } from "rxjs";
import { Store, select } from "@ngrx/store";

import { RxjsService } from "./services/rxjs.service";
import * as fromRoot from "./store/reducers";
import {
  CallWithoutError,
  CallWithError,
  CallWithErrorKeepListening,
  CallWithErrorNotCaught,
  EffectReturnTest
} from "./store/app.actions";

@Component({
  selector: "app-root",
  templateUrl: "./app.component.html",
  styleUrls: ["./app.component.css"]
})
export class AppComponent implements OnInit {
  title = "RxJS Playground";
  name$: Observable;
  someString$: Observable;

  constructor(
    private rxjsService: RxjsService,
    private store: Store
  ) {}

  ngOnInit(): void {
    this.rxjsService.subject
      .pipe(
        catchError(error => {
          console.log("Error in catchError", error);
          return of(error);
        })
      )
      .subscribe(
        value => {
          console.log("Subject value:", value);
        },
        error => console.log("Error!", error)
      );

    this.name$ = this.store.pipe(select(fromRoot.getName));
    this.someString$ = this.store.pipe(select(fromRoot.getSomeString));
  }

  pokeSubject(value: boolean) {
    this.rxjsService.nextSubject(value);
  }

  errorSubject() {
    this.rxjsService.errorSubject();
  }

  ngrxSuccess() {
    this.store.dispatch(new CallWithoutError());
  }

  ngrxError() {
    this.store.dispatch(new CallWithError());
  }

  ngrxErrorKeepListening() {
    this.store.dispatch(new CallWithErrorKeepListening());
  }
  ngrxErrorDontCatch() {
    this.store.dispatch(new CallWithErrorNotCaught());
  }

  ngrxEffectReturnTest(actionNum) {
    this.store.dispatch(new EffectReturnTest(actionNum));
  }
}

                                        

The error handling excellent, evil, and ugly is in the effected code.

CallWithoutError Effect

Following is our success case:

This individual will work each time but if it failed, it would remain working because the catchError is inside the http.get pipe. For the resolution case, the SetName reducer will add the “name” to the store. The UI plucks that up and displays it.

CallWithError Effect

This effect will call the API with the wrong URL so an error is caused. The error handling is done wrongly so once called, this will never work repeatedly unless the app is refreshed.

In this case, the catchError at the first level of this action$.pipe will get called, thus ending the effect since its Observable stream will end. This is much like in the case study above through many RxJS Observables. We should see the message «Error-You’re Fasted» on the page after clicking it. If we try clicking the button return, it will not fire the effect.

Following is the output for this:

ai-Hiring-banner

Figure: Output of callWithoutError Effect

CallWithErrorKeepListening Effect

This effect will call the API through the incorrect URL so an error is generated. Despite that, it will handle the error properly so that it can be called again.

The correct way to handle the RxJS error is by putting the catchError within the http.get pipe. It will end the http.get observable but that does not matter because it’s a finite observable anyways and only emits one value. While it returns the SetName action, the switchMap will release it and remain the Observable stream. Notice that the finalize there will never be called.

Following is the output of this:

ai-Hiring-banner

Figure: Output of callWithErrorListening Effect

CallWithErrorNotCaughtEffect

This our last effect that describes what happens if we do not catch the error? It behaves the same way as if we handled the error incorrectly. It is just that you are not looking into that error stream.

Also, the name on the UI will not be set after you aren’t calling setName in the catchError operator. So, you will moreover not see any output if it was the first button clicked or you will see the last name that was set. Another mysterious problem that would be difficult to debug.

Conclusion

As you can tell from this blog, knowing how to appropriately handle RxJS errors in your Angular app will help you to prevent those weird problems you could see when your infinite Observable stream ends unexpectedly. Through this knowledge, you should be able to make sure that your infinite observables never and unless you decide they are finished.

Blog Our insights

Понравилась статья? Поделить с друзьями:
  • Rx580 8gb ошибка 43
  • Rx580 2048sp ошибка 43
  • Rx500 epson scanner error
  • Rx470 ошибка 43
  • Rx200s charge error