atコマンドで指定時間にJobを単発実行させる

はじめに

最近、CrontabみたいなJobを定期的にスケジュールするのでは無くて、指定時間に単発で実行したいみたいな要件にぶち当たって、ちょっと調べていたらUnix系のOSにatというコマンドがあることに気がついたので機能を試して見ようかと思います。

at コマンドとは

標準出力、もしくはファイルからコマンドを受け取って、/bin/shを使って指定時間にシェルを起動するコマンドです。Corntabなどは日時の指定のみで、最長一年に一回は実行されるのに対してatコマンドは年の指定まで行なうことが可能で、指定した時間に一回のみといった実行が行えます。
実行時の環境変数BASH_VERSINFODISPLAYEUID, GROUPSSHELLOPTSTERMUID以外)、ワークディレクトリ、umaskなどはコマンド実行時の状態でキープされます。
また、atsetuidプログラムで、LD_LIBRARY_PATHLD_PRELOADなどの環境変数も読み込まれないようです。

使ってみる

環境

コマンドの実行環境は以下の通り、

$ uname -srvmpio
Linux 5.4.0-62-generic #70-Ubuntu SMP Tue Jan 12 12:45:47 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

$ lsb_release -a
LSB Version:    core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.1 LTS
Release:    20.04
Codename:   focal

基本的な操作方法

プロンプトでコマンドを設定する。

atは以下のような形式で、実行できます。

$ at 07:55 01/31/21
warning: commands will be executed using /bin/sh
at> 

実行すると、>atという表示が現れプロンプトが起動します。
ここにコマンドを打ち込んでctrl+dを押すと設定が完了です。

$ at 10:46 01/22/21
warning: commands will be executed using /bin/sh
at> java --version > test.txt
at> <EOT>
job 6 at Fri Jan 22 10:46:00 2021

実施時間をすぎると以下のようにテキストが出力されているのが確認できます。

$ cat test.txt 
openjdk 15.0.1 2020-10-20
OpenJDK Runtime Environment AdoptOpenJDK (build 15.0.1+9)
Eclipse OpenJ9 VM AdoptOpenJDK (build openj9-0.23.0, JRE 15 Linux amd64-64-Bit Compressed References 20201022_81 (JIT enabled, AOT enabled)
OpenJ9   - 0394ef754
OMR      - 582366ae5
JCL      - ad583de3b5 based on jdk-15.0.1+9)

ファイルを読み込んで実行する

atコマンドはファイルを読み込んで実行することもできます。
まずは、以下のようなシェルスクリプトを用意します。

hello.sh

#!/bin/sh

echo "hello, at command" > test.txt

このシェルスクリプトatコマンドで実行するには-fオプションで実行するファイルを指定します。

$ at 21:05 01/22/21 -f hello.sh 
warning: commands will be executed using /bin/sh
job 8 at Fri Jan 22 21:05:00 2021

$ cat test.txt
hello, at command

日付の指定の形式について

atPOSIX.2 standardを拡張した日付の指定が可能なようです。

  • HH:MM
    • 時間と分を指定できます。この場合、今日の時間にJobが実行され、もしその時間が今日すでに過去である場合は次の日の指定された時間にJobが実行されます。
  • MMDD[CC]YY, MM/DD/[CC]YY, DD.MM.[CC]YY or [CC]YY-MM-DD
    • オプションとして、上記のような形式で年月日を指定することができます。
  • midnightnoonteatime
    • それぞれの時間に実行されます。
  • todaytomorrow
    • サーフィクスとして上記の表記をつけるとそれぞれ今日と明日を示すことができます。

その他にも結構自由な時間の指定ができるようで、今日から3日後の4pmとかを指定する場合はat 4pm + 3 days、6月31日の10時AMを指定したい場合はat 10am Jul 31と行ったような感じで指定することができます。

他にできそうなこと

マニュアルでatコマンドを調べてみると、このコマンドには以下のような、仲間がいるようです。

  • atq

    • ペンディングされているUserのJobの一覧を取得します。superuserであるばあいはすべてのJobを出力します。-lオプションがエイリアスになっています。
  • atrm

    • job numberにとって特定されるジョブを削除します。-rオプションがエイリアスになっています。
  • batch

    • システムのロードアベレージが1.5(かもしくはatdコマンドで指定された値)以下になった際にJobを実行する。

他にも-m使うなどするとジョブの実行時にユーザにメールを送ることができるようです。

MinikubeにIstioをインストールする

はじめに

業務でKubernetesやIstioの機能の一部を利用することがあるですが、勉強のために試せる環境がほしかったのでローカルのMinikubeにインストールして見ようかと思います。

やってみる

動作環境

$ uname -srvmpio
Linux 5.4.0-62-generic #70-Ubuntu SMP Tue Jan 12 12:45:47 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

$ lsb_release -a
LSB Version:    core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.1 LTS
Release:    20.04
Codename:   focal

$ docker version
Client: Docker Engine - Community
 Version:           20.10.2
 API version:       1.41
 Go version:        go1.13.15
 Git commit:        2291f61
 Built:             Mon Dec 28 16:17:43 2020
 OS/Arch:           linux/amd64
 Context:           default
 Experimental:      true

Server: Docker Engine - Community
 Engine:
  Version:          20.10.2
  API version:      1.41 (minimum version 1.12)
  Go version:       go1.13.15
  Git commit:       8891c58
  Built:            Mon Dec 28 16:15:19 2020
  OS/Arch:          linux/amd64
  Experimental:     true
 containerd:
  Version:          1.4.3
  GitCommit:        269548fa27e0089a8b8278fc4fc781d7f65a939b
 runc:
  Version:          1.0.0-rc92
  GitCommit:        ff819c7e9184c13b7c2607fe6c30ae19403a7aff
 docker-init:
  Version:          0.19.0
  GitCommit:        de40ad0

