非Reactiveな非同期処理をFluxで扱う
はじめに
Reactive Springを読んでいて、Flux.create()
を使えば非Reactiveな非同期のイベントをReactiveのProducerにアダプトすることができるみたいなことが書かれていたので、実際に使って見て理解を深めたいと思います。
Flux.create()?
Flux.create(Consumer<? super FluxSink
実際にやってみる
それでは早速やっていきたいと思います。
タイトルの通り今回はTreadを作って非同期に実行されるスレッドからエレメントをエミットしたいと思います。
環境
$ java --version openjdk version "15" 2020-09-15 OpenJDK Runtime Environment (build 15+36-1562) OpenJDK 64-Bit Server VM (build 15+36-1562, mixed mode, sharing) $ mvn --version Apache Maven 3.6.3 Maven home: /usr/share/maven Java version: 15, vendor: Oracle Corporation, runtime: /home/yuya-hirooka/.sdkman/candidates/java/15-open Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-59-generic", arch: "amd64", family: "unix"
Pom
今回は、ReactorとJunit5を利用したいので、Pomには以下の設定を追加しました。
<dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.7.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>5.7.0</version> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>2020.0.2</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.2</version> </plugin> <plugin> <artifactId>maven-failsafe-plugin</artifactId> <version>2.22.2</version> </plugin> </plugins> </build>
コードを書く
準備が整ったところで早速コードを書いていきます。
実際のコードは以下のとおりとなります。
public class FluxCreateTest { private final ExecutorService es = Executors.newFixedThreadPool(1); @Test void exec() { System.out.println("Thread Name = " + Thread.currentThread().getName()); Flux<String> values = Flux.create(emitter -> this.execTask(emitter, 5)); StepVerifier .create(values.doFinally(signalType -> this.es.shutdown())) .expectNext("exec number: 1") .expectNext("exec number: 2") .expectNext("exec number: 3") .expectNext("exec number: 4") .expectNext("exec number: 5") .verifyComplete(); } private void execTask(FluxSink<String> stringFluxSink, int maxNum) { this.es.submit(() -> { System.out.println("Thread Name = " + Thread.currentThread().getName()); AtomicInteger num = new AtomicInteger(); while (num.get() < maxNum) { String emitValue = "exec number: " + num.incrementAndGet(); System.out.println("emitValue = " + emitValue); stringFluxSink.next(emitValue); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } stringFluxSink.complete(); }); } }
実行するとテストが成功しコンソールに以下の出力がされます。
Thread Name = main Thread Name = pool-1-thread-1 emitValue = exec number: 1 emitValue = exec number: 2 emitValue = exec number: 3 emitValue = exec number: 4 emitValue = exec number: 5
上記のコードでは、ExecutorServiceによって作成されたpool-1-tread-1
という名前のスレッドで、1秒に1回、exec number: *
というStringをエミットしています。
Flux.create()は引数としてFluxSinkを受け取るConsumerを登録することができ、Signalは受け取ったFluxSink APIを通して発信することができます。
ここで、ダウンストリームのConsumerがオーバーフローした際のハンドリングはOverflowStrategy.BUFFER
が設定されており、デフォルトではバッファする挙動となります。
この、ストラテジーは変更することが可能で、その場合は、createの第二引数にOverflowStrategyのEnumを渡してやります。