In life of every programmer comes time to test that code. This moment is especially hard with new concepts or even paradigms. Good example here is RxJava - you can do magic with it, but testing magic is not so trivial. So, here comes the most important thing: how to make all RxJava chains synchronous.

Let’s assume you want to test that code:

void displayImages() {
        Observable.fromArray("british", "siamese", "persian")
                .map(cat -> downloadAllImages(cat))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(
                        images -> display(images),
                        error -> display(error)
                );
    }

Nothing fancy here - download all images (which is a long operation) for a few cat breeds. Because we are good programmers we do long running operations on background thread and just before displaying results we go back to main thread (of course any other thread will do for this example).

In test we might write something like

displayImages();
assertImagesAreDisplayed();

The problem is: this test will not work, because work is scheduled on background thread.

There are three possibilities, where first two are not related to the article title:

  1. Make your tested function return Observable/Flowable and use test() function. This is simple solution but good mainly for unit testing. And requires additional refactoring, which is sometimes not possible (or too complicated).
  2. Pass used Scheduler as dependency. This is it, right? When you have problem with testing something, just extract it as a dependency. But I think it’s an overkill in this case and would make even simple object require too many dependencies. I love dependency injection, but every love has a limit.
  3. Configure RxJava to make all schedulers synchronous. Yep, that’s a big win.

Unfortunately, it’s a win that you might never hear about, unless you explicitly search for it. But it’s there and it’s very simple. All you have to do is:

RxJavaPlugins.setInitIoSchedulerHandler(c -> Schedulers.from(Runnable::run));

And boom, your IO scheduler is now synchronous.

What is done here is intercepting creation of IO Scheduler and instead providing our own. And our own is created by calling static Scheduler from(Executor executor). Executor is very simple interface with one method only: void execute(Runnable r) so we can implement it with Runnable::run.

Usually Executor receives some task Runnable and puts in into a thread pool. Later on this task will be ran on another thread from that pool. In this case, we are saying ‘when I receive some task… just run it.’ No threads, nothing. That makes this solution synchronous - we run the task immediately on the current thread.

So basically it says that IO Scheduler should just run the task now on current thread. Which means it will be blocking - just what we need here.

For concrete use case I wrote before the solution would be to add these lines:

RxAndroidPlugins.setInitMainThreadSchedulerHandler(c -> Schedulers.from(Runnable::run));
RxJavaPlugins.setInitIoSchedulerHandler(c -> Schedulers.from(Runnable::run));

or even better

Scheduler synchronousScheduler = Schedulers.from(Runnable::run);
RxAndroidPlugins.setInitMainThreadSchedulerHandler(c -> synchronousScheduler);
RxJavaPlugins.setInitIoSchedulerHandler(c -> synchronousScheduler);

And analogically for every other Scheduler, apart from trampoline() but I don’t see it used very often, so that’s not a big issue.

Of course one might say that it’s not very clean UncleBob-y and Objected-Oriented solution, we are using global initialization. And one might be right. But I think that the simplicity of the solution far outweighs clean code drawbacks. It’s great that RxJava creators provided us with such handy testing tools.