atコマンドで指定時間にJobを単発実行させる
はじめに
最近、CrontabみたいなJobを定期的にスケジュールするのでは無くて、指定時間に単発で実行したいみたいな要件にぶち当たって、ちょっと調べていたらUnix系のOSにat
というコマンドがあることに気がついたので機能を試して見ようかと思います。
at コマンドとは
標準出力、もしくはファイルからコマンドを受け取って、/bin/sh
を使って指定時間にシェルを起動するコマンドです。Corntabなどは日時の指定のみで、最長一年に一回は実行されるのに対してatコマンドは年の指定まで行なうことが可能で、指定した時間に一回のみといった実行が行えます。
実行時の環境変数(BASH_VERSINFO
、 DISPLAY
、 EUID
, GROUPS
、 SHELLOPTS
、 TERM
、 UID
以外)、ワークディレクトリ、umaskなどはコマンド実行時の状態でキープされます。
また、at
はsetuid
プログラムで、LD_LIBRARY_PATH
やLD_PRELOAD
などの環境変数も読み込まれないようです。
使ってみる
環境
コマンドの実行環境は以下の通り、
$ uname -srvmpio Linux 5.4.0-62-generic #70-Ubuntu SMP Tue Jan 12 12:45:47 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux $ lsb_release -a LSB Version: core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch Distributor ID: Ubuntu Description: Ubuntu 20.04.1 LTS Release: 20.04 Codename: focal
基本的な操作方法
プロンプトでコマンドを設定する。
atは以下のような形式で、実行できます。
$ at 07:55 01/31/21 warning: commands will be executed using /bin/sh at>
実行すると、>at
という表示が現れプロンプトが起動します。
ここにコマンドを打ち込んでctrl+d
を押すと設定が完了です。
$ at 10:46 01/22/21 warning: commands will be executed using /bin/sh at> java --version > test.txt at> <EOT> job 6 at Fri Jan 22 10:46:00 2021
実施時間をすぎると以下のようにテキストが出力されているのが確認できます。
$ cat test.txt openjdk 15.0.1 2020-10-20 OpenJDK Runtime Environment AdoptOpenJDK (build 15.0.1+9) Eclipse OpenJ9 VM AdoptOpenJDK (build openj9-0.23.0, JRE 15 Linux amd64-64-Bit Compressed References 20201022_81 (JIT enabled, AOT enabled) OpenJ9 - 0394ef754 OMR - 582366ae5 JCL - ad583de3b5 based on jdk-15.0.1+9)
ファイルを読み込んで実行する
atコマンドはファイルを読み込んで実行することもできます。
まずは、以下のようなシェルスクリプトを用意します。
hello.sh
#!/bin/sh echo "hello, at command" > test.txt
このシェルスクリプトをat
コマンドで実行するには-f
オプションで実行するファイルを指定します。
$ at 21:05 01/22/21 -f hello.sh warning: commands will be executed using /bin/sh job 8 at Fri Jan 22 21:05:00 2021 $ cat test.txt hello, at command
日付の指定の形式について
at
はPOSIX.2 standardを拡張した日付の指定が可能なようです。
HH:MM
- 時間と分を指定できます。この場合、今日の時間にJobが実行され、もしその時間が今日すでに過去である場合は次の日の指定された時間にJobが実行されます。
MMDD[CC]YY
,MM/DD/[CC]YY
,DD.MM.[CC]YY
or[CC]YY-MM-DD
- オプションとして、上記のような形式で年月日を指定することができます。
midnight
、noon
、teatime
- それぞれの時間に実行されます。
today
、tomorrow
- サーフィクスとして上記の表記をつけるとそれぞれ今日と明日を示すことができます。
その他にも結構自由な時間の指定ができるようで、今日から3日後の4pmとかを指定する場合はat 4pm + 3 days
、6月31日の10時AMを指定したい場合はat 10am Jul 31
と行ったような感じで指定することができます。
他にできそうなこと
マニュアルでatコマンドを調べてみると、このコマンドには以下のような、仲間がいるようです。
atq
atrm
- job numberにとって特定されるジョブを削除します。
-r
オプションがエイリアスになっています。
- job numberにとって特定されるジョブを削除します。
batch
- システムのロードアベレージが1.5(かもしくは
atd
コマンドで指定された値)以下になった際にJobを実行する。
- システムのロードアベレージが1.5(かもしくは
他にも-m
使うなどするとジョブの実行時にユーザにメールを送ることができるようです。
MinikubeにIstioをインストールする
はじめに
業務でKubernetesやIstioの機能の一部を利用することがあるですが、勉強のために試せる環境がほしかったのでローカルのMinikubeにインストールして見ようかと思います。
やってみる
動作環境
$ uname -srvmpio Linux 5.4.0-62-generic #70-Ubuntu SMP Tue Jan 12 12:45:47 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux $ lsb_release -a LSB Version: core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch Distributor ID: Ubuntu Description: Ubuntu 20.04.1 LTS Release: 20.04 Codename: focal $ docker version Client: Docker Engine - Community Version: 20.10.2 API version: 1.41 Go version: go1.13.15 Git commit: 2291f61 Built: Mon Dec 28 16:17:43 2020 OS/Arch: linux/amd64 Context: default Experimental: true Server: Docker Engine - Community Engine: Version: 20.10.2 API version: 1.41 (minimum version 1.12) Go version: go1.13.15 Git commit: 8891c58 Built: Mon Dec 28 16:15:19 2020 OS/Arch: linux/amd64 Experimental: true containerd: Version: 1.4.3 GitCommit: 269548fa27e0089a8b8278fc4fc781d7f65a939b runc: Version: 1.0.0-rc92 GitCommit: ff819c7e9184c13b7c2607fe6c30ae19403a7aff docker-init: Version: 0.19.0 GitCommit: de40ad0
minikubeのインストール
ドキュメントに寄ると、いくつかインストール方法があるようですが、今回はバイナリのインストールを行おうと思います。
以下のコマンドを実行します。
$ curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 \ && chmod +x minikube $ sudo mkdir -p /usr/local/bin/ $ sudo install minikube /usr/local/bin/ $ minikube version minikube version: v1.16.0 commit: 9f1e482427589ff8451c4723b6ba53bb9742fbb1
インストールが終わったのでMinikubeを起動します。
この際、MinikubeのドライバーはDockerを使い、メモリのCPUの設定は公式ドキュメントにしたがって、起動したいと思います。
$ minikube start --memory=16384 --cpus=4 --kubernetes-version=v1.18.10 $ minikube status minikube type: Control Plane host: Running kubelet: Running apiserver: Running kubeconfig: Configured timeToStop: Nonexistent
istioをインストール
PodとServiceに関する注意点
Istioを利用する上でPodとServiceには求められる条件があります。
Podの必須条件は以下のとおりです。
- Podは最低一つのServiceに紐付いている必要がある
- PodはアプリケーションをUIDが1337で実行していないこと
- pod security policiesがクラスターに設定されているかつIstio CNI Pluginを利用していない場合。Podに対して
NET_ADMIN
とNET_RAW
の機能が利用できる必要がある(詳細はこちら)
また、以下のような推奨される条件があるようです。
- Istioがメトリクスを収集する際に付加情報をつけることができるので、Podに
app
ラベルと、version
ラベルを付与しておくのがよい。 - Serviceの
spec.ports.name
にプロトコルを指定して置くのが良い(指定できるプロトコルおよび詳細はこちら)
Istioが利用するポート
別の注意点としてIstioが以下のようなポートを利用することを留意しておく必要があります。
Istioのサイドカープロキシ(Envoy)によって利用されるポートの一覧
ポート | プロトコル | 説明 |
---|---|---|
15000 | TCP | Envoyのadmin ポート。ポッド内のみ |
15001 | TCP | Envoyのアウトバウンドポート |
15006 | TCP | Envoyのインバウンドポート |
15008 | TCP | Envoyトンネルポート(インバウンド) |
15020 | HTTP | アプリケーションやIstio agent、Envoyのテレメトリーのポイント |
15021 | HTTP | ヘルスチェックのポイント |
15090 | Envoy Prometheusのテレメトリーポイント |
Istioのコントロールプレーンによって利用されるポートの一覧
ポート | プロトコル | 説明 |
---|---|---|
15010 | GRPC | XDSとCAサービス(プレーンテキスト) |
15012 | GRPC | XDSとCAサービス(TLS)プロダクションで利用することが推奨される |
8080 | HTTP | Debugインタフェース(非推奨) |
443 | HTTPS | Webhook |
15014 | HTTP | コントロールプレーンのモニタリング |
Server First Protocols
サーバが最初のバイトを送信するようなServer First Protocolを利用する際は
PREMISSIVE mTLSやプロトコルの自動検出等など最初のバイトを利用する機能に影響を与える可能性があります。なので、明示的にServiceのプロトコルを選択肢てやる必要があります。
ただし、以下のようなよく知られたポートを使うプロトコルやサーバに関しては自動的にTCPが選択されるようです。
その他、アウトバウンドのアクセスでHostヘッダーを利用する場合などにも注意が必要なようです。詳細はこちらを確認ください
インストール
istioをインストールする方法は以下のような方法があるようです。
- Istiocltを使ったマニュアルイントール
- Istio Operatorを使ったインストール
あとは、マルチクラスターへのインストールやVMを使う方法(こいつはちょっとよくわからなかった)などがあるようです。
今回はIstiocltを使ったマニュアルイントールをやってみようと思います。
どちらにせよまずはIstioctlをインストールする必要があるようなのでインストールします。
istioctlのバージョンはIstioのコントロールプレーンのバージョンと合わせることが推奨されています。今回は最新のリリースのものをインストールします。
$ curl -sL https://istio.io/downloadIstioctl | sh - $ istioctl version no running Istio pods in "istio-system" 1.8.2
何度も実行するのが面倒なので、export PATH=$PATH:$HOME/.istioctl/bin
は.bash_profile
に登録しておきます。
次にIstioをクラスターにインストールしていくのですが、まずIstioのインストールには[Profile]という概念があるみたいです。簡単にまとめるとプロファイルはIstioのコントローププレーンやIstioサイドカーに対するカスタマイズ方法を提供しています。Profileでできることの詳細に関してはこちらのページをご確認ください。
なにも設定しない場合defualt
のプロファイルが利用されることになります。
今回は素直にインストールするので特にプロファイルは指定しません。
# プロファイルを指定したい場合は`--set profile=demo`などを指定する $ istioctl install --context minikube
コマンドの実行が終わったら次にきちんとインストールが行われたか確認します。
verify-install
コマンドを使ってこの確認を行なうことができます。
まずは、インストール時のさいに利用されたマニフェストを取得する必要があります。
事前に取得していない場合はistioctl manifest generate <インストール時に使用したオプション>
で出力することができます。
今回の場合は以下のように実行します。
$ istioctl manifest generate > generated-manifest.yaml $ istioctl verify-install -f generated-manifest.yaml CustomResourceDefinition: authorizationpolicies.security.istio.io.default checked successfully CustomResourceDefinition: destinationrules.networking.istio.io.default checked successfully CustomResourceDefinition: envoyfilters.networking.istio.io.default checked successfully CustomResourceDefinition: gateways.networking.istio.io.default checked successfully CustomResourceDefinition: istiooperators.install.istio.io.default checked successfully CustomResourceDefinition: peerauthentications.security.istio.io.default checked successfully CustomResourceDefinition: requestauthentications.security.istio.io.default checked successfully CustomResourceDefinition: serviceentries.networking.istio.io.default checked successfully CustomResourceDefinition: sidecars.networking.istio.io.default checked successfully CustomResourceDefinition: virtualservices.networking.istio.io.default checked successfully CustomResourceDefinition: workloadentries.networking.istio.io.default checked successfully CustomResourceDefinition: workloadgroups.networking.istio.io.default checked successfully ServiceAccount: istio-ingressgateway-service-account.istio-system checked successfully ServiceAccount: istio-reader-service-account.istio-system checked successfully ServiceAccount: istiod-service-account.istio-system checked successfully ClusterRole: istio-reader-istio-system.default checked successfully ClusterRole: istiod-istio-system.default checked successfully ClusterRoleBinding: istio-reader-istio-system.default checked successfully ClusterRoleBinding: istiod-istio-system.default checked successfully ValidatingWebhookConfiguration: istiod-istio-system.default checked successfully EnvoyFilter: metadata-exchange-1.6.istio-system checked successfully EnvoyFilter: metadata-exchange-1.7.istio-system checked successfully EnvoyFilter: metadata-exchange-1.8.istio-system checked successfully EnvoyFilter: stats-filter-1.6.istio-system checked successfully EnvoyFilter: stats-filter-1.7.istio-system checked successfully EnvoyFilter: stats-filter-1.8.istio-system checked successfully EnvoyFilter: tcp-metadata-exchange-1.6.istio-system checked successfully EnvoyFilter: tcp-metadata-exchange-1.7.istio-system checked successfully EnvoyFilter: tcp-metadata-exchange-1.8.istio-system checked successfully EnvoyFilter: tcp-stats-filter-1.6.istio-system checked successfully EnvoyFilter: tcp-stats-filter-1.7.istio-system checked successfully EnvoyFilter: tcp-stats-filter-1.8.istio-system checked successfully ConfigMap: istio.istio-system checked successfully ConfigMap: istio-sidecar-injector.istio-system checked successfully MutatingWebhookConfiguration: istio-sidecar-injector.default checked successfully Deployment: istio-ingressgateway.istio-system checked successfully Deployment: istiod.istio-system checked successfully PodDisruptionBudget: istio-ingressgateway.istio-system checked successfully PodDisruptionBudget: istiod.istio-system checked successfully Role: istio-ingressgateway-sds.istio-system checked successfully Role: istiod-istio-system.istio-system checked successfully RoleBinding: istio-ingressgateway-sds.istio-system checked successfully RoleBinding: istiod-istio-system.istio-system checked successfully HorizontalPodAutoscaler: istio-ingressgateway.istio-system checked successfully HorizontalPodAutoscaler: istiod.istio-system checked successfully Service: istio-ingressgateway.istio-system checked successfully Service: istiod.istio-system checked successfully Checked 12 custom resource definitions Checked 1 Istio Deployments Istio is installed successfully
最後の出力をみるとインストールはうまく行っているようです。
アーキテクチャについてちょっと見てみる
最後にほんの少しだけ、Istioのアーキテクチャについて見てみたいと思います。
istioのコンポーネントは論理的には以下の2つに分けられるようです。
data plane
- サイドカーとしてデプロイされるEnvoyプロキシの集まり。マイクロサービス間のすべてネットワークコミュニケーションをプロキシする。また、メトリクスの収集なども行なう
controle plane
- プロキシのルーティングトラフィックを設定、マネージする
(公式ドキュメントより)
Envoy
Istioは拡張したEnvoy Proxyを利用します。
サービス間のインバウンドとアウトバウンドのやり取りをすべてプロキシする役割を持っており、サイドカーとしてPodにデプロイされます。
Istiod
サービスディスカバリ機能、設定、証明書マネジメントを提供します。
IstiodはハイレベルなルーティングルールをEnvoyの設定に変換し、ランタイムのサイドカーに拡散します。
以下のようなコンポーネントで構成されます。
- Pilot
- Envoyの構成管理を行なう
- Citadel
- CA。鍵の管理や証明書の発行を行なう
- galley
- ユーザ設定のマネジメントを行なう
参考資料
Reactor Publisherにおけるエラーハンドリング
はじめに
前回に引き続き、Reactive Springのを読んでいて、Publihsherのエラーハンドリングを行なうためのメソッド群が紹介されていたので、実際に動かして試してみようと思います。(書籍で紹介されていないものも結構あったので)
実際にやってみる
環境は以下の通り、
環境
$ java --version openjdk version "15" 2020-09-15 OpenJDK Runtime Environment (build 15+36-1562) OpenJDK 64-Bit Server VM (build 15+36-1562, mixed mode, sharing) $ mvn --version Apache Maven 3.6.3 Maven home: /usr/share/maven Java version: 15, vendor: Oracle Corporation, runtime: /home/yuya-hirooka/.sdkman/candidates/java/15-open Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-59-generic", arch: "amd64", family: "unix"
Pom
今回は、ReactorとJunit5を利用したいので、Pomには以下の設定を追加しました。
<dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.7.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>5.7.0</version> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>2020.0.2</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.2</version> </plugin> <plugin> <artifactId>maven-failsafe-plugin</artifactId> <version>2.22.2</version> </plugin> </plugins> </build>
コードを書いていく
エラーハンドリング
まずはエラーハンドリングについて、FluxとMonoではonError**と言ったようなエラー時にフックされるようなメソッドがいくつか生えています。
onErrorResume()
onErrorResume()ですが、こいつは例外発生時のFallback処理を書くためのメソッドです。
以下のコードはintの3がPublishされた際にRutimeExceptionをスローされる処理に対して、エラー発生時はFallbackとして4をPublishします。
@Test void testOnErrorResume() { Flux<Integer> integerFlux = Flux.just(1, 2, 3) .flatMap(integer -> { if (integer.equals(3)) return Flux.error(new RuntimeException()); return Flux.just(integer); }) .onErrorResume(e -> Flux.just(4)); StepVerifier.create(integerFlux) .expectNext(1) .expectNext(2) .expectNext(4) .verifyComplete(); }
特定の例外に対して、Fallback処理を書きたい場合は以下のように第一引数に例外クラスを書いてやるようにします。
@Test void testOnErrorResume() { Flux<Integer> integerFlux = Flux.just(1, 2, 3) .flatMap(integer -> { if (integer.equals(3)) return Flux.error(new RuntimeException()); return Flux.just(integer); }) .onErrorResume(IllegalArgumentException.class, v -> Flux.just(10)) .onErrorResume(RuntimeException.class, v -> Flux.just(4)); StepVerifier.create(integerFlux) .expectNext(1) .expectNext(2) .expectNext(4) .verifyComplete(); }
また、第一引数は例外クラスの代わりにPredicate<? super Throwable> predicate
を受け取ることができるので以下のように特定条件の例外にマッチした際のハンドリングを記述することもできます。
@Test void testOnErrorResume() { Flux<Integer> integerFlux = Flux.just(1, 2, 3) .flatMap(integer -> { if (integer.equals(3) return Flux.error(new RuntimeException("Oops")); return Flux.just(integer); }) .onErrorResume(e -> e.getMessage().equals("Auchi"), v -> Flux.just(10)) .onErrorResume(e -> e.getMessage().equals("Oops"), v -> Flux.just(4)); StepVerifier.create(integerFlux) .expectNext(1) .expectNext(2) .expectNext(4) .verifyComplete(); }
onErrorReturn
onErrorReturn()は例外発生時に単一の値をエミットする関数です。
以下のコードは先程と同じく、3がPublishされた際にRutimeExceptionをスローされる処理に対して、エラー発生時はFallbackとして4をPublishします。
@Test void testOnErrorReturn() { Flux<Integer> integerFlux = Flux.just(1, 2, 3) .flatMap(integer -> { if (integer.equals(3)) return Flux.error(new RuntimeException()); return Flux.just(integer); }) .onErrorReturn(4); StepVerifier.create(integerFlux) .expectNext(1) .expectNext(2) .expectNext(4) .verifyComplete(); }
特定の例外クラスにマッチした場合や、特定条件の例外にマッチした場合のハンドリングの書き方はonErrorResume
と同様の方法でできます。
すべてを動かしてみたわけでは無いですが、 onErro**系は大体同じような操作ができるっぽいです。
doOnError
まず、Flux(or Mono)でのストリーム処理時に発生したエラーをハンドリングして、ログなどの副作用の伴う処理の実行を行なう場合はdoOnError()を利用します。
@Test void testDoOnError() { Flux<Integer> integerFlux = Flux.just(1, 2, 3) .map(v -> { if (v.equals(3)) throw new RuntimeException("Oops"); return v; }) .doOnError(RuntimeException.class, v -> System.out.println("message = " + v.getMessage())); StepVerifier.create(integerFlux) .expectNext(1) .expectNext(2) .expectError(RuntimeException.class) .verify(); }
標準出力への出力は以下の通り。
message = Oops
onErrorMap
処理時の例外をハンドリングして、他の例外に変換してダウンストリームに返す場合は、onErrorMap()を利用します。
@Test void testOnErrorMap() { Flux<Integer> integerFlux = Flux.just(1, 2, 3) .flatMap(integer -> { if (integer.equals(3)) return Flux.error(new RuntimeException("Oops")); return Flux.just(integer); }) .onErrorMap(RuntimeException.class, e -> new IllegalArgumentException(e.getMessage())); StepVerifier.create(integerFlux) .expectNext(1) .expectNext(2) .expectError(IllegalArgumentException.class) .verify(); } }
onErrorContinue
例外発生時に、問題があるエミットを落として処理を続ける場合は、onErrorContinue()を利用します。
@Test void testOnErrorContinue() { Flux<Integer> integerFlux = Flux.just(1, 2, 3, 4) .flatMap(integer -> { if (integer.equals(3)) return Flux.error(new RuntimeException("Oops")); return Flux.just(integer); }) .onErrorContinue((e, v) -> System.out.printf("dropped value = %s", v)); StepVerifier.create(integerFlux) .expectNext(1) .expectNext(2) .expectNext(4) .verifyComplete(); } }
実行すると、標準出力に以下の文字列が出力されます。
dropped value = 3
非Reactiveな非同期処理をFluxで扱う
はじめに
Reactive Springを読んでいて、Flux.create()
を使えば非Reactiveな非同期のイベントをReactiveのProducerにアダプトすることができるみたいなことが書かれていたので、実際に使って見て理解を深めたいと思います。
Flux.create()?
Flux.create(Consumer<? super FluxSink
実際にやってみる
それでは早速やっていきたいと思います。
タイトルの通り今回はTreadを作って非同期に実行されるスレッドからエレメントをエミットしたいと思います。
環境
$ java --version openjdk version "15" 2020-09-15 OpenJDK Runtime Environment (build 15+36-1562) OpenJDK 64-Bit Server VM (build 15+36-1562, mixed mode, sharing) $ mvn --version Apache Maven 3.6.3 Maven home: /usr/share/maven Java version: 15, vendor: Oracle Corporation, runtime: /home/yuya-hirooka/.sdkman/candidates/java/15-open Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-59-generic", arch: "amd64", family: "unix"
Pom
今回は、ReactorとJunit5を利用したいので、Pomには以下の設定を追加しました。
<dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.7.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>5.7.0</version> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>2020.0.2</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.2</version> </plugin> <plugin> <artifactId>maven-failsafe-plugin</artifactId> <version>2.22.2</version> </plugin> </plugins> </build>
コードを書く
準備が整ったところで早速コードを書いていきます。
実際のコードは以下のとおりとなります。
public class FluxCreateTest { private final ExecutorService es = Executors.newFixedThreadPool(1); @Test void exec() { System.out.println("Thread Name = " + Thread.currentThread().getName()); Flux<String> values = Flux.create(emitter -> this.execTask(emitter, 5)); StepVerifier .create(values.doFinally(signalType -> this.es.shutdown())) .expectNext("exec number: 1") .expectNext("exec number: 2") .expectNext("exec number: 3") .expectNext("exec number: 4") .expectNext("exec number: 5") .verifyComplete(); } private void execTask(FluxSink<String> stringFluxSink, int maxNum) { this.es.submit(() -> { System.out.println("Thread Name = " + Thread.currentThread().getName()); AtomicInteger num = new AtomicInteger(); while (num.get() < maxNum) { String emitValue = "exec number: " + num.incrementAndGet(); System.out.println("emitValue = " + emitValue); stringFluxSink.next(emitValue); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } stringFluxSink.complete(); }); } }
実行するとテストが成功しコンソールに以下の出力がされます。
Thread Name = main Thread Name = pool-1-thread-1 emitValue = exec number: 1 emitValue = exec number: 2 emitValue = exec number: 3 emitValue = exec number: 4 emitValue = exec number: 5
上記のコードでは、ExecutorServiceによって作成されたpool-1-tread-1
という名前のスレッドで、1秒に1回、exec number: *
というStringをエミットしています。
Flux.create()は引数としてFluxSinkを受け取るConsumerを登録することができ、Signalは受け取ったFluxSink APIを通して発信することができます。
ここで、ダウンストリームのConsumerがオーバーフローした際のハンドリングはOverflowStrategy.BUFFER
が設定されており、デフォルトではバッファする挙動となります。
この、ストラテジーは変更することが可能で、その場合は、createの第二引数にOverflowStrategyのEnumを渡してやります。
2020年振り返り
はじめに
なんとなくいろんな人の振り返りみて、自分も振り返ってみようかとふと思ったので書いてみようかと。
ざっくり今年やったこと、あったこと
今年やったとことあったことをまとめると、以下のようになるかと思います。
- 転職して一年
- ブログを平均して週一ぐらいでは書く
- 本を読む
転職して一年
先月の11月で今の会社に来て一年が経ちました。もともとは、転職ドラフトというサイトで声をかけていただいて今に至りますが、気がつけば一年たってました。
前職とは働き方や開発のやり方が全く違い、いろいろ学びが多かったと思います。
今の組織は、アジャイル開発を行う上で、XPのプラクティスを大切にしています。その中で自分の中の今まで知識でしかなかった部分が実践できたり、知らなかったことをたくさん学ぶことができました。
その他の面で、コミュニケーション面に置いて自分の中で1つのブレイクスルーみたいなのがあった気がします。作業が基本ペア作業で行うので一般的な組織に比べてコミュニケーションを撮る機会がかなり多いと思います。その中であまり上手くコミュニケーションを取れない事があると、感じていたのですが、いろいろ考える中で気づきみたいなものがあり少しだけコミュニケーション力がマシになった気がします。
とはいえ、まだまだ上手くできない面がかなりおおく、特に、ビジネスサイド方々などのコンテキストが共有されてない相手とのコミュニケーションに課題を感じているので、来年以降その力をつけていかないとなぁと思っています。
(ただ、自分の強みとして伸ばしたい方向性とは少し違う気もしているのですが…)
ブログを平均して週一ぐらいでは書く
前職の先輩でかなりのブログを書かれる人がいます。その人に触発されてなんとなく、最低週一ぐらいでは書いてみようと思ったのが、今年の3月です。こから42記事書いてるので目的は達成したかなと。
まぁ、今後も週一でゆるゆると書いていこうかなぁとは思います。
やってみて思ったことは以下のことです。
- ブログをネタを探すようになるので、発見が多くなる
- 文章にする事で理解度が上がる
前者に関して、ブログを書くことを意識してるとより多くのことを学べるなぁと感じました。
後者に関しては、ドキュメントを読んでなんとなく理解した気になっても書いてみると結構書けないみたいなのがあったのがたくさんありました。その箇所を自分の言葉で文に起こすことで理解度が断然上がるのを感じました。
本を読む
本を読むのは、新人の頃からずっと続けているのですが今年は少し読む量が減った気がします。 思い出せるだけ羅列してみようと思います。
- Clean Agile
- Amazon Web Services負荷試験入門
- Netty In Action
- Java並行プログラミング
- Real World HTTP 第2版
- イラストでわかるDockerとKubernetes
- マイクロサービスパターン
- 監視入門
- オブジェクト指向UIデザイン
- Linuxで動かしながら学ぶネットワーク入門
- ドメイン駆動設計入門
- みんなのJava
- 新世代Javaプログラミングガイド
- みんなのKotlin
- 7つの習慣
- kubernetesで実践するクラウドネイティブdevops
本屋で買ったのとかで覚えてないの合わせると、多分もうちょいある気がしますが、Amazonの購入履歴やらなんやらで思い出せるのはこんだけです。
こうして眺めてみると、Javaに結構寄ってるのがわかりますね。
来年はもう少し、Linuxやネットワーク、ディスクなどの足回りの知識をもう少し固めたいなーと思いました。
後は、Java以外の言語をもう少しやって、そのパラダイムを学びたいなぁと考えてます。
特に関数型に関しては自分はかなり弱い感じがするので「すごいHaskellたのしく学ぼう!」は読み直したいなと思います。
あとは、Rustなんかも学びが多そうだなぁと思ってます。
来年やりたいこと
来年は、以下のようなことをやろうかなと。
- 継続してブログを書く
- テーマを考えて読む本を選んでみる
- 資格を何か取る
ブログを書くことで、自分の技術力が上がるのは実感できたので今後も続けていたいとおもいます。
本に関しては、振り返りでも思ったのですが今までは興味の向くままに本を選んでいたのですが来年は少し考えながら本を選んでみようかと思いました。本のところでも書いたのですが具体的には以下のような本を意識的に読んでみようかと。
最後の資格に関しては、資格を取ることが目的というよりは資格を取る事でその分野に対する知識を深めることができるのではとは前々から思っていたので、やってみようかと。具体的には、LPICやCKA/CKADあたりに挑戦してみようかなと思ってます。
K6のシナリオを使ってみる
はじめに
K6では負荷試験のシナリオを設定することが可能です。このブログではその機能を簡単に試してみてざっくりとした使い方を把握したいと思います。 このブログではK6をDockerを用いて起動します。また、テスト対象のアプリは以前のK6について書いたブログで利用したものを利用します。helloの文字列を返すだけのAPIです。
やってみる
環境
$ lsb_release -a LSB Version: core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch Distributor ID: Ubuntu Description: Ubuntu 20.04.1 LTS Release: 20.04 Codename: focal $ uname -srvmpio Linux 5.4.0-58-generic #64-Ubuntu SMP Wed Dec 9 08:16:25 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux $ docker version Client: Docker Engine - Community Version: 19.03.12 API version: 1.40 Go version: go1.13.10 Git commit: 48a66213fe Built: Mon Jun 22 15:45:36 2020 OS/Arch: linux/amd64 Experimental: false Server: Docker Engine - Community Engine: Version: 19.03.12 API version: 1.40 (minimum version 1.12) Go version: go1.13.10 Git commit: 48a66213fe Built: Mon Jun 22 15:44:07 2020 OS/Arch: linux/amd64 Experimental: true containerd: Version: 1.2.13 GitCommit: 7ad184331fa3e55e52b890ea95e65ba581ae3429 runc: Version: 1.0.0-rc10 GitCommit: dc9208a3303feef5b3839f4323d9beb36df0a9dd docker-init: Version: 0.18.0 GitCommit: fec3683
シナリオで何ができるか
ざっくり言うとK6のシナリオではVirtual Users(VUs)、イテレーション、スケジュールの詳細な設定を行なうことができます。
シナリオを使うメリットは主に以下のようなものがあります。
- 同じスクリプトに対して複数のシナリオを設定することができる。それぞれは独立したJavaScriptの関数として実行されることによってテストをより整理しやすくフレキシブルに作成することができる。
- それぞれのシナリオがVUや、イテレーション、スケジューリングパターンを独立して利用することができる(この機能はExecutorsによって実装される)。この機能を活用することによってより高度な現実に近いリクエストを送ることができる
- それぞれのシナリオは直列でも並列でも実行可能で、また、それらを混ぜたような実行も可能である
- シナリオごとに違った環境変数やメトリクスのタグをりようすることができる
Executorsって何ぞ?
ExecutorsはK6のエンジンのワークホースです。それぞれのワークホースがVUsをスケジューリングし、それぞれが別で反復実行されます。
Executorsには以下のようなものがあります。
Executor名 | 値 | 説明 |
---|---|---|
Shared iterations | shared-iterations | 固定値分だけ反復実行するExecutor。この固定値は複数のVUsの間でシェアされる |
Per VU iterations | per-vu-iterations | 固定値の値の回数それぞれのVUsがスクリプトを反復実行するExecutor。この固定値は複数VUs間でシェアされない。 |
Constant VUs | constant-vus | 設定された時間内で、できるだけ反復を実行するVUsの固定値 |
Ramping VUs | ramping-vus | 設定された時間内で、できるだけ反復を実行するVUsの数の変数。開始時点でのVUs数や終了時点のVUs数等を設定できる。グローバルのstages と同等のことができる |
Constant arrival rate | constant-arrival-rate | 固定値の数のイテレーションを設定された時間内で実行するExecutor。VUsの数を変動させることでイテレーションレートを保証しようとする |
Ramping Arrival Rate | ramping-arrival-rate | 区切られた時間の中で、変動するイテレーション回数実行するExecutor。Ramping VUsに似ているが、イテレーションのためのもので、VUsの数は動的に変動する |
Externally Controlled | externally-controlled | externally-controlled|K6のREST APIやCLIを通して、実行時に設定を変更することができるExecutor |
すべてのシナリオで共通の一般的な設定項目
K6のシナリオではExecutor固有の設定項目とは別に、共有の設定項目がいくつかあります。
その設定項目を以下にまとめます。
値 | 型 | 説明 | デフォルト値 |
---|---|---|---|
executor* | string | ユニークなExecutorの名前 | - |
startTime | string | シナリオの実行が開始されるべき時間 | 0s |
gracefulStop | string | イテレーションが強制的に終了される前の待ち時間 | 30s |
exec | string | 実行されるJSの関数名 | defualt |
env | object | シナリオで利用される環境変数 | {} |
tags | object | シナリオのタグ | {} |
実際にシナリオを書いていく
基本的な書き方
シナリオの基本的な書き方は以下のようになります。
export let options = { scenarios: { example_scenario: { // 利用するExecutorの名前 executor: 'shared-iterations', // すべてのシナリオで共通の一般的な設定項目 startTime: '10s', gracefulStop: '5s', env: { EXAMPLEVAR: 'testing' }, tags: { example_tag: 'testing' }, // エクセキューターごとの設定項目 vus: 10, iterations: 200, maxDuration: '10s', }, //二個目のExecutor another_scenario: { ... } } }
Executorを設定する際はscenarios
にobjectを複数記述することが可能なようです。それぞれのシナリオのKey(example_scenario
、another_scenario
) などは任意の値を利用することが可能ですが、scenariosセクションの中でユニークな値である必要があります。
ユーザが上昇するシナリオを書く
シナリオについてある程度まとめたところで実際に一つ書いてみたいと思います。
お題は以下のように
- 開始10秒はリクエストを送らない
- 10秒経過あとは時間経過とともにユーザが上昇する
- 開始10秒で一秒間ウェイトを置いてリクエストを送るVUsが10から20まで段階的に上昇
- 開始20秒から30秒でVUsが20から60まで段階的に上昇
- 開始30秒間から10秒かけて段階的にVUsが減少
また、スクリプトは以下のものを利用します。
import http from 'k6/http'; import { sleep } from 'k6'; export default function () { http.get('http://{$APP_IP}:8080/hello'); sleep(1); }
それでは早速書いていきたいと思います。
今回利用するExecutorは Ramping VUs
です。
お題のストーリーを記述すると以下のようになります。
import http from 'k6/http'; import { sleep } from 'k6'; export let options = { scenarios: { ramping_up_scenario: { executor: 'ramping-vus', startVUs:10, stages: [ {duration: '10s', target: 20}, {duration: '10s', target: 60}, {duration: '10s', target: 0}, ] } } }; export default function () { http.get('http://192.168.10.8:8080/hello'); sleep(1); }
上記の設定でスクリプトを実行すると以下のようになります。
$ docker run --rm --network k6net -i loadimpact/k6 run - < k6script.js /\ |‾‾| /‾‾/ /‾‾/ /\ / \ | |/ / / / / \/ \ | ( / ‾‾\ / \ | |\ \ | (‾) | / __________ \ |__| \__\ \_____/ .io execution: local script: - output: - scenarios: (100.00%) 1 scenario, 60 max VUs, 1m0s max duration (incl. graceful stop): * ramping_up_scenario: Up to 60 looping VUs for 30s over 3 stages (gracefulRampDown: 30s, gracefulStop: 30s) running (0m00.8s), 10/60 VUs, 0 complete and 0 interrupted iterations ramping_up_scenario [ 3% ] 10/60 VUs 00.8s/30.0s running (0m01.8s), 11/60 VUs, 10 complete and 0 interrupted iterations ramping_up_scenario [ 6% ] 11/60 VUs 01.8s/30.0s running (0m02.8s), 12/60 VUs, 21 complete and 0 interrupted iterations ramping_up_scenario [ 9% ] 12/60 VUs 02.8s/30.0s (中略) running (0m08.8s), 18/60 VUs, 108 complete and 0 interrupted iterations ramping_up_scenario [ 29% ] 18/60 VUs 08.8s/30.0s running (0m09.8s), 19/60 VUs, 126 complete and 0 interrupted iterations ramping_up_scenario [ 33% ] 19/60 VUs 09.8s/30.0s running (0m10.8s), 23/60 VUs, 145 complete and 0 interrupted iterations ramping_up_scenario [ 36% ] 23/60 VUs 10.8s/30.0s (中略) running (0m17.8s), 51/60 VUs, 389 complete and 0 interrupted iterations ramping_up_scenario [ 59% ] 51/60 VUs 17.8s/30.0s running (0m18.8s), 55/60 VUs, 440 complete and 0 interrupted iterations ramping_up_scenario [ 63% ] 55/60 VUs 18.8s/30.0s running (0m19.8s), 59/60 VUs, 494 complete and 0 interrupted iterations ramping_up_scenario [ 66% ] 59/60 VUs 19.8s/30.0s running (0m20.8s), 58/60 VUs, 553 complete and 0 interrupted iterations ramping_up_scenario [ 69% ] 58/60 VUs 20.8s/30.0s (中略) running (0m28.8s), 12/60 VUs, 853 complete and 0 interrupted iterations ramping_up_scenario [ 96% ] 12/60 VUs 28.8s/30.0s running (0m29.8s), 06/60 VUs, 865 complete and 0 interrupted iterations ramping_up_scenario [ 99% ] 06/60 VUs 29.8s/30.0s running (0m30.1s), 00/60 VUs, 871 complete and 0 interrupted iterations ramping_up_scenario ✓ [ 100% ] 00/60 VUs 30s data_received..............: 103 kB 3.4 kB/s data_sent..................: 77 kB 2.5 kB/s http_req_blocked...........: avg=52.36µs min=1.48µs med=8.59µs max=12.99ms p(90)=14.6µs p(95)=267.27µs http_req_connecting........: avg=37.05µs min=0s med=0s max=12.96ms p(90)=0s p(95)=222.62µs http_req_duration..........: avg=1.49ms min=257.57µs med=1.37ms max=11.37ms p(90)=2.59ms p(95)=3.11ms http_req_receiving.........: avg=87.02µs min=11.56µs med=83.98µs max=654.79µs p(90)=144.41µs p(95)=163.26µs http_req_sending...........: avg=41.95µs min=6.09µs med=36.36µs max=719.74µs p(90)=67.04µs p(95)=106.4µs http_req_tls_handshaking...: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s http_req_waiting...........: avg=1.36ms min=229.33µs med=1.22ms max=11.21ms p(90)=2.42ms p(95)=2.95ms http_reqs..................: 871 28.917444/s iteration_duration.........: avg=1s min=1s med=1s max=1.01s p(90)=1s p(95)=1s iterations.................: 871 28.917444/s vus........................: 6 min=6 max=59 vus_max....................: 60 min=60 max=60
最初に注目するのは以下の出力です。
scenarios: (100.00%) 1 scenario, 60 max VUs, 1m0s max duration (incl. graceful stop): * ramping_up_scenario: Up to 60 looping VUs for 30s over 3 stages (gracefulRampDown: 30s, gracefulStop: 30s)
シナリオが読み込まれ、そのマックスVUsが60であることがわかります。
また、3つのステージに分けてVUsが上昇していくことも出力されています。実際に最後の10秒は減少ですがK6の出力ではそこは読み取れないみたいですね。
その後各リクエストの結果を読み込んでみると、最初の10秒程度でVUsがユーザが20まで上昇し、次の10秒で20から60まで上昇して(増加数が20以降から変わってます)そして、最後の10秒でユーザが0まで減少しています。
お題はクリアできたみたいですね。
参考資料
K6の負荷テスト結果をGrafana+InfluxDBで可視化する
はじめに
K6は実行結果を様々なプラットフォームに送信して可視化することが可能です。
その中の選択肢としてGrafana+InfluxDBを利用することが可能なようなので、試してみたいと思います。
このブログでは、それぞれのツールはDockerを利用して立ち上げることを想定しています。公式のdocker-composeを用意されていますが今回は利用せずにやってみようと思います。
また、今回使うK6のスクリプトは以前のブログで最後に作成したものを利用しようと思います。簡単にまとめると10のVUsで一秒間の1回10秒リクエストを送るスクリプトを使います。また、テスト対象のアプリも同様に以前のブログで利用したものを利用します。hello
の文字列を返すだけのAPIです。
やってみる
動作環境
$ lsb_release -a LSB Version: core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch Distributor ID: Ubuntu Description: Ubuntu 20.04.1 LTS Release: 20.04 Codename: focal $ uname -srvmpio Linux 5.4.0-58-generic #64-Ubuntu SMP Wed Dec 9 08:16:25 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux $ docker version Client: Docker Engine - Community Version: 19.03.12 API version: 1.40 Go version: go1.13.10 Git commit: 48a66213fe Built: Mon Jun 22 15:45:36 2020 OS/Arch: linux/amd64 Experimental: false Server: Docker Engine - Community Engine: Version: 19.03.12 API version: 1.40 (minimum version 1.12) Go version: go1.13.10 Git commit: 48a66213fe Built: Mon Jun 22 15:44:07 2020 OS/Arch: linux/amd64 Experimental: true containerd: Version: 1.2.13 GitCommit: 7ad184331fa3e55e52b890ea95e65ba581ae3429 runc: Version: 1.0.0-rc10 GitCommit: dc9208a3303feef5b3839f4323d9beb36df0a9dd docker-init: Version: 0.18.0 GitCommit: fec3683
InfluxDB を立ち上げる
最初に、諸々適当なDockerのネットワークを作成しておきます。
$ docker network create k6net
まずは、InfluxDBを立ち上げます。
InfluxDBは公式のイメージを利用して立ち上げます。
データの永続化の問題もありますが、今回は試したいだけなので、ボリュームを当てたりはしません。
$ docker run --network k6net --name influxdb -d influxdb
K6の実行結果をInfluxDBに保存する
InfluxDBが立ち上がったところで早速テストを実行していきたいと思います。
あたらためて書くと今回利用するスクリプトとアプリは以前のブログで利用したものを再利用します。
K6でテストを行ってその実行結果をInfluxDBに送信するには次のコマンドを実行します。
$ docker run --rm --network k6net -i loadimpact/k6 run --out influxdb=http://influxdb:8086/k6db - < k6script.js
ここで重要なのは--out influxdb=http://influxdb:8086/k6db
のところです。K6では様々なOUTPUTのプラグイン が用意されており、--out
の引数に指定するだけで簡単に測定結果を送信することができます。
スクリプトの実行が完了されると、InfluxDBにk6db
という名前でDBが作成されます。
Grafanaでデータを可視化する
ここまででInfluxDBにテスト結果の格納することまでができたので、最後にその可視化を行っていきます。
まずは、Grafanaを立ち上げます以下のコマンドを叩きます。
docker run --network=k6net -d -p 3000:3000 grafana/grafana
起動するとlocalhost:3000
でアクセスすることができます。
初回、ユーザ/パスワードはadmin/admin
です。
次にInfluxDBを起動します。
$ docker run --name influxdb quay.io/influxdb/influxdb:v2.0.3
Grafanaがたち上がったら、InfluxDBと連携させます。
Grafanaのサイドウィンドウから、[歯車]⇨[Add Data Source]を選択し、サーチボックスにInfluxDB
と入力してInfluxDBの項目を選択します。
設定は以下のように入力します。
Database
の項目はK6の実行の際に作成されたものを利用します。
これで連携は完了です。
あとはいい感じにダッシュボードを作成すればよいです。
(ちなみに以下の画像はk6 Load Testing Resultsのテンプレートを利用してます)
InfluxDBのオプション
スクリプト実行時のInfluxDBに対するオプションがいくつかあるので以下にまとめます。 オプションは実行時の環境変数として設定することで利用することができます。
環境変数 | 説明 |
---|---|
K6_INFLUXDB_USERNAME | InfluxDBのユーザネーム |
K6_INFLUXDB_PASSWORD | InfluxDBのパスワード |
K6_INFLUXDB_INSECURE | tureになってる場合httpsの証明書検証をスキップする |
K6_INFLUXDB_TAGS_AS_FIELDS | K6のメトリクスの中で、インデックスできないフィールドをコンマ区切りで指定する。オプションでインデックスのvu:int のように型を指定できる。方はint 、bool 、float 、string がある |