rxjava

rxjava

Make your web application reactive with RxJava.

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

dependency

<dependency>
 <groupId>org.jooby</groupId>
 <artifactId>jooby-rxjava</artifactId>
 <version>1.6.6</version>
</dependency>

exports

  • map route operator: Rx.rx() that converts Observable (and family) into Deferred API.
  • manage the lifecycle of Schedulers and make sure they go down on application shutdown time.

usage

...
import org.jooby.rx.Rx;
...
{
  use(new Rx());

  get("/", () -> Observable.from("reactive programming in jooby!"));

}

how it works?

Previous example is translated to:

{
  use(new Rx());

  get("/", req -> {
   return new Deferred(deferred -> {
     Observable.from("reactive programming in jooby!")
       .subscribe(deferred::resolve, deferred::reject);
   });
  });

}

Translation is done via Rx.rx() route operator. If you are a RxJava programmer then you don’t need to worry for learning a new API and semantic. The Rx.rx() route operator deal and take cares of the Deferred API.

rx mapper

Advanced observable configuration is allowed via adapter function:

...
import org.jooby.rx.Rx;
...
{
  use(new Rx()
      .withObservable(observable -> observable.observeOn(Schedulers.io())));

  get("/observable", req -> Observable...);

}

The observable adapter function allow you to customize observables from routes.

schedulers

This module provides the default Scheduler from RxJava. But also let you define your own Scheduler using the executor module.

rx.schedulers.io = forkjoin
rx.schedulers.computation = fixed
rx.schedulers.newThread = "fixed = 10"

The previous example defines a:

  • forkjoin pool for Schedulers#io()
  • fixed thread pool equals to the number of available processors for Schedulers#computation()
  • fixed thread pool with a max of 10 for Schedulers#newThread()

Of course, you can define/override all, some or none of them. In any case the Scheduler will be shutdown at application shutdown time.