Jaegerでk8s+Istio上のアプリ(Quarkus、Spring)を分散トレーシングする
はじめに
分散トレーシングをやる際にJaegerというツールがあって、試してみたいと思って試せていなかったのやってみようと思います。今回はMinikubeで作ったクラスターにIstioをデプロイして、 その環境でのトレーシングを行ってみようと思います。
Istioにおけるトレーシングについて
どんなふうに実現されるか
Istioにおける分散トレーシングがどのように実現されているのかは、ドキュメントにいろいろ書かれてました。
まず、IstioはEnvoyベースのトレーシングを行います。その際にアプリケーションはB3 trace headersなどのヘッダーを転送していく必要があるようです。具体的には以下のようなヘッダーを転送して行くみたいです。
- x-request-id
- x-b3-traceid
- x-b3-spanid
- x-b3-parentspanid
- x-b3-sampled
- x-b3-flags
- b3
さらに、Lightstepを利用する場合はx-ot-span-context
もつける必要があるみたいです。
これらのヘッダーはマニュアルで転送していくことも可能ですが、ZipkinやJaegerのクライアントを使うことで自動的に拡散することも可能なようです。
ちなみに、なぜ、Istio自身がこれらのヘッダーをフォワーディングできないのかについてですが、アプリケーションのアウトバウンドリクエストがどのインバウンドリクエストによって発生したものかを特定するすべが、Istio側には存在しないからです。
また、Envoyベースのトレーシングに置いてEnvoyは以下のようなことを行ってくれます。
- リクエストIDとトレーシングヘッダー(B3 Header等)を生成し送信する
- リクエストとレスポンスのメタデータからTrace Spanを生成する
- トレーシングバックエンドにSpanを送信する
- プロキシ先のアプリケーションにヘッダーを送信する
Jaegerとは
Jaeger JaergerはDapper、OpenZipkinにインスパイヤーされた分散トレーシングシステムです。マイクロサービスに置いて以下のような用途で用いられます。
- 分散トレーシングのモニタリング
- 根本原因解析
- サービスの依存解析
- パフォーマンス、レイテンシの最適化
また、以下のようなコンポーネントで構成されます。
- Goで作られたバックエンドコンポーネント
- React UI
- ストレージ
- Cassandra 3.4+
- Elasticserch 5.x, 6.x, 7.x
- Kafka
- メモリ
やってみる
環境
今回はKubernetesのクラスターはMinikube(driver=none)を用います。
$ minikube version minikube version: v1.16.0 commit: 9f1e482427589ff8451c4723b6ba53bb9742fbb1 $ kubectl version -o yaml clientVersion: buildDate: "2020-10-15T01:52:24Z" compiler: gc gitCommit: 62876fc6d93e891aa7fbe19771e6a6c03773b0f7 gitTreeState: clean gitVersion: v1.18.10 goVersion: go1.13.15 major: "1" minor: "18" platform: linux/amd64 serverVersion: buildDate: "2020-10-15T01:43:56Z" compiler: gc gitCommit: 62876fc6d93e891aa7fbe19771e6a6c03773b0f7 gitTreeState: clean gitVersion: v1.18.10 goVersion: go1.13.15 major: "1" minor: "18" $ istioctl version client version: 1.8.2 control plane version: 1.8.2 data plane version: 1.8.2 (1 proxies) $ uname -srvmpio Linux 5.4.0-64-generic #72-Ubuntu SMP Fri Jan 15 10:27:54 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux $ docker version (Client略) Server: Docker Engine - Community Engine: Version: 20.10.2 API version: 1.41 (minimum version 1.12) Go version: go1.13.15 Git commit: 8891c58 Built: Mon Dec 28 16:15:19 2020 OS/Arch: linux/amd64 Experimental: true containerd: Version: 1.4.3 GitCommit: 269548fa27e0089a8b8278fc4fc781d7f65a939b runc: Version: 1.0.0-rc92 GitCommit: ff819c7e9184c13b7c2607fe6c30ae19403a7aff docker-init: Version: 0.19.0 GitCommit: de40ad0 $ java --version openjdk 11.0.10 2021-01-19 OpenJDK Runtime Environment 18.9 (build 11.0.10+9) OpenJDK 64-Bit Server VM 18.9 (build 11.0.10+9, mixed mode) $ mvn --version Apache Maven 3.6.3 Maven home: /usr/share/maven Java version: 11.0.10, vendor: Oracle Corporation, runtime: /home/someone/.sdkman/candidates/java/11.0.10-open Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-65-generic", arch: "amd64", family: "unix"
もろもろを構築しておく
今回は以下のような環境を構築して分散トレーシングをやってみようと思います。
Jaegerでトレーシングするのは以下の3つになります。
利用するアプリはSpringとQuarkusと利用していますが、この記事に置いては深い意味は無く、別で試したことがあったので採用しました。
アプリのロジックに関しても複雑なことは市内想定で、QuarkusアプリがSpringアプリに対して、Hello, Tracing
の文字列を取得してそのままフロント側に返すようにしようと思います。
Sidecar InjectionをTrueにしてネームスペースを作成
まずは、IstioのインジェクションをTrueにしておきます。
今回は余計な複雑さをなくすために、新たにネームスペースを作成はせずにクラスタのdefault
に対してインジェクションをTrueにします。
以下のコマンドを実行します。
$ kubectl --context=minikube label namespace default istio-injection=enabled namespace/default labeled
minikubeのコンテクストでイメージをビルドするように設定
今回はローカルでビルドしたしたイメージを使うようにしておきます。
いかのコマンドでminikubeのコンテクストでイメージをビルドするように設定します。
$ eval $(minikube docker-env)
サンプルアプリケーションを作成しコンテナ化
Quarkusのアプリを作成
まずはQuarkusの方を作っていこうと思います。
プロジェクトはQuarkus - Start coding with code.quarkus.ioを使って作成します。
プロジェクトの設定は以下の通り。
依存はRESTEasy
とRest Client
だけ追加しておきます。
プロジェクトが作成できたら、まずはHTTPクライアントを作成します。
@RegisterRestClient @RegisterClientHeaders public interface GreetingClient { @GET @Path("/hello") String fetchHello(); }
基本的には、なんの変哲の無いRestClientですがひとつだけポイントがあります。
前述の通り、Istioを使った分散トレーシングではヘッダーを転送していく必要があります。そのフォワーディングを行なうために、@RegisterClientHeadersを利用しています。このヘッダーはデフォルトで、指定されたJAX-RSのインバウンドリクエストヘッダーをアウトバウンドのリクエスト時に付与することができます。
RestClientの設定と転送するヘッダー設定を以下のようにappllication.properties
に記述しておきます。
quarkus.http.port=8081 dev.hirooka.GreetingClient/mp-rest/url=http://${SPRING_SERVICE:localhost:8082} dev.hirooka.GreetingClient/mp-rest/scope=javax.inject.Singleton org.eclipse.microprofile.rest.client.propagateHeaders=x-request-id,x-b3-traceid,x-b3-spanid,x-b3-parentspanid,x-b3-sampled,x-b3-flags,b3,x-ot-span-context
設定まで記述できたら、次はハンドラーを記述します。今回はHTTPクライアントをハンドラーから直接利用するようにします。
@Path("/hello") public class GreetingResource { @Inject @RestClient GreetingClient greetingClient; static final Logger logger = Logger.getLogger(GreetingResource.class); @GET @Produces(MediaType.TEXT_PLAIN) public String hello(@Context HttpHeaders headers) { logger.info(headers.getRequestHeaders()); return greetingClient.fetchHello(); } }
これでQuarkusの方のアプリは完成しました。
最後に作ったアプリをDockerイメージ化しておきます。
$ ./mvnw package $ docker build -f src/main/docker/Dockerfile.jvm -t quarkus/open-tracing-jvm . $ docker images | grep quarkus quarkus/open-tracing latest f2c29c0c8e21 2 minutes ago 385MB
Springのアプリを作成
次にSpringのアプリを作成します。
プロジェクトはSpring Initializrを使って作成します。
設定は以下のように。
今回の構成だと、Springのアプリは外部アクセスを行わないため、ヘッダーをフォワーディングする必要が無いので、Spring Web
だけで良いのですが、一応フォワーディングのやり方を示すために依存にSleuth
とZipkin Client
を追加してます。
なにはともあれ、まずはコントローラーを作成します。
@RestController public class GreetingController { private final Logger logger = LoggerFactory.getLogger(GreetingController.class); @GetMapping("/hello") String greeting(@RequestHeader Map<String, String> header) { logger.info(header.toString()); return "Hello, Tracing"; } }
今回必要な部分はこれだけです。
設定は以下の通り
server.port=8082 spring.zipkin.enabled=false
Spring Sleuth
はデフォルトでlocalhost:9411
にメトリクスを送信してしまうので、今回は無効化して置きます。
Springも外部へのHTTPコールを行いヘッダーを転送したい場合はspring.sleuth.propagation-keys
を以下のように設定すれば良さそうです。
spring.sleuth.propagation-keys=x-request-id,x-b3-traceid,x-b3-spanid,x-b3-parentspanid,x-b3-sampled,x-b3-flags,b3,x-ot-span-context
x-b3-traceid
などのB3のヘッダーなどはこの値をセットしていない場合は、Spring Sleuth
が自分で生成した値をヘッダーで利用してしまうみたいです。(もしかしたらもっといい方法があるかも...)
まぁ、今回はspring.sleuth.propagation-keys
に関しては置いておいて、できたアプリをDockerイメージ化します。
$ mvn spring-boot:build-image -Dspring-boot.build-image.imageName=spring/open-tracing $ docker images | grep spring/open-tracing spring/open-tracing latest 0dbe73ec4359 41 years ago 268MB
イメージが作成されました。
DeploymentとServiceを作成してmimikubeにデプロイする
それではDeploymentとServiceを作成してminikubeにデプロイしておきます。
まずはベースとなるdeployment.yamlを作成します。
$ kubectl create deployment quarkus-app --image=quarkus/open-tracing --dry-run=client -o yaml > deployment.yaml echo --- >> deployment.yaml $ kubectl create deployment spring-app --image=spring/open-tracing --dry-run=client -o yaml >> deployment.yaml
できた、deplyment.yamlのそれぞれのDeploymentにimagePullPolicy: IfNotPresent
(ローカルイメージを使用するようにするため)とquarkus-app
には環境変数SPRING_SERVICE
を記述しておきます。
apiVersion: apps/v1 kind: Deployment metadata: creationTimestamp: null labels: app: quarkus-app name: quarkus-app spec: replicas: 1 selector: matchLabels: app: quarkus-app strategy: {} template: metadata: creationTimestamp: null labels: app: quarkus-app spec: containers: - image: quarkus/open-tracing name: open-tracing imagePullPolicy: IfNotPresent env: - name: SPRING_SERVICE value: spring-app:8082 resources: {} status: {} --- apiVersion: apps/v1 kind: Deployment metadata: creationTimestamp: null labels: app: spring-app name: spring-app spec: replicas: 1 selector: matchLabels: app: spring-app strategy: {} template: metadata: creationTimestamp: null labels: app: spring-app spec: containers: - image: spring/open-tracing name: open-tracing imagePullPolicy: IfNotPresent resources: {} status: {}
次にサービスを作成します。
$ kubectl create service clusterip quarkus-app --tcp=8081:8081 --dry-run=client -o yaml > service.yaml $ echo --- >> service.yaml $ kubectl create service clusterip spring-app --tcp=8082:8082 --dry-run=client -o yaml > service.yaml
Serviceは特にいじることは無いので、作ったマニフェストをapplyしていきます。
$ kubectl --context=minikube apply -f deplyment.yaml $ kubectl --context=minikube apply -f service.yaml
GatawayとVirtualSerciceを作成して疎通確認する
Gataway
とVirtualService
を作っておきます。
gateway.yaml
apiVersion: networking.istio.io/v1alpha3 kind: Gateway metadata: name: quarkus-app-gateway spec: selector: istio: ingressgateway servers: - port: number: 80 name: http protocol: HTTP hosts: - "*"
virtual-service.yaml
apiVersion: networking.istio.io/v1alpha3 kind: VirtualService metadata: name: quarkus-app-vs spec: hosts: - "*" gateways: - quarkus-app-gateway http: - match: - uri: prefix: /hello route: - destination: port: number: 8081 host: quarkus-app
上記のyamlをapplyしておきます。
$ kubectl --context=minikube apply -f gateway.yaml $ kubectl --context=minikube apply -f virtual-service.yaml
cUrlを使って疎通確認を行います。
$ export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}' $ export INGRESS_HOST=$(minikube ip) $ curl ${INGRESS_HOST}:${INGRESS_PORT}/hello -v * Trying 192.168.49.2:30019... * TCP_NODELAY set * Connected to 192.168.49.2 (192.168.49.2) port 30019 (#0) > GET /hello HTTP/1.1 > Host: 192.168.49.2:30019 > User-Agent: curl/7.68.0 > Accept: */* > * Mark bundle as not supporting multiuse < HTTP/1.1 200 OK < content-length: 14 < content-type: text/plain;charset=UTF-8 < x-envoy-upstream-service-time: 23 < date: Sun, 31 Jan 2021 11:28:48 GMT < server: istio-envoy < * Connection #0 to host 192.168.49.2 left intact Hello, Tracing
ここまででようやく準備完了です。
Jaergerを動かす
さて、ようやくですが。
Jaegerをローカルで動かしておきます。
今回はMinikubeのクラスターにデプロイして動かします。
外部のJaergerにメトリクスを送信する場合は、--set values.global.tracer.zipkin.address=<jaeger-collector-address>:9411
をIstioのインストール時に設定しておけば、任意のJaergerにデータを送信することができます。
それでは、Jargerをデプロイします。
$ kubectl apply -f https://raw.githubusercontent.com/istio/istio/release-1.8/samples/addons/jaeger.yaml
次のコマンドで、UIを動かします。
$ istioctl dashboard jaeger http://localhost:16686
出力される。アドレスにアクセスすると以下の様なUIが表示されます。
実はローカルで試している際に何度かリクエストを送ってしまったので、すでにデータが存在してしまっていますが、最初はなにも出力されません。
これは、デフォルトではIstioは1%のリクエストのデータをJaergerに送るためです。
以下のコマンドを何度か叩いて、Jaergerにデータを送るようにしておきます。
$ for i in $(seq 1 100); do curl -s -o /dev/null "http://${INGRESS_HOST}:${INGRESS_PORT}/hello"; done
そうするとそれっぽいデータが見れるようになります。
ちょっとUIを見てみる
ヘッダーのSearch
タブを選択すると以下のような検索用のボックスが表示されていると思います。
Service
をistio-ingressgateway
を選択してFind Traces
を押すと右側にトレースされたリクエストが表示されます。
僕の環境では4回のサンプリングされたリクエストの情報が表示されます。
グラフのすぐ下のリクエスト(7ff3cb9
)をクリックするとリクエストに関する情報がより詳細にみることができます。
ヘッダーのSystem Architecture
タブを選択するとサービスの依存関係やそれぞれに難解リクエストが送られたかを確認することができます。
とりあえず動かすところまでできました。
KongのKubernetes Ingress Controllerを試す
はじめに
久しぶりにKongをちょっとお勉強したい気になってきたので、ドキュメントを眺めていたらKubernetes Ingress Controllerなるものを見つけました。面白そうだったので、とりあえず、動かすまでやってみようかと思います。
Kongの基本的なところとプラグインの書き方も以前まとめたので興味がある人はよかったら見てみてください。
KongのIngress Controllerについて
KubernetesのIngress Controllerでクラスター内でIngressリソースとして動くKongに対して設定と管理を行います。クラスター内のスケーリング、設定の変更、エラーなどのイベントによってKongをアップデートしてくれます。
以下の2つのコンポーネントからなります。
- Kong本体
- Controller、Kongの設定を同期する
プラグインを反映させることももちろん可能で、Kongができることは基本的になんでもできるようです。
カスタムリソースについて
いくつかのカスタムリソースが用意されおり、Kongの宣言的な設定を用いてKongの機能を利用することができます。
- KongPlugin: KongのPluginエンティティ相当の設定を行なうリソース
- KongIngress: ルーティング、ロードバランシング、ヘルスチェックなど細かなルーティングの設定等を行なうためのリソース
- KongConsumer: KongのCunsumerエンティティへのマッピング
- TCPIngress: TCPベースのルーティングを行なうためのリソース。non-HTTPベースのサービスに対して利用可能
使ってみる
今回はMinikube(driver none)にIngressControllerをインストールして、サンプルアプリにプロキシしてみたいと思います。
環境
$ minikube version minikube version: v1.16.0 commit: 9f1e482427589ff8451c4723b6ba53bb9742fbb1 $ kubectl version -o yaml clientVersion: buildDate: "2021-01-13T13:28:09Z" compiler: gc gitCommit: faecb196815e248d3ecfb03c680a4507229c2a56 gitTreeState: clean gitVersion: v1.20.2 goVersion: go1.15.5 major: "1" minor: "20" platform: linux/amd64 serverVersion: buildDate: "2020-12-08T17:51:19Z" compiler: gc gitCommit: af46c47ce925f4c4ad5cc8d1fca46c7b77d13b38 gitTreeState: clean gitVersion: v1.20.0 goVersion: go1.15.5 major: "1" minor: "20" platform: linux/amd64 $ docker version (クライアント略) Server: Docker Engine - Community Engine: Version: 20.10.2 API version: 1.41 (minimum version 1.12) Go version: go1.13.15 Git commit: 8891c58 Built: Mon Dec 28 16:15:19 2020 OS/Arch: linux/amd64 Experimental: true containerd: Version: 1.4.3 GitCommit: 269548fa27e0089a8b8278fc4fc781d7f65a939b runc: Version: 1.0.0-rc92 GitCommit: ff819c7e9184c13b7c2607fe6c30ae19403a7aff docker-init: Version: 0.19.0 GitCommit: de40ad0 $ uname -srvmpio Linux 5.4.0-62-generic #70-Ubuntu SMP Tue Jan 12 12:45:47 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux $ lsb_release -a LSB Version: core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch Distributor ID: Ubuntu Description: Ubuntu 20.04.1 LTS Release: 20.04 Codename: focal
アップストリームを作っておく
下準備として、Kongがプロキシすする先のアップストリームを作って置きます。
アップストリームはNginxを用いて2つ作成し、それぞれが自分自身のhostname
を返すようにしておきます。
--dry-run
オプションと使ってリソースを作ります。
# deployment.yamlの作成 $ kubectl create deployment nginx-first --image=nginx:1.19.6 --dry-run=client -o yaml > deployment.yaml $ echo --- >> deployment.yaml $ kubectl create deployment nginx-second --image=nginx:1.19.6 --dry-run=client -o yaml >> deployment.yaml # service.yamlの作成 $ kubectl create service clusterip nginx-first --tcp=8081:80 --dry-run=client -o yaml > service.yaml $ echo --- >> service.yaml $ kubectl create service clusterip nginx-second --tcp=8082:80 --dry-run -o yaml > service.yaml
それぞれできたリソースはここに置いておくので興味があれば確認してみてください。
作成したリソースをApplyしてします。
$ kubectl --context=minikube apply -f deployment.yaml $ kubectl --context=minikube get po NAME READY STATUS RESTARTS AGE nginx-first-d6db6c668-wtmwh 1/1 Running 0 84s nginx-second-6b8d5c9696-2cj25 1/1 Running 0 84s $ kubectl --context=minikube apply -f service.yaml service/nginx-first created service/nginx-second created $ kubectl get svc NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 92m nginx-first ClusterIP 10.109.148.241 <none> 8081/TCP 44s nginx-second ClusterIP 10.105.0.182 <none> 8082/TCP 44s
最後に、それぞれが自分自身のhostname
を返すようにしておきます。
$ kubectl --context=minikube exec -it nginx-first-d6db6c668-wtmwh -- cp /etc/hostname /usr/share/nginx/html/index.html $ kubectl --context=minikube exec -it nginx-second-6b8d5c9696-2cj25 -- cp /etc/hostname /usr/share/nginx/html/index.html
テストのためにポートフォワードしてそれぞれのNginxにつないで見ます。
# First $ kubectl --context=minikube port-forward service/nginx-first 8081:8081 $ curl localhost:8081 nginx-first-d6db6c668-wtmwh # Second $ kubectl --context=minikube port-forward service/nginx-second 8082:8082 $ curl localhost:8082 nginx-second-6b8d5c9696-2cj25
無事、それぞれのNginxがhostname
を返してくれていますね。
これで準備完了です。
Kubernetes Ingress Controllerのインストール
Minikubeで作ったクラスタにKongのKubernetes Ingress Controllerをインストールします。
ドキュメントに寄るとHelmを使った方法やここからダウンロードしたYamlファイルをアプライスル方法があるみたいです。
今回は後者の方で行こうと思います。
$ kubectl create -f https://bit.ly/k4k8s namespace/kong created Warning: apiextensions.k8s.io/v1beta1 CustomResourceDefinition is deprecated in v1.16+, unavailable in v1.22+; use apiextensions.k8s.io/v1 CustomResourceDefinition customresourcedefinition.apiextensions.k8s.io/kongclusterplugins.configuration.konghq.com created customresourcedefinition.apiextensions.k8s.io/kongconsumers.configuration.konghq.com created customresourcedefinition.apiextensions.k8s.io/kongingresses.configuration.konghq.com created customresourcedefinition.apiextensions.k8s.io/kongplugins.configuration.konghq.com created customresourcedefinition.apiextensions.k8s.io/tcpingresses.configuration.konghq.com created serviceaccount/kong-serviceaccount created Warning: rbac.authorization.k8s.io/v1beta1 ClusterRole is deprecated in v1.17+, unavailable in v1.22+; use rbac.authorization.k8s.io/v1 ClusterRole clusterrole.rbac.authorization.k8s.io/kong-ingress-clusterrole created Warning: rbac.authorization.k8s.io/v1beta1 ClusterRoleBinding is deprecated in v1.17+, unavailable in v1.22+; use rbac.authorization.k8s.io/v1 ClusterRoleBinding clusterrolebinding.rbac.authorization.k8s.io/kong-ingress-clusterrole-nisa-binding created service/kong-proxy created service/kong-validation-webhook created deployment.apps/ingress-kong created
むー、Kubernetesのバージョン1.20.xを使ってると、betaじゃなくなったリソースがいくつかあって警告がいくつか出てしまうみたいですね。
まぁ、この辺は一旦気にせずに先に進めようと思います。
デプロイされたコングにアクセスするためのIPを取得して環境変数(KONG_PROXY_IP
)にセットしておきます。
この、今後この環境変数を使ってKongにアクセスします。
export KONG_PROXY_IP=$(minikube service -n kong kong-proxy --url | head -1)
デプロイされたKongにアクセスしてみます。
$ curl -i $PROXY_IP HTTP/1.1 404 Not Found Date: Sun, 24 Jan 2021 06:20:54 GMT Content-Type: application/json; charset=utf-8 Connection: keep-alive Content-Length: 48 X-Kong-Response-Latency: 0 Server: kong/2.2.1 {"message":"no Route matched with those values"}
Kongから無事レスポンスが返ってきました。
まだなにもKongの設定を行っていないので、NotFoundを返してきますね。
プロキシの設定を記述する
プロキシの設定を記述するには、普通の(?)Ingressのリソースを記述すれば良さそうです。
以下のようなYamlを記述します。
ingress.yaml
apiVersion: extensions/v1beta1 kind: Ingress metadata: name: demo annotations: kubernetes.io/ingress.class: kong konghq.com/strip-path: "true" spec: rules: - http: paths: - path: /first backend: serviceName: nginx-first servicePort: 8081 - path: /second backend: serviceName: nginx-second servicePort: 8082
今回ちょっとだけトリッキーなのはnginxのコンテキストルートはも問題で、/first
などのパスがそのままアップストリームにプロキシされると困るのでkonghq.com/strip-path: "true"
を付与しました。
早速、リソースをApplyしてアクセスしてみます。
$ kubectl apply -f ingress.yaml $ curl -i $PROXY_IP/first HTTP/1.1 200 OK Content-Type: text/html; charset=UTF-8 Content-Length: 28 Connection: keep-alive Server: nginx/1.19.6 Date: Sun, 24 Jan 2021 07:11:05 GMT Last-Modified: Sun, 24 Jan 2021 05:57:51 GMT ETag: "600d0c5f-1c" Accept-Ranges: bytes X-Kong-Upstream-Latency: 1 X-Kong-Proxy-Latency: 0 Via: kong/2.2.1 nginx-first-d6db6c668-wtmwh $ curl -i $PROXY_IP/second HTTP/1.1 200 OK Content-Type: text/html; charset=UTF-8 Content-Length: 30 Connection: keep-alive Server: nginx/1.19.6 Date: Sun, 24 Jan 2021 07:11:06 GMT Last-Modified: Sun, 24 Jan 2021 05:59:05 GMT ETag: "600d0ca9-1e" Accept-Ranges: bytes X-Kong-Upstream-Latency: 1 X-Kong-Proxy-Latency: 0 Via: kong/2.2.1 nginx-second-6b8d5c9696-2cj25
それぞれ、きちんとプロキシされているようですね。
atコマンドで指定時間にJobを単発実行させる
はじめに
最近、CrontabみたいなJobを定期的にスケジュールするのでは無くて、指定時間に単発で実行したいみたいな要件にぶち当たって、ちょっと調べていたらUnix系のOSにat
というコマンドがあることに気がついたので機能を試して見ようかと思います。
at コマンドとは
標準出力、もしくはファイルからコマンドを受け取って、/bin/sh
を使って指定時間にシェルを起動するコマンドです。Corntabなどは日時の指定のみで、最長一年に一回は実行されるのに対してatコマンドは年の指定まで行なうことが可能で、指定した時間に一回のみといった実行が行えます。
実行時の環境変数(BASH_VERSINFO
、 DISPLAY
、 EUID
, GROUPS
、 SHELLOPTS
、 TERM
、 UID
以外)、ワークディレクトリ、umaskなどはコマンド実行時の状態でキープされます。
また、at
はsetuid
プログラムで、LD_LIBRARY_PATH
やLD_PRELOAD
などの環境変数も読み込まれないようです。
使ってみる
環境
コマンドの実行環境は以下の通り、
$ uname -srvmpio Linux 5.4.0-62-generic #70-Ubuntu SMP Tue Jan 12 12:45:47 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux $ lsb_release -a LSB Version: core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch Distributor ID: Ubuntu Description: Ubuntu 20.04.1 LTS Release: 20.04 Codename: focal
基本的な操作方法
プロンプトでコマンドを設定する。
atは以下のような形式で、実行できます。
$ at 07:55 01/31/21 warning: commands will be executed using /bin/sh at>
実行すると、>at
という表示が現れプロンプトが起動します。
ここにコマンドを打ち込んでctrl+d
を押すと設定が完了です。
$ at 10:46 01/22/21 warning: commands will be executed using /bin/sh at> java --version > test.txt at> <EOT> job 6 at Fri Jan 22 10:46:00 2021
実施時間をすぎると以下のようにテキストが出力されているのが確認できます。
$ cat test.txt openjdk 15.0.1 2020-10-20 OpenJDK Runtime Environment AdoptOpenJDK (build 15.0.1+9) Eclipse OpenJ9 VM AdoptOpenJDK (build openj9-0.23.0, JRE 15 Linux amd64-64-Bit Compressed References 20201022_81 (JIT enabled, AOT enabled) OpenJ9 - 0394ef754 OMR - 582366ae5 JCL - ad583de3b5 based on jdk-15.0.1+9)
ファイルを読み込んで実行する
atコマンドはファイルを読み込んで実行することもできます。
まずは、以下のようなシェルスクリプトを用意します。
hello.sh
#!/bin/sh echo "hello, at command" > test.txt
このシェルスクリプトをat
コマンドで実行するには-f
オプションで実行するファイルを指定します。
$ at 21:05 01/22/21 -f hello.sh warning: commands will be executed using /bin/sh job 8 at Fri Jan 22 21:05:00 2021 $ cat test.txt hello, at command
日付の指定の形式について
at
はPOSIX.2 standardを拡張した日付の指定が可能なようです。
HH:MM
- 時間と分を指定できます。この場合、今日の時間にJobが実行され、もしその時間が今日すでに過去である場合は次の日の指定された時間にJobが実行されます。
MMDD[CC]YY
,MM/DD/[CC]YY
,DD.MM.[CC]YY
or[CC]YY-MM-DD
- オプションとして、上記のような形式で年月日を指定することができます。
midnight
、noon
、teatime
- それぞれの時間に実行されます。
today
、tomorrow
- サーフィクスとして上記の表記をつけるとそれぞれ今日と明日を示すことができます。
その他にも結構自由な時間の指定ができるようで、今日から3日後の4pmとかを指定する場合はat 4pm + 3 days
、6月31日の10時AMを指定したい場合はat 10am Jul 31
と行ったような感じで指定することができます。
他にできそうなこと
マニュアルでatコマンドを調べてみると、このコマンドには以下のような、仲間がいるようです。
atq
atrm
- job numberにとって特定されるジョブを削除します。
-r
オプションがエイリアスになっています。
- job numberにとって特定されるジョブを削除します。
batch
- システムのロードアベレージが1.5(かもしくは
atd
コマンドで指定された値)以下になった際にJobを実行する。
- システムのロードアベレージが1.5(かもしくは
他にも-m
使うなどするとジョブの実行時にユーザにメールを送ることができるようです。
MinikubeにIstioをインストールする
はじめに
業務でKubernetesやIstioの機能の一部を利用することがあるですが、勉強のために試せる環境がほしかったのでローカルのMinikubeにインストールして見ようかと思います。
やってみる
動作環境
$ uname -srvmpio Linux 5.4.0-62-generic #70-Ubuntu SMP Tue Jan 12 12:45:47 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux $ lsb_release -a LSB Version: core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch Distributor ID: Ubuntu Description: Ubuntu 20.04.1 LTS Release: 20.04 Codename: focal $ docker version Client: Docker Engine - Community Version: 20.10.2 API version: 1.41 Go version: go1.13.15 Git commit: 2291f61 Built: Mon Dec 28 16:17:43 2020 OS/Arch: linux/amd64 Context: default Experimental: true Server: Docker Engine - Community Engine: Version: 20.10.2 API version: 1.41 (minimum version 1.12) Go version: go1.13.15 Git commit: 8891c58 Built: Mon Dec 28 16:15:19 2020 OS/Arch: linux/amd64 Experimental: true containerd: Version: 1.4.3 GitCommit: 269548fa27e0089a8b8278fc4fc781d7f65a939b runc: Version: 1.0.0-rc92 GitCommit: ff819c7e9184c13b7c2607fe6c30ae19403a7aff docker-init: Version: 0.19.0 GitCommit: de40ad0
minikubeのインストール
ドキュメントに寄ると、いくつかインストール方法があるようですが、今回はバイナリのインストールを行おうと思います。
以下のコマンドを実行します。
$ curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 \ && chmod +x minikube $ sudo mkdir -p /usr/local/bin/ $ sudo install minikube /usr/local/bin/ $ minikube version minikube version: v1.16.0 commit: 9f1e482427589ff8451c4723b6ba53bb9742fbb1
インストールが終わったのでMinikubeを起動します。
この際、MinikubeのドライバーはDockerを使い、メモリのCPUの設定は公式ドキュメントにしたがって、起動したいと思います。
$ minikube start --memory=16384 --cpus=4 --kubernetes-version=v1.18.10 $ minikube status minikube type: Control Plane host: Running kubelet: Running apiserver: Running kubeconfig: Configured timeToStop: Nonexistent
istioをインストール
PodとServiceに関する注意点
Istioを利用する上でPodとServiceには求められる条件があります。
Podの必須条件は以下のとおりです。
- Podは最低一つのServiceに紐付いている必要がある
- PodはアプリケーションをUIDが1337で実行していないこと
- pod security policiesがクラスターに設定されているかつIstio CNI Pluginを利用していない場合。Podに対して
NET_ADMIN
とNET_RAW
の機能が利用できる必要がある(詳細はこちら)
また、以下のような推奨される条件があるようです。
- Istioがメトリクスを収集する際に付加情報をつけることができるので、Podに
app
ラベルと、version
ラベルを付与しておくのがよい。 - Serviceの
spec.ports.name
にプロトコルを指定して置くのが良い(指定できるプロトコルおよび詳細はこちら)
Istioが利用するポート
別の注意点としてIstioが以下のようなポートを利用することを留意しておく必要があります。
Istioのサイドカープロキシ(Envoy)によって利用されるポートの一覧
ポート | プロトコル | 説明 |
---|---|---|
15000 | TCP | Envoyのadmin ポート。ポッド内のみ |
15001 | TCP | Envoyのアウトバウンドポート |
15006 | TCP | Envoyのインバウンドポート |
15008 | TCP | Envoyトンネルポート(インバウンド) |
15020 | HTTP | アプリケーションやIstio agent、Envoyのテレメトリーのポイント |
15021 | HTTP | ヘルスチェックのポイント |
15090 | Envoy Prometheusのテレメトリーポイント |
Istioのコントロールプレーンによって利用されるポートの一覧
ポート | プロトコル | 説明 |
---|---|---|
15010 | GRPC | XDSとCAサービス(プレーンテキスト) |
15012 | GRPC | XDSとCAサービス(TLS)プロダクションで利用することが推奨される |
8080 | HTTP | Debugインタフェース(非推奨) |
443 | HTTPS | Webhook |
15014 | HTTP | コントロールプレーンのモニタリング |
Server First Protocols
サーバが最初のバイトを送信するようなServer First Protocolを利用する際は
PREMISSIVE mTLSやプロトコルの自動検出等など最初のバイトを利用する機能に影響を与える可能性があります。なので、明示的にServiceのプロトコルを選択肢てやる必要があります。
ただし、以下のようなよく知られたポートを使うプロトコルやサーバに関しては自動的にTCPが選択されるようです。
その他、アウトバウンドのアクセスでHostヘッダーを利用する場合などにも注意が必要なようです。詳細はこちらを確認ください
インストール
istioをインストールする方法は以下のような方法があるようです。
- Istiocltを使ったマニュアルイントール
- Istio Operatorを使ったインストール
あとは、マルチクラスターへのインストールやVMを使う方法(こいつはちょっとよくわからなかった)などがあるようです。
今回はIstiocltを使ったマニュアルイントールをやってみようと思います。
どちらにせよまずはIstioctlをインストールする必要があるようなのでインストールします。
istioctlのバージョンはIstioのコントロールプレーンのバージョンと合わせることが推奨されています。今回は最新のリリースのものをインストールします。
$ curl -sL https://istio.io/downloadIstioctl | sh - $ istioctl version no running Istio pods in "istio-system" 1.8.2
何度も実行するのが面倒なので、export PATH=$PATH:$HOME/.istioctl/bin
は.bash_profile
に登録しておきます。
次にIstioをクラスターにインストールしていくのですが、まずIstioのインストールには[Profile]という概念があるみたいです。簡単にまとめるとプロファイルはIstioのコントローププレーンやIstioサイドカーに対するカスタマイズ方法を提供しています。Profileでできることの詳細に関してはこちらのページをご確認ください。
なにも設定しない場合defualt
のプロファイルが利用されることになります。
今回は素直にインストールするので特にプロファイルは指定しません。
# プロファイルを指定したい場合は`--set profile=demo`などを指定する $ istioctl install --context minikube
コマンドの実行が終わったら次にきちんとインストールが行われたか確認します。
verify-install
コマンドを使ってこの確認を行なうことができます。
まずは、インストール時のさいに利用されたマニフェストを取得する必要があります。
事前に取得していない場合はistioctl manifest generate <インストール時に使用したオプション>
で出力することができます。
今回の場合は以下のように実行します。
$ istioctl manifest generate > generated-manifest.yaml $ istioctl verify-install -f generated-manifest.yaml CustomResourceDefinition: authorizationpolicies.security.istio.io.default checked successfully CustomResourceDefinition: destinationrules.networking.istio.io.default checked successfully CustomResourceDefinition: envoyfilters.networking.istio.io.default checked successfully CustomResourceDefinition: gateways.networking.istio.io.default checked successfully CustomResourceDefinition: istiooperators.install.istio.io.default checked successfully CustomResourceDefinition: peerauthentications.security.istio.io.default checked successfully CustomResourceDefinition: requestauthentications.security.istio.io.default checked successfully CustomResourceDefinition: serviceentries.networking.istio.io.default checked successfully CustomResourceDefinition: sidecars.networking.istio.io.default checked successfully CustomResourceDefinition: virtualservices.networking.istio.io.default checked successfully CustomResourceDefinition: workloadentries.networking.istio.io.default checked successfully CustomResourceDefinition: workloadgroups.networking.istio.io.default checked successfully ServiceAccount: istio-ingressgateway-service-account.istio-system checked successfully ServiceAccount: istio-reader-service-account.istio-system checked successfully ServiceAccount: istiod-service-account.istio-system checked successfully ClusterRole: istio-reader-istio-system.default checked successfully ClusterRole: istiod-istio-system.default checked successfully ClusterRoleBinding: istio-reader-istio-system.default checked successfully ClusterRoleBinding: istiod-istio-system.default checked successfully ValidatingWebhookConfiguration: istiod-istio-system.default checked successfully EnvoyFilter: metadata-exchange-1.6.istio-system checked successfully EnvoyFilter: metadata-exchange-1.7.istio-system checked successfully EnvoyFilter: metadata-exchange-1.8.istio-system checked successfully EnvoyFilter: stats-filter-1.6.istio-system checked successfully EnvoyFilter: stats-filter-1.7.istio-system checked successfully EnvoyFilter: stats-filter-1.8.istio-system checked successfully EnvoyFilter: tcp-metadata-exchange-1.6.istio-system checked successfully EnvoyFilter: tcp-metadata-exchange-1.7.istio-system checked successfully EnvoyFilter: tcp-metadata-exchange-1.8.istio-system checked successfully EnvoyFilter: tcp-stats-filter-1.6.istio-system checked successfully EnvoyFilter: tcp-stats-filter-1.7.istio-system checked successfully EnvoyFilter: tcp-stats-filter-1.8.istio-system checked successfully ConfigMap: istio.istio-system checked successfully ConfigMap: istio-sidecar-injector.istio-system checked successfully MutatingWebhookConfiguration: istio-sidecar-injector.default checked successfully Deployment: istio-ingressgateway.istio-system checked successfully Deployment: istiod.istio-system checked successfully PodDisruptionBudget: istio-ingressgateway.istio-system checked successfully PodDisruptionBudget: istiod.istio-system checked successfully Role: istio-ingressgateway-sds.istio-system checked successfully Role: istiod-istio-system.istio-system checked successfully RoleBinding: istio-ingressgateway-sds.istio-system checked successfully RoleBinding: istiod-istio-system.istio-system checked successfully HorizontalPodAutoscaler: istio-ingressgateway.istio-system checked successfully HorizontalPodAutoscaler: istiod.istio-system checked successfully Service: istio-ingressgateway.istio-system checked successfully Service: istiod.istio-system checked successfully Checked 12 custom resource definitions Checked 1 Istio Deployments Istio is installed successfully
最後の出力をみるとインストールはうまく行っているようです。
アーキテクチャについてちょっと見てみる
最後にほんの少しだけ、Istioのアーキテクチャについて見てみたいと思います。
istioのコンポーネントは論理的には以下の2つに分けられるようです。
data plane
- サイドカーとしてデプロイされるEnvoyプロキシの集まり。マイクロサービス間のすべてネットワークコミュニケーションをプロキシする。また、メトリクスの収集なども行なう
controle plane
- プロキシのルーティングトラフィックを設定、マネージする
(公式ドキュメントより)
Envoy
Istioは拡張したEnvoy Proxyを利用します。
サービス間のインバウンドとアウトバウンドのやり取りをすべてプロキシする役割を持っており、サイドカーとしてPodにデプロイされます。
Istiod
サービスディスカバリ機能、設定、証明書マネジメントを提供します。
IstiodはハイレベルなルーティングルールをEnvoyの設定に変換し、ランタイムのサイドカーに拡散します。
以下のようなコンポーネントで構成されます。
- Pilot
- Envoyの構成管理を行なう
- Citadel
- CA。鍵の管理や証明書の発行を行なう
- galley
- ユーザ設定のマネジメントを行なう
参考資料
Reactor Publisherにおけるエラーハンドリング
はじめに
前回に引き続き、Reactive Springのを読んでいて、Publihsherのエラーハンドリングを行なうためのメソッド群が紹介されていたので、実際に動かして試してみようと思います。(書籍で紹介されていないものも結構あったので)
実際にやってみる
環境は以下の通り、
環境
$ java --version openjdk version "15" 2020-09-15 OpenJDK Runtime Environment (build 15+36-1562) OpenJDK 64-Bit Server VM (build 15+36-1562, mixed mode, sharing) $ mvn --version Apache Maven 3.6.3 Maven home: /usr/share/maven Java version: 15, vendor: Oracle Corporation, runtime: /home/yuya-hirooka/.sdkman/candidates/java/15-open Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-59-generic", arch: "amd64", family: "unix"
Pom
今回は、ReactorとJunit5を利用したいので、Pomには以下の設定を追加しました。
<dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.7.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>5.7.0</version> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>2020.0.2</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.2</version> </plugin> <plugin> <artifactId>maven-failsafe-plugin</artifactId> <version>2.22.2</version> </plugin> </plugins> </build>
コードを書いていく
エラーハンドリング
まずはエラーハンドリングについて、FluxとMonoではonError**と言ったようなエラー時にフックされるようなメソッドがいくつか生えています。
onErrorResume()
onErrorResume()ですが、こいつは例外発生時のFallback処理を書くためのメソッドです。
以下のコードはintの3がPublishされた際にRutimeExceptionをスローされる処理に対して、エラー発生時はFallbackとして4をPublishします。
@Test void testOnErrorResume() { Flux<Integer> integerFlux = Flux.just(1, 2, 3) .flatMap(integer -> { if (integer.equals(3)) return Flux.error(new RuntimeException()); return Flux.just(integer); }) .onErrorResume(e -> Flux.just(4)); StepVerifier.create(integerFlux) .expectNext(1) .expectNext(2) .expectNext(4) .verifyComplete(); }
特定の例外に対して、Fallback処理を書きたい場合は以下のように第一引数に例外クラスを書いてやるようにします。
@Test void testOnErrorResume() { Flux<Integer> integerFlux = Flux.just(1, 2, 3) .flatMap(integer -> { if (integer.equals(3)) return Flux.error(new RuntimeException()); return Flux.just(integer); }) .onErrorResume(IllegalArgumentException.class, v -> Flux.just(10)) .onErrorResume(RuntimeException.class, v -> Flux.just(4)); StepVerifier.create(integerFlux) .expectNext(1) .expectNext(2) .expectNext(4) .verifyComplete(); }
また、第一引数は例外クラスの代わりにPredicate<? super Throwable> predicate
を受け取ることができるので以下のように特定条件の例外にマッチした際のハンドリングを記述することもできます。
@Test void testOnErrorResume() { Flux<Integer> integerFlux = Flux.just(1, 2, 3) .flatMap(integer -> { if (integer.equals(3) return Flux.error(new RuntimeException("Oops")); return Flux.just(integer); }) .onErrorResume(e -> e.getMessage().equals("Auchi"), v -> Flux.just(10)) .onErrorResume(e -> e.getMessage().equals("Oops"), v -> Flux.just(4)); StepVerifier.create(integerFlux) .expectNext(1) .expectNext(2) .expectNext(4) .verifyComplete(); }
onErrorReturn
onErrorReturn()は例外発生時に単一の値をエミットする関数です。
以下のコードは先程と同じく、3がPublishされた際にRutimeExceptionをスローされる処理に対して、エラー発生時はFallbackとして4をPublishします。
@Test void testOnErrorReturn() { Flux<Integer> integerFlux = Flux.just(1, 2, 3) .flatMap(integer -> { if (integer.equals(3)) return Flux.error(new RuntimeException()); return Flux.just(integer); }) .onErrorReturn(4); StepVerifier.create(integerFlux) .expectNext(1) .expectNext(2) .expectNext(4) .verifyComplete(); }
特定の例外クラスにマッチした場合や、特定条件の例外にマッチした場合のハンドリングの書き方はonErrorResume
と同様の方法でできます。
すべてを動かしてみたわけでは無いですが、 onErro**系は大体同じような操作ができるっぽいです。
doOnError
まず、Flux(or Mono)でのストリーム処理時に発生したエラーをハンドリングして、ログなどの副作用の伴う処理の実行を行なう場合はdoOnError()を利用します。
@Test void testDoOnError() { Flux<Integer> integerFlux = Flux.just(1, 2, 3) .map(v -> { if (v.equals(3)) throw new RuntimeException("Oops"); return v; }) .doOnError(RuntimeException.class, v -> System.out.println("message = " + v.getMessage())); StepVerifier.create(integerFlux) .expectNext(1) .expectNext(2) .expectError(RuntimeException.class) .verify(); }
標準出力への出力は以下の通り。
message = Oops
onErrorMap
処理時の例外をハンドリングして、他の例外に変換してダウンストリームに返す場合は、onErrorMap()を利用します。
@Test void testOnErrorMap() { Flux<Integer> integerFlux = Flux.just(1, 2, 3) .flatMap(integer -> { if (integer.equals(3)) return Flux.error(new RuntimeException("Oops")); return Flux.just(integer); }) .onErrorMap(RuntimeException.class, e -> new IllegalArgumentException(e.getMessage())); StepVerifier.create(integerFlux) .expectNext(1) .expectNext(2) .expectError(IllegalArgumentException.class) .verify(); } }
onErrorContinue
例外発生時に、問題があるエミットを落として処理を続ける場合は、onErrorContinue()を利用します。
@Test void testOnErrorContinue() { Flux<Integer> integerFlux = Flux.just(1, 2, 3, 4) .flatMap(integer -> { if (integer.equals(3)) return Flux.error(new RuntimeException("Oops")); return Flux.just(integer); }) .onErrorContinue((e, v) -> System.out.printf("dropped value = %s", v)); StepVerifier.create(integerFlux) .expectNext(1) .expectNext(2) .expectNext(4) .verifyComplete(); } }
実行すると、標準出力に以下の文字列が出力されます。
dropped value = 3
非Reactiveな非同期処理をFluxで扱う
はじめに
Reactive Springを読んでいて、Flux.create()
を使えば非Reactiveな非同期のイベントをReactiveのProducerにアダプトすることができるみたいなことが書かれていたので、実際に使って見て理解を深めたいと思います。
Flux.create()?
Flux.create(Consumer<? super FluxSink
実際にやってみる
それでは早速やっていきたいと思います。
タイトルの通り今回はTreadを作って非同期に実行されるスレッドからエレメントをエミットしたいと思います。
環境
$ java --version openjdk version "15" 2020-09-15 OpenJDK Runtime Environment (build 15+36-1562) OpenJDK 64-Bit Server VM (build 15+36-1562, mixed mode, sharing) $ mvn --version Apache Maven 3.6.3 Maven home: /usr/share/maven Java version: 15, vendor: Oracle Corporation, runtime: /home/yuya-hirooka/.sdkman/candidates/java/15-open Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "5.4.0-59-generic", arch: "amd64", family: "unix"
Pom
今回は、ReactorとJunit5を利用したいので、Pomには以下の設定を追加しました。
<dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.7.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>5.7.0</version> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>2020.0.2</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.2</version> </plugin> <plugin> <artifactId>maven-failsafe-plugin</artifactId> <version>2.22.2</version> </plugin> </plugins> </build>
コードを書く
準備が整ったところで早速コードを書いていきます。
実際のコードは以下のとおりとなります。
public class FluxCreateTest { private final ExecutorService es = Executors.newFixedThreadPool(1); @Test void exec() { System.out.println("Thread Name = " + Thread.currentThread().getName()); Flux<String> values = Flux.create(emitter -> this.execTask(emitter, 5)); StepVerifier .create(values.doFinally(signalType -> this.es.shutdown())) .expectNext("exec number: 1") .expectNext("exec number: 2") .expectNext("exec number: 3") .expectNext("exec number: 4") .expectNext("exec number: 5") .verifyComplete(); } private void execTask(FluxSink<String> stringFluxSink, int maxNum) { this.es.submit(() -> { System.out.println("Thread Name = " + Thread.currentThread().getName()); AtomicInteger num = new AtomicInteger(); while (num.get() < maxNum) { String emitValue = "exec number: " + num.incrementAndGet(); System.out.println("emitValue = " + emitValue); stringFluxSink.next(emitValue); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } stringFluxSink.complete(); }); } }
実行するとテストが成功しコンソールに以下の出力がされます。
Thread Name = main Thread Name = pool-1-thread-1 emitValue = exec number: 1 emitValue = exec number: 2 emitValue = exec number: 3 emitValue = exec number: 4 emitValue = exec number: 5
上記のコードでは、ExecutorServiceによって作成されたpool-1-tread-1
という名前のスレッドで、1秒に1回、exec number: *
というStringをエミットしています。
Flux.create()は引数としてFluxSinkを受け取るConsumerを登録することができ、Signalは受け取ったFluxSink APIを通して発信することができます。
ここで、ダウンストリームのConsumerがオーバーフローした際のハンドリングはOverflowStrategy.BUFFER
が設定されており、デフォルトではバッファする挙動となります。
この、ストラテジーは変更することが可能で、その場合は、createの第二引数にOverflowStrategyのEnumを渡してやります。
2020年振り返り
はじめに
なんとなくいろんな人の振り返りみて、自分も振り返ってみようかとふと思ったので書いてみようかと。
ざっくり今年やったこと、あったこと
今年やったとことあったことをまとめると、以下のようになるかと思います。
- 転職して一年
- ブログを平均して週一ぐらいでは書く
- 本を読む
転職して一年
先月の11月で今の会社に来て一年が経ちました。もともとは、転職ドラフトというサイトで声をかけていただいて今に至りますが、気がつけば一年たってました。
前職とは働き方や開発のやり方が全く違い、いろいろ学びが多かったと思います。
今の組織は、アジャイル開発を行う上で、XPのプラクティスを大切にしています。その中で自分の中の今まで知識でしかなかった部分が実践できたり、知らなかったことをたくさん学ぶことができました。
その他の面で、コミュニケーション面に置いて自分の中で1つのブレイクスルーみたいなのがあった気がします。作業が基本ペア作業で行うので一般的な組織に比べてコミュニケーションを撮る機会がかなり多いと思います。その中であまり上手くコミュニケーションを取れない事があると、感じていたのですが、いろいろ考える中で気づきみたいなものがあり少しだけコミュニケーション力がマシになった気がします。
とはいえ、まだまだ上手くできない面がかなりおおく、特に、ビジネスサイド方々などのコンテキストが共有されてない相手とのコミュニケーションに課題を感じているので、来年以降その力をつけていかないとなぁと思っています。
(ただ、自分の強みとして伸ばしたい方向性とは少し違う気もしているのですが…)
ブログを平均して週一ぐらいでは書く
前職の先輩でかなりのブログを書かれる人がいます。その人に触発されてなんとなく、最低週一ぐらいでは書いてみようと思ったのが、今年の3月です。こから42記事書いてるので目的は達成したかなと。
まぁ、今後も週一でゆるゆると書いていこうかなぁとは思います。
やってみて思ったことは以下のことです。
- ブログをネタを探すようになるので、発見が多くなる
- 文章にする事で理解度が上がる
前者に関して、ブログを書くことを意識してるとより多くのことを学べるなぁと感じました。
後者に関しては、ドキュメントを読んでなんとなく理解した気になっても書いてみると結構書けないみたいなのがあったのがたくさんありました。その箇所を自分の言葉で文に起こすことで理解度が断然上がるのを感じました。
本を読む
本を読むのは、新人の頃からずっと続けているのですが今年は少し読む量が減った気がします。 思い出せるだけ羅列してみようと思います。
- Clean Agile
- Amazon Web Services負荷試験入門
- Netty In Action
- Java並行プログラミング
- Real World HTTP 第2版
- イラストでわかるDockerとKubernetes
- マイクロサービスパターン
- 監視入門
- オブジェクト指向UIデザイン
- Linuxで動かしながら学ぶネットワーク入門
- ドメイン駆動設計入門
- みんなのJava
- 新世代Javaプログラミングガイド
- みんなのKotlin
- 7つの習慣
- kubernetesで実践するクラウドネイティブdevops
本屋で買ったのとかで覚えてないの合わせると、多分もうちょいある気がしますが、Amazonの購入履歴やらなんやらで思い出せるのはこんだけです。
こうして眺めてみると、Javaに結構寄ってるのがわかりますね。
来年はもう少し、Linuxやネットワーク、ディスクなどの足回りの知識をもう少し固めたいなーと思いました。
後は、Java以外の言語をもう少しやって、そのパラダイムを学びたいなぁと考えてます。
特に関数型に関しては自分はかなり弱い感じがするので「すごいHaskellたのしく学ぼう!」は読み直したいなと思います。
あとは、Rustなんかも学びが多そうだなぁと思ってます。
来年やりたいこと
来年は、以下のようなことをやろうかなと。
- 継続してブログを書く
- テーマを考えて読む本を選んでみる
- 資格を何か取る
ブログを書くことで、自分の技術力が上がるのは実感できたので今後も続けていたいとおもいます。
本に関しては、振り返りでも思ったのですが今までは興味の向くままに本を選んでいたのですが来年は少し考えながら本を選んでみようかと思いました。本のところでも書いたのですが具体的には以下のような本を意識的に読んでみようかと。
最後の資格に関しては、資格を取ることが目的というよりは資格を取る事でその分野に対する知識を深めることができるのではとは前々から思っていたので、やってみようかと。具体的には、LPICやCKA/CKADあたりに挑戦してみようかなと思ってます。