RxJava on Android - Where To Start The Journey

Deprecation Warning

This article talks about RxJava 1. Since it was written, RxJava 2 came out, which contains a lot of breaking changes, new and completely rewritten apis. So while the basics are still the same, read any specifics in this article if you try to use RxJava 1!

If You are in the process of upgrading from RxJava1 to RxJava2 I recommend reading an article like this about it first.

UPDATE 2016.09.09.

Fixed errors in example code snippets and added link to the sample repo. Thx for Alvaro Crespo for the notice!

If you often read through developer forums, blogs, and other sites, you most likely came across RxJava. It creates such a buzz between developers that it is pretty hard to miss, and there is a good reason for that. Sadly, almost every article about the topic begins with the notion that learning RxJava is very hard, even approaching it is difficult, and there is a lack of good materials. I do think that it discourages many developers to even start looking into it. So instead of writing another technical introductory tutorial I will try to talk about the way you can begin exploring the Wast Unknown Reactive Territory of Android Programming, pointing at good learning resources and talking about some of the hurdles you are likely to come across.

I myself went through this when I started learning RxJava and as a junior developer I think I have a good understanding on what is like encountering it without extensive prior knowledge.

First of all the saying that there is not enough good material is becoming less true everyday. Even so, that in the time of writing this article I feel more like that finding the really good ones to start is harder, because new articles appearing everywhere. The basics are rarely exmplained well, so I will begin with that.

Grasping the basics

The big problem that most of the people talking about RxJava makes, that they somehow think about the core principles of RxJava like it is some kind of magic, and you have to become a sensei to fully understand. I think that when you don’t want to over complicate it, and are able to look at the basics of it, it is quite simple. It incorporates three things: the Observable Pattern, the Iterator Pattern and Functional Programming.

A sequence diagram of the Java Observable pattern implementation

In the Observable Pattern, there is two important players: the Subject (sometimes called Observable) and the Observer. The subject has a list of objects that are interested in the state changes of it: these are the observers. An observer subscribes to the changes of the subject/observable so a change in its state will call a method of the observer which can handle this event in any way it wants. As long as the observer is subscribed, the subject/observable will continue to notify it on every state changes. When an observer is no longer interested in these events, it can unsubscribe from the subject/observable.

RxJava calls things a little bit different. The Subject in the observable pattern is always called Observable in RxJava, and the Observer in the observable pattern is called Subscriber. Unfortunately these names are regularly confused, so make sure you remember the correct ones used in RxJava (Observable and Subscriber).

Similarly to the observable pattern, RxJava’s Observables emit events, and Subscribers in that subscribed to them receives these events. They way that these events are emitted is similar to an Iterator (in the Iterator Pattern): new events can be handled inside the onNext method. The one unique thing is that observables may not emit any events as long as no one subscribed to them (these are called cold observables, I will touch on these later), and can come to a state when they will not emit anything else at all. On top of that, RxJava adds a ton of different operators, which can be used to apply different transformations to the emitted events, or even piece chain together different events in some way. Multiple operators can be applied to events, they can be chained together in any desired way (more on that in a bit).

// Creating a simple Subscriber
Subscriber<String> mySubscriber = new Subscriber<String>() {  
    @Override
    public void onNext(String s) {
        // This is where you can do something with
        // the incoming value from the observable
    }

    @Override
    public void onCompleted() {
        // This method is called when the observable 
        // finished emitting events
    }

    @Override
    public void onError(Throwable e) {
        // This method is called when the observable encountered
        // an unrecoverable error
    }
};

// Creating a simple Observable from a method 
// (which supposedly takes a long time to execute
// and returns a String Array, Iterable<String>
// or Future<String> as mySubsciber's
// generic type parameter is declared as String
// and subscribing to it
// and storing the resulted subscription in a variable
Subscription s = Observable.from(longOperation())  
    .subscribe(mySubscriber)

// Unsubscribe when no longer interested on the events 
// emitted by the observable
s.unsubscribe();

