非Reactiveな非同期処理をFluxで扱う

はじめに

Reactive Springを読んでいて、Flux.create()を使えば非Reactiveな非同期のイベントをReactiveのProducerにアダプトすることができるみたいなことが書かれていたので、実際に使って見て理解を深めたいと思います。

Flux.create()?

Flux.create(Consumer<? super FluxSink> emitter)プログラマティックにFluxを生成するためのFactory で、非同期や同期で生成した値をFluxSink APIを通してエミットすることができます。このFactoryを使えばJMS Brockerのイベントやマルチスレッドアプリケーションで生成した値をエレメントとしてエミットすることができます。

実際にやってみる

それでは早速やっていきたいと思います。
タイトルの通り今回はTreadを作って非同期に実行されるスレッドからエレメントをエミットしたいと思います。

環境

$ java --version
openjdk version "15" 2020-09-15
OpenJDK Runtime Environment (build 15+36-1562)
OpenJDK 64-Bit Server VM (build 15+36-1562, mixed mode, sharing)


$ mvn --version
Apache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 15, vendor: Oracle Corporation, runtime: /home/yuya-hirooka/.sdkman/candidates/java/15-open
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-59-generic", arch: "amd64", family: "unix"

Pom

今回は、ReactorとJunit5を利用したいので、Pomには以下の設定を追加しました。

    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.7.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.7.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>2020.0.2</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
            <plugin>
                <artifactId>maven-failsafe-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
        </plugins>
    </build>

コードを書く

準備が整ったところで早速コードを書いていきます。
実際のコードは以下のとおりとなります。

public class FluxCreateTest {

    private final ExecutorService es = Executors.newFixedThreadPool(1);

    @Test
    void exec() {
        System.out.println("Thread Name = " + Thread.currentThread().getName());
        Flux<String> values = Flux.create(emitter -> this.execTask(emitter, 5));

        StepVerifier
                .create(values.doFinally(signalType -> this.es.shutdown()))
                .expectNext("exec number: 1")
                .expectNext("exec number: 2")
                .expectNext("exec number: 3")
                .expectNext("exec number: 4")
                .expectNext("exec number: 5")
                .verifyComplete();
    }

    private void execTask(FluxSink<String> stringFluxSink, int maxNum) {
        this.es.submit(() -> {
            System.out.println("Thread Name = " + Thread.currentThread().getName());

            AtomicInteger num = new AtomicInteger();

            while (num.get() < maxNum) {
                String emitValue = "exec number: " + num.incrementAndGet();
                System.out.println("emitValue = " + emitValue);
                stringFluxSink.next(emitValue);

                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            stringFluxSink.complete();
        });
    }
}

実行するとテストが成功しコンソールに以下の出力がされます。

Thread Name = main
Thread Name = pool-1-thread-1
emitValue = exec number: 1
emitValue = exec number: 2
emitValue = exec number: 3
emitValue = exec number: 4
emitValue = exec number: 5

上記のコードでは、ExecutorServiceによって作成されたpool-1-tread-1という名前のスレッドで、1秒に1回、exec number: *というStringをエミットしています。
Flux.create()は引数としてFluxSinkを受け取るConsumerを登録することができ、Signalは受け取ったFluxSink APIを通して発信することができます。

ここで、ダウンストリームのConsumerがオーバーフローした際のハンドリングはOverflowStrategy.BUFFERが設定されており、デフォルトではバッファする挙動となります。
この、ストラテジーは変更することが可能で、その場合は、createの第二引数にOverflowStrategyEnumを渡してやります。

2020年振り返り

はじめに

なんとなくいろんな人の振り返りみて、自分も振り返ってみようかとふと思ったので書いてみようかと。

ざっくり今年やったこと、あったこと

今年やったとことあったことをまとめると、以下のようになるかと思います。

  • 転職して一年
  • ブログを平均して週一ぐらいでは書く
  • 本を読む

転職して一年

先月の11月で今の会社に来て一年が経ちました。もともとは、転職ドラフトというサイトで声をかけていただいて今に至りますが、気がつけば一年たってました。
前職とは働き方や開発のやり方が全く違い、いろいろ学びが多かったと思います。
今の組織は、アジャイル開発を行う上で、XPのプラクティスを大切にしています。その中で自分の中の今まで知識でしかなかった部分が実践できたり、知らなかったことをたくさん学ぶことができました。

その他の面で、コミュニケーション面に置いて自分の中で1つのブレイクスルーみたいなのがあった気がします。作業が基本ペア作業で行うので一般的な組織に比べてコミュニケーションを撮る機会がかなり多いと思います。その中であまり上手くコミュニケーションを取れない事があると、感じていたのですが、いろいろ考える中で気づきみたいなものがあり少しだけコミュニケーション力がマシになった気がします。
とはいえ、まだまだ上手くできない面がかなりおおく、特に、ビジネスサイド方々などのコンテキストが共有されてない相手とのコミュニケーションに課題を感じているので、来年以降その力をつけていかないとなぁと思っています。
(ただ、自分の強みとして伸ばしたい方向性とは少し違う気もしているのですが…)

ブログを平均して週一ぐらいでは書く

前職の先輩でかなりのブログを書かれる人がいます。その人に触発されてなんとなく、最低週一ぐらいでは書いてみようと思ったのが、今年の3月です。こから42記事書いてるので目的は達成したかなと。
まぁ、今後も週一でゆるゆると書いていこうかなぁとは思います。
やってみて思ったことは以下のことです。

  • ブログをネタを探すようになるので、発見が多くなる
  • 文章にする事で理解度が上がる

前者に関して、ブログを書くことを意識してるとより多くのことを学べるなぁと感じました。
後者に関しては、ドキュメントを読んでなんとなく理解した気になっても書いてみると結構書けないみたいなのがあったのがたくさんありました。その箇所を自分の言葉で文に起こすことで理解度が断然上がるのを感じました。

本を読む

本を読むのは、新人の頃からずっと続けているのですが今年は少し読む量が減った気がします。 思い出せるだけ羅列してみようと思います。

本屋で買ったのとかで覚えてないの合わせると、多分もうちょいある気がしますが、Amazonの購入履歴やらなんやらで思い出せるのはこんだけです。
こうして眺めてみると、Javaに結構寄ってるのがわかりますね。
来年はもう少し、Linuxやネットワーク、ディスクなどの足回りの知識をもう少し固めたいなーと思いました。
後は、Java以外の言語をもう少しやって、そのパラダイムを学びたいなぁと考えてます。
特に関数型に関しては自分はかなり弱い感じがするので「すごいHaskellたのしく学ぼう!」は読み直したいなと思います。
あとは、Rustなんかも学びが多そうだなぁと思ってます。

来年やりたいこと

来年は、以下のようなことをやろうかなと。

