Reactor Publisherにおけるエラーハンドリング
はじめに
前回に引き続き、Reactive Springのを読んでいて、Publihsherのエラーハンドリングを行なうためのメソッド群が紹介されていたので、実際に動かして試してみようと思います。(書籍で紹介されていないものも結構あったので)
実際にやってみる
環境は以下の通り、
環境
$ java --version openjdk version "15" 2020-09-15 OpenJDK Runtime Environment (build 15+36-1562) OpenJDK 64-Bit Server VM (build 15+36-1562, mixed mode, sharing) $ mvn --version Apache Maven 3.6.3 Maven home: /usr/share/maven Java version: 15, vendor: Oracle Corporation, runtime: /home/yuya-hirooka/.sdkman/candidates/java/15-open Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-59-generic", arch: "amd64", family: "unix"
Pom
今回は、ReactorとJunit5を利用したいので、Pomには以下の設定を追加しました。
<dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.7.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>5.7.0</version> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>2020.0.2</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.2</version> </plugin> <plugin> <artifactId>maven-failsafe-plugin</artifactId> <version>2.22.2</version> </plugin> </plugins> </build>
コードを書いていく
エラーハンドリング
まずはエラーハンドリングについて、FluxとMonoではonError**と言ったようなエラー時にフックされるようなメソッドがいくつか生えています。
onErrorResume()
onErrorResume()ですが、こいつは例外発生時のFallback処理を書くためのメソッドです。
以下のコードはintの3がPublishされた際にRutimeExceptionをスローされる処理に対して、エラー発生時はFallbackとして4をPublishします。
@Test void testOnErrorResume() { Flux<Integer> integerFlux = Flux.just(1, 2, 3) .flatMap(integer -> { if (integer.equals(3)) return Flux.error(new RuntimeException()); return Flux.just(integer); }) .onErrorResume(e -> Flux.just(4)); StepVerifier.create(integerFlux) .expectNext(1) .expectNext(2) .expectNext(4) .verifyComplete(); }
特定の例外に対して、Fallback処理を書きたい場合は以下のように第一引数に例外クラスを書いてやるようにします。
@Test void testOnErrorResume() { Flux<Integer> integerFlux = Flux.just(1, 2, 3) .flatMap(integer -> { if (integer.equals(3)) return Flux.error(new RuntimeException()); return Flux.just(integer); }) .onErrorResume(IllegalArgumentException.class, v -> Flux.just(10)) .onErrorResume(RuntimeException.class, v -> Flux.just(4)); StepVerifier.create(integerFlux) .expectNext(1) .expectNext(2) .expectNext(4) .verifyComplete(); }
また、第一引数は例外クラスの代わりにPredicate<? super Throwable> predicate
を受け取ることができるので以下のように特定条件の例外にマッチした際のハンドリングを記述することもできます。
@Test void testOnErrorResume() { Flux<Integer> integerFlux = Flux.just(1, 2, 3) .flatMap(integer -> { if (integer.equals(3) return Flux.error(new RuntimeException("Oops")); return Flux.just(integer); }) .onErrorResume(e -> e.getMessage().equals("Auchi"), v -> Flux.just(10)) .onErrorResume(e -> e.getMessage().equals("Oops"), v -> Flux.just(4)); StepVerifier.create(integerFlux) .expectNext(1) .expectNext(2) .expectNext(4) .verifyComplete(); }
onErrorReturn
onErrorReturn()は例外発生時に単一の値をエミットする関数です。
以下のコードは先程と同じく、3がPublishされた際にRutimeExceptionをスローされる処理に対して、エラー発生時はFallbackとして4をPublishします。
@Test void testOnErrorReturn() { Flux<Integer> integerFlux = Flux.just(1, 2, 3) .flatMap(integer -> { if (integer.equals(3)) return Flux.error(new RuntimeException()); return Flux.just(integer); }) .onErrorReturn(4); StepVerifier.create(integerFlux) .expectNext(1) .expectNext(2) .expectNext(4) .verifyComplete(); }
特定の例外クラスにマッチした場合や、特定条件の例外にマッチした場合のハンドリングの書き方はonErrorResume
と同様の方法でできます。
すべてを動かしてみたわけでは無いですが、 onErro**系は大体同じような操作ができるっぽいです。
doOnError
まず、Flux(or Mono)でのストリーム処理時に発生したエラーをハンドリングして、ログなどの副作用の伴う処理の実行を行なう場合はdoOnError()を利用します。
@Test void testDoOnError() { Flux<Integer> integerFlux = Flux.just(1, 2, 3) .map(v -> { if (v.equals(3)) throw new RuntimeException("Oops"); return v; }) .doOnError(RuntimeException.class, v -> System.out.println("message = " + v.getMessage())); StepVerifier.create(integerFlux) .expectNext(1) .expectNext(2) .expectError(RuntimeException.class) .verify(); }
標準出力への出力は以下の通り。
message = Oops
onErrorMap
処理時の例外をハンドリングして、他の例外に変換してダウンストリームに返す場合は、onErrorMap()を利用します。
@Test void testOnErrorMap() { Flux<Integer> integerFlux = Flux.just(1, 2, 3) .flatMap(integer -> { if (integer.equals(3)) return Flux.error(new RuntimeException("Oops")); return Flux.just(integer); }) .onErrorMap(RuntimeException.class, e -> new IllegalArgumentException(e.getMessage())); StepVerifier.create(integerFlux) .expectNext(1) .expectNext(2) .expectError(IllegalArgumentException.class) .verify(); } }
onErrorContinue
例外発生時に、問題があるエミットを落として処理を続ける場合は、onErrorContinue()を利用します。
@Test void testOnErrorContinue() { Flux<Integer> integerFlux = Flux.just(1, 2, 3, 4) .flatMap(integer -> { if (integer.equals(3)) return Flux.error(new RuntimeException("Oops")); return Flux.just(integer); }) .onErrorContinue((e, v) -> System.out.printf("dropped value = %s", v)); StepVerifier.create(integerFlux) .expectNext(1) .expectNext(2) .expectNext(4) .verifyComplete(); } }
実行すると、標準出力に以下の文字列が出力されます。
dropped value = 3