Hands-On Reactive Programming in Spring 5
上QQ阅读APP看书,第一时间看更新

Producing and consuming streams

At this point, we should be familiar enough with the RxJava library to create our first small application. Let's define a stream that is represented by the Observable class. At the moment, we may assume that the Observable is a sort of generator that knows how to propagate events for subscribers as soon as they subscribe:

Observable<String> observale = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) { // (1)
sub.onNext("Hello, reactive world!"); // (2)
sub.onCompleted(); // (3)
}
}
);

So, here we create an Observable with a callback that will be applied as soon as the Subscriber appears (1). At that moment, our Observer will produce a one string value (2) and then signal the end of the stream to the subscriber (3). We can also improve this code using the Java 8 lambdas:

Observable<String> observable = Observable.create(
sub -> {
sub.onNext("Hello, reactive world!");
sub.onCompleted();
}
);

In contrast with the Java Stream API, Observable is reusable, and each subscriber will receive the Hello, reactive world! event just after the subscription.

Note that, from RxJava 1.2.7 onward, the  Observable creation has been deprecated and treated as unsafe because it may generate too many elements and overload the subscriber. In other words, this approach does not support backpressure, a concept that we are going to examine later in detail. However, that code is still valid for the sake of introduction.

So, now we need a Subscriber, as shown in the following code:

Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) { // (1)
System.out.println(s);
}

@Override
public void onCompleted() { // (2)
System.out.println("Done!");
}

@Override
public void onError(Throwable e) { // (3)
System.err.println(e);
}
};

As we can see, the Subscriber has to implement the Observer methods and define the reactions for new events (1), stream completion (2), and errors (3). Now, let's hook the observable and subscriber instances together:

observable.subscribe(subscriber);

When running the mentioned code, the program generates the following output:

Hello, reactive world!
Done!

Hooray! We have just written a small and simple reactive hello-world application! As we may suspect, we may rewrite this example using lambdas, as shown in the following code:

Observable.create(
sub -> {
sub.onNext("Hello, reactive world!");
sub.onCompleted();
}
).subscribe(
System.out::println,
System.err::println,
() -> System.out.println("Done!")
);

The RxJava library gives a lot of flexibility in order to create Observable and Subscriber instances. It is possible to create an Observable instance just by referencing elements, by using an old-style array, or from the Iterable collection, as follows:

Observable.just("1", "2", "3", "4");
Observable.from(new String[]{"A", "B", "C"});
Observable.from(Collections.emptyList());

It is also possible to reference a Callable (1) or even a Future (2), as shown in the following code:

Observable<String> hello = Observable.fromCallable(() -> "Hello ");  // (1)
Future<String> future =
Executors.newCachedThreadPool().submit(() -> "World");
Observable<String> world = Observable.from(future); // (2)

Moreover, along with the plain creational functionality, the Observable stream may be created by combining other Observable instances, which allows for easy implementation of pretty complicated workflows. For example, the concat() operator for each of the incoming streams consumes all items by re-sending them to the downstream observer. Incoming streams will then be processed until a terminal operation (onComplete(), onError()) occurs, and the order of processing is the same as the order of the concat() arguments. The following code demonstrates an example of the concat() usage:

Observable.concat(hello, world, Observable.just("!"))
.forEach(System.out::print);

Here, as part of a straightforward combination of a few Observable instances that use different origins, we also iterate through the result with the Observable.forEach() method in a way that is similar to the Java 8 Stream API. Such a program generates the following output:

Hello World!
Note that even though it is convenient to not define handlers for exceptions, in the case where an error occurs, the default Subscriber implementation throws rx.exceptions.OnErrorNotImplementedException .