minikubeのインストール

ドキュメントに寄ると、いくつかインストール方法があるようですが、今回はバイナリのインストールを行おうと思います。
以下のコマンドを実行します。

$ curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 \
  && chmod +x minikube

$ sudo mkdir -p /usr/local/bin/
$ sudo install minikube /usr/local/bin/

$ minikube version
minikube version: v1.16.0
commit: 9f1e482427589ff8451c4723b6ba53bb9742fbb1

インストールが終わったのでMinikubeを起動します。
この際、MinikubeのドライバーはDockerを使い、メモリのCPUの設定は公式ドキュメントにしたがって、起動したいと思います。

$ minikube start --memory=16384 --cpus=4 --kubernetes-version=v1.18.10

$ minikube status
minikube
type: Control Plane
host: Running
kubelet: Running
apiserver: Running
kubeconfig: Configured
timeToStop: Nonexistent

istioをインストール

PodとServiceに関する注意点

Istioを利用する上でPodとServiceには求められる条件があります。

Podの必須条件は以下のとおりです。

  • Podは最低一つのServiceに紐付いている必要がある
  • PodはアプリケーションをUIDが1337で実行していないこと
  • pod security policiesクラスターに設定されているかつIstio CNI Pluginを利用していない場合。Podに対してNET_ADMINNET_RAWの機能が利用できる必要がある(詳細はこちら)