  • 継続してブログを書く
  • テーマを考えて読む本を選んでみる
  • 資格を何か取る

ブログを書くことで、自分の技術力が上がるのは実感できたので今後も続けていたいとおもいます。

本に関しては、振り返りでも思ったのですが今までは興味の向くままに本を選んでいたのですが来年は少し考えながら本を選んでみようかと思いました。本のところでも書いたのですが具体的には以下のような本を意識的に読んでみようかと。

  • ネットワーク、ディスク、Linuxなどの下回りの知識を深める
  • Javaとは全く違う言語パラダイムを学ぶ

最後の資格に関しては、資格を取ることが目的というよりは資格を取る事でその分野に対する知識を深めることができるのではとは前々から思っていたので、やってみようかと。具体的には、LPICやCKA/CKADあたりに挑戦してみようかなと思ってます。

K6のシナリオを使ってみる

はじめに

K6では負荷試験のシナリオを設定することが可能です。このブログではその機能を簡単に試してみてざっくりとした使い方を把握したいと思います。 このブログではK6をDockerを用いて起動します。また、テスト対象のアプリは以前のK6について書いたブログで利用したものを利用します。helloの文字列を返すだけのAPIです。

やってみる

環境

$ lsb_release -a
LSB Version:    core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.1 LTS
Release:    20.04
Codename:   focal



$ uname -srvmpio
Linux 5.4.0-58-generic #64-Ubuntu SMP Wed Dec 9 08:16:25 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux


$ docker version 
Client: Docker Engine - Community
 Version:           19.03.12
 API version:       1.40
 Go version:        go1.13.10
 Git commit:        48a66213fe
 Built:             Mon Jun 22 15:45:36 2020
 OS/Arch:           linux/amd64
 Experimental:      false

Server: Docker Engine - Community
 Engine:
  Version:          19.03.12
  API version:      1.40 (minimum version 1.12)
  Go version:       go1.13.10
  Git commit:       48a66213fe
  Built:            Mon Jun 22 15:44:07 2020
  OS/Arch:          linux/amd64
  Experimental:     true
 containerd:
  Version:          1.2.13
  GitCommit:        7ad184331fa3e55e52b890ea95e65ba581ae3429
 runc:
  Version:          1.0.0-rc10
  GitCommit:        dc9208a3303feef5b3839f4323d9beb36df0a9dd
 docker-init:
  Version:          0.18.0
  GitCommit:        fec3683

シナリオで何ができるか

ざっくり言うとK6のシナリオではVirtual Users(VUs)、イテレーション、スケジュールの詳細な設定を行なうことができます。
シナリオを使うメリットは主に以下のようなものがあります。

  • 同じスクリプトに対して複数のシナリオを設定することができる。それぞれは独立したJavaScriptの関数として実行されることによってテストをより整理しやすくフレキシブルに作成することができる。
  • それぞれのシナリオがVUや、イテレーション、スケジューリングパターンを独立して利用することができる(この機能はExecutorsによって実装される)。この機能を活用することによってより高度な現実に近いリクエストを送ることができる
  • それぞれのシナリオは直列でも並列でも実行可能で、また、それらを混ぜたような実行も可能である
  • シナリオごとに違った環境変数やメトリクスのタグをりようすることができる

Executorsって何ぞ?

ExecutorsはK6のエンジンのワークホースです。それぞれのワークホースがVUsをスケジューリングし、それぞれが別で反復実行されます。

Executorsには以下のようなものがあります。

Executor名 説明
Shared iterations shared-iterations 固定値分だけ反復実行するExecutor。この固定値は複数のVUsの間でシェアされる
Per VU iterations per-vu-iterations 固定値の値の回数それぞれのVUsがスクリプトを反復実行するExecutor。この固定値は複数VUs間でシェアされない。
Constant VUs constant-vus 設定された時間内で、できるだけ反復を実行するVUsの固定値
Ramping VUs ramping-vus 設定された時間内で、できるだけ反復を実行するVUsの数の変数。開始時点でのVUs数や終了時点のVUs数等を設定できる。グローバルのstagesと同等のことができる
Constant arrival rate constant-arrival-rate 固定値の数のイテレーションを設定された時間内で実行するExecutor。VUsの数を変動させることでイテレーションレートを保証しようとする
Ramping Arrival Rate ramping-arrival-rate 区切られた時間の中で、変動するイテレーション回数実行するExecutor。Ramping VUsに似ているが、イテレーションのためのもので、VUsの数は動的に変動する
Externally Controlled externally-controlled externally-controlled|K6REST APICLIを通して、実行時に設定を変更することができるExecutor

すべてのシナリオで共通の一般的な設定項目

K6のシナリオではExecutor固有の設定項目とは別に、共有の設定項目がいくつかあります。
その設定項目を以下にまとめます。

説明 デフォルト値
executor* string ユニークなExecutorの名前 -
startTime string シナリオの実行が開始されるべき時間 0s
gracefulStop string イテレーションが強制的に終了される前の待ち時間 30s
exec string 実行されるJSの関数名 defualt
env object シナリオで利用される環境変数 {}
tags object シナリオのタグ {}

実際にシナリオを書いていく

基本的な書き方

シナリオの基本的な書き方は以下のようになります。

export let options = {
  scenarios: {
    example_scenario: {
      // 利用するExecutorの名前
      executor: 'shared-iterations',

      // すべてのシナリオで共通の一般的な設定項目
      startTime: '10s',
      gracefulStop: '5s',
      env: { EXAMPLEVAR: 'testing' },
      tags: { example_tag: 'testing' },

      // エクセキューターごとの設定項目  
      vus: 10,
      iterations: 200,
      maxDuration: '10s',
    },
    //二個目のExecutor  
    another_scenario: { ... }
  }
}

Executorを設定する際はscenariosにobjectを複数記述することが可能なようです。それぞれのシナリオのKey(example_scenarioanother_scenario) などは任意の値を利用することが可能ですが、scenariosセクションの中でユニークな値である必要があります。

ユーザが上昇するシナリオを書く

シナリオについてある程度まとめたところで実際に一つ書いてみたいと思います。
お題は以下のように

