Reactive Streams
Reactive = Observer pattern + Iterator pattern + Functional programming
Codes Like Sync,Works Like Async
Subscriber側
基準に従う場合
// Java 9
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
// RxJava 2
import io.reactivex.FlowableSubscriber;
import org.reactivestreams.Subscription;
// AkkaまたはReactor
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
class NewsSubscriber implements {↑}<News> {
private Subscription subscription;
private static final int MAX_NEWS = 3;
private int newsReceived = 0;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(News item) {
newsReceived++;
if (newsReceived >= MAX_NEWS) {
subscription.cancel();
return;
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.err.printf("error occurred: %s\n", throwable.getMessage());
throwable.printStackTrace(System.err);
}
@Override
public void onComplete() {
System.out.println("completed");
}
}
特殊な場合(RxJava 1)
import rx.Subscriber;
class NewsSubscriber implements Subscriber<News> {
private static final int MAX_NEWS = 3;
private int newsReceived = 0;
@Override
public void onStart() {
request(1);
}
@Override
public void onNext(News item) {
newsReceived++;
if (newsReceived >= MAX_NEWS) {
unsubscribe();
return;
}
request(1);
}
@Override
public void onError(Throwable throwable) {...}
@Override
public void onCompleted() {...}
}
Publisher側
Java 9
import java.util.concurrent.SubmissionPublisher;
try (SubmissionPublisher<News> newsPublisher = new SubmissionPublisher()) {
newsPublisher.subscribe(new NewsSubscriber());
List.of(
News.create("News A"),
News.create("News B"))
.forEach(newsPublisher::submit);
while (newsPublisher.hasSubscribers()) {
// wait
}
}
RxJava 2
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.1.8</version>
</dependency>
import io.reactivex.Flowable;
Flowable.just(
News.create("News A"),
News.create("News B"))
.subscribe(new NewsSubscriber());
Akka
<dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.12</artifactId>
<version>2.5.3</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream_2.12</artifactId>
<version>2.5.3</version>
</dependency>
</dependencies>
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.AsPublisher;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import org.reactivestreams.Publisher;
final ActorSystem system = ActorSystem.create("sample-system");
final Materializer materializer = ActorMaterializer.create(system);
final Publisher<News> publisher =
Source.from(List.of(
News.create("News A"),
News.create("News B")))
.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);
publisher.subscribe(new NewsSubscriber());
Reactor
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.1.2.RELEASE</version>
</dependency>
import reactor.core.publisher.Flux;
Flux.just(
News.create("News A"),
News.create("News B"))
.subscribe(new NewsSubscriber());
RxJava 1
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.3.4</version>
</dependency>
import rx.Observable;
Observable.just(
News.create("News A"),
News.create("News B"))
.subscribe(new NewsSubscriber());
RxJava