また、以下のような推奨される条件があるようです。

  • Istioがメトリクスを収集する際に付加情報をつけることができるので、Podにappラベルと、versionラベルを付与しておくのがよい。
  • Serviceのspec.ports.nameプロトコルを指定して置くのが良い(指定できるプロトコルおよび詳細はこちら

Istioが利用するポート

別の注意点としてIstioが以下のようなポートを利用することを留意しておく必要があります。

Istioのサイドカープロキシ(Envoy)によって利用されるポートの一覧

ポート プロトコル 説明
15000 TCP Envoyのadmin ポート。ポッド内のみ
15001 TCP Envoyのアウトバウンドポート
15006 TCP Envoyのインバウンドポート
15008 TCP Envoyトンネルポート(インバウンド)
15020 HTTP アプリケーションやIstio agent、Envoyのテレメトリーのポイント
15021 HTTP ヘルスチェックのポイント
15090 Envoy Prometheusのテレメトリーポイント

Istioのコントロールプレーンによって利用されるポートの一覧

ポート プロトコル 説明
15010 GRPC XDSとCAサービス(プレーンテキスト)
15012 GRPC XDSとCAサービス(TLS)プロダクションで利用することが推奨される
8080 HTTP Debugインタフェース(非推奨)
443 HTTPS Webhook
15014 HTTP コントロールプレーンのモニタリング

Server First Protocols

サーバが最初のバイトを送信するようなServer First Protocolを利用する際は PREMISSIVE mTLSプロトコルの自動検出等など最初のバイトを利用する機能に影響を与える可能性があります。なので、明示的にServiceのプロトコルを選択肢てやる必要があります。
ただし、以下のようなよく知られたポートを使うプロトコルやサーバに関しては自動的にTCPが選択されるようです。

その他、アウトバウンドのアクセスでHostヘッダーを利用する場合などにも注意が必要なようです。詳細はこちらを確認ください

インストール

istioをインストールする方法は以下のような方法があるようです。

  • Istiocltを使ったマニュアルイントール
  • Istio Operatorを使ったインストール

あとは、マルチクラスターへのインストールやVMを使う方法(こいつはちょっとよくわからなかった)などがあるようです。
今回はIstiocltを使ったマニュアルイントールをやってみようと思います。

どちらにせよまずはIstioctlをインストールする必要があるようなのでインストールします。
istioctlのバージョンはIstioのコントロールプレーンのバージョンと合わせることが推奨されています。今回は最新のリリースのものをインストールします。

$ curl -sL https://istio.io/downloadIstioctl | sh -

$ istioctl version
no running Istio pods in "istio-system"
1.8.2

何度も実行するのが面倒なので、export PATH=$PATH:$HOME/.istioctl/bin.bash_profileに登録しておきます。

次にIstioをクラスターにインストールしていくのですが、まずIstioのインストールには[Profile]という概念があるみたいです。簡単にまとめるとプロファイルはIstioのコントローププレーンやIstioサイドカーに対するカスタマイズ方法を提供しています。Profileでできることの詳細に関してはこちらのページをご確認ください。
なにも設定しない場合defualtのプロファイルが利用されることになります。
今回は素直にインストールするので特にプロファイルは指定しません。

# プロファイルを指定したい場合は`--set profile=demo`などを指定する
$ istioctl install --context minikube

コマンドの実行が終わったら次にきちんとインストールが行われたか確認します。
verify-installコマンドを使ってこの確認を行なうことができます。
まずは、インストール時のさいに利用されたマニフェストを取得する必要があります。
事前に取得していない場合はistioctl manifest generate <インストール時に使用したオプション>で出力することができます。
今回の場合は以下のように実行します。

$ istioctl manifest generate >  generated-manifest.yaml

$ istioctl verify-install -f generated-manifest.yaml 
CustomResourceDefinition: authorizationpolicies.security.istio.io.default checked successfully
CustomResourceDefinition: destinationrules.networking.istio.io.default checked successfully
CustomResourceDefinition: envoyfilters.networking.istio.io.default checked successfully
CustomResourceDefinition: gateways.networking.istio.io.default checked successfully
CustomResourceDefinition: istiooperators.install.istio.io.default checked successfully
CustomResourceDefinition: peerauthentications.security.istio.io.default checked successfully
CustomResourceDefinition: requestauthentications.security.istio.io.default checked successfully
CustomResourceDefinition: serviceentries.networking.istio.io.default checked successfully
CustomResourceDefinition: sidecars.networking.istio.io.default checked successfully
CustomResourceDefinition: virtualservices.networking.istio.io.default checked successfully
CustomResourceDefinition: workloadentries.networking.istio.io.default checked successfully
CustomResourceDefinition: workloadgroups.networking.istio.io.default checked successfully
ServiceAccount: istio-ingressgateway-service-account.istio-system checked successfully
ServiceAccount: istio-reader-service-account.istio-system checked successfully
ServiceAccount: istiod-service-account.istio-system checked successfully
ClusterRole: istio-reader-istio-system.default checked successfully
ClusterRole: istiod-istio-system.default checked successfully
ClusterRoleBinding: istio-reader-istio-system.default checked successfully
ClusterRoleBinding: istiod-istio-system.default checked successfully
ValidatingWebhookConfiguration: istiod-istio-system.default checked successfully
EnvoyFilter: metadata-exchange-1.6.istio-system checked successfully
EnvoyFilter: metadata-exchange-1.7.istio-system checked successfully
EnvoyFilter: metadata-exchange-1.8.istio-system checked successfully
EnvoyFilter: stats-filter-1.6.istio-system checked successfully
EnvoyFilter: stats-filter-1.7.istio-system checked successfully
EnvoyFilter: stats-filter-1.8.istio-system checked successfully
EnvoyFilter: tcp-metadata-exchange-1.6.istio-system checked successfully
EnvoyFilter: tcp-metadata-exchange-1.7.istio-system checked successfully
EnvoyFilter: tcp-metadata-exchange-1.8.istio-system checked successfully
EnvoyFilter: tcp-stats-filter-1.6.istio-system checked successfully
EnvoyFilter: tcp-stats-filter-1.7.istio-system checked successfully
EnvoyFilter: tcp-stats-filter-1.8.istio-system checked successfully
ConfigMap: istio.istio-system checked successfully
ConfigMap: istio-sidecar-injector.istio-system checked successfully
MutatingWebhookConfiguration: istio-sidecar-injector.default checked successfully
Deployment: istio-ingressgateway.istio-system checked successfully
Deployment: istiod.istio-system checked successfully
PodDisruptionBudget: istio-ingressgateway.istio-system checked successfully
PodDisruptionBudget: istiod.istio-system checked successfully
Role: istio-ingressgateway-sds.istio-system checked successfully
Role: istiod-istio-system.istio-system checked successfully
RoleBinding: istio-ingressgateway-sds.istio-system checked successfully
RoleBinding: istiod-istio-system.istio-system checked successfully
HorizontalPodAutoscaler: istio-ingressgateway.istio-system checked successfully
HorizontalPodAutoscaler: istiod.istio-system checked successfully
Service: istio-ingressgateway.istio-system checked successfully
Service: istiod.istio-system checked successfully
Checked 12 custom resource definitions
Checked 1 Istio Deployments
Istio is installed successfully

最後の出力をみるとインストールはうまく行っているようです。

アーキテクチャについてちょっと見てみる

最後にほんの少しだけ、Istioのアーキテクチャについて見てみたいと思います。
istioのコンポーネントは論理的には以下の2つに分けられるようです。

  • data plane

    • サイドカーとしてデプロイされるEnvoyプロキシの集まり。マイクロサービス間のすべてネットワークコミュニケーションをプロキシする。また、メトリクスの収集なども行なう
  • controle plane

(公式ドキュメントより)

Envoy

Istioは拡張したEnvoy Proxyを利用します。
サービス間のインバウンドとアウトバウンドのやり取りをすべてプロキシする役割を持っており、サイドカーとしてPodにデプロイされます。

Istiod

サービスディスカバリ機能、設定、証明書マネジメントを提供します。
IstiodはハイレベルなルーティングルールをEnvoyの設定に変換し、ランタイムのサイドカーに拡散します。
以下のようなコンポーネントで構成されます。

  • Pilot
    • Envoyの構成管理を行なう
  • Citadel
    • CA。鍵の管理や証明書の発行を行なう
  • galley
    • ユーザ設定のマネジメントを行なう

参考資料

Reactor Publisherにおけるエラーハンドリング

はじめに

前回に引き続き、Reactive Springのを読んでいて、Publihsherのエラーハンドリングを行なうためのメソッド群が紹介されていたので、実際に動かして試してみようと思います。(書籍で紹介されていないものも結構あったので)

実際にやってみる

環境は以下の通り、

環境

$ java --version
openjdk version "15" 2020-09-15
OpenJDK Runtime Environment (build 15+36-1562)
OpenJDK 64-Bit Server VM (build 15+36-1562, mixed mode, sharing)


$ mvn --version
Apache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 15, vendor: Oracle Corporation, runtime: /home/yuya-hirooka/.sdkman/candidates/java/15-open
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-59-generic", arch: "amd64", family: "unix"

Pom

今回は、ReactorとJunit5を利用したいので、Pomには以下の設定を追加しました。

    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.7.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.7.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>2020.0.2</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
            <plugin>
                <artifactId>maven-failsafe-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
        </plugins>
    </build>

コードを書いていく

エラーハンドリング

まずはエラーハンドリングについて、FluxとMonoではonError**と言ったようなエラー時にフックされるようなメソッドがいくつか生えています。

onErrorResume()

onErrorResume()ですが、こいつは例外発生時のFallback処理を書くためのメソッドです。
以下のコードはintの3がPublishされた際にRutimeExceptionをスローされる処理に対して、エラー発生時はFallbackとして4をPublishします。

    @Test
    void testOnErrorResume() {

        Flux<Integer> integerFlux = Flux.just(1, 2, 3)
                .flatMap(integer -> {
                    if (integer.equals(3)) return Flux.error(new RuntimeException());
                    return Flux.just(integer);
                })
                .onErrorResume(e -> Flux.just(4));

        StepVerifier.create(integerFlux)
                .expectNext(1)
                .expectNext(2)
                .expectNext(4)
                .verifyComplete();
    }

特定の例外に対して、Fallback処理を書きたい場合は以下のように第一引数に例外クラスを書いてやるようにします。

    @Test
    void testOnErrorResume() {

        Flux<Integer> integerFlux = Flux.just(1, 2, 3)
                .flatMap(integer -> {
                    if (integer.equals(3)) return Flux.error(new RuntimeException());
                    return Flux.just(integer);
                })
                .onErrorResume(IllegalArgumentException.class, v -> Flux.just(10))
                .onErrorResume(RuntimeException.class, v -> Flux.just(4));

        StepVerifier.create(integerFlux)
                .expectNext(1)
                .expectNext(2)
                .expectNext(4)
                .verifyComplete();
    }

また、第一引数は例外クラスの代わりにPredicate<? super Throwable> predicateを受け取ることができるので以下のように特定条件の例外にマッチした際のハンドリングを記述することもできます。

    @Test
    void testOnErrorResume() {

        Flux<Integer> integerFlux = Flux.just(1, 2, 3)
                .flatMap(integer -> {
                    if (integer.equals(3) return Flux.error(new RuntimeException("Oops"));
                    return Flux.just(integer);
                })
                .onErrorResume(e -> e.getMessage().equals("Auchi"), v -> Flux.just(10))
                .onErrorResume(e -> e.getMessage().equals("Oops"), v -> Flux.just(4));


        StepVerifier.create(integerFlux)
                .expectNext(1)
                .expectNext(2)
                .expectNext(4)
                .verifyComplete();
    }

onErrorReturn

onErrorReturn()は例外発生時に単一の値をエミットする関数です。
以下のコードは先程と同じく、3がPublishされた際にRutimeExceptionをスローされる処理に対して、エラー発生時はFallbackとして4をPublishします。

    @Test
    void testOnErrorReturn() {

        Flux<Integer> integerFlux = Flux.just(1, 2, 3)
                .flatMap(integer -> {
                    if (integer.equals(3)) return Flux.error(new RuntimeException());
                    return Flux.just(integer);
                })
                .onErrorReturn(4);

        StepVerifier.create(integerFlux)
                .expectNext(1)
                .expectNext(2)
                .expectNext(4)
                .verifyComplete();
    }

特定の例外クラスにマッチした場合や、特定条件の例外にマッチした場合のハンドリングの書き方はonErrorResumeと同様の方法でできます。
すべてを動かしてみたわけでは無いですが、 onErro**系は大体同じような操作ができるっぽいです。

doOnError

まず、Flux(or Mono)でのストリーム処理時に発生したエラーをハンドリングして、ログなどの副作用の伴う処理の実行を行なう場合はdoOnError()を利用します。

    @Test
    void testDoOnError() {
        Flux<Integer> integerFlux = Flux.just(1, 2, 3)
                .map(v -> {
                    if (v.equals(3)) throw new RuntimeException("Oops");
                    return v;
                })
                .doOnError(RuntimeException.class, v -> System.out.println("message = " + v.getMessage()));

        StepVerifier.create(integerFlux)
                .expectNext(1)
                .expectNext(2)
                .expectError(RuntimeException.class)
                .verify();
    }

標準出力への出力は以下の通り。

message = Oops

onErrorMap

処理時の例外をハンドリングして、他の例外に変換してダウンストリームに返す場合は、onErrorMap()を利用します。

    @Test
    void testOnErrorMap() {

        Flux<Integer> integerFlux = Flux.just(1, 2, 3)
                .flatMap(integer -> {
                    if (integer.equals(3)) return Flux.error(new RuntimeException("Oops"));
                    return Flux.just(integer);
                })
                .onErrorMap(RuntimeException.class, e -> new IllegalArgumentException(e.getMessage()));

        StepVerifier.create(integerFlux)
                .expectNext(1)
                .expectNext(2)
                .expectError(IllegalArgumentException.class)
                .verify();
    }
}

onErrorContinue

例外発生時に、問題があるエミットを落として処理を続ける場合は、onErrorContinue()を利用します。

    @Test
    void testOnErrorContinue() {

        Flux<Integer> integerFlux = Flux.just(1, 2, 3, 4)
                .flatMap(integer -> {
                    if (integer.equals(3)) return Flux.error(new RuntimeException("Oops"));
                    return Flux.just(integer);
                })
                .onErrorContinue((e, v) -> System.out.printf("dropped value  = %s", v));


        StepVerifier.create(integerFlux)
                .expectNext(1)
                .expectNext(2)
                .expectNext(4)
                .verifyComplete();
    }
}

