非Reactiveな非同期処理をFluxで扱う
はじめに
Reactive Springを読んでいて、Flux.create()
を使えば非Reactiveな非同期のイベントをReactiveのProducerにアダプトすることができるみたいなことが書かれていたので、実際に使って見て理解を深めたいと思います。
Flux.create()?
Flux.create(Consumer<? super FluxSink
実際にやってみる
それでは早速やっていきたいと思います。
タイトルの通り今回はTreadを作って非同期に実行されるスレッドからエレメントをエミットしたいと思います。
環境
$ java --version openjdk version "15" 2020-09-15 OpenJDK Runtime Environment (build 15+36-1562) OpenJDK 64-Bit Server VM (build 15+36-1562, mixed mode, sharing) $ mvn --version Apache Maven 3.6.3 Maven home: /usr/share/maven Java version: 15, vendor: Oracle Corporation, runtime: /home/yuya-hirooka/.sdkman/candidates/java/15-open Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-59-generic", arch: "amd64", family: "unix"
Pom
今回は、ReactorとJunit5を利用したいので、Pomには以下の設定を追加しました。
<dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.7.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>5.7.0</version> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>2020.0.2</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.2</version> </plugin> <plugin> <artifactId>maven-failsafe-plugin</artifactId> <version>2.22.2</version> </plugin> </plugins> </build>
コードを書く
準備が整ったところで早速コードを書いていきます。
実際のコードは以下のとおりとなります。
public class FluxCreateTest { private final ExecutorService es = Executors.newFixedThreadPool(1); @Test void exec() { System.out.println("Thread Name = " + Thread.currentThread().getName()); Flux<String> values = Flux.create(emitter -> this.execTask(emitter, 5)); StepVerifier .create(values.doFinally(signalType -> this.es.shutdown())) .expectNext("exec number: 1") .expectNext("exec number: 2") .expectNext("exec number: 3") .expectNext("exec number: 4") .expectNext("exec number: 5") .verifyComplete(); } private void execTask(FluxSink<String> stringFluxSink, int maxNum) { this.es.submit(() -> { System.out.println("Thread Name = " + Thread.currentThread().getName()); AtomicInteger num = new AtomicInteger(); while (num.get() < maxNum) { String emitValue = "exec number: " + num.incrementAndGet(); System.out.println("emitValue = " + emitValue); stringFluxSink.next(emitValue); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } stringFluxSink.complete(); }); } }
実行するとテストが成功しコンソールに以下の出力がされます。
Thread Name = main Thread Name = pool-1-thread-1 emitValue = exec number: 1 emitValue = exec number: 2 emitValue = exec number: 3 emitValue = exec number: 4 emitValue = exec number: 5
上記のコードでは、ExecutorServiceによって作成されたpool-1-tread-1
という名前のスレッドで、1秒に1回、exec number: *
というStringをエミットしています。
Flux.create()は引数としてFluxSinkを受け取るConsumerを登録することができ、Signalは受け取ったFluxSink APIを通して発信することができます。
ここで、ダウンストリームのConsumerがオーバーフローした際のハンドリングはOverflowStrategy.BUFFER
が設定されており、デフォルトではバッファする挙動となります。
この、ストラテジーは変更することが可能で、その場合は、createの第二引数にOverflowStrategyのEnumを渡してやります。
2020年振り返り
はじめに
なんとなくいろんな人の振り返りみて、自分も振り返ってみようかとふと思ったので書いてみようかと。
ざっくり今年やったこと、あったこと
今年やったとことあったことをまとめると、以下のようになるかと思います。
- 転職して一年
- ブログを平均して週一ぐらいでは書く
- 本を読む
転職して一年
先月の11月で今の会社に来て一年が経ちました。もともとは、転職ドラフトというサイトで声をかけていただいて今に至りますが、気がつけば一年たってました。
前職とは働き方や開発のやり方が全く違い、いろいろ学びが多かったと思います。
今の組織は、アジャイル開発を行う上で、XPのプラクティスを大切にしています。その中で自分の中の今まで知識でしかなかった部分が実践できたり、知らなかったことをたくさん学ぶことができました。
その他の面で、コミュニケーション面に置いて自分の中で1つのブレイクスルーみたいなのがあった気がします。作業が基本ペア作業で行うので一般的な組織に比べてコミュニケーションを撮る機会がかなり多いと思います。その中であまり上手くコミュニケーションを取れない事があると、感じていたのですが、いろいろ考える中で気づきみたいなものがあり少しだけコミュニケーション力がマシになった気がします。
とはいえ、まだまだ上手くできない面がかなりおおく、特に、ビジネスサイド方々などのコンテキストが共有されてない相手とのコミュニケーションに課題を感じているので、来年以降その力をつけていかないとなぁと思っています。
(ただ、自分の強みとして伸ばしたい方向性とは少し違う気もしているのですが…)
ブログを平均して週一ぐらいでは書く
前職の先輩でかなりのブログを書かれる人がいます。その人に触発されてなんとなく、最低週一ぐらいでは書いてみようと思ったのが、今年の3月です。こから42記事書いてるので目的は達成したかなと。
まぁ、今後も週一でゆるゆると書いていこうかなぁとは思います。
やってみて思ったことは以下のことです。
- ブログをネタを探すようになるので、発見が多くなる
- 文章にする事で理解度が上がる
前者に関して、ブログを書くことを意識してるとより多くのことを学べるなぁと感じました。
後者に関しては、ドキュメントを読んでなんとなく理解した気になっても書いてみると結構書けないみたいなのがあったのがたくさんありました。その箇所を自分の言葉で文に起こすことで理解度が断然上がるのを感じました。
本を読む
本を読むのは、新人の頃からずっと続けているのですが今年は少し読む量が減った気がします。 思い出せるだけ羅列してみようと思います。
- Clean Agile
- Amazon Web Services負荷試験入門
- Netty In Action
- Java並行プログラミング
- Real World HTTP 第2版
- イラストでわかるDockerとKubernetes
- マイクロサービスパターン
- 監視入門
- オブジェクト指向UIデザイン
- Linuxで動かしながら学ぶネットワーク入門
- ドメイン駆動設計入門
- みんなのJava
- 新世代Javaプログラミングガイド
- みんなのKotlin
- 7つの習慣
- kubernetesで実践するクラウドネイティブdevops
本屋で買ったのとかで覚えてないの合わせると、多分もうちょいある気がしますが、Amazonの購入履歴やらなんやらで思い出せるのはこんだけです。
こうして眺めてみると、Javaに結構寄ってるのがわかりますね。
来年はもう少し、Linuxやネットワーク、ディスクなどの足回りの知識をもう少し固めたいなーと思いました。
後は、Java以外の言語をもう少しやって、そのパラダイムを学びたいなぁと考えてます。
特に関数型に関しては自分はかなり弱い感じがするので「すごいHaskellたのしく学ぼう!」は読み直したいなと思います。
あとは、Rustなんかも学びが多そうだなぁと思ってます。
来年やりたいこと
来年は、以下のようなことをやろうかなと。
- 継続してブログを書く
- テーマを考えて読む本を選んでみる
- 資格を何か取る
ブログを書くことで、自分の技術力が上がるのは実感できたので今後も続けていたいとおもいます。
本に関しては、振り返りでも思ったのですが今までは興味の向くままに本を選んでいたのですが来年は少し考えながら本を選んでみようかと思いました。本のところでも書いたのですが具体的には以下のような本を意識的に読んでみようかと。
最後の資格に関しては、資格を取ることが目的というよりは資格を取る事でその分野に対する知識を深めることができるのではとは前々から思っていたので、やってみようかと。具体的には、LPICやCKA/CKADあたりに挑戦してみようかなと思ってます。
K6のシナリオを使ってみる
はじめに
K6では負荷試験のシナリオを設定することが可能です。このブログではその機能を簡単に試してみてざっくりとした使い方を把握したいと思います。 このブログではK6をDockerを用いて起動します。また、テスト対象のアプリは以前のK6について書いたブログで利用したものを利用します。helloの文字列を返すだけのAPIです。
やってみる
環境
$ lsb_release -a LSB Version: core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch Distributor ID: Ubuntu Description: Ubuntu 20.04.1 LTS Release: 20.04 Codename: focal $ uname -srvmpio Linux 5.4.0-58-generic #64-Ubuntu SMP Wed Dec 9 08:16:25 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux $ docker version Client: Docker Engine - Community Version: 19.03.12 API version: 1.40 Go version: go1.13.10 Git commit: 48a66213fe Built: Mon Jun 22 15:45:36 2020 OS/Arch: linux/amd64 Experimental: false Server: Docker Engine - Community Engine: Version: 19.03.12 API version: 1.40 (minimum version 1.12) Go version: go1.13.10 Git commit: 48a66213fe Built: Mon Jun 22 15:44:07 2020 OS/Arch: linux/amd64 Experimental: true containerd: Version: 1.2.13 GitCommit: 7ad184331fa3e55e52b890ea95e65ba581ae3429 runc: Version: 1.0.0-rc10 GitCommit: dc9208a3303feef5b3839f4323d9beb36df0a9dd docker-init: Version: 0.18.0 GitCommit: fec3683
シナリオで何ができるか
ざっくり言うとK6のシナリオではVirtual Users(VUs)、イテレーション、スケジュールの詳細な設定を行なうことができます。
シナリオを使うメリットは主に以下のようなものがあります。
- 同じスクリプトに対して複数のシナリオを設定することができる。それぞれは独立したJavaScriptの関数として実行されることによってテストをより整理しやすくフレキシブルに作成することができる。
- それぞれのシナリオがVUや、イテレーション、スケジューリングパターンを独立して利用することができる(この機能はExecutorsによって実装される)。この機能を活用することによってより高度な現実に近いリクエストを送ることができる
- それぞれのシナリオは直列でも並列でも実行可能で、また、それらを混ぜたような実行も可能である
- シナリオごとに違った環境変数やメトリクスのタグをりようすることができる
Executorsって何ぞ?
ExecutorsはK6のエンジンのワークホースです。それぞれのワークホースがVUsをスケジューリングし、それぞれが別で反復実行されます。
Executorsには以下のようなものがあります。
Executor名 | 値 | 説明 |
---|---|---|
Shared iterations | shared-iterations | 固定値分だけ反復実行するExecutor。この固定値は複数のVUsの間でシェアされる |
Per VU iterations | per-vu-iterations | 固定値の値の回数それぞれのVUsがスクリプトを反復実行するExecutor。この固定値は複数VUs間でシェアされない。 |
Constant VUs | constant-vus | 設定された時間内で、できるだけ反復を実行するVUsの固定値 |
Ramping VUs | ramping-vus | 設定された時間内で、できるだけ反復を実行するVUsの数の変数。開始時点でのVUs数や終了時点のVUs数等を設定できる。グローバルのstages と同等のことができる |
Constant arrival rate | constant-arrival-rate | 固定値の数のイテレーションを設定された時間内で実行するExecutor。VUsの数を変動させることでイテレーションレートを保証しようとする |
Ramping Arrival Rate | ramping-arrival-rate | 区切られた時間の中で、変動するイテレーション回数実行するExecutor。Ramping VUsに似ているが、イテレーションのためのもので、VUsの数は動的に変動する |
Externally Controlled | externally-controlled | externally-controlled|K6のREST APIやCLIを通して、実行時に設定を変更することができるExecutor |
すべてのシナリオで共通の一般的な設定項目
K6のシナリオではExecutor固有の設定項目とは別に、共有の設定項目がいくつかあります。
その設定項目を以下にまとめます。
値 | 型 | 説明 | デフォルト値 |
---|---|---|---|
executor* | string | ユニークなExecutorの名前 | - |
startTime | string | シナリオの実行が開始されるべき時間 | 0s |
gracefulStop | string | イテレーションが強制的に終了される前の待ち時間 | 30s |
exec | string | 実行されるJSの関数名 | defualt |
env | object | シナリオで利用される環境変数 | {} |
tags | object | シナリオのタグ | {} |
実際にシナリオを書いていく
基本的な書き方
シナリオの基本的な書き方は以下のようになります。
export let options = { scenarios: { example_scenario: { // 利用するExecutorの名前 executor: 'shared-iterations', // すべてのシナリオで共通の一般的な設定項目 startTime: '10s', gracefulStop: '5s', env: { EXAMPLEVAR: 'testing' }, tags: { example_tag: 'testing' }, // エクセキューターごとの設定項目 vus: 10, iterations: 200, maxDuration: '10s', }, //二個目のExecutor another_scenario: { ... } } }
Executorを設定する際はscenarios
にobjectを複数記述することが可能なようです。それぞれのシナリオのKey(example_scenario
、another_scenario
) などは任意の値を利用することが可能ですが、scenariosセクションの中でユニークな値である必要があります。
ユーザが上昇するシナリオを書く
シナリオについてある程度まとめたところで実際に一つ書いてみたいと思います。
お題は以下のように
- 開始10秒はリクエストを送らない
- 10秒経過あとは時間経過とともにユーザが上昇する
- 開始10秒で一秒間ウェイトを置いてリクエストを送るVUsが10から20まで段階的に上昇
- 開始20秒から30秒でVUsが20から60まで段階的に上昇
- 開始30秒間から10秒かけて段階的にVUsが減少
また、スクリプトは以下のものを利用します。
import http from 'k6/http'; import { sleep } from 'k6'; export default function () { http.get('http://{$APP_IP}:8080/hello'); sleep(1); }
それでは早速書いていきたいと思います。
今回利用するExecutorは Ramping VUs
です。
お題のストーリーを記述すると以下のようになります。
import http from 'k6/http'; import { sleep } from 'k6'; export let options = { scenarios: { ramping_up_scenario: { executor: 'ramping-vus', startVUs:10, stages: [ {duration: '10s', target: 20}, {duration: '10s', target: 60}, {duration: '10s', target: 0}, ] } } }; export default function () { http.get('http://192.168.10.8:8080/hello'); sleep(1); }
上記の設定でスクリプトを実行すると以下のようになります。
$ docker run --rm --network k6net -i loadimpact/k6 run - < k6script.js /\ |‾‾| /‾‾/ /‾‾/ /\ / \ | |/ / / / / \/ \ | ( / ‾‾\ / \ | |\ \ | (‾) | / __________ \ |__| \__\ \_____/ .io execution: local script: - output: - scenarios: (100.00%) 1 scenario, 60 max VUs, 1m0s max duration (incl. graceful stop): * ramping_up_scenario: Up to 60 looping VUs for 30s over 3 stages (gracefulRampDown: 30s, gracefulStop: 30s) running (0m00.8s), 10/60 VUs, 0 complete and 0 interrupted iterations ramping_up_scenario [ 3% ] 10/60 VUs 00.8s/30.0s running (0m01.8s), 11/60 VUs, 10 complete and 0 interrupted iterations ramping_up_scenario [ 6% ] 11/60 VUs 01.8s/30.0s running (0m02.8s), 12/60 VUs, 21 complete and 0 interrupted iterations ramping_up_scenario [ 9% ] 12/60 VUs 02.8s/30.0s (中略) running (0m08.8s), 18/60 VUs, 108 complete and 0 interrupted iterations ramping_up_scenario [ 29% ] 18/60 VUs 08.8s/30.0s running (0m09.8s), 19/60 VUs, 126 complete and 0 interrupted iterations ramping_up_scenario [ 33% ] 19/60 VUs 09.8s/30.0s running (0m10.8s), 23/60 VUs, 145 complete and 0 interrupted iterations ramping_up_scenario [ 36% ] 23/60 VUs 10.8s/30.0s (中略) running (0m17.8s), 51/60 VUs, 389 complete and 0 interrupted iterations ramping_up_scenario [ 59% ] 51/60 VUs 17.8s/30.0s running (0m18.8s), 55/60 VUs, 440 complete and 0 interrupted iterations ramping_up_scenario [ 63% ] 55/60 VUs 18.8s/30.0s running (0m19.8s), 59/60 VUs, 494 complete and 0 interrupted iterations ramping_up_scenario [ 66% ] 59/60 VUs 19.8s/30.0s running (0m20.8s), 58/60 VUs, 553 complete and 0 interrupted iterations ramping_up_scenario [ 69% ] 58/60 VUs 20.8s/30.0s (中略) running (0m28.8s), 12/60 VUs, 853 complete and 0 interrupted iterations ramping_up_scenario [ 96% ] 12/60 VUs 28.8s/30.0s running (0m29.8s), 06/60 VUs, 865 complete and 0 interrupted iterations ramping_up_scenario [ 99% ] 06/60 VUs 29.8s/30.0s running (0m30.1s), 00/60 VUs, 871 complete and 0 interrupted iterations ramping_up_scenario ✓ [ 100% ] 00/60 VUs 30s data_received..............: 103 kB 3.4 kB/s data_sent..................: 77 kB 2.5 kB/s http_req_blocked...........: avg=52.36µs min=1.48µs med=8.59µs max=12.99ms p(90)=14.6µs p(95)=267.27µs http_req_connecting........: avg=37.05µs min=0s med=0s max=12.96ms p(90)=0s p(95)=222.62µs http_req_duration..........: avg=1.49ms min=257.57µs med=1.37ms max=11.37ms p(90)=2.59ms p(95)=3.11ms http_req_receiving.........: avg=87.02µs min=11.56µs med=83.98µs max=654.79µs p(90)=144.41µs p(95)=163.26µs http_req_sending...........: avg=41.95µs min=6.09µs med=36.36µs max=719.74µs p(90)=67.04µs p(95)=106.4µs http_req_tls_handshaking...: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s http_req_waiting...........: avg=1.36ms min=229.33µs med=1.22ms max=11.21ms p(90)=2.42ms p(95)=2.95ms http_reqs..................: 871 28.917444/s iteration_duration.........: avg=1s min=1s med=1s max=1.01s p(90)=1s p(95)=1s iterations.................: 871 28.917444/s vus........................: 6 min=6 max=59 vus_max....................: 60 min=60 max=60
最初に注目するのは以下の出力です。
scenarios: (100.00%) 1 scenario, 60 max VUs, 1m0s max duration (incl. graceful stop): * ramping_up_scenario: Up to 60 looping VUs for 30s over 3 stages (gracefulRampDown: 30s, gracefulStop: 30s)
シナリオが読み込まれ、そのマックスVUsが60であることがわかります。
また、3つのステージに分けてVUsが上昇していくことも出力されています。実際に最後の10秒は減少ですがK6の出力ではそこは読み取れないみたいですね。
その後各リクエストの結果を読み込んでみると、最初の10秒程度でVUsがユーザが20まで上昇し、次の10秒で20から60まで上昇して(増加数が20以降から変わってます)そして、最後の10秒でユーザが0まで減少しています。
お題はクリアできたみたいですね。
参考資料
K6の負荷テスト結果をGrafana+InfluxDBで可視化する
はじめに
K6は実行結果を様々なプラットフォームに送信して可視化することが可能です。
その中の選択肢としてGrafana+InfluxDBを利用することが可能なようなので、試してみたいと思います。
このブログでは、それぞれのツールはDockerを利用して立ち上げることを想定しています。公式のdocker-composeを用意されていますが今回は利用せずにやってみようと思います。
また、今回使うK6のスクリプトは以前のブログで最後に作成したものを利用しようと思います。簡単にまとめると10のVUsで一秒間の1回10秒リクエストを送るスクリプトを使います。また、テスト対象のアプリも同様に以前のブログで利用したものを利用します。hello
の文字列を返すだけのAPIです。
やってみる
動作環境
$ lsb_release -a LSB Version: core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch Distributor ID: Ubuntu Description: Ubuntu 20.04.1 LTS Release: 20.04 Codename: focal $ uname -srvmpio Linux 5.4.0-58-generic #64-Ubuntu SMP Wed Dec 9 08:16:25 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux $ docker version Client: Docker Engine - Community Version: 19.03.12 API version: 1.40 Go version: go1.13.10 Git commit: 48a66213fe Built: Mon Jun 22 15:45:36 2020 OS/Arch: linux/amd64 Experimental: false Server: Docker Engine - Community Engine: Version: 19.03.12 API version: 1.40 (minimum version 1.12) Go version: go1.13.10 Git commit: 48a66213fe Built: Mon Jun 22 15:44:07 2020 OS/Arch: linux/amd64 Experimental: true containerd: Version: 1.2.13 GitCommit: 7ad184331fa3e55e52b890ea95e65ba581ae3429 runc: Version: 1.0.0-rc10 GitCommit: dc9208a3303feef5b3839f4323d9beb36df0a9dd docker-init: Version: 0.18.0 GitCommit: fec3683
InfluxDB を立ち上げる
最初に、諸々適当なDockerのネットワークを作成しておきます。
$ docker network create k6net
まずは、InfluxDBを立ち上げます。
InfluxDBは公式のイメージを利用して立ち上げます。
データの永続化の問題もありますが、今回は試したいだけなので、ボリュームを当てたりはしません。
$ docker run --network k6net --name influxdb -d influxdb
K6の実行結果をInfluxDBに保存する
InfluxDBが立ち上がったところで早速テストを実行していきたいと思います。
あたらためて書くと今回利用するスクリプトとアプリは以前のブログで利用したものを再利用します。
K6でテストを行ってその実行結果をInfluxDBに送信するには次のコマンドを実行します。
$ docker run --rm --network k6net -i loadimpact/k6 run --out influxdb=http://influxdb:8086/k6db - < k6script.js
ここで重要なのは--out influxdb=http://influxdb:8086/k6db
のところです。K6では様々なOUTPUTのプラグイン が用意されており、--out
の引数に指定するだけで簡単に測定結果を送信することができます。
スクリプトの実行が完了されると、InfluxDBにk6db
という名前でDBが作成されます。
Grafanaでデータを可視化する
ここまででInfluxDBにテスト結果の格納することまでができたので、最後にその可視化を行っていきます。
まずは、Grafanaを立ち上げます以下のコマンドを叩きます。
docker run --network=k6net -d -p 3000:3000 grafana/grafana
起動するとlocalhost:3000
でアクセスすることができます。
初回、ユーザ/パスワードはadmin/admin
です。
次にInfluxDBを起動します。
$ docker run --name influxdb quay.io/influxdb/influxdb:v2.0.3
Grafanaがたち上がったら、InfluxDBと連携させます。
Grafanaのサイドウィンドウから、[歯車]⇨[Add Data Source]を選択し、サーチボックスにInfluxDB
と入力してInfluxDBの項目を選択します。
設定は以下のように入力します。
Database
の項目はK6の実行の際に作成されたものを利用します。
これで連携は完了です。
あとはいい感じにダッシュボードを作成すればよいです。
(ちなみに以下の画像はk6 Load Testing Resultsのテンプレートを利用してます)
InfluxDBのオプション
スクリプト実行時のInfluxDBに対するオプションがいくつかあるので以下にまとめます。 オプションは実行時の環境変数として設定することで利用することができます。
環境変数 | 説明 |
---|---|
K6_INFLUXDB_USERNAME | InfluxDBのユーザネーム |
K6_INFLUXDB_PASSWORD | InfluxDBのパスワード |
K6_INFLUXDB_INSECURE | tureになってる場合httpsの証明書検証をスキップする |
K6_INFLUXDB_TAGS_AS_FIELDS | K6のメトリクスの中で、インデックスできないフィールドをコンマ区切りで指定する。オプションでインデックスのvu:int のように型を指定できる。方はint 、bool 、float 、string がある |
参考資料
負荷テストツールK6を試す
はじめに
負荷テストのツールを何かしら勉強したいなと思って、K6というツールがあるというのを知って良さそうに感じたのでとりあえず動かしてみるところまでやってみようと思います。
K6とは
K6はLoad Impactという負荷テストのサービスを作っていた会社が、その経験を活かして作ったOSSみたいです。その機能に以下のようなものがあります。
- ES6でのスクリプティング
- テストも設定オプションもJSのコードで記述できる
- CIでの自動化サポート(アサーションや成功/失敗を判定するしきい値の設定が可能)
- HTTP/1.1、2、WebSocket、gRPCのサポート
- TLS機能のサポート
- ビルドインのHARコンバーター
- InfluxDB (+Grafana)やJson等を利用したメトリクスの公開
- その他様々な機能
- クッキー
- 暗号化
- メトリクスのカスタマイズ
- エンコーディング
- HTMLフォーム
Virtual User
K6にはVirtual Users(VUs)という概念があります。
Virtual Usersはそれぞれが分けられた環境で、並行でテストスクリプトを実行してくれます。
また、Virtual Userはリアルユーザの真似をするようなリクエストを送ることも可能みたいです。
使ってみる
早速使ってみたいと思います。
K6を実行するためにはそのツールセットをインストールする必要がありますが今回はDockerを利用してK6を実行したいと思います。
環境
実行環境は以下の通りです。
$ uname -srvmpio Linux 5.4.0-58-generic #64-Ubuntu SMP Wed Dec 9 08:16:25 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux $ lsb_release -a LSB Version: core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch Distributor ID: Ubuntu Description: Ubuntu 20.04.1 LTS Release: 20.04 Codename: focal $ docker version Client: Docker Engine - Community Version: 19.03.12 API version: 1.40 Go version: go1.13.10 Git commit: 48a66213fe Built: Mon Jun 22 15:45:36 2020 OS/Arch: linux/amd64 Experimental: false Server: Docker Engine - Community Engine: Version: 19.03.12 API version: 1.40 (minimum version 1.12) Go version: go1.13.10 Git commit: 48a66213fe Built: Mon Jun 22 15:44:07 2020 OS/Arch: linux/amd64 Experimental: true containerd: Version: 1.2.13 GitCommit: 7ad184331fa3e55e52b890ea95e65ba581ae3429 runc: Version: 1.0.0-rc10 GitCommit: dc9208a3303feef5b3839f4323d9beb36df0a9dd docker-init: Version: 0.18.0 GitCommit: fec3683
事前準備
インストール
前述の通り、K6を実行するにはそのツールセットをインストールする必要があります。様々な環境でインストール、動作させることが可能です。
Linux(Ubuntu/Debian)
DabianベースのLinuxの場合はプライベートのdebリポジトリからインストールすることができます。
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys 379CE192D401AB61 echo "deb https://dl.bintray.com/loadimpact/deb stable main" | sudo tee -a /etc/apt/sources.list sudo apt-get update sudo apt-get install k6
Docker
Dockerイメージloadimpact/k6が公開されており、そのイメージを用いたテストの実行も可能です。
docker pull loadimpact/k6
Mac
Macではbrewを使ってインストールすることができるみたいです。
brew install k6
バイナリでのインストール
GitHubのページからバイナリを取得することも可能です。
リンクからバイナリをダウンロードしてPATHを通して通してください。
テストするアプリを用意する
インストールが終わったところで、テスト対象のアプリを準備しておきます。
今回は環境が用意できればなんでもよいので個人的に1番使い慣れてるSpringを使ってアプリを作ろうと思います。
Spring Initializrを使ってアプリを作成します。
設定は以下のように
出来上がったプロジェクトをエディタ等で開き作成されているApplication
クラスを以下のように修正します。
@SpringBootApplication @RestController public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @GetMapping("/hello") public String hello(){ return "hello"; } }
アプリを起動してcURLでエンドポイントへアクセスするとHelloという文字列が返ってきます。
$ curl localhost:8080/hello hello
テストコードを書く
それでは準備ができたところで実際にテストスクリプトを書いて行きたいと思います。 K6でスクリプトを記述する場合基本的なスクリプトの構成は以下のようになります。
// init code export default function() { // vu code }
K6ではdefalt
functionを定義してやる必要があり、これが一般的に言うメイン関数のようなテストコードのエントリーポイントとなります。
defalt
functionの中のコードのことをVU code
と呼び、外側のコードのことをinit code
と呼びます。
ここで、Virtual Usersを利用して、並行化されるのはVU code
であり、init code
は一回のみ実行されます。VU code
の中ではHTTPなどのリクエストを送信しそのメトリクスを計測することは可能ですが、ローカルのファイルシステムを読み込んだり、他のモジュールを呼んだりすることはできません。それらはinit code
で実行することが必要です。
以上のことを踏まえて、リクエストを一回実行して1秒だけまつスクリプトを記述します。
k6script.js
import http from 'k6/http'; import { sleep } from 'k6'; export default function () { http.get('http://${HOST_IP}:8080/hello'); sleep(1); }
テストコードが記述できたので早速実行してみます。 実行は以下のようにコマンドを叩きます。
$ docker run -i loadimpact/k6 run - < ${SCRIPT_NAME}.js
今回の場合は次のようになります。
$ docker run -i loadimpact/k6 run - < k6script.js /\ |‾‾| /‾‾/ /‾‾/ /\ / \ | |/ / / / / \/ \ | ( / ‾‾\ / \ | |\ \ | (‾) | / __________ \ |__| \__\ \_____/ .io execution: local script: - output: - scenarios: (100.00%) 1 scenario, 1 max VUs, 10m30s max duration (incl. graceful stop): * default: 1 iterations for each of 1 VUs (maxDuration: 10m0s, gracefulStop: 30s) running (00m01.0s), 1/1 VUs, 0 complete and 0 interrupted iterations default [ 0% ] 1 VUs 00m01.0s/10m0s 0/1 iters, 1 per VU running (00m01.0s), 0/1 VUs, 1 complete and 0 interrupted iterations default ✓ [ 100% ] 1 VUs 00m01.0s/10m0s 1/1 iters, 1 per VU data_received..............: 118 B 114 B/s data_sent..................: 89 B 86 B/s http_req_blocked...........: avg=1.67ms min=1.67ms med=1.67ms max=1.67ms p(90)=1.67ms p(95)=1.67ms http_req_connecting........: avg=1.61ms min=1.61ms med=1.61ms max=1.61ms p(90)=1.61ms p(95)=1.61ms http_req_duration..........: avg=2ms min=2ms med=2ms max=2ms p(90)=2ms p(95)=2ms http_req_receiving.........: avg=83.92µs min=83.92µs med=83.92µs max=83.92µs p(90)=83.92µs p(95)=83.92µs http_req_sending...........: avg=72.25µs min=72.25µs med=72.25µs max=72.25µs p(90)=72.25µs p(95)=72.25µs http_req_tls_handshaking...: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s http_req_waiting...........: avg=1.84ms min=1.84ms med=1.84ms max=1.84ms p(90)=1.84ms p(95)=1.84ms http_reqs..................: 1 0.96781/s iteration_duration.........: avg=1s min=1s med=1s max=1s p(90)=1s p(95)=1s iterations.................: 1 0.96781/s vus........................: 1 min=1 max=1 vus_max....................: 1 min=1 max=1
コンソールに実行結果が出力されているのがわかります。
ちょっとだけ細かく見てみると、送信したデータ量(B/s)、受信したデータ量(B/s)、リクエストでブロックされた時間、接続にかかった時間、等々の平均値やパーセントタイルの値などが表示されていますね。
VirtualUsersを追加して、テストの実行時間を変更する
次に、Virtual Usersを追加してリクエストを並行で送ってみるようにしてみたいと思います。 方法は以下の4つほどあります
- テスト実行時オプション(
--vus
and-duration
)として渡す - 環境変数として定義しておく
.json
の設定ファイルを作成し、実行時に読み込ませるinit call
にoptions
のJsonオブジェクトを定義する
今回は4つ目のJsonオブジェクトを定義する方でやろうと思います。
具体的には、「10 users で10秒間リクエストを送る
」ように、先程のコードを以下のように書き換えます。
import http from 'k6/http'; import { sleep } from 'k6'; export let options = { vus: 10, duration: '10s', }; export default function () { http.get('http://${HOST_IP}:8080/hello'); sleep(1); }
option.vus
とoption.duration
をそれぞれinit code
の領域で設定しています。
テストを実行すると先ほどと違って、10並行で、10秒間リクエストが送られていることがわかります。
$ docker run -i loadimpact/k6 run - < k6script.js /\ |‾‾| /‾‾/ /‾‾/ /\ / \ | |/ / / / / \/ \ | ( / ‾‾\ / \ | |\ \ | (‾) | / __________ \ |__| \__\ \_____/ .io execution: local script: - output: - scenarios: (100.00%) 1 scenario, 10 max VUs, 40s max duration (incl. graceful stop): * default: 10 looping VUs for 10s (gracefulStop: 30s) running (01.0s), 10/10 VUs, 0 complete and 0 interrupted iterations default [ 9% ] 10 VUs 00.9s/10s running (02.0s), 10/10 VUs, 10 complete and 0 interrupted iterations default [ 19% ] 10 VUs 01.9s/10s (略) default [ 99% ] 10 VUs 09.9s/10s running (10.1s), 00/10 VUs, 100 complete and 0 interrupted iterations default ✓ [ 100% ] 10 VUs 10s data_received..............: 12 kB 1.2 kB/s data_sent..................: 8.9 kB 880 B/s http_req_blocked...........: avg=32.42µs min=2.11µs med=11.88µs max=855.09µs p(90)=49.84µs p(95)=172.34µs http_req_connecting........: avg=19.37µs min=0s med=0s max=814.16µs p(90)=4.29µs p(95)=144.09µs http_req_duration..........: avg=3.67ms min=704.32µs med=3.94ms max=8.48ms p(90)=6.71ms p(95)=7.35ms http_req_receiving.........: avg=99.37µs min=16.55µs med=103.15µs max=223.89µs p(90)=173.99µs p(95)=182.34µs http_req_sending...........: avg=49.94µs min=9.15µs med=47.83µs max=334.41µs p(90)=70.58µs p(95)=87.42µs http_req_tls_handshaking...: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s http_req_waiting...........: avg=3.52ms min=658.68µs med=3.75ms max=8.3ms p(90)=6.52ms p(95)=6.92ms http_reqs..................: 100 9.895886/s iteration_duration.........: avg=1s min=1s med=1s max=1s p(90)=1s p(95)=1s iterations.................: 100 9.895886/s vus........................: 10 min=10 max=10
ここまでで、ざっくりとK6の利用方法は把握できた気がします。
テスト結果の出力
テスト結果はデフォルトではコンソールに出力されますが、以下のような出力先を変更するプラグインがいくつか用意されています。
この他にもNew Relicなどもあります。
オプション
最後に、その他にどんなことができるのかざっくり把握するためオプションで気になったところをいくつかまとめたいと思います。
全量はここで確認することができます。
Duration
Durationでは、テストが実行される時間を設定することができます。
実行時間中はそれぞれのVUsでスクリプトがループして呼ばれ続けます。
例えば以下のような設定の場合3分間vu code
がループで実行され続けます。
export let options = { duration: '3m', };
Iterations
IterationsオプションはVUsがそれぞれ何度スクリプトを実行するかを指定することができます。
この値は、VUsで単純に割り算され、例えば以下のような設定である場合10Vusがそれぞれ10回のリクエストを実行します。
export let options = { vus: 10, iterations: 100, };
No VU Connection Reuse
No VU Connection ReuseではTCPをVUsの実行の中でTCPコネクションを再利用するかをbooleanで設定できます。 設定は以下のように行います。
export let options = { noVUConnectionReuse: true, };
RPS
RPSではVUsをまたいだ秒間での最大リクエスト数を設定することができます。
export let options = { rps: 500, };
Scenarios
Scenariosでは、1つ以上の実行パターンを定義することができます。シナリオは以下のような特徴があります。
- 同じスクリプトに対して複数のシナリオを定義できる
- それぞれのシナリオは個別のVUsやスケジュールパターンを持つことができる
- それぞれのシナリオは直列でも並行でも実行することができる
- シナリオごとに違った環境変数やメトリクスタグをセットすることができる
より詳細な情報はこちらを参考にしてください。
利用イメージは以下のようになります。
export let options = { scenarios: { my_api_scenario: { // arbitrary scenario name executor: 'ramping-vus', startVUs: 0, stages: [ { duration: '5s', target: 100 }, { duration: '5s', target: 0 }, ], gracefulRampDown: '10s', env: { MYVAR: 'example' }, tags: { my_tag: 'example' }, }, }, };
Stages
StagesではVUsの上昇や下降を制御することができます。
例えば以下のような設定の場合、最初の3分間でVUsを1から10まで上昇させ5分間そのままリクエストを続けその他とに10から35ユーザ10分間で増やしていき、最後の3分でVUsを0まで減らします。
export let options = { stages: [ { duration: '3m', target: 10 }, { duration: '5m', target: 10 }, { duration: '10m', target: 35 }, { duration: '3m', target: 0 }, ], };
参考資料
Nettyの主要コンポーネントを整理する
はじめに
Nettyを勉強していると、EventLoopやHandlerなど様々なコンポーネントがでてきて、混乱したのでちょっと図でまとめてみようと思います。このブログでは以下のようなNettyの主要コンポーネント(クラス or インターフェース)の役割をまとめそれぞれの関係を図で整理しようと思います。
- EventLoop、EventLoopGroupe
- Channel
- ChannelHandler、ChannelPipeline
Netty In Actionなどの書籍を参考にしていますが、あくまで自分の理解をまとめますので、正しい理解を行いたい場合は書籍やドキュメント実装を当たるようにしてください。
非同期I/O? 同期I/O?
前提としてJavaのネットワーク通信における、同期I/Oと非同期I/Oについてちょっと整理してみようと思います。
Javaにおけるネットワーキングプログラムは、はじめ、ネイティブのソケットライブラリーを使った同期的なAPI(java.net
)が用意されていました。その、同期的なAPIを使った複数通信を扱う際のイメージは以下のようになります。
このような同期I/Oでは、ソケットごとにスレッドが作成され、それらのスレッドはI/Oオペレーションを行っている間はスレッドは休眠状態に入ります。そうなるとリソースの無駄が発生してしまいます。更に、それぞれのスレッドはOSによって決められたサイズのメモリを確保しますのでそうったオーバーヘッドも発生してしまいます。
上記のような同期I/Oに対して、Java 1.4らNIO(java.nio
)が導入されました。NIOでは、システムのイベント通知APIを用いて、データの読み取りや書き取りが行われたことを通知を受けることで、非同期的な通信を可能としています。
NIOを用いたJavaにおけるネットワーク通信は以下のようになります。
class java.nio.channels.Selector
は複数の非同期ソケットが処理可能な状態にあるかのイベント通知を受け、ハンドリングします。同期IOに比べソケットに対して、1つだけののスレッドが割り当てられているため、コンテクストスイッチなどにおける、CPUリソースの無駄遣いの削減や、メモリ消費の削減に繋がり、また、スレッドも休眠せずI/O待ちの際に別のタスクを消費することができます。
Nettyとは
NettyはJavaのNIOの上に作られるネットワークアプリケーションを作るためのフレームワークです。NIOを直接使うよりも、より簡単にすばやくアプリケーションの開発を行なうことができます。また、プーリングと再利用によってJavaのAPIを直接使うよりも低いレイテンシ、高スループットを実現してします。
NettyはNIO以外のライブラリー等には依存しておらず、3.x系ならJava 5、4.x系ならJava 6以上(一部、オプションの機能を利用する場合は7以上が求められます)で動きます。
ちなみにNIOに対して、同期的I/OをNettyではOIOと呼んでいるようです。
Nettyのコンポーネントと全体像
Nettyの主要コンポーネントを一枚の図のような関係になります。 以降でそれぞれのコンポーネントについて見ていきたいと思います。
Channel
まず、NettyのChannelについてですが、これは、Socketを直接使う場合の複雑さを削減するためのコンポーネントで、 Reed、Write、Connect、BindなどのI/Oオペレーションを提供しています。
NettyではChannelインターフェースを用意しており、以下のような実装が存在します。
- NioSocketChannel
- NIO selectorベースの実装
- OioSocketChannel
- 古いブロッキングI/Oを利用した実装
EventLoop、EventLoopGroupe
EventLoopGoupeは1つ以上のEventLoopを管理し、新しく作られたChannelにアロケートする役割を持っています。1つのChannelは1つのEventLoopが割り当てられ、そのライフサイクルの中で登録されたChannelのI/Oオペレーションをハンドルします。通常、1つのEventLoopに複数のChannelが登録されますが、実装によってはEventLoopとChannelが一体一になるものもあります。
EventLoopの実装には以下のようなものがあります。
- NioEventLoop
- ChannelをSelectorに登録して、それらの多重化を行います
- ThreadPerChannelEventLoop
- ChannelごとにEventLoopが割り当てられる同期OIOチャンネルをハンドリングします
ChannelHandler、ChannelPipeline
ChannelHandlerはChannelの状態の変化、データの取得などのネットワークイベントによって、フックされるメソッドを持ったコンポーネントです。Nettyでは、アプリケーションのロジックはこのHandlerに記述されます。 ソケットを外としたときの、データフローの方向によってInboundとOutboundそれぞれのハンドラーを作成することが可能で、 ChannelHandlerはChannelPipelineに登録され、Piplineの中で各方向のHandlerがチェインされていきます。このときHandlerはPiplelineに登録された順番でチェインされます。
通常、Nettyをクライアントとして利用した場合はOutboundのデータフローを通り、サーバとして利用した場合はInboundのデータフローを通ることになります。
同じ、Pipeline内でInboundとOutbound両方のHandlerが登録されていた場合でも基本的にはそれぞれが交わってチェインされることがありません。ただし、InboundHanderでも、ChannelHandlerContext(後述)からChannelを取り出して直接ソケットへの書き込みを行った際にはOutboundのハンドラーがチェインされます。また、ChannelPilpelineへのハンドラーは動的に削除や追加を行なうことも可能です。
ChannelHandlerとChannelPipelineの説明をする上で、もう一つ欠かせないのが、ChannelHandlerContextです。ChannelHandlerContextは同じ方向のデータフローのHanlder群とChannelPipelineとのひも付きや、Channelそのものを持つコンテクストです。また、アプリケーションはこのコンテクストを通してHandlerはデータの送信を行ったり、他のハンドラーを呼び出したりすることが可能となります。Nettyでは2種類のデータ送信方法があり、前述したように直接Channelに書き込む方法と、このCannelHandlerContextを通して送信する方法があります。前者の場合はOutboundのHandlerがフックされ後者の場合はされません。
次の図はHandler群とChannelPipeline、そして、その関係性を示しています。
図に書いてあるとおり、HandlerはChannelHandlerContextを通して、次のハンドラーへのチェインを行います。
終わりに
細かいところで言うと、Nettyをクライアントとして利用する場合とサーバとして利用する場合でEventLoopの数が違ったりはするですが、ざっとまとめた、Nettyの全体像はこんな感じになっていると理解しています。
参考資料
NettyのChannelInboundHandlerとChannelOutboundHandleについてまとめる
はじめに
Netty In Actionを読んでいて、それぞれがのハンドラーがどのどのタイミングで実行されるかが分かりづらかったので、自身の頭の中の整理を目的に自分の理解をまとめて見ようと思います。
Inbound? OutBound?
ChannelPipelineでは、ソケットとアプリケーションを両端として、それぞれの方向に流れるデータフローをInboundとOutBoundと呼んでいます。
- ソケット⇨アプリケーション : Inbound
- アプリケーション⇨ソケット: Outbound
また、それぞれのデータフローのイベントに対して、ChannelPipelineには別の種類のハンドラーを登録することが可能で、そのハンドラーを実装するためのインターフェースが、ChannelInboundHandler
とChannelOutboundHandler
です。その名の通りそれぞれがソケットに空のInboundとOutboundのイベントで発火されます。
それぞれの方向で基本的に、Hndler間での関係性は無く、ChannelHandlertContextを通してお互いに相互作用はしません。
ChannelInboundHandler
ChannelInboundHandler
のデータに対するイベントで発火されるメソッドが定義されたInterfaceです。具体的には以下のようなものが定義されています。
メソッド名 | 説明 |
---|---|
channelRegistered | EventLoopとChannelHandlertContextがChannelに登録され、何かのI/Oのハンドルが可能となった際に実行される |
channelUnregistered | EventLoopとChannelHandlertContextがChannelから解除され、なにかのI/Oのハンドルが不可能となった際に実行される |
channelActive | ChannelのChannelHndlerContextがアクティブ、チャンネルが接続もしくは、バインドされ準備ができた状態になった際に実行される |
channelInactive | ChannelがActive状態ではなくなった際際に実行される。リモートの通信先と接続がキレている |
channelReadComplete | readオペレーションが終わった際に実行される |
channelRead | データがChannelから読み出された際に実行される |
channelWritabilityChanged | ChannelのWritabilityの状態が変わった際に呼び出される。ユーザは書き込みがが終わっていないことを確認できたり、書き込み可能になった際に書き込みを行なうようにすることができる。その確認を行なうために、Channelのメソッドである、isWritable() メソッドを利用することができる。 |
userEventTriggered | ChannelnboundHandler.fireUserEventTriggered()が呼ばれた際に実行される。 |
exceptionCaught | 例外が投げられた際に実行される。 |
例えば以下のようなハンドラーをChannelPipelineに登録します。
@ChannelHandler.Sharable public class EchoInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelRegistered inbound handler called"); super.channelRegistered(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelUnregistered inbound handler called"); super.channelUnregistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive inbound handler called"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelInactive inbound handler called"); super.channelInactive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("channelRead inbound handler called"); //送信されたデータをコンソールに出力してそのまま返す。 ByteBuf input = (ByteBuf) msg; System.out.println(input.toString(CharsetUtil.UTF_8)); ctx.write(input); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("channelReadComplete inbound handler called"); //読み込みが終了の際にコネクションをクローズする ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { super.userEventTriggered(ctx, evt); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { super.channelWritabilityChanged(ctx); } }
TCPソケットの通信でaaa
というデータを送った際には以下のような出力が標準出力にされます。channelActive
までが、コネクションを確立されるまでに出力され、
channelRegistered inbound handler called channelActive inbound handler called channelRead inbound handler called aaa channelReadComplete inbound handler called channelInactive inbound handler called channelUnregistered inbound handler called
userEventTriggered
やchannelWritabilityChanged
などは、前述の表のようなタイミングで実行されるため、今回のケースでは実行されないようです。
ChannelOutboundHandler
Inboundに対して、ChannelOutboundHandler
に関しては、以下のようなメソッドが定義されています。
メソッド名 | 説明 |
---|---|
bind(ChannelHandlerContext, SocketAddress,ChannelPromise) | ローカルアドレスにバインドがリクエストされた際に実行される |
connect(ChannelHandlerContext, SocketAddress,SocketAddress,ChannelPromise) | リモートの通信先にChannelと接続のリクエストの際に実行される |
disconnect(ChannelHandlerContext, ChannelPromise) | Channelのリモートの通信先とのコネクションを切断するリクエストを送る際に実行されます |
close(ChannelHandlerContext,ChannelPromise) | Channelのクローズのリクエストの際に |
disconnect(ChannelHandlerContext, ChannelPromise) | EventLoopからChannelの登録解除がリクエストされた際に実行されます。 |
read(ChannelHandlerContext) | Channelのリードがリクエストされた際に実行されます |
flush(ChannelHandlerContext) | Channelのキューに溜まったデータをフラッシュのリクエストの際に実行されます |
write(ChannelHandlerContext,Object, ChannelPromise | Channelを通して書き込みがリクエストされる際に実行されます |
例えば以下の2つのようなハンドラーをChannelPipelineに登録します。
@ChannelHandler.Sharable public class EchoOutBoundHandler extends ChannelOutboundHandlerAdapter { @Override public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { System.out.println("bind outbound handler called"); super.bind(ctx, localAddress, promise); } @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { System.out.println("connect outbound handler called"); super.connect(ctx, remoteAddress, localAddress, promise); } @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { System.out.println("disconnect outbound handler called"); super.disconnect(ctx, promise); } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { System.out.println("close outbound handler called"); super.close(ctx, promise); } @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { System.out.println("deregister outbound handler called"); super.deregister(ctx, promise); } @Override public void read(ChannelHandlerContext ctx) throws Exception { System.out.println("read outbound handler called"); super.read(ctx); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("write outbound handler called"); super.write(ctx, msg, promise); } @Override public void flush(ChannelHandlerContext ctx) throws Exception { System.out.println("flush outbound handler called"); super.flush(ctx); } }
public class EchoClient extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 通信先にhelloのメッセージを送信 ctx.writeAndFlush(Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8)); super.channelActive(ctx); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { // 通信先からのレスポンスを表示 System.out.println(msg.toString(CharsetUtil.UTF_8)); } }
その状態でNettyをクライアントとして実行し、通信を行なうとコンソールに以下のような出力が行われます。
connect outbound handler called write outbound handler called flush outbound handler called read outbound handler called hello read outbound handler called read outbound handler called