Reactor Publisherにおけるエラーハンドリング

はじめに

前回に引き続き、Reactive Springのを読んでいて、Publihsherのエラーハンドリングを行なうためのメソッド群が紹介されていたので、実際に動かして試してみようと思います。(書籍で紹介されていないものも結構あったので)

実際にやってみる

環境は以下の通り、

環境

$ java --version
openjdk version "15" 2020-09-15
OpenJDK Runtime Environment (build 15+36-1562)
OpenJDK 64-Bit Server VM (build 15+36-1562, mixed mode, sharing)


$ mvn --version
Apache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 15, vendor: Oracle Corporation, runtime: /home/yuya-hirooka/.sdkman/candidates/java/15-open
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-59-generic", arch: "amd64", family: "unix"

Pom

今回は、ReactorとJunit5を利用したいので、Pomには以下の設定を追加しました。

    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.7.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.7.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>2020.0.2</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
            <plugin>
                <artifactId>maven-failsafe-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
        </plugins>
    </build>

コードを書いていく

エラーハンドリング

まずはエラーハンドリングについて、FluxとMonoではonError**と言ったようなエラー時にフックされるようなメソッドがいくつか生えています。

onErrorResume()

onErrorResume()ですが、こいつは例外発生時のFallback処理を書くためのメソッドです。
以下のコードはintの3がPublishされた際にRutimeExceptionをスローされる処理に対して、エラー発生時はFallbackとして4をPublishします。

    @Test
    void testOnErrorResume() {

        Flux<Integer> integerFlux = Flux.just(1, 2, 3)
                .flatMap(integer -> {
                    if (integer.equals(3)) return Flux.error(new RuntimeException());
                    return Flux.just(integer);
                })
                .onErrorResume(e -> Flux.just(4));

        StepVerifier.create(integerFlux)
                .expectNext(1)
                .expectNext(2)
                .expectNext(4)
                .verifyComplete();
    }

特定の例外に対して、Fallback処理を書きたい場合は以下のように第一引数に例外クラスを書いてやるようにします。

    @Test
    void testOnErrorResume() {

        Flux<Integer> integerFlux = Flux.just(1, 2, 3)
                .flatMap(integer -> {
                    if (integer.equals(3)) return Flux.error(new RuntimeException());
                    return Flux.just(integer);
                })
                .onErrorResume(IllegalArgumentException.class, v -> Flux.just(10))
                .onErrorResume(RuntimeException.class, v -> Flux.just(4));

        StepVerifier.create(integerFlux)
                .expectNext(1)
                .expectNext(2)
                .expectNext(4)
                .verifyComplete();
    }

また、第一引数は例外クラスの代わりにPredicate<? super Throwable> predicateを受け取ることができるので以下のように特定条件の例外にマッチした際のハンドリングを記述することもできます。

    @Test
    void testOnErrorResume() {

        Flux<Integer> integerFlux = Flux.just(1, 2, 3)
                .flatMap(integer -> {
                    if (integer.equals(3) return Flux.error(new RuntimeException("Oops"));
                    return Flux.just(integer);
                })
                .onErrorResume(e -> e.getMessage().equals("Auchi"), v -> Flux.just(10))
                .onErrorResume(e -> e.getMessage().equals("Oops"), v -> Flux.just(4));


        StepVerifier.create(integerFlux)
                .expectNext(1)
                .expectNext(2)
                .expectNext(4)
                .verifyComplete();
    }

onErrorReturn

onErrorReturn()は例外発生時に単一の値をエミットする関数です。
以下のコードは先程と同じく、3がPublishされた際にRutimeExceptionをスローされる処理に対して、エラー発生時はFallbackとして4をPublishします。

    @Test
    void testOnErrorReturn() {

        Flux<Integer> integerFlux = Flux.just(1, 2, 3)
                .flatMap(integer -> {
                    if (integer.equals(3)) return Flux.error(new RuntimeException());
                    return Flux.just(integer);
                })
                .onErrorReturn(4);

        StepVerifier.create(integerFlux)
                .expectNext(1)
                .expectNext(2)
                .expectNext(4)
                .verifyComplete();
    }

特定の例外クラスにマッチした場合や、特定条件の例外にマッチした場合のハンドリングの書き方はonErrorResumeと同様の方法でできます。
すべてを動かしてみたわけでは無いですが、 onErro**系は大体同じような操作ができるっぽいです。

doOnError

まず、Flux(or Mono)でのストリーム処理時に発生したエラーをハンドリングして、ログなどの副作用の伴う処理の実行を行なう場合はdoOnError()を利用します。

    @Test
    void testDoOnError() {
        Flux<Integer> integerFlux = Flux.just(1, 2, 3)
                .map(v -> {
                    if (v.equals(3)) throw new RuntimeException("Oops");
                    return v;
                })
                .doOnError(RuntimeException.class, v -> System.out.println("message = " + v.getMessage()));

        StepVerifier.create(integerFlux)
                .expectNext(1)
                .expectNext(2)
                .expectError(RuntimeException.class)
                .verify();
    }

標準出力への出力は以下の通り。

message = Oops

onErrorMap

処理時の例外をハンドリングして、他の例外に変換してダウンストリームに返す場合は、onErrorMap()を利用します。

    @Test
    void testOnErrorMap() {

        Flux<Integer> integerFlux = Flux.just(1, 2, 3)
                .flatMap(integer -> {
                    if (integer.equals(3)) return Flux.error(new RuntimeException("Oops"));
                    return Flux.just(integer);
                })
                .onErrorMap(RuntimeException.class, e -> new IllegalArgumentException(e.getMessage()));

        StepVerifier.create(integerFlux)
                .expectNext(1)
                .expectNext(2)
                .expectError(IllegalArgumentException.class)
                .verify();
    }
}

onErrorContinue

例外発生時に、問題があるエミットを落として処理を続ける場合は、onErrorContinue()を利用します。

    @Test
    void testOnErrorContinue() {

        Flux<Integer> integerFlux = Flux.just(1, 2, 3, 4)
                .flatMap(integer -> {
                    if (integer.equals(3)) return Flux.error(new RuntimeException("Oops"));
                    return Flux.just(integer);
                })
                .onErrorContinue((e, v) -> System.out.printf("dropped value  = %s", v));


        StepVerifier.create(integerFlux)
                .expectNext(1)
                .expectNext(2)
                .expectNext(4)
                .verifyComplete();
    }
}

実行すると、標準出力に以下の文字列が出力されます。

dropped value  = 3