Grokking Android

Getting Down to the Nitty Gritty of Android Development

RxJava’s Side Effect Methods

By

RxJava's Observable class has plenty of methods that can be used to transform the stream of emitted items to the kind of data that you need. Those methods are at the very core of RxJava and form a big part of it's attraction.

But there are other methods, that do not change the stream of items in any way - I call those methods side effect methods.

What do I mean by side effect methods?

Side effect methods do not affect your stream in itself. Instead they are invoked when certain events occur to allow you to react to those events.

For example: if you're interested in doing something outside of your Subscriber's callbacks whenever some error occurs, you would use the doOnError() method and pass to it the functional interface to be used whenever an error occurs:


someObservable
      .doOnError(new Action1() {
         @Override
         public void call(Throwable t) {
            // use this callback to clean up resources,
            // log the event or or report the 
            // problem to the user
         }
      })
      //...

The most important part is the call() method. The code of this method will be executed before the Subscriber's onError() method is called.

In addition to exceptions RxJava offers many more events to which you can react:

Events and their corresponding side effect operations
Method Functional Interface Event
doOnSubscribe() Action0 A subscriber subscribes to the Observable
doOnUnsubscribe() Action0 A subscriber unsubscribes from the subscription
doOnNext() Action1<T> The next item is emitted
doOnCompleted() Action0 The Observable will emit no more items
doOnError() Action1<T> An error occurred
doOnTerminate() Action0 Either an error occurred or the Observable will emit no more items - will be called before the termination methods
finallyDo() Action0 Either an error occurred or the Observable will emit no more items - will be called after the termination methods
doOnEach() Action1<Notification<T>> Either an item was emitted, the Observable completes or an error occurred. The Notification object contains information about the type of event
doOnRequest() Action1<Long> A downstream operator requests to emit more items

The <T> refers either to the type of the item emitted or, in the case of the onError() method, the type of the Throwable thrown.

The functional interfaces are all of type Action0 or Action1. This means that the single methods of these interfaces do not return anything and take either zero arguments or one argument, depending on the specific event.

Since those methods do not return anything, they cannot be used to change the emitted items and thus do not change the stream of items in any way. Instead these methods are intended to cause side effects like writing something on disk, cleaning up state or anything else that manipulates the state of the system itself instead of the stream of events.

Note: The side effect methods themselves (doOnNext(), doOnCompleted() and so on) do return an Observable. That's to keep the interface fluent. But the returned Observable is of the same type and emits the same items as the source Observable.

What are they useful for?

Now since they do not change the stream of items there must be other uses for them. I present here three examples of what you can achieve using these methods:

So let's see these examples in detail.

Use doOnNext() for debugging

With RxJava you sometimes wonder why your Observable isn't working as expected. Especially when you are just starting out. Since you use a fluent API to transform some source into something that you want to subscribe to, you only see what you get at the end of this transformation pipeline.

When I was learning about RxJava I had some initial experience with Java's Streams. Basically you have the same problem there. You have a fluid API to move from one Stream of some type to another Stream of another type. But what if it doesn't work as expected?

With Java 8 Streams you have the peek() method. So when starting out with RxJava I wondered if something comparable is available. Well, there is. Actually, RxJava offers much more!

You can use the doOnNext() method anywhere in your processing pipeline to see what is happening and what the intermediary result is.

Here's an example of this:


Observable someObservable = Observable
            .from(Arrays.asList(new Integer[]{2, 3, 5, 7, 11}))
            .doOnNext(System.out::println)
            .filter(prime -> prime % 2 != 0)
            .doOnNext(System.out::println)
            .count()
            .doOnNext(System.out::println)
            .map(number -> String.format("Contains %d elements", number));

Subscription subscription = o.subscribe(
            System.out::println,
            System.out::println,
            () -> System.out.println("Completed!"));

And here is the output of that code:


2
3
3
5
5
7
7
11
11
4
Contains 4 elements
Completed!

That way you can glean valuable information about what is going on when your Observable doesn't behave as you expected.

The doOnError() and doOnCompleted() methods can also be useful for debugging the state of your pipeline.

Note: If you're using RxJava while developing for Android please have a look at the Frodo and Fernando Ceja's post explaining about the motivation for and usage of Frodo. With Frodo you can use annotations to debug your Observables and Subscribers.

The shown way of using doOnNext() and doOnError() does not change much of the system state - apart from bloating your log and slowing everything down. 🙂

But there are other uses for these operators. And in those cases you use those methods to actually change the state of your system. Let’s have a look at them.

Use doOnError() within flatMap()

Say you're using Retrofit to access some resource over the network. Since Retrofit supports observables, you can easily use those calls within your processing chain using flatMap().

Alas, network related calls can go wrong in many ways - especially on mobiles. In this case you might not want the Observable to stop working, which it would if you were to rely on your subscriber's onError() callback alone.

But keep in mind that you have an Observable within your flatMap() method. Thus you could use the doOnError() method to change the UI in some way, yet still have a working Observable stream for future events.

So what this looks like is this:


flatMap(id -> service.getPost()
       .doOnError(t -> {
          // report problem to UI
       })
       .onErrorResumeNext(Observable.empty())
)

This method is especially useful if you query your remote resource as a result of potentially recurring UI events.

Use doOnNext() to save/cache network results

If at some point in your chain you make network calls, you could use doOnNext() to store the incoming results to your local database or put them in some cache.

It would be as simple as the following lines:


// getOrderById is getting a fresh order 
// from the net and returns an observable of orders
// Observable<Order> getOrderById(long id) {...}

Observable.from(aListWithIds)
         .flatMap(id -> getOrderById(id)
                              .doOnNext(order -> cacheOrder(order))
         // carry on with more processing

See this pattern applied in more detail in Daniel Lew's excellent blog post about accessing multiple sources.

Wrap up

As you've seen, you can use the side effect methods of RxJava in multiple ways. Even though they do not change the stream of emitted items, they change the state of your overall system. This can be something as simple as logging the current items of your Observable at a certain point within your processing pipeline, up to writing objects to your database as a result of a network call.

In my next post I am going to show you how to use RxJava's hooks to get further insights. Stay tuned!

Wolfram Rittmeyer lives in Germany and has been developing with Java for many years.

He has been interested in Android for quite a while and has been blogging about all kind of topics around Android.

You can find him on Google+ and Twitter.