非Reactiveな非同期処理をFluxで扱う

はじめに

Reactive Springを読んでいて、Flux.create()を使えば非Reactiveな非同期のイベントをReactiveのProducerにアダプトすることができるみたいなことが書かれていたので、実際に使って見て理解を深めたいと思います。

Flux.create()?

Flux.create(Consumer<? super FluxSink> emitter)プログラマティックにFluxを生成するためのFactory で、非同期や同期で生成した値をFluxSink APIを通してエミットすることができます。このFactoryを使えばJMS Brockerのイベントやマルチスレッドアプリケーションで生成した値をエレメントとしてエミットすることができます。

実際にやってみる

それでは早速やっていきたいと思います。
タイトルの通り今回はTreadを作って非同期に実行されるスレッドからエレメントをエミットしたいと思います。

環境

$ java --version
openjdk version "15" 2020-09-15
OpenJDK Runtime Environment (build 15+36-1562)
OpenJDK 64-Bit Server VM (build 15+36-1562, mixed mode, sharing)


$ mvn --version
Apache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 15, vendor: Oracle Corporation, runtime: /home/yuya-hirooka/.sdkman/candidates/java/15-open
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-59-generic", arch: "amd64", family: "unix"

Pom

今回は、ReactorとJunit5を利用したいので、Pomには以下の設定を追加しました。

    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.7.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.7.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>2020.0.2</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
            <plugin>
                <artifactId>maven-failsafe-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
        </plugins>
    </build>

コードを書く

準備が整ったところで早速コードを書いていきます。
実際のコードは以下のとおりとなります。

public class FluxCreateTest {

    private final ExecutorService es = Executors.newFixedThreadPool(1);

    @Test
    void exec() {
        System.out.println("Thread Name = " + Thread.currentThread().getName());
        Flux<String> values = Flux.create(emitter -> this.execTask(emitter, 5));

        StepVerifier
                .create(values.doFinally(signalType -> this.es.shutdown()))
                .expectNext("exec number: 1")
                .expectNext("exec number: 2")
                .expectNext("exec number: 3")
                .expectNext("exec number: 4")
                .expectNext("exec number: 5")
                .verifyComplete();
    }

    private void execTask(FluxSink<String> stringFluxSink, int maxNum) {
        this.es.submit(() -> {
            System.out.println("Thread Name = " + Thread.currentThread().getName());

            AtomicInteger num = new AtomicInteger();

            while (num.get() < maxNum) {
                String emitValue = "exec number: " + num.incrementAndGet();
                System.out.println("emitValue = " + emitValue);
                stringFluxSink.next(emitValue);

                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            stringFluxSink.complete();
        });
    }
}

実行するとテストが成功しコンソールに以下の出力がされます。

Thread Name = main
Thread Name = pool-1-thread-1
emitValue = exec number: 1
emitValue = exec number: 2
emitValue = exec number: 3
emitValue = exec number: 4
emitValue = exec number: 5

上記のコードでは、ExecutorServiceによって作成されたpool-1-tread-1という名前のスレッドで、1秒に1回、exec number: *というStringをエミットしています。
Flux.create()は引数としてFluxSinkを受け取るConsumerを登録することができ、Signalは受け取ったFluxSink APIを通して発信することができます。

ここで、ダウンストリームのConsumerがオーバーフローした際のハンドリングはOverflowStrategy.BUFFERが設定されており、デフォルトではバッファする挙動となります。
この、ストラテジーは変更することが可能で、その場合は、createの第二引数にOverflowStrategyEnumを渡してやります。