Reactive Streams

Reactive = Observer pattern + Iterator pattern + Functional programming

Codes Like Sync,Works Like Async

Subscriber側

基準に従う場合

// Java 9

import java.util.concurrent.Flow.Subscriber;

import java.util.concurrent.Flow.Subscription;

// RxJava 2

import io.reactivex.FlowableSubscriber;

import org.reactivestreams.Subscription;

// AkkaまたはReactor

import org.reactivestreams.Subscriber;

import org.reactivestreams.Subscription;

class NewsSubscriber implements {↑}<News> {

private Subscription subscription;

private static final int MAX_NEWS = 3;

private int newsReceived = 0;

@Override

public void onSubscribe(Subscription subscription) {

this.subscription = subscription;

subscription.request(1);

}

@Override

public void onNext(News item) {

newsReceived++;

if (newsReceived >= MAX_NEWS) {

subscription.cancel();

return;

}

subscription.request(1);

}

@Override

public void onError(Throwable throwable) {

System.err.printf("error occurred: %s\n", throwable.getMessage());

throwable.printStackTrace(System.err);

}

@Override

public void onComplete() {

System.out.println("completed");

}

}

特殊な場合(RxJava 1)

import rx.Subscriber;

class NewsSubscriber implements Subscriber<News> {

private static final int MAX_NEWS = 3;

private int newsReceived = 0;

@Override

public void onStart() {

request(1);

}

@Override

public void onNext(News item) {

newsReceived++;

if (newsReceived >= MAX_NEWS) {

unsubscribe();

return;

}

request(1);

}

@Override

public void onError(Throwable throwable) {...}

@Override

public void onCompleted() {...}

}

Publisher側

Java 9

import java.util.concurrent.SubmissionPublisher;

try (SubmissionPublisher<News> newsPublisher = new SubmissionPublisher()) {

newsPublisher.subscribe(new NewsSubscriber());

List.of(

News.create("News A"),

News.create("News B"))

.forEach(newsPublisher::submit);

while (newsPublisher.hasSubscribers()) {

// wait

}

}

RxJava 2

<dependency>

<groupId>io.reactivex.rxjava2</groupId>

<artifactId>rxjava</artifactId>

<version>2.1.8</version>

</dependency>

import io.reactivex.Flowable;

Flowable.just(

News.create("News A"),

News.create("News B"))

.subscribe(new NewsSubscriber());

Akka

<dependencies>

<dependency>

<groupId>com.typesafe.akka</groupId>

<artifactId>akka-actor_2.12</artifactId>

<version>2.5.3</version>

</dependency>

<dependency>

<groupId>com.typesafe.akka</groupId>

<artifactId>akka-stream_2.12</artifactId>

<version>2.5.3</version>

</dependency>

</dependencies>

import akka.actor.ActorSystem;

import akka.stream.ActorMaterializer;

import akka.stream.Materializer;

import akka.stream.javadsl.AsPublisher;

import akka.stream.javadsl.Sink;

import akka.stream.javadsl.Source;

import org.reactivestreams.Publisher;

final ActorSystem system = ActorSystem.create("sample-system");

final Materializer materializer = ActorMaterializer.create(system);

final Publisher<News> publisher =

Source.from(List.of(

News.create("News A"),

News.create("News B")))

.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);

publisher.subscribe(new NewsSubscriber());

Reactor

<dependency>

<groupId>io.projectreactor</groupId>

<artifactId>reactor-core</artifactId>

<version>3.1.2.RELEASE</version>

</dependency>

import reactor.core.publisher.Flux;

Flux.just(

News.create("News A"),

News.create("News B"))

.subscribe(new NewsSubscriber());

RxJava 1

<dependency>

<groupId>io.reactivex</groupId>

<artifactId>rxjava</artifactId>

<version>1.3.4</version>

</dependency>

import rx.Observable;

Observable.just(

News.create("News A"),

News.create("News B"))

.subscribe(new NewsSubscriber());

RxJava