// Creating an Observable another way
// The just method only emits one event
// (one string in this example)
// than completes
Observable.just("Hello, world!");  

One thing that is a bit harder to grasp is hot and cold observables. Basically a hot observable is one that can emit events whenever it wants, even when no one subscribes to it. When a Subscriber subscribes to a hot observable, it only gets the events emitted after the subscription. A cold observable on the other hand, only emits event when someone is subscribed to it. The values from a hot observable are shared between all its subscriber, while the values from a cold observable are not shared. The difficulty is that nothing differentiates these two types of observables in code, so the difference is only in behaviour. You should look into examples about it to fully understand the difference.

// A typical hot observable that emits events every 10 seconds, 
// even without any subscribers
Observable.interval(10, TimeUnit.SECONDS);

// A cold observable that only emits events 
// when a Subscriber subscribes to it
// The defer operator does not create an Observable 
// until someone subscribes to it
// and creates a fresh observable for each Subscriber
// You can wrap any Observables with the defer operator
Observable<String> o = Observable.defer(new Func0<Observable<String>>() {  
    @Override
    public Observable<String> call() {
        return Observable.just("Hello, world!");
    }
});

// The above observable will only emit events 
// when a Subscriber subscribes to it
Subscription s = o.subscribe(mySubscriber);  

So in principle, it is really not that hard to understand. The complicated part comes when you try to use it, and go deeper into the more complicated scenarios, because even if it is simple in it’s core, RxJava require at least some understanding of functional programming (the transformation with chained operators), but in turn it enables you to write concise solutions to complicated problems, and handle asynchronous calls like a pro. So let’s look into that.

Smooth Operator

Operators are a big part of RxJava, it comes with a huge collection of them. They are there to make transformations on the emitted data, independently of the observables or the subscribers. They take the event from the input observable or observables and transforms them to a new observable. And that’s the key thing about them, you can chain multiple operators, and manipulate data without the subscriber depending on another element - the only need is that the operators must work with the correct input and output data. They are like pipes and filters on Unix.

// An Observable emitting a single number
// and a map operator which does some kind of transformation on
// the input, or return a completely different thing
// here we return an object by calling an api with an id of it
Observable.just(getMyObjectId())  
     .map(new Func1<Long, MyObject>() {
        @Override
        public MyObject call(Long l) {
            return myApi.getObjectWithId(l);
        }
    })
    .subscribe(new Action1<MyObject>() {
        @Override
        public void call(MyObject o) {
            Log.d(TAG, "my object " + o.toString());
        }
    });

Be advised, that there are a tons of RxJava operators, and it is not necessary to learn all of them at once. Most likely you cannot even keep in mind most of them, and they will only stick when you came across a particular problem which can be solved with one of them. It is a good approach to learn the operators (like Map, FlatMap, Zip, Filter) in the basic tutorials, and later, when you have a particular problem, try to find a tutorial or stack overflow question with that problem - it is likely that someone will say There’s an operator for that!.

When trying to understand how different operators work, I think the best way is to look at marble diagrams that greatly visualizes the operation. These diagrams show that from a certain input, what is the output of each operator. At the top there are always the events that serve as an input to the operator or operators, then the transformation the operator applies to them, and the output is shown at the bottom. The little circles represent the particular events. Most of the marble diagrams on the RxJava page are even interactive, so you can more easily understand how it works.

A marble diagram explaining the Map operator: in this example it multiplies every number the input observable emits by 10

Going async with RxJava

Mobile apps most likely do a lot of network calls, slow database reading/writing, where we don’t want to block the GUI of the application. On Android, we also have Activity and Fragment lifecycle to complicate our lives, so reactive programming can be really handy. Instead of the ugliness of AsyncTask or manual handling of threads, RxJava provides a simple way to do work in the background, and then present the result in the foreground. An even better part is error handling: a huge headache when you try to do manually with multiple threads at play. In RxJava, when an observable reaches an unrecoverable error state (ex. an Exception happens) it calls the onError method of the subscriber, so you can do all the error handling there. Operators don’t have to worry about handling exception, as they will skip to the onError instantly.