  • 開始10秒はリクエストを送らない
  • 10秒経過あとは時間経過とともにユーザが上昇する
    • 開始10秒で一秒間ウェイトを置いてリクエストを送るVUsが10から20まで段階的に上昇
    • 開始20秒から30秒でVUsが20から60まで段階的に上昇
  • 開始30秒間から10秒かけて段階的にVUsが減少

また、スクリプトは以下のものを利用します。

import http from 'k6/http';
import { sleep } from 'k6';

export default function () {
  
  http.get('http://{$APP_IP}:8080/hello');
  sleep(1);
}

それでは早速書いていきたいと思います。
今回利用するExecutorは Ramping VUsです。
お題のストーリーを記述すると以下のようになります。

import http from 'k6/http';
import { sleep } from 'k6';

export let options = {
  scenarios: {
    ramping_up_scenario: {
      executor: 'ramping-vus',

      startVUs:10,
      stages: [
        {duration: '10s', target: 20},
        {duration: '10s', target: 60},
        {duration: '10s', target: 0},
      ]
    }

  }
};

export default function () {

  http.get('http://192.168.10.8:8080/hello');
  sleep(1);
}

上記の設定でスクリプトを実行すると以下のようになります。

$ docker run --rm --network k6net -i loadimpact/k6 run   - < k6script.js

