public class Rx extends Exec
Reactive programming via RxJava library.
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
Observable
(and family) into Deferred
API. schedulers
and make sure they go down on application shutdown time.
...
import org.jooby.rx.Rx;
...
{
use(new Rx());
get("/", req -> Observable.from("reactive programming in jooby!"));
}
Previous example is translated to:
{
use(new Rx());
get("/", req -> {
return new Deferred(deferred -> {
Observable.from("reactive programming in jooby!")
.subscribe(deferred::resolve, deferred::reject);
});
});
}
Translation is done via rx()
route mapper. If you are a rxjava programmer then you don't need to worry for learning a new API and semantic. The rx()
route mapper deal and take cares of the Deferred
API.
Advanced observable configuration is allowed via function adapter:
...
import org.jooby.rx.Rx;
...
{
use(new Rx()
.withObservable(observable -> observable.observeOn(Scheduler.io()),
.withSingle(single -> single.observeOn(Scheduler.io()),
.withCompletable(completable -> completable.observeOn(Scheduler.io()));
get("/observable", req -> Observable...);
get("/single", req -> Single...);
....
get("/completable", req -> Completable...);
}
Here every Observable/Single/Completable from a route handler will observe on the io
scheduler.
This module provides the default Scheduler
from rxjava. But also let you define your own scheduler
using the Exec
module.
rx.schedulers.io = forkjoin rx.schedulers.computation = fixed rx.schedulers.newThread = "fixed = 10"
The previous example defines a:
Schedulers.io()
Schedulers.computation()
Schedulers.newThread()
Of course, you can define/override all, some or none of them. In any case the Scheduler
will be shutdown at application shutdown time.
Modifier and Type | Method and Description |
---|---|
com.typesafe.config.Config |
config() |
void |
configure(Env env, com.typesafe.config.Config conf, com.google.inject.Binder binder)
Configure and produces bindings for the underlying application.
|
static Route.Mapper<Object> |
rx()
|
static Route.Mapper<Object> |
rx(Function<rx.Observable,rx.Observable> observable, Function<rx.Single,rx.Single> single)
|
static Route.Mapper<Object> |
rx(Function<rx.Observable,rx.Observable> observable, Function<rx.Single,rx.Single> single, Function<rx.Completable,rx.Completable> completable)
|
Rx |
withCompletable(Function<rx.Completable,rx.Completable> adapter)
Apply the given function adapter to completable returned by routes:
|
Rx |
withObservable(Function<rx.Observable,rx.Observable> adapter)
Apply the given function adapter to observables returned by routes:
|
Rx |
withSingle(Function<rx.Single,rx.Single> adapter)
Apply the given function adapter to single returned by routes:
|
public Rx()
Rx
module.
public static Route.Mapper<Object> rx()
Observable
, Single
or Completable
into a Deferred
object.
...
import org.jooby.rx.Rx;
...
{
with(() -> {
get("/1", req -> Observable...);
get("/2", req -> Single...);
....
get("/N", req -> Completable...);
}).map(Rx.rx());
}
public static Route.Mapper<Object> rx(Function<rx.Observable,rx.Observable> observable, Function<rx.Single,rx.Single> single)
Observable
, Single
or Completable
into a Deferred
object.
...
import org.jooby.rx.Rx;
...
{
use(new Rx());
with(() -> {
get("/1", req -> Observable...);
get("/2", req -> Observable...);
....
get("/N", req -> Observable...);
}).map(Rx.rx(
observable -> observable.observeOn(Scheduler.io()),
single -> single.observeOn(Scheduler.io()),
completable -> completable.observeOn(Scheduler.io())));
}
observable
- Observable adapter.
single
- Single adapter.
public static Route.Mapper<Object> rx(Function<rx.Observable,rx.Observable> observable, Function<rx.Single,rx.Single> single, Function<rx.Completable,rx.Completable> completable)
Observable
, Single
or Completable
into a Deferred
object.
...
import org.jooby.rx.Rx;
...
{
use(new Rx());
with(() -> {
get("/1", req -> Observable...);
get("/2", req -> Observable...);
....
get("/N", req -> Observable...);
}).map(Rx.rx(
observable -> observable.observeOn(Scheduler.io()),
single -> single.observeOn(Scheduler.io()),
completable -> completable.observeOn(Scheduler.io())));
}
observable
- Observable adapter.
single
- Single adapter.
completable
- Completable adapter.
public Rx withObservable(Function<rx.Observable,rx.Observable> adapter)
{
use(new Rx().withObservable(observable -> observable.observeOn(Schedulers.io())));
get("observable", -> {
return Observable...
});
}
adapter
- Observable adapter.
public Rx withSingle(Function<rx.Single,rx.Single> adapter)
{
use(new Rx().withSingle(observable -> observable.observeOn(Schedulers.io())));
get("single", -> {
return Single...
});
}
adapter
- Single adapter.
public Rx withCompletable(Function<rx.Completable,rx.Completable> adapter)
{
use(new Rx().withObservable(observable -> observable.observeOn(Schedulers.io())));
get("completable", -> {
return Completable...
});
}
adapter
- Completable adapter.
public void configure(Env env, com.typesafe.config.Config conf, com.google.inject.Binder binder)
Jooby.Module
application env
and/or the current application properties available from Config
.
configure
in interface Jooby.Module
configure
in class Exec
env
- The current application's env. Not null.
conf
- The current config object. Not null.
binder
- A guice binder. Not null.
public com.typesafe.config.Config config()
config
in interface Jooby.Module
config
in class Exec
Copyright © 2019. All rights reserved.