Observable.just("Hello, world!")  
    .map(new Func1<String, Integer>() {
        @Override
        public Integer call(String s) {
            return potentialException(s);
        }
    })
    .map(new Func1<Integer, String>() {
        @Override
        public String call(Integer i) {
            return anotherPotentialException(i);
        }
    })
    .subscribe(new Subscriber<String>() {
        @Override
        public void onNext(String s) { 
            Log.d(TAG, "next string: " + s);
        }

        @Override
        public void onCompleted() {
            Log.d(TAG, "completed");
        }

        @Override
        public void onError(Throwable e) { 
            Log.e(TAG, "something went wrong");
        }
    });

So you would like to specify which thread you would like to use and when right? The two particular methods for this are subscribeOn and observeOn. Do not confuse these two. You can think about it as a newspaper subscription: you subscribe to it to get it delivered to you, and then you observe the result when the concrete newspaper arrive to you. You can do these things on separate threads, maybe you subscribe on a background thread but observe on the main thread and display the result on the UI. So by calling subscribeOn, you say that everything from the point of subscribing will run on the thread (or rather scheduler, will explain in a bit) specified as a parameter. If you also call observeOn with a separate thread (scheduler) it means that observing the result will happen on that thread. And subscribeOn and observeOn are just operators, so they can be attached to any Observable. An wherever you put them, all of the operators above subscribeOn will use the scheduler passed as parameter of subscribeOn, and all of the operator below observeOn will use the scheduler passed in as parameter of observeOn. These two operators take care all of the threading.

Observable.from(longOperation())  
    .subscribeOn(thread1)
    .observeOn(theread2)
    .subscribe(mySubscriber);

// Subscribing on thread1 (which is in reallity a scheduler)
// Observing on thread2 (which is in reallity a scheduler)

How threads are switched during execution of the methods above

Observable.from(longOperation())  
    .subscribeOn(thread1)
    .subscribe(mySubscriber);

// Subscribing on thread1
// Observing on thread1

Observable.from(longOperation())  
    .observeOn(thread2)
    .subscribe(mySubscriber);

// Subscribing on the thread you are currently on
// Observing on thread2

I mentioned Schedulers. In RxJava, the concrete threading is abstracted away from you, by the use of schedulers. These are some kind of threads or threadpools, with a couple of them provided with RxJava (like Scheduler.io - a threadpool to run IO type operations, and AndroidSchedulers.mainThread - which is just the well-known Android main thread).

RxJava on Android

RxJava is a simple Java lib that you can also use in Android, but some awesome developers created RxAndroid and its Extensions which add a lot of extra features to it, like binding to lifecycle events (so observers can auto-unsubscribe from observables when the Fragment or Activity is destroyed), Android specific Schedulers (like the AndroidSchedulers.mainThread I mentioned about earlier) and more. You should use that in your Android applications.

// Getting a potentially large resource (like an image) from
// an API that returns an observable on the IO threadpool
// and showing it on the UI using the Android main thread
myApi.getSomeResource(url)  
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<SomeResource>() {
        @Override
        public void call(SomeResource res) {
            showResourceOnUi(res);
        }
    });

Where should you start using RxJava?

As I mentioned RxJava makes asynchronous calls easy and more concise. A good place to start using it is the domain layer of your application. This way, you can do all the hard work on background threads and display all your results on the UI using the main thread. Network calls, database access, these are all good candidates to be done with RxJava. A well structured and clean architecture is achievable with RxJava, it can be used well with architectural patterns like MVP, MVVM.

RxJava even can facilitate advanced features like exponential backoff, retrying, caching and data binding. You can find good examples for these things.

Where not to use RxJava

That is so awesome - you might say -, so let’s use RxJava everywhere, right? No. If you really get into the stuff, you may feel the need to use RxJava as a solution to use cases like for-each loops as well (with the From operator), but please be deliberate on when to use it. Although I said at the beginning that the basics of RxJava are simple - and I meant that - applying it everywhere can in fact lead to code really hard to read, maintain and debug.

