QuarkusでReactiveなWebAPIを書いてみる

はじめに

Reactive Stream系のものはSpring WebFluxしか触ったことが無かったのですが、QuarkusにもReactive Routeというのがあるというのを知り使ってみたくなったので、その機能を試してみたいと思います。

Reactive RouteとSmallrye Mutiny

まず、QuarkusでReactiveなAPIを書く際に重要な要素が2つあります。

  • Reactive Route
  • Samllrye Mutiny

Reactive Route

Quakusのリクエストとレスポンスにのルーティングは以下のような構成になっています。


https://quarkus.io/guides/images/http-architecture.png

まず、QuarkusのHTTPサポートは、Vert.xないしはNetty のノンブロッキングなリアクティブエンジンの上にできています。そのレイアーでは(図中一番下のレイヤー)Event Loopと呼ばれるIOスレッドがリクエストを受け取り、適切なコードへとルーティングされるようになっています。 そのルーティング先として、大まかに選択肢が2あり、そのうちの1つが今まで使ってきたServletのようなブロッキングをともなうリクエストハンドラーで、もう一つがノンブロッキングであるReactive Routeです。
Reactive RouteではリクエストをIOスレッドと言う並行制とパフォーマンス性が比較的高いスレッドで処理されます。

Samllrye Mutiny

Samllrye Mutinyはリアクティブなプログラミングを行なうためのライブラリです。Samllrye Mutinyは比較的後発のReactive Programmingライブラリですが、その分今までののライブラリで起こっていたような問題を解決するようにデザインされているらしいです。
Samllrye Mutinyには以下の二つのオブジェクトが用意されています。

  • Multi
  • Uni

言葉通りそれぞれ、Multiが1つ以上のストリームを扱い、Uniは0ないしは1のストリームを表すオブジェクトです。MultiもUniも非同期型でそれぞれイベントの発火を起点に起動します。

APIを書いてみる

利用する二つの要素を軽く説明したので実際にAPIを書いていきたいと思います。

環境

$ java --version
openjdk 15-ea 2020-09-15
OpenJDK Runtime Environment (build 15-ea+25-1229)
OpenJDK 64-Bit Server VM (build 15-ea+25-1229, mixed mode, sharing)


$ mvn -v
Apache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 15-ea, vendor: Oracle Corporation, runtime: /home/someone/.sdkman/candidates/java/15.ea.25-open
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-42-generic", arch: "amd64", family: "unix"

プロジェクトの作成

Quarkus - Start coding with code.quarkus.ioでプロジェクトを作成します。
設定化以下のようにしました。

f:id:yuya_hirooka:20200821081928p:plain

生成されたプロジェクトを開き、Pomを軽く見てみると諸々入っていますが以下の依存が追加されているのが確認されます。

 <dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-vertx-web</artifactId>
</dependency>

メイン関数からQuarkusアプリを実行できるようにする

少し話題がそれますが、個人的にアプリをIntelliJのRun機能をつかって、Main関数から起動できるのが好みなので以下のコードを追加しておきます。

AppStart.java

@QuarkusMain
public class AppStart {
    public static void main(String[] args) {
        Quarkus.run();
    }
}

@QuarkusMainはQuarkusアプリのMainクラス付与するアノテーションです(深くは書きませんが、QuarkusApllicationを実装したクラスにも付与することができます)。
Main関数の中でQuarkus.run()を呼び出すことで、アプリが起動するようになります。
更に余談ですが、Main関数を直接実行したときはIDELuncherというやつが内部的に呼び出され、諸々の設定を行ってくれます。その際、Devモードで起動する設定になっています。

まずはハンドラーを実装してみる

まずはReactive Routesのハンドラーを実装します。
コードとしては以下のようになります。

@ApplicationScoped
public class EmployeeRoutes {

    @Route(path = "employee")
    void employees(RoutingExchange rx) {
        rx.ok("hello " + rx.getParam("name").orElse("World"));
    }
}

