Spring WebFluxでブロッキング処理を書く
はじめに
てっきり、WebFluxのようなReactiveの場合、ハンドラーみたいなのはEventLoopとは別スレッドで動いているもんだとばかり思っていたのですが、どうやらそうではないようで(冷静に考えれば当たり前、と言うかそうしてしまったら意味がない)、悪気なくリソースクラスとかでブロッキングするような処理を書いてしまいリクエストをつまらせるというやらかしをしてしまいました。
そもそも、この辺の理解がかなり曖昧だったのでちょっと調べてみようと思います。あと、一応ブロッキングの処理を書くこともできるようなのでそのやり方についてもまとめます。
ハンドラーでブロッキングした際のThreadの動きを確認する
環境
動作する環境は以下のような感じ
$ java --version openjdk 11.0.8 2020-07-14 OpenJDK Runtime Environment 18.9 (build 11.0.8+10) OpenJDK 64-Bit Server VM 18.9 (build 11.0.8+10, mixed mode) $ lsb_release -a LSB Version: core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch Distributor ID: Ubuntu Description: Ubuntu 20.04.1 LTS Release: 20.04 Codename: focal $ uname -srvmpio Linux 5.4.0-48-generic #52-Ubuntu SMP Thu Sep 10 10:58:49 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux
プロジェクトはSpring Initializrで作成して、
Spring Bootは 2.3.4.RELEASE、依存はSpring Reactive Webだけ追加しました。
また、詳細に使い方は説明しませんが、VisualVMを使ってThreadの動きを確認したいと思います。(VisualVMについては以前記事を書いたのでそちらを見ていただけると嬉しいです。)
ハンドラーを実装する
リソースクラスを作成し、ブロッキングを行なうハンドラーと行わないハンドラーを書きます。
@Component public class HelloResource { private HelloService service; public HelloResource(HelloService service) { this.service = service; } public Mono<ServerResponse> blockingGreeting(ServerRequest request) { try { TimeUnit.MINUTES.sleep(30); } catch (InterruptedException e) { e.printStackTrace(); } return ServerResponse.ok().body(Mono.just("hello"), String.class); } public Mono<ServerResponse> asyncGreeting(ServerRequest request) { return ServerResponse.ok().contentType(MediaType.APPLICATION_STREAM_JSON).body( Flux.interval(Duration.ofSeconds(5)).take(300), Long.class); } }
そして、ハンドラーをRouterFunctionへ登録します。
@Bean public RouterFunction<ServerResponse> route(HelloResource helloResource) { return RouterFunctions .route(GET("/block"), helloResource::blockingGreeting) .andRoute(GET("/async"), helloResource::asyncGreeting); }
ここまでで、下準備はOK。
リクエストを送りスレッドを確認する
アプリを起動し、ブロッキングしている方のハンドラーにリクエスト送ってみます。
$ curl localhost:8080/block
すると以下のようにreactor-http-epoll-2
というEventLoopのスレッドが作成されて、Sleep状態に入ってしまったことがわかります。
reactor-http-epoll-2
のThread Dumpをみると確かにハンドラーがEventLoopのスレッドで実行されて、処理がブロッキングされているのがわかります。
"reactor-http-epoll-2" #35 daemon prio=5 os_prio=0 cpu=85.35ms elapsed=434.60s tid=0x00007f76f4005000 nid=0x131db waiting on condition [0x00007f77081e6000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(java.base@11.0.6/Native Method) at dev.hirooka.demo.flux.HelloResource.blockingGreeting(HelloResource.java:24) at dev.hirooka.demo.flux.Application$$Lambda$416/0x00000008402d5040.handle(Unknown Source) at org.springframework.web.reactive.function.server.support.HandlerFunctionAdapter.handle(HandlerFunctionAdapter.java:61) at org.springframework.web.reactive.DispatcherHandler.invokeHandler(DispatcherHandler.java:161) at org.springframework.web.reactive.DispatcherHandler.lambda$handle$1(DispatcherHandler.java:146) at org.springframework.web.reactive.DispatcherHandler$$Lambda$575/0x00000008403a9840.apply(Unknown Source) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118) at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:274) at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:851) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192) at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onNext(FluxConcatArray.java:176) at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2344) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2152) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2026) at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) at reactor.core.publisher.Mono.subscribe(Mono.java:4213) at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:207) at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80) at reactor.core.publisher.Mono.subscribe(Mono.java:4213) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:441) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:211) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:161) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86) at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) at reactor.core.publisher.Mono.subscribe(Mono.java:4213) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172) at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) at reactor.netty.http.server.HttpServerHandle.onStateChange(HttpServerHandle.java:65) at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:518) at reactor.netty.tcp.TcpServerBind$ChildObserver.onStateChange(TcpServerBind.java:272) at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:462) at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:172) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(java.base@11.0.6/Thread.java:834)
今度は非同期の方にリクエストを送ってみます。
$ curl localhost:8080/async 0 1 2
この場合では、reactor-http-epoll-2
はSleepされずparallel-1
と呼ばれるTreadが作成されます。
parallel-1
はSchedulers.parallel()で作成されるスレッドで、Fluxのイベント発行時にはこのスレッドで処理が行われるようです。
ログからはJacsonの処理が行われているのを確認することができます。
2020-10-03 01:24:30.073 DEBUG 80149 --- [ parallel-1] o.s.http.codec.json.Jackson2JsonEncoder : [12112c2a-1] Encoding [149]
EventLoopのスレッドを食いつぶす
EventLoopのスレッド最大数のデフォルト値はLoopResourcesで設定されているようで、最小4で、利用可能なプロセッサーの数がデフォルトとして設定されるようです。
私の環境で論理プロセッサーの数は8個なのでデフォルトでは、最大8このスレッドをプールします。
$ grep processor /proc/cpuinfo | wc -l 8
このプール食いつぶした際には後のリクエストがハングしてしまいます。
8回のcurl localhost:8080/block
のリクエストを送信しEventLoopを食いつぶします。
reactor-http-epoll-1
~reactor-http-epoll-8
までがすべてSleepしていることが確認できます。
この状況でasync
のリクエストを送信します。
$ curl localhost:8080/async
この場合、EventPoolのスレッドを食いつぶしてしまっているため、いつまで立っても値は返ってきません。
ブロッキングAPIを呼び出すためにどうするか?
どうしてもハンドラーやその呼び出し先でブロッキングなAPIを利用したい場合、
ドキュメントによるとpublishOn
オペレータを使えということだったので、先程のハンドラーをちょっと書き直して見ます。
public Mono<ServerResponse> blockingGreeting(ServerRequest request) { return ServerResponse.ok().body(Mono.fromCallable(() -> { TimeUnit.MINUTES.sleep(30); return "hello"; }).publishOn(Schedulers.elastic()) , String.class); }
Mono.fromCallable()
の中でブロッキングな処理を書き、publishOn
にスケジューラーを登録します。ちなみにelasticは必要に応じて、新しいWorkerを生成するSchedulerです。
リクエストを送り再度、Threadの状態を確認してみます。
$ curl localhost:8080/block
スレッドは以下のようになりました。
elastic-2
というスレッドとelastic-evictor-1
というスレッドが作成され、reactor-http-epoll-*
はブロックされてないのが確認できます。
ログも以下のとおりになり、Writeが別スレッドで行われるようになっていました。
2020-10-03 09:24:39.380 DEBUG 7498 --- [ elastic-2] o.s.core.codec.CharSequenceEncoder : [cef65a9b-1] Writing "hello"