実行すると、標準出力に以下の文字列が出力されます。

dropped value  = 3

非Reactiveな非同期処理をFluxで扱う

はじめに

Reactive Springを読んでいて、Flux.create()を使えば非Reactiveな非同期のイベントをReactiveのProducerにアダプトすることができるみたいなことが書かれていたので、実際に使って見て理解を深めたいと思います。

Flux.create()?

Flux.create(Consumer<? super FluxSink> emitter)プログラマティックにFluxを生成するためのFactory で、非同期や同期で生成した値をFluxSink APIを通してエミットすることができます。このFactoryを使えばJMS Brockerのイベントやマルチスレッドアプリケーションで生成した値をエレメントとしてエミットすることができます。

実際にやってみる

それでは早速やっていきたいと思います。
タイトルの通り今回はTreadを作って非同期に実行されるスレッドからエレメントをエミットしたいと思います。

環境

$ java --version
openjdk version "15" 2020-09-15
OpenJDK Runtime Environment (build 15+36-1562)
OpenJDK 64-Bit Server VM (build 15+36-1562, mixed mode, sharing)


$ mvn --version
Apache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 15, vendor: Oracle Corporation, runtime: /home/yuya-hirooka/.sdkman/candidates/java/15-open
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-59-generic", arch: "amd64", family: "unix"