JAX-RSアノテーションではなく@Routeアノテーションを使うことでReactive Routeが利用できます。@Routeの引数にpath="employee"と記述していますが、こうすることでURLのパス/employeeでこのRouteにディスパッチされるようになります。
@Routeが付与されている関数ではRoutingExchangeクラスを引数として受け取っています。このクラスはリクエストとレスポンスをハンドリングするためのクラスです。rx.ok()を呼び出すことで200番の RoutingExchangeの他にRouteingContextというのが存在しますが、RoutingExchangeがそのラッパーとなり便利なAPIを用意してくれているのでそちらを使っています。

個々までできたところでアプリを起動してcURLを叩いてみます。

$ curl localhost:8080/

$ curl localhost:8080/employee
hello World

$ curl localhost:8080/employee?name=henoheno -v
*   Trying 127.0.0.1:8080...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /employee?name=henoheno HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.68.0
> Accept: */*
> 
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< content-length: 14
< 
* Connection #0 to host localhost left intact
hello henoheno

これでReactive Routeのハンドラーの作成が完了しました。

Smallrye Mutinyを使って、Reactive Streamを返す

それでは次にレスポンスをストリーム形式にしてみます。
前述のMulitを使って先程ハンドラーを以下のように書き換えてみます。

@ApplicationScoped
public class EmployeeRoutes {

    @Route(path = "employee")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    Multi<String> employees(RoutingExchange rx) {
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .onItem().transform(n -> "employee ID is " + n + "\n")
                .transform().byTakingFirstItems(6);
    }
}

先程は戻り値がVoidでしたがそこををMultiで来るんだString型にします。
さらに、@Produces(MediaType.SERVER_SENT_EVENTS)にします。QuarkusでのStreamはチャンク方式のServer Sent Eventで送信されます。
以下のコードはMultiを使って1秒ごとにイベンドを発生させて、リクエスト側に送信するコードです。

Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .onItem().transform(n -> "employee ID is " + n + "\n")
                .transform().byTakingFirstItems(6);

それでは、cURLでリクエストを送ってみます。

$ curl localhost:8080/employee -N -v
*   Trying 127.0.0.1:8080...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /employee HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.68.0
> Accept: */*
> 
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< 
employee ID is 0
employee ID is 1
employee ID is 2
employee ID is 3
employee ID is 4
employee ID is 5
* Connection #0 to host localhost left intact

上記のスニペットでは分かりづらいのですが、1秒ごとにemployee ID is [番号]のストリングが返ってきています。

JsonのStreamを返す

最後、WebAPI通信の際にJsonが使われることが多いと思いますが、Jsonのリストでデータを返すことができます。
まずはJsonとして返したいEmployeeオプジェクトを作成します。

package dev.hirooka;

public class Employee {
    public Employee(String name) {
        this.name = name;
    }

    final String name;

    public String getName() {
        return name;
    }
}

次にハンドラーを以下のように書き換えます。

@ApplicationScoped
public class EmployeeRoutes {

    @Route(path = "employee")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    Multi<Employee> employees(RoutingExchange rx) {
        return ReactiveRoutes.asJsonArray(
                Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                        .onItem().transform(n -> new Employee("employee nama:" + n.toString()))
                        .transform().byTakingFirstItems(6));
    }
}

RactiveRoutesのスタティックメソッドであるasJsonArray()にMultiを渡してやることでオブジェクトをJsonマッピングして、返してくれます。この際に ContentTypeはapplication/jsonを自動的に付与してくれるので、クライアント側では返ってきたレスポンスをJsonとして扱うことができます。

$ curl localhost:8080/employee -N -v
*   Trying 127.0.0.1:8080...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /employee HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.68.0
> Accept: */*
> 
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< content-type: application/json
< 
[{"name":"0"},{"name":"1"},{"name":"2"},{"name":"3"},{"name":"4"},{"name":"5"}]* Connection #0 to host localhost left intact

これも分かりづらいですが一秒ごとにレスポンスが返ってくるような形になっています。

参考資料