Grokking Android

Getting Down to the Nitty Gritty of Android Development

RxJava’s Side Effect Methods

By 14 Comments

RxJava’s Observable class has plenty of methods that can be used to transform the stream of emitted items to the kind of data that you need. Those methods are at the very core of RxJava and form a big part of it’s attraction.

But there are other methods, that do not change the stream of items in any way – I call those methods side effect methods.

What do I mean by side effect methods?

Side effect methods do not affect your stream in itself. Instead they are invoked when certain events occur to allow you to react to those events.

For example: if you’re interested in doing something outside of your Subscriber’s callbacks whenever some error occurs, you would use the doOnError() method and pass to it the functional interface to be used whenever an error occurs:

      .doOnError(new Action1() {
         public void call(Throwable t) {
            // use this callback to clean up resources,
            // log the event or or report the 
            // problem to the user

The most important part is the call() method. The code of this method will be executed before the Subscriber’s onError() method is called.

In addition to exceptions RxJava offers many more events to which you can react:

Events and their corresponding side effect operations
Method Functional Interface Event
doOnSubscribe() Action0 A subscriber subscribes to the Observable
doOnUnsubscribe() Action0 A subscriber unsubscribes from the subscription
doOnNext() Action1<T> The next item is emitted
doOnCompleted() Action0 The Observable will emit no more items
doOnError() Action1<T> An error occurred
doOnTerminate() Action0 Either an error occurred or the Observable will emit no more items – will be called before the termination methods
finallyDo() Action0 Either an error occurred or the Observable will emit no more items – will be called after the termination methods
doOnEach() Action1<Notification<T>> Either an item was emitted, the Observable completes or an error occurred. The Notification object contains information about the type of event
doOnRequest() Action1<Long> A downstream operator requests to emit more items

The <T> refers either to the type of the item emitted or, in the case of the onError() method, the type of the Throwable thrown.

The functional interfaces are all of type Action0 or Action1. This means that the single methods of these interfaces do not return anything and take either zero arguments or one argument, depending on the specific event.

Since those methods do not return anything, they cannot be used to change the emitted items and thus do not change the stream of items in any way. Instead these methods are intended to cause side effects like writing something on disk, cleaning up state or anything else that manipulates the state of the system itself instead of the stream of events.

Note: The side effect methods themselves (doOnNext(), doOnCompleted() and so on) do return an Observable. That’s to keep the interface fluent. But the returned Observable is of the same type and emits the same items as the source Observable.

What are they useful for?

Now since they do not change the stream of items there must be other uses for them. I present here three examples of what you can achieve using these methods:

So let’s see these examples in detail.

Use doOnNext() for debugging

With RxJava you sometimes wonder why your Observable isn’t working as expected. Especially when you are just starting out. Since you use a fluent API to transform some source into something that you want to subscribe to, you only see what you get at the end of this transformation pipeline.

When I was learning about RxJava I had some initial experience with Java’s Streams. Basically you have the same problem there. You have a fluid API to move from one Stream of some type to another Stream of another type. But what if it doesn’t work as expected?

With Java 8 Streams you have the peek() method. So when starting out with RxJava I wondered if something comparable is available. Well, there is. Actually, RxJava offers much more!

You can use the doOnNext() method anywhere in your processing pipeline to see what is happening and what the intermediary result is.

Here’s an example of this:

Observable someObservable = Observable
            .from(Arrays.asList(new Integer[]{2, 3, 5, 7, 11}))
            .filter(prime -> prime % 2 != 0)
            .map(number -> String.format(“Contains %d elements”, number));

Subscription subscription = o.subscribe(
            () -> System.out.println(“Completed!”));

And here is the output of that code:

Contains 4 elements

That way you can glean valuable information about what is going on when your Observable doesn’t behave as you expected.

The doOnError() and doOnCompleted() methods can also be useful for debugging the state of your pipeline.

Note: If you’re using RxJava while developing for Android please have a look at the Frodo and Fernando Ceja’s post explaining about the motivation for and usage of Frodo. With Frodo you can use annotations to debug your Observables and Subscribers.

The shown way of using doOnNext() and doOnError() does not change much of the system state – apart from bloating your log and slowing everything down. 🙂

But there are other uses for these operators. And in those cases you use those methods to actually change the state of your system. Let’s have a look at them.

Use doOnError() within flatMap()

Say you’re using Retrofit to access some resource over the network. Since Retrofit supports observables, you can easily use those calls within your processing chain using flatMap().

Alas, network related calls can go wrong in many ways – especially on mobiles. In this case you might not want the Observable to stop working, which it would if you were to rely on your subscriber’s onError() callback alone.

But keep in mind that you have an Observable within your flatMap() method. Thus you could use the doOnError() method to change the UI in some way, yet still have a working Observable stream for future events.

So what this looks like is this:

flatMap(id -> service.getPost()
       .doOnError(t -> {
          // report problem to UI

This method is especially useful if you query your remote resource as a result of potentially recurring UI events.

Use doOnNext() to save/cache network results

If at some point in your chain you make network calls, you could use doOnNext() to store the incoming results to your local database or put them in some cache.

It would be as simple as the following lines:

// getOrderById is getting a fresh order 
// from the net and returns an observable of orders
// Observable<Order> getOrderById(long id) {…}

         .flatMap(id -> getOrderById(id)
                              .doOnNext(order -> cacheOrder(order))
         // carry on with more processing

See this pattern applied in more detail in Daniel Lew’s excellent blog post about accessing multiple sources.

Wrap up

As you’ve seen, you can use the side effect methods of RxJava in multiple ways. Even though they do not change the stream of emitted items, they change the state of your overall system. This can be something as simple as logging the current items of your Observable at a certain point within your processing pipeline, up to writing objects to your database as a result of a network call.

In my next post I am going to show you how to use RxJava’s hooks to get further insights. Stay tuned!

Wolfram Rittmeyer lives in Germany and has been developing with Java for many years.

In recent years he shifted his attention to Android and blogs about anything interesting that came up while developing for Android.

You can find him on Google+ and Twitter.

14 thoughts on “RxJava’s Side Effect Methods”

  1. Thanks for this article. It explains several things about RxJava that I have been using, but did not know how it actually works 😀

    Couple of points:

    > The most important part is the call() method. The code of this method will be executed before the Subscriber‘s doOnError() method is called.

    Don’t you mean the Subscriber’s `onError()` method?

    Secondly, in your code example:

    flatMap(id -> service.getPost()
    .doOnError(t -> {
    // report problem to UI

    I have been placing the code to report the problem to UI within `onErrorResumeNext` itself – that is to say, I do not have the `doOnError()` method – instead I merge that functionality in `onErrorResumeNext()`. Are there any drawbacks of doing this? I guess I’m not able to understand the difference between these two methods at a conceptual level.

    1. Thanks! Yes, you are right of course. I have fixed this.

      You can return a specific value in onErrorReturn() as well. But then you must use the same type and most likely evaluate the incoming values in some way in your subscriber.

      I mostly use Observables within my presenters and use the doOnError() method to call a method my view object.

  2. I have a question regarding “Use doOnError() within flatMap()” section
    In my apps, I usually process errors in Subscribers onError() method, so the code looks like this:

    .subscribe( success -> {show data}, error -> {report problem to UI})

    What are the benefits of using your method? Are those two methods interchangeable ?

    1. What you’re doing is the right approach for anything that really should terminate after an error condition occurs. But if you want to survive the problem, than this won’t work. What I suggest here is for repeated items issued to flatMap() – for example when the user enters text into a search field.

      With your approach any network problem would immediately stop the Observable. With the approach suggested here, a temporary network problem would not cancel the outer Observable you subscribe to and thus further user input could trigger new network requests that might run successfully.

  3. Hi, thanks for your article? Can you give a link to some example code with error handling and caching network result with Rx?

  4. The title of the article is a tad misleading

    Utility Operators would probably be a better term

    1. I think not. I present them in this post exactly as in the Wikipedia definition. I show usages of these methods that cause side effects, that change the overall state of the system. Actually without any side effects, there’s no use for these methods at all. That’s different from other operators which are also useful if used with pure functions.

      Utility operators are way more than those presented here. And the RxJava wiki page mixes different types of operators.

  5. I really appreciate this article. These kind of knowledge is very important to have in mind when we are debugging our App, even finding many difficult bugs. Thanks a lot.

  6. The Use doOnError() within flatMap() section saved my sanity. I’m trying to redo an old, really poorly done application, with RxJava and lear Rx in the proccess. Was stuck on trying to somehow get rid of long and convoluted async call chains. Now I think I got it.
    Thanks Wolfram

    1. Thanks for the feedback, Marko! I’m always glad when my posts are of help 🙂

  7. I think the filter should be modified in order to match the output in your example.

    .filter(prime -> prime % 2 != 0)

    1. Thanks. Somehow missed this. It’s now corrected.

  8. Yes, @zhen, I observed the same thing. The example should be corrected to

    Observable someObservable = Observable
    .from(Arrays.asList(new Integer[]{2, 3, 5, 7, 11}))
    .filter(prime -> prime % 2 != 0)
    .doOnNext(integer -> {
    System.out.println(“filtered” + integer);
    .doOnNext(integer -> {
    System.out.println(“count” + integer);
    .map(number -> String.format(“Contains %d elements”, number));

    Subscription subscription = someObservable.subscribe(
    () -> System.out.println(“Completed!”));

    1. Fixed it. Thanks.

Leave a Reply

Your email address will not be published. Required fields are marked *