Pom

今回は、ReactorとJunit5を利用したいので、Pomには以下の設定を追加しました。

    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.7.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.7.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>2020.0.2</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
            <plugin>
                <artifactId>maven-failsafe-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
        </plugins>
    </build>

コードを書く

準備が整ったところで早速コードを書いていきます。
実際のコードは以下のとおりとなります。

public class FluxCreateTest {

    private final ExecutorService es = Executors.newFixedThreadPool(1);

    @Test
    void exec() {
        System.out.println("Thread Name = " + Thread.currentThread().getName());
        Flux<String> values = Flux.create(emitter -> this.execTask(emitter, 5));

        StepVerifier
                .create(values.doFinally(signalType -> this.es.shutdown()))
                .expectNext("exec number: 1")
                .expectNext("exec number: 2")
                .expectNext("exec number: 3")
                .expectNext("exec number: 4")
                .expectNext("exec number: 5")
                .verifyComplete();
    }

    private void execTask(FluxSink<String> stringFluxSink, int maxNum) {
        this.es.submit(() -> {
            System.out.println("Thread Name = " + Thread.currentThread().getName());

            AtomicInteger num = new AtomicInteger();

            while (num.get() < maxNum) {
                String emitValue = "exec number: " + num.incrementAndGet();
                System.out.println("emitValue = " + emitValue);
                stringFluxSink.next(emitValue);

                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            stringFluxSink.complete();
        });
    }
}

実行するとテストが成功しコンソールに以下の出力がされます。

Thread Name = main
Thread Name = pool-1-thread-1
emitValue = exec number: 1
emitValue = exec number: 2
emitValue = exec number: 3
emitValue = exec number: 4
emitValue = exec number: 5

上記のコードでは、ExecutorServiceによって作成されたpool-1-tread-1という名前のスレッドで、1秒に1回、exec number: *というStringをエミットしています。
Flux.create()は引数としてFluxSinkを受け取るConsumerを登録することができ、Signalは受け取ったFluxSink APIを通して発信することができます。

ここで、ダウンストリームのConsumerがオーバーフローした際のハンドリングはOverflowStrategy.BUFFERが設定されており、デフォルトではバッファする挙動となります。
この、ストラテジーは変更することが可能で、その場合は、createの第二引数にOverflowStrategyEnumを渡してやります。

2020年振り返り

はじめに

なんとなくいろんな人の振り返りみて、自分も振り返ってみようかとふと思ったので書いてみようかと。

ざっくり今年やったこと、あったこと

今年やったとことあったことをまとめると、以下のようになるかと思います。

  • 転職して一年
  • ブログを平均して週一ぐらいでは書く
  • 本を読む

転職して一年

先月の11月で今の会社に来て一年が経ちました。もともとは、転職ドラフトというサイトで声をかけていただいて今に至りますが、気がつけば一年たってました。
前職とは働き方や開発のやり方が全く違い、いろいろ学びが多かったと思います。
今の組織は、アジャイル開発を行う上で、XPのプラクティスを大切にしています。その中で自分の中の今まで知識でしかなかった部分が実践できたり、知らなかったことをたくさん学ぶことができました。

その他の面で、コミュニケーション面に置いて自分の中で1つのブレイクスルーみたいなのがあった気がします。作業が基本ペア作業で行うので一般的な組織に比べてコミュニケーションを撮る機会がかなり多いと思います。その中であまり上手くコミュニケーションを取れない事があると、感じていたのですが、いろいろ考える中で気づきみたいなものがあり少しだけコミュニケーション力がマシになった気がします。
とはいえ、まだまだ上手くできない面がかなりおおく、特に、ビジネスサイド方々などのコンテキストが共有されてない相手とのコミュニケーションに課題を感じているので、来年以降その力をつけていかないとなぁと思っています。
(ただ、自分の強みとして伸ばしたい方向性とは少し違う気もしているのですが…)

ブログを平均して週一ぐらいでは書く

前職の先輩でかなりのブログを書かれる人がいます。その人に触発されてなんとなく、最低週一ぐらいでは書いてみようと思ったのが、今年の3月です。こから42記事書いてるので目的は達成したかなと。
まぁ、今後も週一でゆるゆると書いていこうかなぁとは思います。
やってみて思ったことは以下のことです。

  • ブログをネタを探すようになるので、発見が多くなる
  • 文章にする事で理解度が上がる

前者に関して、ブログを書くことを意識してるとより多くのことを学べるなぁと感じました。
後者に関しては、ドキュメントを読んでなんとなく理解した気になっても書いてみると結構書けないみたいなのがあったのがたくさんありました。その箇所を自分の言葉で文に起こすことで理解度が断然上がるのを感じました。

本を読む

本を読むのは、新人の頃からずっと続けているのですが今年は少し読む量が減った気がします。 思い出せるだけ羅列してみようと思います。

本屋で買ったのとかで覚えてないの合わせると、多分もうちょいある気がしますが、Amazonの購入履歴やらなんやらで思い出せるのはこんだけです。
こうして眺めてみると、Javaに結構寄ってるのがわかりますね。
来年はもう少し、Linuxやネットワーク、ディスクなどの足回りの知識をもう少し固めたいなーと思いました。
後は、Java以外の言語をもう少しやって、そのパラダイムを学びたいなぁと考えてます。
特に関数型に関しては自分はかなり弱い感じがするので「すごいHaskellたのしく学ぼう!」は読み直したいなと思います。
あとは、Rustなんかも学びが多そうだなぁと思ってます。

来年やりたいこと

来年は、以下のようなことをやろうかなと。

