Spring WebFluxでブロッキング処理を書く

はじめに

てっきり、WebFluxのようなReactiveの場合、ハンドラーみたいなのはEventLoopとは別スレッドで動いているもんだとばかり思っていたのですが、どうやらそうではないようで(冷静に考えれば当たり前、と言うかそうしてしまったら意味がない)、悪気なくリソースクラスとかでブロッキングするような処理を書いてしまいリクエストをつまらせるというやらかしをしてしまいました。
そもそも、この辺の理解がかなり曖昧だったのでちょっと調べてみようと思います。あと、一応ブロッキングの処理を書くこともできるようなのでそのやり方についてもまとめます。

ハンドラーでブロッキングした際のThreadの動きを確認する

環境

動作する環境は以下のような感じ

$ java --version
openjdk 11.0.8 2020-07-14
OpenJDK Runtime Environment 18.9 (build 11.0.8+10)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.8+10, mixed mode)


$ lsb_release -a
LSB Version:    core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.1 LTS
Release:    20.04
Codename:   focal

$ uname -srvmpio
Linux 5.4.0-48-generic #52-Ubuntu SMP Thu Sep 10 10:58:49 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux

プロジェクトはSpring Initializrで作成して、 Spring Bootは 2.3.4.RELEASE、依存はSpring Reactive Webだけ追加しました。
また、詳細に使い方は説明しませんが、VisualVMを使ってThreadの動きを確認したいと思います。(VisualVMについては以前記事を書いたのでそちらを見ていただけると嬉しいです。)

ハンドラーを実装する

リソースクラスを作成し、ブロッキングを行なうハンドラーと行わないハンドラーを書きます。

@Component
public class HelloResource {

    private HelloService service;

    public HelloResource(HelloService service) {
        this.service = service;
    }

    public Mono<ServerResponse> blockingGreeting(ServerRequest request) {
        try {
            TimeUnit.MINUTES.sleep(30);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return ServerResponse.ok().body(Mono.just("hello"), String.class);
    }

    public Mono<ServerResponse> asyncGreeting(ServerRequest request) {
        return ServerResponse.ok().contentType(MediaType.APPLICATION_STREAM_JSON).body(
                Flux.interval(Duration.ofSeconds(5)).take(300), Long.class);
    }
}

そして、ハンドラーをRouterFunctionへ登録します。

@Bean
public RouterFunction<ServerResponse> route(HelloResource helloResource) {
    return RouterFunctions
            .route(GET("/block"), helloResource::blockingGreeting)
            .andRoute(GET("/async"), helloResource::asyncGreeting);
}

ここまでで、下準備はOK。

リクエストを送りスレッドを確認する

アプリを起動し、ブロッキングしている方のハンドラーにリクエスト送ってみます。

$ curl localhost:8080/block

すると以下のようにreactor-http-epoll-2というEventLoopのスレッドが作成されて、Sleep状態に入ってしまったことがわかります。

f:id:yuya_hirooka:20201003005107p:plain

reactor-http-epoll-2のThread Dumpをみると確かにハンドラーがEventLoopのスレッドで実行されて、処理がブロッキングされているのがわかります。

"reactor-http-epoll-2" #35 daemon prio=5 os_prio=0 cpu=85.35ms elapsed=434.60s tid=0x00007f76f4005000 nid=0x131db waiting on condition  [0x00007f77081e6000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(java.base@11.0.6/Native Method)
        at dev.hirooka.demo.flux.HelloResource.blockingGreeting(HelloResource.java:24)
        at dev.hirooka.demo.flux.Application$$Lambda$416/0x00000008402d5040.handle(Unknown Source)
        at org.springframework.web.reactive.function.server.support.HandlerFunctionAdapter.handle(HandlerFunctionAdapter.java:61)
        at org.springframework.web.reactive.DispatcherHandler.invokeHandler(DispatcherHandler.java:161)
        at org.springframework.web.reactive.DispatcherHandler.lambda$handle$1(DispatcherHandler.java:146)
        at org.springframework.web.reactive.DispatcherHandler$$Lambda$575/0x00000008403a9840.apply(Unknown Source)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118)
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
        at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:274)
        at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:851)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)
        at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76)
        at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onNext(FluxConcatArray.java:176)
        at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2344)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2152)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2026)
        at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4213)
        at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:207)
        at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4213)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:441)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:211)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:161)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4213)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172)
        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        at reactor.netty.http.server.HttpServerHandle.onStateChange(HttpServerHandle.java:65)
        at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:518)
        at reactor.netty.tcp.TcpServerBind$ChildObserver.onStateChange(TcpServerBind.java:272)
        at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:462)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:172)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(java.base@11.0.6/Thread.java:834)

今度は非同期の方にリクエストを送ってみます。

$ curl localhost:8080/async
0
1
2

この場合では、reactor-http-epoll-2はSleepされずparallel-1と呼ばれるTreadが作成されます。

f:id:yuya_hirooka:20201003011323p:plain

parallel-1はSchedulers.parallel()で作成されるスレッドで、Fluxのイベント発行時にはこのスレッドで処理が行われるようです。
ログからはJacsonの処理が行われているのを確認することができます。

2020-10-03 01:24:30.073 DEBUG 80149 --- [     parallel-1] o.s.http.codec.json.Jackson2JsonEncoder  : [12112c2a-1] Encoding [149]

EventLoopのスレッドを食いつぶす

EventLoopのスレッド最大数のデフォルト値はLoopResourcesで設定されているようで、最小4で、利用可能なプロセッサーの数がデフォルトとして設定されるようです。
私の環境で論理プロセッサーの数は8個なのでデフォルトでは、最大8このスレッドをプールします。

$ grep processor /proc/cpuinfo | wc -l
8

このプール食いつぶした際には後のリクエストがハングしてしまいます
8回のcurl localhost:8080/blockのリクエストを送信しEventLoopを食いつぶします。

f:id:yuya_hirooka:20201003024438p:plain

reactor-http-epoll-1~reactor-http-epoll-8までがすべてSleepしていることが確認できます。
この状況でasyncのリクエストを送信します。

$ curl localhost:8080/async

この場合、EventPoolのスレッドを食いつぶしてしまっているため、いつまで立っても値は返ってきません。

ブロッキングAPIを呼び出すためにどうするか?

どうしてもハンドラーやその呼び出し先でブロッキングAPIを利用したい場合、 ドキュメントによるとpublishOnオペレータを使えということだったので、先程のハンドラーをちょっと書き直して見ます。

public Mono<ServerResponse> blockingGreeting(ServerRequest request) {
    return ServerResponse.ok().body(Mono.fromCallable(() -> {
                TimeUnit.MINUTES.sleep(30);
                return "hello";
             }).publishOn(Schedulers.elastic())
             , String.class);
}

Mono.fromCallable()の中でブロッキングな処理を書き、publishOnにスケジューラーを登録します。ちなみにelasticは必要に応じて、新しいWorkerを生成するSchedulerです。
リクエストを送り再度、Threadの状態を確認してみます。

$ curl localhost:8080/block

スレッドは以下のようになりました。

f:id:yuya_hirooka:20201003093645p:plain

elastic-2というスレッドとelastic-evictor-1というスレッドが作成され、reactor-http-epoll-*はブロックされてないのが確認できます。
ログも以下のとおりになり、Writeが別スレッドで行われるようになっていました。

2020-10-03 09:24:39.380 DEBUG 7498 --- [      elastic-2] o.s.core.codec.CharSequenceEncoder       : [cef65a9b-1] Writing "hello"

参考資料