rxjava
Make your web application reactive with RxJava.
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
dependency
<dependency>
<groupId>org.jooby</groupId>
<artifactId>jooby-rxjava</artifactId>
<version>1.6.6</version>
</dependency>
exports
- map route operator: Rx.rx() that converts
Observable
(and family) into Deferred API. - manage the lifecycle of
Schedulers
and make sure they go down on application shutdown time.
usage
...
import org.jooby.rx.Rx;
...
{
use(new Rx());
get("/", () -> Observable.from("reactive programming in jooby!"));
}
how it works?
Previous example is translated to:
{
use(new Rx());
get("/", req -> {
return new Deferred(deferred -> {
Observable.from("reactive programming in jooby!")
.subscribe(deferred::resolve, deferred::reject);
});
});
}
Translation is done via Rx.rx() route operator. If you are a RxJava programmer then you don’t need to worry for learning a new API and semantic. The Rx.rx() route operator deal and take cares of the Deferred API.
rx mapper
Advanced observable configuration is allowed via adapter function:
...
import org.jooby.rx.Rx;
...
{
use(new Rx()
.withObservable(observable -> observable.observeOn(Schedulers.io())));
get("/observable", req -> Observable...);
}
The observable adapter function allow you to customize observables from routes.
schedulers
This module provides the default Scheduler
from RxJava. But also let you define your own Scheduler
using the executor module.
rx.schedulers.io = forkjoin
rx.schedulers.computation = fixed
rx.schedulers.newThread = "fixed = 10"
The previous example defines a:
- forkjoin pool for
Schedulers#io()
- fixed thread pool equals to the number of available processors for
Schedulers#computation()
- fixed thread pool with a max of 10 for
Schedulers#newThread()
Of course, you can define/override all, some or none of them. In any case the Scheduler
will be shutdown at application shutdown time.