  • 継続してブログを書く
  • テーマを考えて読む本を選んでみる
  • 資格を何か取る

ブログを書くことで、自分の技術力が上がるのは実感できたので今後も続けていたいとおもいます。

本に関しては、振り返りでも思ったのですが今までは興味の向くままに本を選んでいたのですが来年は少し考えながら本を選んでみようかと思いました。本のところでも書いたのですが具体的には以下のような本を意識的に読んでみようかと。

  • ネットワーク、ディスク、Linuxなどの下回りの知識を深める
  • Javaとは全く違う言語パラダイムを学ぶ

最後の資格に関しては、資格を取ることが目的というよりは資格を取る事でその分野に対する知識を深めることができるのではとは前々から思っていたので、やってみようかと。具体的には、LPICやCKA/CKADあたりに挑戦してみようかなと思ってます。

K6のシナリオを使ってみる

はじめに

K6では負荷試験のシナリオを設定することが可能です。このブログではその機能を簡単に試してみてざっくりとした使い方を把握したいと思います。 このブログではK6をDockerを用いて起動します。また、テスト対象のアプリは以前のK6について書いたブログで利用したものを利用します。helloの文字列を返すだけのAPIです。

やってみる

環境

$ lsb_release -a
LSB Version:    core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.1 LTS
Release:    20.04
Codename:   focal



$ uname -srvmpio
Linux 5.4.0-58-generic #64-Ubuntu SMP Wed Dec 9 08:16:25 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux


$ docker version 
Client: Docker Engine - Community
 Version:           19.03.12
 API version:       1.40
 Go version:        go1.13.10
 Git commit:        48a66213fe
 Built:             Mon Jun 22 15:45:36 2020
 OS/Arch:           linux/amd64
 Experimental:      false

Server: Docker Engine - Community
 Engine:
  Version:          19.03.12
  API version:      1.40 (minimum version 1.12)
  Go version:       go1.13.10
  Git commit:       48a66213fe
  Built:            Mon Jun 22 15:44:07 2020
  OS/Arch:          linux/amd64
  Experimental:     true
 containerd:
  Version:          1.2.13
  GitCommit:        7ad184331fa3e55e52b890ea95e65ba581ae3429
 runc:
  Version:          1.0.0-rc10
  GitCommit:        dc9208a3303feef5b3839f4323d9beb36df0a9dd
 docker-init:
  Version:          0.18.0
  GitCommit:        fec3683

シナリオで何ができるか

ざっくり言うとK6のシナリオではVirtual Users(VUs)、イテレーション、スケジュールの詳細な設定を行なうことができます。
シナリオを使うメリットは主に以下のようなものがあります。

  • 同じスクリプトに対して複数のシナリオを設定することができる。それぞれは独立したJavaScriptの関数として実行されることによってテストをより整理しやすくフレキシブルに作成することができる。
  • それぞれのシナリオがVUや、イテレーション、スケジューリングパターンを独立して利用することができる(この機能はExecutorsによって実装される)。この機能を活用することによってより高度な現実に近いリクエストを送ることができる
  • それぞれのシナリオは直列でも並列でも実行可能で、また、それらを混ぜたような実行も可能である
  • シナリオごとに違った環境変数やメトリクスのタグをりようすることができる

Executorsって何ぞ?

ExecutorsはK6のエンジンのワークホースです。それぞれのワークホースがVUsをスケジューリングし、それぞれが別で反復実行されます。

Executorsには以下のようなものがあります。

Executor名 説明
Shared iterations shared-iterations 固定値分だけ反復実行するExecutor。この固定値は複数のVUsの間でシェアされる
Per VU iterations per-vu-iterations 固定値の値の回数それぞれのVUsがスクリプトを反復実行するExecutor。この固定値は複数VUs間でシェアされない。
Constant VUs constant-vus 設定された時間内で、できるだけ反復を実行するVUsの固定値
Ramping VUs ramping-vus 設定された時間内で、できるだけ反復を実行するVUsの数の変数。開始時点でのVUs数や終了時点のVUs数等を設定できる。グローバルのstagesと同等のことができる
Constant arrival rate constant-arrival-rate 固定値の数のイテレーションを設定された時間内で実行するExecutor。VUsの数を変動させることでイテレーションレートを保証しようとする
Ramping Arrival Rate ramping-arrival-rate 区切られた時間の中で、変動するイテレーション回数実行するExecutor。Ramping VUsに似ているが、イテレーションのためのもので、VUsの数は動的に変動する
Externally Controlled externally-controlled externally-controlled|K6REST APICLIを通して、実行時に設定を変更することができるExecutor

すべてのシナリオで共通の一般的な設定項目

K6のシナリオではExecutor固有の設定項目とは別に、共有の設定項目がいくつかあります。
その設定項目を以下にまとめます。

説明 デフォルト値
executor* string ユニークなExecutorの名前 -
startTime string シナリオの実行が開始されるべき時間 0s
gracefulStop string イテレーションが強制的に終了される前の待ち時間 30s
exec string 実行されるJSの関数名 defualt
env object シナリオで利用される環境変数 {}
tags object シナリオのタグ {}

実際にシナリオを書いていく

基本的な書き方

シナリオの基本的な書き方は以下のようになります。

export let options = {
  scenarios: {
    example_scenario: {
      // 利用するExecutorの名前
      executor: 'shared-iterations',

      // すべてのシナリオで共通の一般的な設定項目
      startTime: '10s',
      gracefulStop: '5s',
      env: { EXAMPLEVAR: 'testing' },
      tags: { example_tag: 'testing' },

      // エクセキューターごとの設定項目  
      vus: 10,
      iterations: 200,
      maxDuration: '10s',
    },
    //二個目のExecutor  
    another_scenario: { ... }
  }
}

Executorを設定する際はscenariosにobjectを複数記述することが可能なようです。それぞれのシナリオのKey(example_scenarioanother_scenario) などは任意の値を利用することが可能ですが、scenariosセクションの中でユニークな値である必要があります。

ユーザが上昇するシナリオを書く

シナリオについてある程度まとめたところで実際に一つ書いてみたいと思います。
お題は以下のように

