QuarkusでReactiveなWebAPIを書いてみる
はじめに
Reactive Stream系のものはSpring WebFluxしか触ったことが無かったのですが、QuarkusにもReactive Routeというのがあるというのを知り使ってみたくなったので、その機能を試してみたいと思います。
Reactive RouteとSmallrye Mutiny
まず、QuarkusでReactiveなAPIを書く際に重要な要素が2つあります。
- Reactive Route
- Samllrye Mutiny
Reactive Route
Quakusのリクエストとレスポンスにのルーティングは以下のような構成になっています。
まず、QuarkusのHTTPサポートは、Vert.xないしはNetty のノンブロッキングなリアクティブエンジンの上にできています。そのレイアーでは(図中一番下のレイヤー)Event Loopと呼ばれるIOスレッドがリクエストを受け取り、適切なコードへとルーティングされるようになっています。
そのルーティング先として、大まかに選択肢が2あり、そのうちの1つが今まで使ってきたServletのようなブロッキングをともなうリクエストハンドラーで、もう一つがノンブロッキングであるReactive Routeです。
Reactive RouteではリクエストをIOスレッドと言う並行制とパフォーマンス性が比較的高いスレッドで処理されます。
Samllrye Mutiny
Samllrye Mutinyはリアクティブなプログラミングを行なうためのライブラリです。Samllrye Mutinyは比較的後発のReactive Programmingライブラリですが、その分今までののライブラリで起こっていたような問題を解決するようにデザインされているらしいです。
Samllrye Mutinyには以下の二つのオブジェクトが用意されています。
- Multi
- Uni
言葉通りそれぞれ、Multiが1つ以上のストリームを扱い、Uniは0ないしは1のストリームを表すオブジェクトです。MultiもUniも非同期型でそれぞれイベントの発火を起点に起動します。
APIを書いてみる
利用する二つの要素を軽く説明したので実際にAPIを書いていきたいと思います。
環境
$ java --version openjdk 15-ea 2020-09-15 OpenJDK Runtime Environment (build 15-ea+25-1229) OpenJDK 64-Bit Server VM (build 15-ea+25-1229, mixed mode, sharing) $ mvn -v Apache Maven 3.6.3 Maven home: /usr/share/maven Java version: 15-ea, vendor: Oracle Corporation, runtime: /home/someone/.sdkman/candidates/java/15.ea.25-open Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-42-generic", arch: "amd64", family: "unix"
プロジェクトの作成
Quarkus - Start coding with code.quarkus.ioでプロジェクトを作成します。
設定化以下のようにしました。
生成されたプロジェクトを開き、Pomを軽く見てみると諸々入っていますが以下の依存が追加されているのが確認されます。
<dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-vertx-web</artifactId> </dependency>
メイン関数からQuarkusアプリを実行できるようにする
少し話題がそれますが、個人的にアプリをIntelliJのRun機能をつかって、Main関数から起動できるのが好みなので以下のコードを追加しておきます。
AppStart.java
@QuarkusMain public class AppStart { public static void main(String[] args) { Quarkus.run(); } }
@QuarkusMain
はQuarkusアプリのMainクラス付与するアノテーションです(深くは書きませんが、QuarkusApllicationを実装したクラスにも付与することができます)。
Main関数の中でQuarkus.run()
を呼び出すことで、アプリが起動するようになります。
更に余談ですが、Main関数を直接実行したときはIDELuncherというやつが内部的に呼び出され、諸々の設定を行ってくれます。その際、Devモードで起動する設定になっています。
まずはハンドラーを実装してみる
まずはReactive Routesのハンドラーを実装します。
コードとしては以下のようになります。
@ApplicationScoped public class EmployeeRoutes { @Route(path = "employee") void employees(RoutingExchange rx) { rx.ok("hello " + rx.getParam("name").orElse("World")); } }
JAX-RSのアノテーションではなく@Route
アノテーションを使うことでReactive Routeが利用できます。@Route
の引数にpath="employee"
と記述していますが、こうすることでURLのパス/employee
でこのRouteにディスパッチされるようになります。
@Route
が付与されている関数ではRoutingExchange
クラスを引数として受け取っています。このクラスはリクエストとレスポンスをハンドリングするためのクラスです。rx.ok()
を呼び出すことで200番の
RoutingExchange
の他にRouteingContext
というのが存在しますが、RoutingExchange
がそのラッパーとなり便利なAPIを用意してくれているのでそちらを使っています。
個々までできたところでアプリを起動してcURLを叩いてみます。
$ curl localhost:8080/ $ curl localhost:8080/employee hello World $ curl localhost:8080/employee?name=henoheno -v * Trying 127.0.0.1:8080... * TCP_NODELAY set * Connected to localhost (127.0.0.1) port 8080 (#0) > GET /employee?name=henoheno HTTP/1.1 > Host: localhost:8080 > User-Agent: curl/7.68.0 > Accept: */* > * Mark bundle as not supporting multiuse < HTTP/1.1 200 OK < content-length: 14 < * Connection #0 to host localhost left intact hello henoheno
これでReactive Routeのハンドラーの作成が完了しました。
Smallrye Mutinyを使って、Reactive Streamを返す
それでは次にレスポンスをストリーム形式にしてみます。
前述のMulit
を使って先程ハンドラーを以下のように書き換えてみます。
@ApplicationScoped public class EmployeeRoutes { @Route(path = "employee") @Produces(MediaType.SERVER_SENT_EVENTS) Multi<String> employees(RoutingExchange rx) { return Multi.createFrom().ticks().every(Duration.ofSeconds(1)) .onItem().transform(n -> "employee ID is " + n + "\n") .transform().byTakingFirstItems(6); } }
先程は戻り値がVoidでしたがそこををMultiで来るんだString型にします。
さらに、@Produces(MediaType.SERVER_SENT_EVENTS)
にします。QuarkusでのStreamはチャンク方式のServer Sent Eventで送信されます。
以下のコードはMultiを使って1秒ごとにイベンドを発生させて、リクエスト側に送信するコードです。
Multi.createFrom().ticks().every(Duration.ofSeconds(1)) .onItem().transform(n -> "employee ID is " + n + "\n") .transform().byTakingFirstItems(6);
$ curl localhost:8080/employee -N -v * Trying 127.0.0.1:8080... * TCP_NODELAY set * Connected to localhost (127.0.0.1) port 8080 (#0) > GET /employee HTTP/1.1 > Host: localhost:8080 > User-Agent: curl/7.68.0 > Accept: */* > * Mark bundle as not supporting multiuse < HTTP/1.1 200 OK < transfer-encoding: chunked < employee ID is 0 employee ID is 1 employee ID is 2 employee ID is 3 employee ID is 4 employee ID is 5 * Connection #0 to host localhost left intact
上記のスニペットでは分かりづらいのですが、1秒ごとにemployee ID is [番号]
のストリングが返ってきています。
JsonのStreamを返す
最後、WebAPI通信の際にJsonが使われることが多いと思いますが、Jsonのリストでデータを返すことができます。
まずはJsonとして返したいEmployeeオプジェクトを作成します。
package dev.hirooka; public class Employee { public Employee(String name) { this.name = name; } final String name; public String getName() { return name; } }
次にハンドラーを以下のように書き換えます。
@ApplicationScoped public class EmployeeRoutes { @Route(path = "employee") @Produces(MediaType.SERVER_SENT_EVENTS) Multi<Employee> employees(RoutingExchange rx) { return ReactiveRoutes.asJsonArray( Multi.createFrom().ticks().every(Duration.ofSeconds(1)) .onItem().transform(n -> new Employee("employee nama:" + n.toString())) .transform().byTakingFirstItems(6)); } }
RactiveRoutesのスタティックメソッドであるasJsonArray()
にMultiを渡してやることでオブジェクトをJsonにマッピングして、返してくれます。この際に ContentTypeはapplication/json
を自動的に付与してくれるので、クライアント側では返ってきたレスポンスをJsonとして扱うことができます。
$ curl localhost:8080/employee -N -v * Trying 127.0.0.1:8080... * TCP_NODELAY set * Connected to localhost (127.0.0.1) port 8080 (#0) > GET /employee HTTP/1.1 > Host: localhost:8080 > User-Agent: curl/7.68.0 > Accept: */* > * Mark bundle as not supporting multiuse < HTTP/1.1 200 OK < transfer-encoding: chunked < content-type: application/json < [{"name":"0"},{"name":"1"},{"name":"2"},{"name":"3"},{"name":"4"},{"name":"5"}]* Connection #0 to host localhost left intact
これも分かりづらいですが一秒ごとにレスポンスが返ってくるような形になっています。