Where to go next?

This introductory article is only meant to increase your appetite and encourage you to learn about RxJava. If you liked what you read I advise you to listen to a good podcast or watch a presentation about RxJava where the lecturer (or lecturers) give a good introduction to the topic. I linked a bunch of these at the end. Then reading a couple of tutorials should get you going but most importantly: write code. Reading will only get you so far, it’s better to try out what you learned.

If you plan to use RxJava in your project, first try to find the best candidates where you can apply it. I said that domain logic with asynchronous calls, or data binding are great examples, but you may find other applications for it. But avoid overusing it, and try not to lose simplicity.

This is not the end of my journey either. I just started experimenting with RxJava and I would like to write more about the topic. If you enjoyed this reading, you should come back later.

Sample project

Here is a sample Android repository which contains all the examples appearing in this post:
https://github.com/AutSoft/RxJavaOnAndroidArticleExamples

Appendix: RetroLambda

There is a nice library that goes well with RxJava: RetroLambda. It basically plugs into the gradle build process and translates Java 8 lambdas to Java 7 stuff. It allows you to write code with using lambda expressions, thus making your code much more concise. It is useful as Android is still stuck on Java 7, but it could produce some hard to debug problems. I personally suggest that you start learning RxJava without it, and then - when you are confident and interested enough - start using RetroLambda.

But I leave it up to you. If you have no problem learning two things at once - and can be careful not to use other Java 8 features by accident (as with RetroLambda, you have to set the Java version to 8, but the library can only translate lamdbas and not other Java 8-only code features) - you may be better of with the better looking code. Powerful IDEs, like Android Studio provides code folding so the problem that Retrolambda solves is really not that big. Be aware though that a lot of articles about RxJava is written using RetroLambda.

// A simple RxJava example that prints Hello, world to LogCat
Observable.just("Hello, world!")  
    .subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            Log.d(TAG, s);
        }
    });

// The same example with lambdas
Observable.just("Hello, world!")  
    .subscribe(s -> Log.d(TAG, s));

Resources

The Observable Pattern
https://en.wikipedia.org/wiki/Observer_pattern

The Iterator Pattern
https://en.wikipedia.org/wiki/Iterator_pattern

Functional Programming
https://en.wikipedia.org/wiki/Functional_programming

ReactiveX official website
http://reactivex.io/

RxJava GitHub page
https://github.com/ReactiveX/RxJava

RxAndroid GitHub page
https://github.com/ReactiveX/RxAndroid

The RxJava Show with Dan Lew (Fragmented Podcast) - Two Parts
http://fragmentedpodcast.com/episodes/3/
http://fragmentedpodcast.com/episodes/4/

Learning RxJava (for Android) by Example
https://www.youtube.com/watch?v=k3D0cWyNno4
https://github.com/kaushikgopal/RxJava-Android-Samples

Grokking RxJava - Four Parts
http://blog.danlew.net/2014/09/15/grokking-rxjava-part-1/
http://blog.danlew.net/2014/09/22/grokking-rxjava-part-2/
http://blog.danlew.net/2014/09/30/grokking-rxjava-part-3/
http://blog.danlew.net/2014/10/08/grokking-rxjava-part-4/

The introduction to Reactive Programming you've been missing
https://gist.github.com/staltz/868e7e9bc2a7b8c1f754

RxJava and Retrolambda Making Android
https://www.youtube.com/watch?v=vRl3u1I9v2M

Interactive Marble Diagrams
http://rxmarbles.com/

Cold vs hot observables (JavaScript language, but a good explanation)
https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/creating.md#cold-vs-hot-observables

RxJava Android Samples
https://github.com/kaushikgopal/RxJava-Android-Samples

Deferring observable code until subscription in RxJava
http://blog.danlew.net/2015/07/23/deferring-observable-code-until-subscription-in-rxjava/