  • 開始10秒はリクエストを送らない
  • 10秒経過あとは時間経過とともにユーザが上昇する
    • 開始10秒で一秒間ウェイトを置いてリクエストを送るVUsが10から20まで段階的に上昇
    • 開始20秒から30秒でVUsが20から60まで段階的に上昇
  • 開始30秒間から10秒かけて段階的にVUsが減少

また、スクリプトは以下のものを利用します。

import http from 'k6/http';
import { sleep } from 'k6';

export default function () {
  
  http.get('http://{$APP_IP}:8080/hello');
  sleep(1);
}

それでは早速書いていきたいと思います。
今回利用するExecutorは Ramping VUsです。
お題のストーリーを記述すると以下のようになります。

import http from 'k6/http';
import { sleep } from 'k6';

export let options = {
  scenarios: {
    ramping_up_scenario: {
      executor: 'ramping-vus',

      startVUs:10,
      stages: [
        {duration: '10s', target: 20},
        {duration: '10s', target: 60},
        {duration: '10s', target: 0},
      ]
    }

  }
};

export default function () {

  http.get('http://192.168.10.8:8080/hello');
  sleep(1);
}

上記の設定でスクリプトを実行すると以下のようになります。

$ docker run --rm --network k6net -i loadimpact/k6 run   - < k6script.js