          /\      |‾‾| /‾‾/   /‾‾/   
     /\  /  \     |  |/  /   /  /    
    /  \/    \    |     (   /   ‾‾\  
   /          \   |  |\  \ |  (‾)  | 
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: -
     output: -

  scenarios: (100.00%) 1 scenario, 60 max VUs, 1m0s max duration (incl. graceful stop):
           * ramping_up_scenario: Up to 60 looping VUs for 30s over 3 stages (gracefulRampDown: 30s, gracefulStop: 30s)


running (0m00.8s), 10/60 VUs, 0 complete and 0 interrupted iterations
ramping_up_scenario   [   3% ] 10/60 VUs  00.8s/30.0s

running (0m01.8s), 11/60 VUs, 10 complete and 0 interrupted iterations
ramping_up_scenario   [   6% ] 11/60 VUs  01.8s/30.0s

running (0m02.8s), 12/60 VUs, 21 complete and 0 interrupted iterations
ramping_up_scenario   [   9% ] 12/60 VUs  02.8s/30.0s


(中略)


running (0m08.8s), 18/60 VUs, 108 complete and 0 interrupted iterations
ramping_up_scenario   [  29% ] 18/60 VUs  08.8s/30.0s

running (0m09.8s), 19/60 VUs, 126 complete and 0 interrupted iterations
ramping_up_scenario   [  33% ] 19/60 VUs  09.8s/30.0s

running (0m10.8s), 23/60 VUs, 145 complete and 0 interrupted iterations
ramping_up_scenario   [  36% ] 23/60 VUs  10.8s/30.0s

(中略)

running (0m17.8s), 51/60 VUs, 389 complete and 0 interrupted iterations
ramping_up_scenario   [  59% ] 51/60 VUs  17.8s/30.0s

running (0m18.8s), 55/60 VUs, 440 complete and 0 interrupted iterations
ramping_up_scenario   [  63% ] 55/60 VUs  18.8s/30.0s

running (0m19.8s), 59/60 VUs, 494 complete and 0 interrupted iterations
ramping_up_scenario   [  66% ] 59/60 VUs  19.8s/30.0s

running (0m20.8s), 58/60 VUs, 553 complete and 0 interrupted iterations
ramping_up_scenario   [  69% ] 58/60 VUs  20.8s/30.0s

(中略)

running (0m28.8s), 12/60 VUs, 853 complete and 0 interrupted iterations
ramping_up_scenario   [  96% ] 12/60 VUs  28.8s/30.0s

running (0m29.8s), 06/60 VUs, 865 complete and 0 interrupted iterations
ramping_up_scenario   [  99% ] 06/60 VUs  29.8s/30.0s

running (0m30.1s), 00/60 VUs, 871 complete and 0 interrupted iterations
ramping_up_scenario ✓ [ 100% ] 00/60 VUs  30s

    data_received..............: 103 kB 3.4 kB/s
    data_sent..................: 77 kB  2.5 kB/s
    http_req_blocked...........: avg=52.36µs min=1.48µs   med=8.59µs  max=12.99ms  p(90)=14.6µs   p(95)=267.27µs
    http_req_connecting........: avg=37.05µs min=0s       med=0s      max=12.96ms  p(90)=0s       p(95)=222.62µs
    http_req_duration..........: avg=1.49ms  min=257.57µs med=1.37ms  max=11.37ms  p(90)=2.59ms   p(95)=3.11ms  
    http_req_receiving.........: avg=87.02µs min=11.56µs  med=83.98µs max=654.79µs p(90)=144.41µs p(95)=163.26µs
    http_req_sending...........: avg=41.95µs min=6.09µs   med=36.36µs max=719.74µs p(90)=67.04µs  p(95)=106.4µs 
    http_req_tls_handshaking...: avg=0s      min=0s       med=0s      max=0s       p(90)=0s       p(95)=0s      
    http_req_waiting...........: avg=1.36ms  min=229.33µs med=1.22ms  max=11.21ms  p(90)=2.42ms   p(95)=2.95ms  
    http_reqs..................: 871    28.917444/s
    iteration_duration.........: avg=1s      min=1s       med=1s      max=1.01s    p(90)=1s       p(95)=1s      
    iterations.................: 871    28.917444/s
    vus........................: 6      min=6  max=59
    vus_max....................: 60     min=60 max=60

最初に注目するのは以下の出力です。

  scenarios: (100.00%) 1 scenario, 60 max VUs, 1m0s max duration (incl. graceful stop):
           * ramping_up_scenario: Up to 60 looping VUs for 30s over 3 stages (gracefulRampDown: 30s, gracefulStop: 30s)

シナリオが読み込まれ、そのマックスVUsが60であることがわかります。
また、3つのステージに分けてVUsが上昇していくことも出力されています。実際に最後の10秒は減少ですがK6の出力ではそこは読み取れないみたいですね。

その後各リクエストの結果を読み込んでみると、最初の10秒程度でVUsがユーザが20まで上昇し、次の10秒で20から60まで上昇して(増加数が20以降から変わってます)そして、最後の10秒でユーザが0まで減少しています。
お題はクリアできたみたいですね。

参考資料

K6の負荷テスト結果をGrafana+InfluxDBで可視化する

はじめに

K6は実行結果を様々なプラットフォームに送信して可視化することが可能です。
その中の選択肢としてGrafana+InfluxDBを利用することが可能なようなので、試してみたいと思います。
このブログでは、それぞれのツールはDockerを利用して立ち上げることを想定しています。公式のdocker-composeを用意されていますが今回は利用せずにやってみようと思います。
また、今回使うK6スクリプト以前のブログで最後に作成したものを利用しようと思います。簡単にまとめると10のVUsで一秒間の1回10秒リクエストを送るスクリプトを使います。また、テスト対象のアプリも同様に以前のブログで利用したものを利用します。helloの文字列を返すだけのAPIです。

やってみる

動作環境

$ lsb_release -a
LSB Version:    core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.1 LTS
Release:    20.04
Codename:   focal

$ uname -srvmpio
Linux 5.4.0-58-generic #64-Ubuntu SMP Wed Dec 9 08:16:25 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux

$ docker version
Client: Docker Engine - Community
 Version:           19.03.12
 API version:       1.40
 Go version:        go1.13.10
 Git commit:        48a66213fe
 Built:             Mon Jun 22 15:45:36 2020
 OS/Arch:           linux/amd64
 Experimental:      false

Server: Docker Engine - Community
 Engine:
  Version:          19.03.12
  API version:      1.40 (minimum version 1.12)
  Go version:       go1.13.10
  Git commit:       48a66213fe
  Built:            Mon Jun 22 15:44:07 2020
  OS/Arch:          linux/amd64
  Experimental:     true
 containerd:
  Version:          1.2.13
  GitCommit:        7ad184331fa3e55e52b890ea95e65ba581ae3429
 runc:
  Version:          1.0.0-rc10
  GitCommit:        dc9208a3303feef5b3839f4323d9beb36df0a9dd
 docker-init:
  Version:          0.18.0
  GitCommit:        fec3683

InfluxDB を立ち上げる

最初に、諸々適当なDockerのネットワークを作成しておきます。

$ docker network create k6net

まずは、InfluxDBを立ち上げます。 InfluxDBは公式のイメージを利用して立ち上げます。
データの永続化の問題もありますが、今回は試したいだけなので、ボリュームを当てたりはしません。

$ docker run --network k6net --name influxdb -d  influxdb

K6の実行結果をInfluxDBに保存する

InfluxDBが立ち上がったところで早速テストを実行していきたいと思います。
あたらためて書くと今回利用するスクリプトとアプリは以前のブログで利用したものを再利用します。

K6でテストを行ってその実行結果をInfluxDBに送信するには次のコマンドを実行します。

$ docker run --rm --network k6net -i loadimpact/k6 run --out influxdb=http://influxdb:8086/k6db  - < k6script.js

ここで重要なのは--out influxdb=http://influxdb:8086/k6dbのところです。K6では様々なOUTPUTのプラグイン が用意されており、--outの引数に指定するだけで簡単に測定結果を送信することができます。
スクリプトの実行が完了されると、InfluxDBにk6dbという名前でDBが作成されます。

Grafanaでデータを可視化する

ここまででInfluxDBにテスト結果の格納することまでができたので、最後にその可視化を行っていきます。
まずは、Grafanaを立ち上げます以下のコマンドを叩きます。

docker run --network=k6net -d -p 3000:3000 grafana/grafana

起動するとlocalhost:3000でアクセスすることができます。

f:id:yuya_hirooka:20201104224941p:plain

初回、ユーザ/パスワードはadmin/adminです。

次にInfluxDBを起動します。

$ docker run --name influxdb quay.io/influxdb/influxdb:v2.0.3

Grafanaがたち上がったら、InfluxDBと連携させます。
Grafanaのサイドウィンドウから、[歯車]⇨[Add Data Source]を選択し、サーチボックスにInfluxDBと入力してInfluxDBの項目を選択します。

f:id:yuya_hirooka:20201217201320p:plain

設定は以下のように入力します。

f:id:yuya_hirooka:20201218114951p:plain

Databaseの項目はK6の実行の際に作成されたものを利用します。
これで連携は完了です。

あとはいい感じにダッシュボードを作成すればよいです。
(ちなみに以下の画像はk6 Load Testing Resultsのテンプレートを利用してます)

f:id:yuya_hirooka:20201218120543p:plain

InfluxDBのオプション

スクリプト実行時のInfluxDBに対するオプションがいくつかあるので以下にまとめます。   オプションは実行時の環境変数として設定することで利用することができます。

環境変数 説明
K6_INFLUXDB_USERNAME InfluxDBのユーザネーム
K6_INFLUXDB_PASSWORD InfluxDBのパスワード
K6_INFLUXDB_INSECURE tureになってる場合httpsの証明書検証をスキップする
K6_INFLUXDB_TAGS_AS_FIELDS K6のメトリクスの中で、インデックスできないフィールドをコンマ区切りで指定する。オプションでインデックスのvu:intのように型を指定できる。方はintboolfloatstringがある

参考資料

負荷テストツールK6を試す

はじめに

負荷テストのツールを何かしら勉強したいなと思って、K6というツールがあるというのを知って良さそうに感じたのでとりあえず動かしてみるところまでやってみようと思います。

K6とは

K6Load Impactという負荷テストのサービスを作っていた会社が、その経験を活かして作ったOSSみたいです。その機能に以下のようなものがあります。

Virtual User

K6にはVirtual Users(VUs)という概念があります。
Virtual Usersはそれぞれが分けられた環境で、並行でテストスクリプトを実行してくれます。
また、Virtual Userはリアルユーザの真似をするようなリクエストを送ることも可能みたいです。

使ってみる

早速使ってみたいと思います。
K6を実行するためにはそのツールセットをインストールする必要がありますが今回はDockerを利用してK6を実行したいと思います。

環境

実行環境は以下の通りです。

$ uname -srvmpio
Linux 5.4.0-58-generic #64-Ubuntu SMP Wed Dec 9 08:16:25 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux

$ lsb_release -a
LSB Version:    core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.1 LTS
Release:    20.04
Codename:   focal

$ docker version
Client: Docker Engine - Community
 Version:           19.03.12
 API version:       1.40
 Go version:        go1.13.10
 Git commit:        48a66213fe
 Built:             Mon Jun 22 15:45:36 2020
 OS/Arch:           linux/amd64
 Experimental:      false

Server: Docker Engine - Community
 Engine:
  Version:          19.03.12
  API version:      1.40 (minimum version 1.12)
  Go version:       go1.13.10
  Git commit:       48a66213fe
  Built:            Mon Jun 22 15:44:07 2020
  OS/Arch:          linux/amd64
  Experimental:     true
 containerd:
  Version:          1.2.13
  GitCommit:        7ad184331fa3e55e52b890ea95e65ba581ae3429
 runc:
  Version:          1.0.0-rc10
  GitCommit:        dc9208a3303feef5b3839f4323d9beb36df0a9dd
 docker-init:
  Version:          0.18.0
  GitCommit:        fec3683

事前準備

インストール

前述の通り、K6を実行するにはそのツールセットをインストールする必要があります。様々な環境でインストール、動作させることが可能です。

Linux(Ubuntu/Debian)

DabianベースのLinuxの場合はプライベートのdebリポジトリからインストールすることができます。

sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys 379CE192D401AB61
echo "deb https://dl.bintray.com/loadimpact/deb stable main" | sudo tee -a /etc/apt/sources.list
sudo apt-get update
sudo apt-get install k6

Docker

Dockerイメージloadimpact/k6が公開されており、そのイメージを用いたテストの実行も可能です。

docker pull loadimpact/k6

Mac

Macではbrewを使ってインストールすることができるみたいです。

brew install k6

バイナリでのインストール

GitHubのページからバイナリを取得することも可能です。
リンクからバイナリをダウンロードしてPATHを通して通してください。

テストするアプリを用意する

インストールが終わったところで、テスト対象のアプリを準備しておきます。
今回は環境が用意できればなんでもよいので個人的に1番使い慣れてるSpringを使ってアプリを作ろうと思います。
Spring Initializrを使ってアプリを作成します。
設定は以下のように

f:id:yuya_hirooka:20201215083536p:plain

出来上がったプロジェクトをエディタ等で開き作成されているApplicationクラスを以下のように修正します。

@SpringBootApplication
@RestController
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @GetMapping("/hello")
    public String hello(){
        return "hello";
    }
}

アプリを起動してcURLでエンドポイントへアクセスするとHelloという文字列が返ってきます。

$ curl localhost:8080/hello
hello

テストコードを書く

それでは準備ができたところで実際にテストスクリプトを書いて行きたいと思います。 K6スクリプトを記述する場合基本的なスクリプトの構成は以下のようになります。

// init code

export default function() {
  // vu code
}

K6ではdefalt functionを定義してやる必要があり、これが一般的に言うメイン関数のようなテストコードのエントリーポイントとなります。
defalt functionの中のコードのことをVU codeと呼び、外側のコードのことをinit codeと呼びます。
ここで、Virtual Usersを利用して、並行化されるのはVU codeであり、init codeは一回のみ実行されます。VU codeの中ではHTTPなどのリクエストを送信しそのメトリクスを計測することは可能ですが、ローカルのファイルシステムを読み込んだり、他のモジュールを呼んだりすることはできません。それらはinit codeで実行することが必要です。

以上のことを踏まえて、リクエストを一回実行して1秒だけまつスクリプトを記述します。

k6script.js

import http from 'k6/http';
import { sleep } from 'k6';

export default function () {
  
  http.get('http://${HOST_IP}:8080/hello');
  sleep(1);

}

テストコードが記述できたので早速実行してみます。 実行は以下のようにコマンドを叩きます。

$ docker run -i loadimpact/k6 run - < ${SCRIPT_NAME}.js

今回の場合は次のようになります。

$ docker run -i loadimpact/k6 run - < k6script.js