          /\      |‾‾| /‾‾/   /‾‾/   
     /\  /  \     |  |/  /   /  /    
    /  \/    \    |     (   /   ‾‾\  
   /          \   |  |\  \ |  (‾)  | 
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: -
     output: -

  scenarios: (100.00%) 1 scenario, 60 max VUs, 1m0s max duration (incl. graceful stop):
           * ramping_up_scenario: Up to 60 looping VUs for 30s over 3 stages (gracefulRampDown: 30s, gracefulStop: 30s)


running (0m00.8s), 10/60 VUs, 0 complete and 0 interrupted iterations
ramping_up_scenario   [   3% ] 10/60 VUs  00.8s/30.0s

running (0m01.8s), 11/60 VUs, 10 complete and 0 interrupted iterations
ramping_up_scenario   [   6% ] 11/60 VUs  01.8s/30.0s

running (0m02.8s), 12/60 VUs, 21 complete and 0 interrupted iterations
ramping_up_scenario   [   9% ] 12/60 VUs  02.8s/30.0s


(中略)


running (0m08.8s), 18/60 VUs, 108 complete and 0 interrupted iterations
ramping_up_scenario   [  29% ] 18/60 VUs  08.8s/30.0s

running (0m09.8s), 19/60 VUs, 126 complete and 0 interrupted iterations
ramping_up_scenario   [  33% ] 19/60 VUs  09.8s/30.0s

running (0m10.8s), 23/60 VUs, 145 complete and 0 interrupted iterations
ramping_up_scenario   [  36% ] 23/60 VUs  10.8s/30.0s

(中略)

running (0m17.8s), 51/60 VUs, 389 complete and 0 interrupted iterations
ramping_up_scenario   [  59% ] 51/60 VUs  17.8s/30.0s

running (0m18.8s), 55/60 VUs, 440 complete and 0 interrupted iterations
ramping_up_scenario   [  63% ] 55/60 VUs  18.8s/30.0s

running (0m19.8s), 59/60 VUs, 494 complete and 0 interrupted iterations
ramping_up_scenario   [  66% ] 59/60 VUs  19.8s/30.0s

running (0m20.8s), 58/60 VUs, 553 complete and 0 interrupted iterations
ramping_up_scenario   [  69% ] 58/60 VUs  20.8s/30.0s

(中略)

running (0m28.8s), 12/60 VUs, 853 complete and 0 interrupted iterations
ramping_up_scenario   [  96% ] 12/60 VUs  28.8s/30.0s

running (0m29.8s), 06/60 VUs, 865 complete and 0 interrupted iterations
ramping_up_scenario   [  99% ] 06/60 VUs  29.8s/30.0s

running (0m30.1s), 00/60 VUs, 871 complete and 0 interrupted iterations
ramping_up_scenario ✓ [ 100% ] 00/60 VUs  30s

    data_received..............: 103 kB 3.4 kB/s
    data_sent..................: 77 kB  2.5 kB/s
    http_req_blocked...........: avg=52.36µs min=1.48µs   med=8.59µs  max=12.99ms  p(90)=14.6µs   p(95)=267.27µs
    http_req_connecting........: avg=37.05µs min=0s       med=0s      max=12.96ms  p(90)=0s       p(95)=222.62µs
    http_req_duration..........: avg=1.49ms  min=257.57µs med=1.37ms  max=11.37ms  p(90)=2.59ms   p(95)=3.11ms  
    http_req_receiving.........: avg=87.02µs min=11.56µs  med=83.98µs max=654.79µs p(90)=144.41µs p(95)=163.26µs
    http_req_sending...........: avg=41.95µs min=6.09µs   med=36.36µs max=719.74µs p(90)=67.04µs  p(95)=106.4µs 
    http_req_tls_handshaking...: avg=0s      min=0s       med=0s      max=0s       p(90)=0s       p(95)=0s      
    http_req_waiting...........: avg=1.36ms  min=229.33µs med=1.22ms  max=11.21ms  p(90)=2.42ms   p(95)=2.95ms  
    http_reqs..................: 871    28.917444/s
    iteration_duration.........: avg=1s      min=1s       med=1s      max=1.01s    p(90)=1s       p(95)=1s      
    iterations.................: 871    28.917444/s
    vus........................: 6      min=6  max=59
    vus_max....................: 60     min=60 max=60

最初に注目するのは以下の出力です。

  scenarios: (100.00%) 1 scenario, 60 max VUs, 1m0s max duration (incl. graceful stop):
           * ramping_up_scenario: Up to 60 looping VUs for 30s over 3 stages (gracefulRampDown: 30s, gracefulStop: 30s)

シナリオが読み込まれ、そのマックスVUsが60であることがわかります。
また、3つのステージに分けてVUsが上昇していくことも出力されています。実際に最後の10秒は減少ですがK6の出力ではそこは読み取れないみたいですね。

その後各リクエストの結果を読み込んでみると、最初の10秒程度でVUsがユーザが20まで上昇し、次の10秒で20から60まで上昇して(増加数が20以降から変わってます)そして、最後の10秒でユーザが0まで減少しています。
お題はクリアできたみたいですね。

参考資料

K6の負荷テスト結果をGrafana+InfluxDBで可視化する

はじめに

K6は実行結果を様々なプラットフォームに送信して可視化することが可能です。
その中の選択肢としてGrafana+InfluxDBを利用することが可能なようなので、試してみたいと思います。
このブログでは、それぞれのツールはDockerを利用して立ち上げることを想定しています。公式のdocker-composeを用意されていますが今回は利用せずにやってみようと思います。
また、今回使うK6スクリプト以前のブログで最後に作成したものを利用しようと思います。簡単にまとめると10のVUsで一秒間の1回10秒リクエストを送るスクリプトを使います。また、テスト対象のアプリも同様に以前のブログで利用したものを利用します。helloの文字列を返すだけのAPIです。

やってみる

動作環境

$ lsb_release -a
LSB Version:    core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.1 LTS
Release:    20.04
Codename:   focal

$ uname -srvmpio
Linux 5.4.0-58-generic #64-Ubuntu SMP Wed Dec 9 08:16:25 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux

$ docker version
Client: Docker Engine - Community
 Version:           19.03.12
 API version:       1.40
 Go version:        go1.13.10
 Git commit:        48a66213fe
 Built:             Mon Jun 22 15:45:36 2020
 OS/Arch:           linux/amd64
 Experimental:      false

Server: Docker Engine - Community
 Engine:
  Version:          19.03.12
  API version:      1.40 (minimum version 1.12)
  Go version:       go1.13.10
  Git commit:       48a66213fe
  Built:            Mon Jun 22 15:44:07 2020
  OS/Arch:          linux/amd64
  Experimental:     true
 containerd:
  Version:          1.2.13
  GitCommit:        7ad184331fa3e55e52b890ea95e65ba581ae3429
 runc:
  Version:          1.0.0-rc10
  GitCommit:        dc9208a3303feef5b3839f4323d9beb36df0a9dd
 docker-init:
  Version:          0.18.0
  GitCommit:        fec3683

InfluxDB を立ち上げる

最初に、諸々適当なDockerのネットワークを作成しておきます。

$ docker network create k6net

まずは、InfluxDBを立ち上げます。 InfluxDBは公式のイメージを利用して立ち上げます。
データの永続化の問題もありますが、今回は試したいだけなので、ボリュームを当てたりはしません。

$ docker run --network k6net --name influxdb -d  influxdb

K6の実行結果をInfluxDBに保存する

InfluxDBが立ち上がったところで早速テストを実行していきたいと思います。
あたらためて書くと今回利用するスクリプトとアプリは以前のブログで利用したものを再利用します。

K6でテストを行ってその実行結果をInfluxDBに送信するには次のコマンドを実行します。

$ docker run --rm --network k6net -i loadimpact/k6 run --out influxdb=http://influxdb:8086/k6db  - < k6script.js

ここで重要なのは--out influxdb=http://influxdb:8086/k6dbのところです。K6では様々なOUTPUTのプラグイン が用意されており、--outの引数に指定するだけで簡単に測定結果を送信することができます。
スクリプトの実行が完了されると、InfluxDBにk6dbという名前でDBが作成されます。

Grafanaでデータを可視化する

ここまででInfluxDBにテスト結果の格納することまでができたので、最後にその可視化を行っていきます。
まずは、Grafanaを立ち上げます以下のコマンドを叩きます。

docker run --network=k6net -d -p 3000:3000 grafana/grafana

起動するとlocalhost:3000でアクセスすることができます。

f:id:yuya_hirooka:20201104224941p:plain

初回、ユーザ/パスワードはadmin/adminです。

次にInfluxDBを起動します。

$ docker run --name influxdb quay.io/influxdb/influxdb:v2.0.3

Grafanaがたち上がったら、InfluxDBと連携させます。
Grafanaのサイドウィンドウから、[歯車]⇨[Add Data Source]を選択し、サーチボックスにInfluxDBと入力してInfluxDBの項目を選択します。

f:id:yuya_hirooka:20201217201320p:plain

設定は以下のように入力します。

f:id:yuya_hirooka:20201218114951p:plain

Databaseの項目はK6の実行の際に作成されたものを利用します。
これで連携は完了です。

あとはいい感じにダッシュボードを作成すればよいです。
(ちなみに以下の画像はk6 Load Testing Resultsのテンプレートを利用してます)

f:id:yuya_hirooka:20201218120543p:plain

InfluxDBのオプション

スクリプト実行時のInfluxDBに対するオプションがいくつかあるので以下にまとめます。   オプションは実行時の環境変数として設定することで利用することができます。

環境変数 説明
K6_INFLUXDB_USERNAME InfluxDBのユーザネーム
K6_INFLUXDB_PASSWORD InfluxDBのパスワード
K6_INFLUXDB_INSECURE tureになってる場合httpsの証明書検証をスキップする
K6_INFLUXDB_TAGS_AS_FIELDS K6のメトリクスの中で、インデックスできないフィールドをコンマ区切りで指定する。オプションでインデックスのvu:intのように型を指定できる。方はintboolfloatstringがある

参考資料