          /\      |‾‾| /‾‾/   /‾‾/   
     /\  /  \     |  |/  /   /  /    
    /  \/    \    |     (   /   ‾‾\  
   /          \   |  |\  \ |  (‾)  | 
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: -
     output: -

  scenarios: (100.00%) 1 scenario, 1 max VUs, 10m30s max duration (incl. graceful stop):
           * default: 1 iterations for each of 1 VUs (maxDuration: 10m0s, gracefulStop: 30s)


running (00m01.0s), 1/1 VUs, 0 complete and 0 interrupted iterations
default   [   0% ] 1 VUs  00m01.0s/10m0s  0/1 iters, 1 per VU

running (00m01.0s), 0/1 VUs, 1 complete and 0 interrupted iterations
default ✓ [ 100% ] 1 VUs  00m01.0s/10m0s  1/1 iters, 1 per VU

    data_received..............: 118 B 114 B/s
    data_sent..................: 89 B  86 B/s
    http_req_blocked...........: avg=1.67ms  min=1.67ms  med=1.67ms  max=1.67ms  p(90)=1.67ms  p(95)=1.67ms 
    http_req_connecting........: avg=1.61ms  min=1.61ms  med=1.61ms  max=1.61ms  p(90)=1.61ms  p(95)=1.61ms 
    http_req_duration..........: avg=2ms     min=2ms     med=2ms     max=2ms     p(90)=2ms     p(95)=2ms    
    http_req_receiving.........: avg=83.92µs min=83.92µs med=83.92µs max=83.92µs p(90)=83.92µs p(95)=83.92µs
    http_req_sending...........: avg=72.25µs min=72.25µs med=72.25µs max=72.25µs p(90)=72.25µs p(95)=72.25µs
    http_req_tls_handshaking...: avg=0s      min=0s      med=0s      max=0s      p(90)=0s      p(95)=0s     
    http_req_waiting...........: avg=1.84ms  min=1.84ms  med=1.84ms  max=1.84ms  p(90)=1.84ms  p(95)=1.84ms 
    http_reqs..................: 1     0.96781/s
    iteration_duration.........: avg=1s      min=1s      med=1s      max=1s      p(90)=1s      p(95)=1s     
    iterations.................: 1     0.96781/s
    vus........................: 1     min=1 max=1
    vus_max....................: 1     min=1 max=1

コンソールに実行結果が出力されているのがわかります。
ちょっとだけ細かく見てみると、送信したデータ量(B/s)、受信したデータ量(B/s)、リクエストでブロックされた時間、接続にかかった時間、等々の平均値やパーセントタイルの値などが表示されていますね。

VirtualUsersを追加して、テストの実行時間を変更する

次に、Virtual Usersを追加してリクエストを並行で送ってみるようにしてみたいと思います。 方法は以下の4つほどあります

  • テスト実行時オプション(--vus and -duration)として渡す
  • 環境変数として定義しておく
  • .jsonの設定ファイルを作成し、実行時に読み込ませる
  • init calloptionsJsonオブジェクトを定義する

今回は4つ目のJsonオブジェクトを定義する方でやろうと思います。
具体的には、「10 users で10秒間リクエストを送る」ように、先程のコードを以下のように書き換えます。

import http from 'k6/http';
import { sleep } from 'k6';

export let options = {
  vus: 10,
  duration: '10s',
};

export default function () {
  
  http.get('http://${HOST_IP}:8080/hello');
  sleep(1);

}

option.vusoption.durationをそれぞれinit codeの領域で設定しています。
テストを実行すると先ほどと違って、10並行で、10秒間リクエストが送られていることがわかります。

$ docker run -i loadimpact/k6 run - < k6script.js

          /\      |‾‾| /‾‾/   /‾‾/   
     /\  /  \     |  |/  /   /  /    
    /  \/    \    |     (   /   ‾‾\  
   /          \   |  |\  \ |  (‾)  | 
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: -
     output: -

  scenarios: (100.00%) 1 scenario, 10 max VUs, 40s max duration (incl. graceful stop):
           * default: 10 looping VUs for 10s (gracefulStop: 30s)


running (01.0s), 10/10 VUs, 0 complete and 0 interrupted iterations
default   [   9% ] 10 VUs  00.9s/10s

running (02.0s), 10/10 VUs, 10 complete and 0 interrupted iterations
default   [  19% ] 10 VUs  01.9s/10s


(略)

default   [  99% ] 10 VUs  09.9s/10s

running (10.1s), 00/10 VUs, 100 complete and 0 interrupted iterations
default ✓ [ 100% ] 10 VUs  10s

    data_received..............: 12 kB  1.2 kB/s
    data_sent..................: 8.9 kB 880 B/s
    http_req_blocked...........: avg=32.42µs min=2.11µs   med=11.88µs  max=855.09µs p(90)=49.84µs  p(95)=172.34µs
    http_req_connecting........: avg=19.37µs min=0s       med=0s       max=814.16µs p(90)=4.29µs   p(95)=144.09µs
    http_req_duration..........: avg=3.67ms  min=704.32µs med=3.94ms   max=8.48ms   p(90)=6.71ms   p(95)=7.35ms  
    http_req_receiving.........: avg=99.37µs min=16.55µs  med=103.15µs max=223.89µs p(90)=173.99µs p(95)=182.34µs
    http_req_sending...........: avg=49.94µs min=9.15µs   med=47.83µs  max=334.41µs p(90)=70.58µs  p(95)=87.42µs 
    http_req_tls_handshaking...: avg=0s      min=0s       med=0s       max=0s       p(90)=0s       p(95)=0s      
    http_req_waiting...........: avg=3.52ms  min=658.68µs med=3.75ms   max=8.3ms    p(90)=6.52ms   p(95)=6.92ms  
    http_reqs..................: 100    9.895886/s
    iteration_duration.........: avg=1s      min=1s       med=1s       max=1s       p(90)=1s       p(95)=1s      
    iterations.................: 100    9.895886/s
    vus........................: 10     min=10 max=10

ここまでで、ざっくりとK6の利用方法は把握できた気がします。

テスト結果の出力

テスト結果はデフォルトではコンソールに出力されますが、以下のような出力先を変更するプラグインがいくつか用意されています。

この他にもNew Relicなどもあります。

オプション

最後に、その他にどんなことができるのかざっくり把握するためオプションで気になったところをいくつかまとめたいと思います。
全量はここで確認することができます。

Duration

Durationでは、テストが実行される時間を設定することができます。
実行時間中はそれぞれのVUsでスクリプトがループして呼ばれ続けます。
例えば以下のような設定の場合3分間vu codeがループで実行され続けます。

export let options = {
  duration: '3m',
};

Iterations

IterationsオプションはVUsがそれぞれ何度スクリプトを実行するかを指定することができます。
この値は、VUsで単純に割り算され、例えば以下のような設定である場合10Vusがそれぞれ10回のリクエストを実行します。

export let options = {
  vus: 10,
  iterations: 100,
};

No VU Connection Reuse

No VU Connection ReuseではTCPをVUsの実行の中でTCPコネクションを再利用するかをbooleanで設定できます。 設定は以下のように行います。

export let options = {
  noVUConnectionReuse: true,
};

RPS

RPSではVUsをまたいだ秒間での最大リクエスト数を設定することができます。

export let options = {
  rps: 500,
};

Scenarios

Scenariosでは、1つ以上の実行パターンを定義することができます。シナリオは以下のような特徴があります。

  • 同じスクリプトに対して複数のシナリオを定義できる
  • それぞれのシナリオは個別のVUsやスケジュールパターンを持つことができる
  • それぞれのシナリオは直列でも並行でも実行することができる
  • シナリオごとに違った環境変数やメトリクスタグをセットすることができる

より詳細な情報はこちらを参考にしてください。
利用イメージは以下のようになります。

export let options = {
  scenarios: {
    my_api_scenario: {
      // arbitrary scenario name
      executor: 'ramping-vus',
      startVUs: 0,
      stages: [
        { duration: '5s', target: 100 },
        { duration: '5s', target: 0 },
      ],
      gracefulRampDown: '10s',
      env: { MYVAR: 'example' },
      tags: { my_tag: 'example' },
    },
  },
};

Stages

StagesではVUsの上昇や下降を制御することができます。
例えば以下のような設定の場合、最初の3分間でVUsを1から10まで上昇させ5分間そのままリクエストを続けその他とに10から35ユーザ10分間で増やしていき、最後の3分でVUsを0まで減らします。

export let options = {
  stages: [
    { duration: '3m', target: 10 },
    { duration: '5m', target: 10 },
    { duration: '10m', target: 35 },
    { duration: '3m', target: 0 },
  ],
};

参考資料

Nettyの主要コンポーネントを整理する

はじめに

Nettyを勉強していると、EventLoopやHandlerなど様々なコンポーネントがでてきて、混乱したのでちょっと図でまとめてみようと思います。このブログでは以下のようなNettyの主要コンポーネント(クラス or インターフェース)の役割をまとめそれぞれの関係を図で整理しようと思います。

  • EventLoop、EventLoopGroupe
  • Channel
  • ChannelHandler、ChannelPipeline

Netty In Actionなどの書籍を参考にしていますが、あくまで自分の理解をまとめますので、正しい理解を行いたい場合は書籍やドキュメント実装を当たるようにしてください。

非同期I/O? 同期I/O?

前提としてJavaのネットワーク通信における、同期I/Oと非同期I/Oについてちょっと整理してみようと思います。
Javaにおけるネットワーキングプログラムは、はじめ、ネイティブのソケットライブラリーを使った同期的なAPI(java.net)が用意されていました。その、同期的なAPIを使った複数通信を扱う際のイメージは以下のようになります。

f:id:yuya_hirooka:20201214193101j:plain

このような同期I/Oでは、ソケットごとにスレッドが作成され、それらのスレッドはI/Oオペレーションを行っている間はスレッドは休眠状態に入ります。そうなるとリソースの無駄が発生してしまいます。更に、それぞれのスレッドはOSによって決められたサイズのメモリを確保しますのでそうったオーバーヘッドも発生してしまいます。

上記のような同期I/Oに対して、Java 1.4らNIO(java.nio)が導入されました。NIOでは、システムのイベント通知APIを用いて、データの読み取りや書き取りが行われたことを通知を受けることで、非同期的な通信を可能としています。
NIOを用いたJavaにおけるネットワーク通信は以下のようになります。

f:id:yuya_hirooka:20201214193223j:plain

class java.nio.channels.Selectorは複数の非同期ソケットが処理可能な状態にあるかのイベント通知を受け、ハンドリングします。同期IOに比べソケットに対して、1つだけののスレッドが割り当てられているため、コンテクストスイッチなどにおける、CPUリソースの無駄遣いの削減や、メモリ消費の削減に繋がり、また、スレッドも休眠せずI/O待ちの際に別のタスクを消費することができます。

Nettyとは

NettyはJavaのNIOの上に作られるネットワークアプリケーションを作るためのフレームワークです。NIOを直接使うよりも、より簡単にすばやくアプリケーションの開発を行なうことができます。また、プーリングと再利用によってJavaAPIを直接使うよりも低いレイテンシ、高スループットを実現してします。
NettyはNIO以外のライブラリー等には依存しておらず、3.x系ならJava 5、4.x系ならJava 6以上(一部、オプションの機能を利用する場合は7以上が求められます)で動きます。
ちなみにNIOに対して、同期的I/OをNettyではOIOと呼んでいるようです。

Nettyのコンポーネントと全体像

f:id:yuya_hirooka:20201214193323j:plain

Nettyの主要コンポーネントを一枚の図のような関係になります。 以降でそれぞれのコンポーネントについて見ていきたいと思います。

Channel

まず、NettyのChannelについてですが、これは、Socketを直接使う場合の複雑さを削減するためのコンポーネントで、 Reed、Write、Connect、BindなどのI/Oオペレーションを提供しています。
NettyではChannelインターフェースを用意しており、以下のような実装が存在します。

  • NioSocketChannel
    • NIO selectorベースの実装
  • OioSocketChannel

EventLoop、EventLoopGroupe

EventLoopGoupeは1つ以上のEventLoopを管理し、新しく作られたChannelにアロケートする役割を持っています。1つのChannelは1つのEventLoopが割り当てられ、そのライフサイクルの中で登録されたChannelのI/Oオペレーションをハンドルします。通常、1つのEventLoopに複数のChannelが登録されますが、実装によってはEventLoopとChannelが一体一になるものもあります。
EventLoopの実装には以下のようなものがあります。

  • NioEventLoop
    • ChannelをSelectorに登録して、それらの多重化を行います
  • ThreadPerChannelEventLoop
    • ChannelごとにEventLoopが割り当てられる同期OIOチャンネルをハンドリングします

ChannelHandler、ChannelPipeline

ChannelHandlerはChannelの状態の変化、データの取得などのネットワークイベントによって、フックされるメソッドを持ったコンポーネントです。Nettyでは、アプリケーションのロジックはこのHandlerに記述されます。 ソケットを外としたときの、データフローの方向によってInboundとOutboundそれぞれのハンドラーを作成することが可能で、 ChannelHandlerはChannelPipelineに登録され、Piplineの中で各方向のHandlerがチェインされていきます。このときHandlerはPiplelineに登録された順番でチェインされます。

f:id:yuya_hirooka:20201214193302j:plain

通常、Nettyをクライアントとして利用した場合はOutboundのデータフローを通り、サーバとして利用した場合はInboundのデータフローを通ることになります。
同じ、Pipeline内でInboundとOutbound両方のHandlerが登録されていた場合でも基本的にはそれぞれが交わってチェインされることがありません。ただし、InboundHanderでも、ChannelHandlerContext(後述)からChannelを取り出して直接ソケットへの書き込みを行った際にはOutboundのハンドラーがチェインされます。また、ChannelPilpelineへのハンドラーは動的に削除や追加を行なうことも可能です。

ChannelHandlerとChannelPipelineの説明をする上で、もう一つ欠かせないのが、ChannelHandlerContextです。ChannelHandlerContextは同じ方向のデータフローのHanlder群とChannelPipelineとのひも付きや、Channelそのものを持つコンテクストです。また、アプリケーションはこのコンテクストを通してHandlerはデータの送信を行ったり、他のハンドラーを呼び出したりすることが可能となります。Nettyでは2種類のデータ送信方法があり、前述したように直接Channelに書き込む方法と、このCannelHandlerContextを通して送信する方法があります。前者の場合はOutboundのHandlerがフックされ後者の場合はされません。
次の図はHandler群とChannelPipeline、そして、その関係性を示しています。

f:id:yuya_hirooka:20201214193437j:plain

図に書いてあるとおり、HandlerはChannelHandlerContextを通して、次のハンドラーへのチェインを行います。

終わりに

細かいところで言うと、Nettyをクライアントとして利用する場合とサーバとして利用する場合でEventLoopの数が違ったりはするですが、ざっとまとめた、Nettyの全体像はこんな感じになっていると理解しています。

参考資料

NettyのChannelInboundHandlerとChannelOutboundHandleについてまとめる

はじめに

Netty In Actionを読んでいて、それぞれがのハンドラーがどのどのタイミングで実行されるかが分かりづらかったので、自身の頭の中の整理を目的に自分の理解をまとめて見ようと思います。

Inbound? OutBound?

ChannelPipelineでは、ソケットとアプリケーションを両端として、それぞれの方向に流れるデータフローをInboundとOutBoundと呼んでいます。

  • ソケット⇨アプリケーション : Inbound
  • アプリケーション⇨ソケット: Outbound

また、それぞれのデータフローのイベントに対して、ChannelPipelineには別の種類のハンドラーを登録することが可能で、そのハンドラーを実装するためのインターフェースが、ChannelInboundHandlerChannelOutboundHandlerです。その名の通りそれぞれがソケットに空のInboundとOutboundのイベントで発火されます。
それぞれの方向で基本的に、Hndler間での関係性は無く、ChannelHandlertContextを通してお互いに相互作用はしません。

ChannelInboundHandler

ChannelInboundHandlerのデータに対するイベントで発火されるメソッドが定義されたInterfaceです。具体的には以下のようなものが定義されています。

メソッド名 説明
channelRegistered EventLoopとChannelHandlertContextがChannelに登録され、何かのI/Oのハンドルが可能となった際に実行される
channelUnregistered EventLoopとChannelHandlertContextがChannelから解除され、なにかのI/Oのハンドルが不可能となった際に実行される
channelActive ChannelのChannelHndlerContextがアクティブ、チャンネルが接続もしくは、バインドされ準備ができた状態になった際に実行される
channelInactive ChannelがActive状態ではなくなった際際に実行される。リモートの通信先と接続がキレている
channelReadComplete readオペレーションが終わった際に実行される
channelRead データがChannelから読み出された際に実行される
channelWritabilityChanged ChannelのWritabilityの状態が変わった際に呼び出される。ユーザは書き込みがが終わっていないことを確認できたり、書き込み可能になった際に書き込みを行なうようにすることができる。その確認を行なうために、Channelのメソッドである、isWritable()メソッドを利用することができる。
userEventTriggered ChannelnboundHandler.fireUserEventTriggered()が呼ばれた際に実行される。
exceptionCaught 例外が投げられた際に実行される。

例えば以下のようなハンドラーをChannelPipelineに登録します。

@ChannelHandler.Sharable
public class EchoInboundHandler extends ChannelInboundHandlerAdapter {


    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered inbound handler called");
        super.channelRegistered(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelUnregistered inbound handler called");
        super.channelUnregistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive inbound handler called");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelInactive inbound handler called");
        super.channelInactive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("channelRead inbound handler called");

        //送信されたデータをコンソールに出力してそのまま返す。
        ByteBuf input = (ByteBuf) msg;
        System.out.println(input.toString(CharsetUtil.UTF_8));
        ctx.write(input);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelReadComplete inbound handler called");

        //読み込みが終了の際にコネクションをクローズする
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        super.channelWritabilityChanged(ctx);
    }

}

TCPソケットの通信でaaaというデータを送った際には以下のような出力が標準出力にされます。channelActiveまでが、コネクションを確立されるまでに出力され、

channelRegistered inbound handler called
channelActive inbound handler called
channelRead inbound handler called
aaa

channelReadComplete inbound handler called
channelInactive inbound handler called
channelUnregistered inbound handler called

userEventTriggeredchannelWritabilityChangedなどは、前述の表のようなタイミングで実行されるため、今回のケースでは実行されないようです。

ChannelOutboundHandler

Inboundに対して、ChannelOutboundHandlerに関しては、以下のようなメソッドが定義されています。

メソッド名 説明
bind(ChannelHandlerContext, SocketAddress,ChannelPromise) ローカルアドレスにバインドがリクエストされた際に実行される
connect(ChannelHandlerContext, SocketAddress,SocketAddress,ChannelPromise) リモートの通信先にChannelと接続のリクエストの際に実行される
disconnect(ChannelHandlerContext, ChannelPromise) Channelのリモートの通信先とのコネクションを切断するリクエストを送る際に実行されます
close(ChannelHandlerContext,ChannelPromise) Channelのクローズのリクエストの際に
disconnect(ChannelHandlerContext, ChannelPromise) EventLoopからChannelの登録解除がリクエストされた際に実行されます。
read(ChannelHandlerContext) Channelのリードがリクエストされた際に実行されます
flush(ChannelHandlerContext) Channelのキューに溜まったデータをフラッシュのリクエストの際に実行されます
write(ChannelHandlerContext,Object, ChannelPromise Channelを通して書き込みがリクエストされる際に実行されます

例えば以下の2つのようなハンドラーをChannelPipelineに登録します。

@ChannelHandler.Sharable
public class EchoOutBoundHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        System.out.println("bind outbound handler called");
        super.bind(ctx, localAddress, promise);
    }

    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        System.out.println("connect outbound handler called");
        super.connect(ctx, remoteAddress, localAddress, promise);
    }

    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        System.out.println("disconnect outbound handler called");
        super.disconnect(ctx, promise);
    }

    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        System.out.println("close outbound handler called");
        super.close(ctx, promise);
    }

    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        System.out.println("deregister outbound handler called");
        super.deregister(ctx, promise);
    }

    @Override
    public void read(ChannelHandlerContext ctx) throws Exception {
        System.out.println("read outbound handler called");
        super.read(ctx);
    }



    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("write outbound handler called");
        super.write(ctx, msg, promise);
    }

    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        System.out.println("flush outbound handler called");
        super.flush(ctx);
    }
}
public class EchoClient extends SimpleChannelInboundHandler<ByteBuf> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 通信先にhelloのメッセージを送信        
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8));
        super.channelActive(ctx);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
  // 通信先からのレスポンスを表示
        System.out.println(msg.toString(CharsetUtil.UTF_8));
    }
}

その状態でNettyをクライアントとして実行し、通信を行なうとコンソールに以下のような出力が行われます。

connect outbound handler called
write outbound handler called
flush outbound handler called
read outbound handler called
hello
read outbound handler called
read outbound handler called