Jaegerでk8s+Istio上のアプリ(Quarkus、Spring)を分散トレーシングする

はじめに

分散トレーシングをやる際にJaegerというツールがあって、試してみたいと思って試せていなかったのやってみようと思います。今回はMinikubeで作ったクラスターにIstioをデプロイして、 その環境でのトレーシングを行ってみようと思います。

Istioにおけるトレーシングについて

どんなふうに実現されるか

Istioにおける分散トレーシングがどのように実現されているのかは、ドキュメントにいろいろ書かれてました。

まず、IstioはEnvoyベースのトレーシングを行います。その際にアプリケーションはB3 trace headersなどのヘッダーを転送していく必要があるようです。具体的には以下のようなヘッダーを転送して行くみたいです。

  • x-request-id
  • x-b3-traceid
  • x-b3-spanid
  • x-b3-parentspanid
  • x-b3-sampled
  • x-b3-flags
  • b3

さらに、Lightstepを利用する場合はx-ot-span-contextもつける必要があるみたいです。
これらのヘッダーはマニュアルで転送していくことも可能ですが、ZipkinJaegerのクライアントを使うことで自動的に拡散することも可能なようです。
ちなみに、なぜ、Istio自身がこれらのヘッダーをフォワーディングできないのかについてですが、アプリケーションのアウトバウンドリクエストがどのインバウンドリクエストによって発生したものかを特定するすべが、Istio側には存在しないからです。

また、Envoyベースのトレーシングに置いてEnvoyは以下のようなことを行ってくれます。

  • リクエストIDとトレーシングヘッダー(B3 Header等)を生成し送信する
  • リクエストとレスポンスのメタデータからTrace Spanを生成する
  • トレーシングバックエンドにSpanを送信する
  • プロキシ先のアプリケーションにヘッダーを送信する

Jaegerとは

Jaeger JaergerはDapper、OpenZipkinにインスパイヤーされた分散トレーシングシステムです。マイクロサービスに置いて以下のような用途で用いられます。

  • 分散トレーシングのモニタリング
  • 根本原因解析
  • サービスの依存解析
  • パフォーマンス、レイテンシの最適化

また、以下のようなコンポーネントで構成されます。

  • Goで作られたバックエンドコンポーネント
  • React UI
  • ストレージ
    • Cassandra 3.4+
    • Elasticserch 5.x, 6.x, 7.x
    • Kafka
    • メモリ

やってみる

環境

今回はKubernetesクラスターはMinikube(driver=none)を用います。

$ minikube version 
minikube version: v1.16.0
commit: 9f1e482427589ff8451c4723b6ba53bb9742fbb1

$ kubectl version -o yaml
clientVersion:
  buildDate: "2020-10-15T01:52:24Z"
  compiler: gc
  gitCommit: 62876fc6d93e891aa7fbe19771e6a6c03773b0f7
  gitTreeState: clean
  gitVersion: v1.18.10
  goVersion: go1.13.15
  major: "1"
  minor: "18"
  platform: linux/amd64
serverVersion:
  buildDate: "2020-10-15T01:43:56Z"
  compiler: gc
  gitCommit: 62876fc6d93e891aa7fbe19771e6a6c03773b0f7
  gitTreeState: clean
  gitVersion: v1.18.10
  goVersion: go1.13.15
  major: "1"
  minor: "18"

$ istioctl version
client version: 1.8.2
control plane version: 1.8.2
data plane version: 1.8.2 (1 proxies)

$ uname -srvmpio
Linux 5.4.0-64-generic #72-Ubuntu SMP Fri Jan 15 10:27:54 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

$ docker version 
(Client略)

Server: Docker Engine - Community
 Engine:
  Version:          20.10.2
  API version:      1.41 (minimum version 1.12)
  Go version:       go1.13.15
  Git commit:       8891c58
  Built:            Mon Dec 28 16:15:19 2020
  OS/Arch:          linux/amd64
  Experimental:     true
 containerd:
  Version:          1.4.3
  GitCommit:        269548fa27e0089a8b8278fc4fc781d7f65a939b
 runc:
  Version:          1.0.0-rc92
  GitCommit:        ff819c7e9184c13b7c2607fe6c30ae19403a7aff
 docker-init:
  Version:          0.19.0
  GitCommit:        de40ad0


$ java --version
openjdk 11.0.10 2021-01-19
OpenJDK Runtime Environment 18.9 (build 11.0.10+9)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.10+9, mixed mode)

$ mvn --version
Apache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 11.0.10, vendor: Oracle Corporation, runtime: /home/someone/.sdkman/candidates/java/11.0.10-open
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-65-generic", arch: "amd64", family: "unix"

もろもろを構築しておく

今回は以下のような環境を構築して分散トレーシングをやってみようと思います。

f:id:yuya_hirooka:20210131213754p:plain

Jaegerでトレーシングするのは以下の3つになります。

利用するアプリはSpringとQuarkusと利用していますが、この記事に置いては深い意味は無く、別で試したことがあったので採用しました。
アプリのロジックに関しても複雑なことは市内想定で、QuarkusアプリがSpringアプリに対して、Hello, Tracingの文字列を取得してそのままフロント側に返すようにしようと思います。

Sidecar InjectionをTrueにしてネームスペースを作成

まずは、IstioのインジェクションをTrueにしておきます。 今回は余計な複雑さをなくすために、新たにネームスペースを作成はせずにクラスタdefaultに対してインジェクションをTrueにします。
以下のコマンドを実行します。

$ kubectl --context=minikube label namespace default istio-injection=enabled
namespace/default labeled

minikubeのコンテクストでイメージをビルドするように設定

今回はローカルでビルドしたしたイメージを使うようにしておきます。
いかのコマンドでminikubeのコンテクストでイメージをビルドするように設定します。

$ eval $(minikube docker-env)

サンプルアプリケーションを作成しコンテナ化

Quarkusのアプリを作成

まずはQuarkusの方を作っていこうと思います。
プロジェクトはQuarkus - Start coding with code.quarkus.ioを使って作成します。
プロジェクトの設定は以下の通り。

f:id:yuya_hirooka:20210131164559p:plain

依存はRESTEasyRest Clientだけ追加しておきます。
プロジェクトが作成できたら、まずはHTTPクライアントを作成します。

@RegisterRestClient
@RegisterClientHeaders
public interface GreetingClient {

    @GET
    @Path("/hello")
    String fetchHello();
}

基本的には、なんの変哲の無いRestClientですがひとつだけポイントがあります。
前述の通り、Istioを使った分散トレーシングではヘッダーを転送していく必要があります。そのフォワーディングを行なうために、@RegisterClientHeadersを利用しています。このヘッダーはデフォルトで、指定されたJAX-RSのインバウンドリクエストヘッダーをアウトバウンドのリクエスト時に付与することができます。

RestClientの設定と転送するヘッダー設定を以下のようにappllication.propertiesに記述しておきます。

quarkus.http.port=8081
dev.hirooka.GreetingClient/mp-rest/url=http://${SPRING_SERVICE:localhost:8082}
dev.hirooka.GreetingClient/mp-rest/scope=javax.inject.Singleton
org.eclipse.microprofile.rest.client.propagateHeaders=x-request-id,x-b3-traceid,x-b3-spanid,x-b3-parentspanid,x-b3-sampled,x-b3-flags,b3,x-ot-span-context

設定まで記述できたら、次はハンドラーを記述します。今回はHTTPクライアントをハンドラーから直接利用するようにします。

@Path("/hello")
public class GreetingResource {

    @Inject
    @RestClient
    GreetingClient greetingClient;

    static final Logger logger = Logger.getLogger(GreetingResource.class);

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String hello(@Context HttpHeaders headers) {
        logger.info(headers.getRequestHeaders());
        return greetingClient.fetchHello();
    }
}

これでQuarkusの方のアプリは完成しました。
最後に作ったアプリをDockerイメージ化しておきます。

$ ./mvnw package
$ docker build -f src/main/docker/Dockerfile.jvm -t quarkus/open-tracing-jvm .
$ docker images | grep quarkus
quarkus/open-tracing                                           latest                  f2c29c0c8e21   2 minutes ago   385MB

Springのアプリを作成

次にSpringのアプリを作成します。
プロジェクトはSpring Initializrを使って作成します。
設定は以下のように。

f:id:yuya_hirooka:20210131173540p:plain

今回の構成だと、Springのアプリは外部アクセスを行わないため、ヘッダーをフォワーディングする必要が無いので、Spring Webだけで良いのですが、一応フォワーディングのやり方を示すために依存にSleuthZipkin Clientを追加してます。

なにはともあれ、まずはコントローラーを作成します。

@RestController
public class GreetingController {

    private final Logger logger = LoggerFactory.getLogger(GreetingController.class);

    @GetMapping("/hello")
    String greeting(@RequestHeader Map<String, String> header) {
        logger.info(header.toString());
        return "Hello, Tracing";
    }
}

今回必要な部分はこれだけです。
設定は以下の通り

server.port=8082
spring.zipkin.enabled=false

Spring Sleuthはデフォルトでlocalhost:9411にメトリクスを送信してしまうので、今回は無効化して置きます。

Springも外部へのHTTPコールを行いヘッダーを転送したい場合はspring.sleuth.propagation-keysを以下のように設定すれば良さそうです。

spring.sleuth.propagation-keys=x-request-id,x-b3-traceid,x-b3-spanid,x-b3-parentspanid,x-b3-sampled,x-b3-flags,b3,x-ot-span-context

x-b3-traceidなどのB3のヘッダーなどはこの値をセットしていない場合は、Spring Sleuthが自分で生成した値をヘッダーで利用してしまうみたいです。(もしかしたらもっといい方法があるかも...)

まぁ、今回はspring.sleuth.propagation-keysに関しては置いておいて、できたアプリをDockerイメージ化します。

$ mvn spring-boot:build-image -Dspring-boot.build-image.imageName=spring/open-tracing

$ docker images | grep spring/open-tracing
spring/open-tracing                                                latest                  0dbe73ec4359   41 years ago        268MB

イメージが作成されました。

DeploymentとServiceを作成してmimikubeにデプロイする

それではDeploymentとServiceを作成してminikubeにデプロイしておきます。
まずはベースとなるdeployment.yamlを作成します。

$ kubectl create deployment quarkus-app --image=quarkus/open-tracing --dry-run=client -o yaml > deployment.yaml
echo --- >> deployment.yaml
$ kubectl create deployment spring-app --image=spring/open-tracing --dry-run=client -o yaml >> deployment.yaml

できた、deplyment.yamlのそれぞれのDeploymentにimagePullPolicy: IfNotPresent(ローカルイメージを使用するようにするため)とquarkus-appには環境変数SPRING_SERVICEを記述しておきます。

apiVersion: apps/v1
kind: Deployment
metadata:
  creationTimestamp: null
  labels:
    app: quarkus-app
  name: quarkus-app
spec:
  replicas: 1
  selector:
    matchLabels:
      app: quarkus-app
  strategy: {}
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: quarkus-app
    spec:
      containers:
      - image: quarkus/open-tracing
        name: open-tracing
        imagePullPolicy: IfNotPresent
        env:
          - name: SPRING_SERVICE
            value: spring-app:8082
        resources: {}
status: {}
---
apiVersion: apps/v1
kind: Deployment
metadata:
  creationTimestamp: null
  labels:
    app: spring-app
  name: spring-app
spec:
  replicas: 1
  selector:
    matchLabels:
      app: spring-app
  strategy: {}
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: spring-app
    spec:
      containers:
      - image: spring/open-tracing
        name: open-tracing
        imagePullPolicy: IfNotPresent
        resources: {}
status: {}

次にサービスを作成します。

$ kubectl create service clusterip quarkus-app --tcp=8081:8081 --dry-run=client -o yaml > service.yaml
$ echo --- >> service.yaml
$ kubectl create service clusterip spring-app --tcp=8082:8082 --dry-run=client -o yaml > service.yaml

Serviceは特にいじることは無いので、作ったマニフェストをapplyしていきます。

$ kubectl --context=minikube apply -f deplyment.yaml
$ kubectl --context=minikube apply -f service.yaml

GatawayとVirtualSerciceを作成して疎通確認する

GatawayVirtualServiceを作っておきます。

gateway.yaml

apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
  name: quarkus-app-gateway
spec:
  selector:
    istio: ingressgateway 
  servers:
  - port:
      number: 80
      name: http
      protocol: HTTP
    hosts:
    - "*"

virtual-service.yaml

apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: quarkus-app-vs
spec:
  hosts:
  - "*"
  gateways:
  - quarkus-app-gateway
  http:
  - match:
    - uri:
        prefix: /hello
    route:
    - destination:
        port:
          number: 8081
        host: quarkus-app

上記のyamlをapplyしておきます。

$ kubectl --context=minikube apply -f gateway.yaml
$ kubectl --context=minikube apply -f virtual-service.yaml

cUrlを使って疎通確認を行います。

$ export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}'

$ export INGRESS_HOST=$(minikube ip)

$ curl ${INGRESS_HOST}:${INGRESS_PORT}/hello -v
*   Trying 192.168.49.2:30019...
* TCP_NODELAY set
* Connected to 192.168.49.2 (192.168.49.2) port 30019 (#0)
> GET /hello HTTP/1.1
> Host: 192.168.49.2:30019
> User-Agent: curl/7.68.0
> Accept: */*
> 
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< content-length: 14
< content-type: text/plain;charset=UTF-8
< x-envoy-upstream-service-time: 23
< date: Sun, 31 Jan 2021 11:28:48 GMT
< server: istio-envoy
< 
* Connection #0 to host 192.168.49.2 left intact
Hello, Tracing

ここまででようやく準備完了です。

Jaergerを動かす

さて、ようやくですが。 Jaegerをローカルで動かしておきます。
今回はMinikubeのクラスターにデプロイして動かします。
外部のJaergerにメトリクスを送信する場合は、--set values.global.tracer.zipkin.address=<jaeger-collector-address>:9411をIstioのインストール時に設定しておけば、任意のJaergerにデータを送信することができます。
それでは、Jargerをデプロイします。

$ kubectl apply -f https://raw.githubusercontent.com/istio/istio/release-1.8/samples/addons/jaeger.yaml

次のコマンドで、UIを動かします。

$  istioctl dashboard jaeger
http://localhost:16686

出力される。アドレスにアクセスすると以下の様なUIが表示されます。

f:id:yuya_hirooka:20210131210234p:plain

実はローカルで試している際に何度かリクエストを送ってしまったので、すでにデータが存在してしまっていますが、最初はなにも出力されません。
これは、デフォルトではIstioは1%のリクエストのデータをJaergerに送るためです。
以下のコマンドを何度か叩いて、Jaergerにデータを送るようにしておきます。

$ for i in $(seq 1 100); do curl -s -o /dev/null "http://${INGRESS_HOST}:${INGRESS_PORT}/hello"; done

そうするとそれっぽいデータが見れるようになります。

f:id:yuya_hirooka:20210131210628p:plain

ちょっとUIを見てみる

ヘッダーのSearchタブを選択すると以下のような検索用のボックスが表示されていると思います。

f:id:yuya_hirooka:20210131211740p:plain

Serviceistio-ingressgatewayを選択してFind Tracesを押すと右側にトレースされたリクエストが表示されます。

f:id:yuya_hirooka:20210131212213p:plain

僕の環境では4回のサンプリングされたリクエストの情報が表示されます。
グラフのすぐ下のリクエスト(7ff3cb9)をクリックするとリクエストに関する情報がより詳細にみることができます。

f:id:yuya_hirooka:20210131212649p:plain

ヘッダーのSystem Architectureタブを選択するとサービスの依存関係やそれぞれに難解リクエストが送られたかを確認することができます。

f:id:yuya_hirooka:20210131213032p:plain

f:id:yuya_hirooka:20210131213114p:plain

とりあえず動かすところまでできました。

KongのKubernetes Ingress Controllerを試す

はじめに

久しぶりにKongをちょっとお勉強したい気になってきたので、ドキュメントを眺めていたらKubernetes Ingress Controllerなるものを見つけました。面白そうだったので、とりあえず、動かすまでやってみようかと思います。
Kongの基本的なところプラグインの書き方も以前まとめたので興味がある人はよかったら見てみてください。

KongのIngress Controllerについて

KubernetesIngress Controllerでクラスター内でIngressリソースとして動くKongに対して設定と管理を行います。クラスター内のスケーリング、設定の変更、エラーなどのイベントによってKongをアップデートしてくれます。
以下の2つのコンポーネントからなります。

  • Kong本体
  • Controller、Kongの設定を同期する

プラグインを反映させることももちろん可能で、Kongができることは基本的になんでもできるようです。

カスタムリソースについて

いくつかのカスタムリソースが用意されおり、Kongの宣言的な設定を用いてKongの機能を利用することができます。

  • KongPlugin: KongのPluginエンティティ相当の設定を行なうリソース
  • KongIngress: ルーティング、ロードバランシング、ヘルスチェックなど細かなルーティングの設定等を行なうためのリソース
  • KongConsumer: KongのCunsumerエンティティへのマッピング
  • TCPIngress: TCPベースのルーティングを行なうためのリソース。non-HTTPベースのサービスに対して利用可能

使ってみる

今回はMinikube(driver none)にIngressControllerをインストールして、サンプルアプリにプロキシしてみたいと思います。

環境

$ minikube version
minikube version: v1.16.0
commit: 9f1e482427589ff8451c4723b6ba53bb9742fbb1

$ kubectl version -o yaml
clientVersion:
  buildDate: "2021-01-13T13:28:09Z"
  compiler: gc
  gitCommit: faecb196815e248d3ecfb03c680a4507229c2a56
  gitTreeState: clean
  gitVersion: v1.20.2
  goVersion: go1.15.5
  major: "1"
  minor: "20"
  platform: linux/amd64
serverVersion:
  buildDate: "2020-12-08T17:51:19Z"
  compiler: gc
  gitCommit: af46c47ce925f4c4ad5cc8d1fca46c7b77d13b38
  gitTreeState: clean
  gitVersion: v1.20.0
  goVersion: go1.15.5
  major: "1"
  minor: "20"
  platform: linux/amd64

$ docker version
(クライアント略)

Server: Docker Engine - Community
 Engine:
  Version:          20.10.2
  API version:      1.41 (minimum version 1.12)
  Go version:       go1.13.15
  Git commit:       8891c58
  Built:            Mon Dec 28 16:15:19 2020
  OS/Arch:          linux/amd64
  Experimental:     true
 containerd:
  Version:          1.4.3
  GitCommit:        269548fa27e0089a8b8278fc4fc781d7f65a939b
 runc:
  Version:          1.0.0-rc92
  GitCommit:        ff819c7e9184c13b7c2607fe6c30ae19403a7aff
 docker-init:
  Version:          0.19.0
  GitCommit:        de40ad0

$ uname -srvmpio
Linux 5.4.0-62-generic #70-Ubuntu SMP Tue Jan 12 12:45:47 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

$ lsb_release -a
LSB Version:    core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.1 LTS
Release:    20.04
Codename:   focal

アップストリームを作っておく

下準備として、Kongがプロキシすする先のアップストリームを作って置きます。
アップストリームはNginxを用いて2つ作成し、それぞれが自分自身のhostnameを返すようにしておきます。

--dry-runオプションと使ってリソースを作ります。

# deployment.yamlの作成
$ kubectl create deployment nginx-first --image=nginx:1.19.6 --dry-run=client -o yaml > deployment.yaml

$ echo --- >> deployment.yaml 

$ kubectl create deployment nginx-second --image=nginx:1.19.6 --dry-run=client -o yaml >> deployment.yaml


# service.yamlの作成
$ kubectl create service clusterip nginx-first --tcp=8081:80 --dry-run=client -o yaml > service.yaml

$ echo --- >> service.yaml 

$ kubectl create service clusterip nginx-second --tcp=8082:80 --dry-run -o yaml > service.yaml

それぞれできたリソースはここに置いておくので興味があれば確認してみてください。
作成したリソースをApplyしてします。

$ kubectl --context=minikube apply -f deployment.yaml

$ kubectl --context=minikube get po 
NAME                            READY   STATUS    RESTARTS   AGE
nginx-first-d6db6c668-wtmwh     1/1     Running   0          84s
nginx-second-6b8d5c9696-2cj25   1/1     Running   0          84s


$ kubectl --context=minikube apply -f service.yaml
service/nginx-first created
service/nginx-second created

$ kubectl get svc
NAME           TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)    AGE
kubernetes     ClusterIP   10.96.0.1        <none>        443/TCP    92m
nginx-first    ClusterIP   10.109.148.241   <none>        8081/TCP   44s
nginx-second   ClusterIP   10.105.0.182     <none>        8082/TCP   44s

最後に、それぞれが自分自身のhostnameを返すようにしておきます。

$ kubectl --context=minikube exec -it nginx-first-d6db6c668-wtmwh -- cp /etc/hostname /usr/share/nginx/html/index.html

$ kubectl --context=minikube exec -it nginx-second-6b8d5c9696-2cj25 -- cp /etc/hostname /usr/share/nginx/html/index.html

テストのためにポートフォワードしてそれぞれのNginxにつないで見ます。

# First
$ kubectl --context=minikube port-forward service/nginx-first 8081:8081

$ curl localhost:8081
nginx-first-d6db6c668-wtmwh

# Second
$ kubectl --context=minikube port-forward service/nginx-second 8082:8082

$ curl localhost:8082
nginx-second-6b8d5c9696-2cj25

無事、それぞれのNginxがhostnameを返してくれていますね。
これで準備完了です。

Kubernetes Ingress Controllerのインストール

Minikubeで作ったクラスタにKongのKubernetes Ingress Controllerをインストールします。
ドキュメントに寄るとHelmを使った方法やここからダウンロードしたYamlファイルをアプライスル方法があるみたいです。
今回は後者の方で行こうと思います。

$ kubectl create -f https://bit.ly/k4k8s
namespace/kong created
Warning: apiextensions.k8s.io/v1beta1 CustomResourceDefinition is deprecated in v1.16+, unavailable in v1.22+; use apiextensions.k8s.io/v1 CustomResourceDefinition
customresourcedefinition.apiextensions.k8s.io/kongclusterplugins.configuration.konghq.com created
customresourcedefinition.apiextensions.k8s.io/kongconsumers.configuration.konghq.com created
customresourcedefinition.apiextensions.k8s.io/kongingresses.configuration.konghq.com created
customresourcedefinition.apiextensions.k8s.io/kongplugins.configuration.konghq.com created
customresourcedefinition.apiextensions.k8s.io/tcpingresses.configuration.konghq.com created
serviceaccount/kong-serviceaccount created
Warning: rbac.authorization.k8s.io/v1beta1 ClusterRole is deprecated in v1.17+, unavailable in v1.22+; use rbac.authorization.k8s.io/v1 ClusterRole
clusterrole.rbac.authorization.k8s.io/kong-ingress-clusterrole created
Warning: rbac.authorization.k8s.io/v1beta1 ClusterRoleBinding is deprecated in v1.17+, unavailable in v1.22+; use rbac.authorization.k8s.io/v1 ClusterRoleBinding
clusterrolebinding.rbac.authorization.k8s.io/kong-ingress-clusterrole-nisa-binding created
service/kong-proxy created
service/kong-validation-webhook created
deployment.apps/ingress-kong created

むー、Kubernetesのバージョン1.20.xを使ってると、betaじゃなくなったリソースがいくつかあって警告がいくつか出てしまうみたいですね。
まぁ、この辺は一旦気にせずに先に進めようと思います。

デプロイされたコングにアクセスするためのIPを取得して環境変数(KONG_PROXY_IP)にセットしておきます。
この、今後この環境変数を使ってKongにアクセスします。

export KONG_PROXY_IP=$(minikube service -n kong kong-proxy --url | head -1)

デプロイされたKongにアクセスしてみます。

$ curl -i $PROXY_IP
HTTP/1.1 404 Not Found
Date: Sun, 24 Jan 2021 06:20:54 GMT
Content-Type: application/json; charset=utf-8
Connection: keep-alive
Content-Length: 48
X-Kong-Response-Latency: 0
Server: kong/2.2.1

{"message":"no Route matched with those values"}

Kongから無事レスポンスが返ってきました。
まだなにもKongの設定を行っていないので、NotFoundを返してきますね。

プロキシの設定を記述する

プロキシの設定を記述するには、普通の(?)Ingressのリソースを記述すれば良さそうです。
以下のようなYamlを記述します。

ingress.yaml

apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: demo
  annotations:
    kubernetes.io/ingress.class: kong
    konghq.com/strip-path: "true"
spec:
  rules:
  - http:
      paths:
      - path: /first
        backend:
          serviceName: nginx-first
          servicePort: 8081
      - path: /second
        backend:
          serviceName: nginx-second
          servicePort: 8082

今回ちょっとだけトリッキーなのはnginxのコンテキストルートはも問題で、/firstなどのパスがそのままアップストリームにプロキシされると困るのでkonghq.com/strip-path: "true"を付与しました。

早速、リソースをApplyしてアクセスしてみます。

$ kubectl apply -f ingress.yaml 

$ curl -i $PROXY_IP/first
HTTP/1.1 200 OK
Content-Type: text/html; charset=UTF-8
Content-Length: 28
Connection: keep-alive
Server: nginx/1.19.6
Date: Sun, 24 Jan 2021 07:11:05 GMT
Last-Modified: Sun, 24 Jan 2021 05:57:51 GMT
ETag: "600d0c5f-1c"
Accept-Ranges: bytes
X-Kong-Upstream-Latency: 1
X-Kong-Proxy-Latency: 0
Via: kong/2.2.1

nginx-first-d6db6c668-wtmwh

$ curl -i $PROXY_IP/second
HTTP/1.1 200 OK
Content-Type: text/html; charset=UTF-8
Content-Length: 30
Connection: keep-alive
Server: nginx/1.19.6
Date: Sun, 24 Jan 2021 07:11:06 GMT
Last-Modified: Sun, 24 Jan 2021 05:59:05 GMT
ETag: "600d0ca9-1e"
Accept-Ranges: bytes
X-Kong-Upstream-Latency: 1
X-Kong-Proxy-Latency: 0
Via: kong/2.2.1

nginx-second-6b8d5c9696-2cj25

それぞれ、きちんとプロキシされているようですね。

atコマンドで指定時間にJobを単発実行させる

はじめに

最近、CrontabみたいなJobを定期的にスケジュールするのでは無くて、指定時間に単発で実行したいみたいな要件にぶち当たって、ちょっと調べていたらUnix系のOSにatというコマンドがあることに気がついたので機能を試して見ようかと思います。

at コマンドとは

標準出力、もしくはファイルからコマンドを受け取って、/bin/shを使って指定時間にシェルを起動するコマンドです。Corntabなどは日時の指定のみで、最長一年に一回は実行されるのに対してatコマンドは年の指定まで行なうことが可能で、指定した時間に一回のみといった実行が行えます。
実行時の環境変数BASH_VERSINFODISPLAYEUID, GROUPSSHELLOPTSTERMUID以外)、ワークディレクトリ、umaskなどはコマンド実行時の状態でキープされます。
また、atsetuidプログラムで、LD_LIBRARY_PATHLD_PRELOADなどの環境変数も読み込まれないようです。

使ってみる

環境

コマンドの実行環境は以下の通り、

$ uname -srvmpio
Linux 5.4.0-62-generic #70-Ubuntu SMP Tue Jan 12 12:45:47 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

$ lsb_release -a
LSB Version:    core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.1 LTS
Release:    20.04
Codename:   focal

基本的な操作方法

プロンプトでコマンドを設定する。

atは以下のような形式で、実行できます。

$ at 07:55 01/31/21
warning: commands will be executed using /bin/sh
at> 

実行すると、>atという表示が現れプロンプトが起動します。
ここにコマンドを打ち込んでctrl+dを押すと設定が完了です。

$ at 10:46 01/22/21
warning: commands will be executed using /bin/sh
at> java --version > test.txt
at> <EOT>
job 6 at Fri Jan 22 10:46:00 2021

実施時間をすぎると以下のようにテキストが出力されているのが確認できます。

$ cat test.txt 
openjdk 15.0.1 2020-10-20
OpenJDK Runtime Environment AdoptOpenJDK (build 15.0.1+9)
Eclipse OpenJ9 VM AdoptOpenJDK (build openj9-0.23.0, JRE 15 Linux amd64-64-Bit Compressed References 20201022_81 (JIT enabled, AOT enabled)
OpenJ9   - 0394ef754
OMR      - 582366ae5
JCL      - ad583de3b5 based on jdk-15.0.1+9)

ファイルを読み込んで実行する

atコマンドはファイルを読み込んで実行することもできます。
まずは、以下のようなシェルスクリプトを用意します。

hello.sh

#!/bin/sh

echo "hello, at command" > test.txt

このシェルスクリプトatコマンドで実行するには-fオプションで実行するファイルを指定します。

$ at 21:05 01/22/21 -f hello.sh 
warning: commands will be executed using /bin/sh
job 8 at Fri Jan 22 21:05:00 2021

$ cat test.txt
hello, at command

日付の指定の形式について

atPOSIX.2 standardを拡張した日付の指定が可能なようです。

  • HH:MM
    • 時間と分を指定できます。この場合、今日の時間にJobが実行され、もしその時間が今日すでに過去である場合は次の日の指定された時間にJobが実行されます。
  • MMDD[CC]YY, MM/DD/[CC]YY, DD.MM.[CC]YY or [CC]YY-MM-DD
    • オプションとして、上記のような形式で年月日を指定することができます。
  • midnightnoonteatime
    • それぞれの時間に実行されます。
  • todaytomorrow
    • サーフィクスとして上記の表記をつけるとそれぞれ今日と明日を示すことができます。

その他にも結構自由な時間の指定ができるようで、今日から3日後の4pmとかを指定する場合はat 4pm + 3 days、6月31日の10時AMを指定したい場合はat 10am Jul 31と行ったような感じで指定することができます。

他にできそうなこと

マニュアルでatコマンドを調べてみると、このコマンドには以下のような、仲間がいるようです。

  • atq

    • ペンディングされているUserのJobの一覧を取得します。superuserであるばあいはすべてのJobを出力します。-lオプションがエイリアスになっています。
  • atrm

    • job numberにとって特定されるジョブを削除します。-rオプションがエイリアスになっています。
  • batch

    • システムのロードアベレージが1.5(かもしくはatdコマンドで指定された値)以下になった際にJobを実行する。

他にも-m使うなどするとジョブの実行時にユーザにメールを送ることができるようです。

MinikubeにIstioをインストールする

はじめに

業務でKubernetesやIstioの機能の一部を利用することがあるですが、勉強のために試せる環境がほしかったのでローカルのMinikubeにインストールして見ようかと思います。

やってみる

動作環境

$ uname -srvmpio
Linux 5.4.0-62-generic #70-Ubuntu SMP Tue Jan 12 12:45:47 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

$ lsb_release -a
LSB Version:    core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.1 LTS
Release:    20.04
Codename:   focal

$ docker version
Client: Docker Engine - Community
 Version:           20.10.2
 API version:       1.41
 Go version:        go1.13.15
 Git commit:        2291f61
 Built:             Mon Dec 28 16:17:43 2020
 OS/Arch:           linux/amd64
 Context:           default
 Experimental:      true

Server: Docker Engine - Community
 Engine:
  Version:          20.10.2
  API version:      1.41 (minimum version 1.12)
  Go version:       go1.13.15
  Git commit:       8891c58
  Built:            Mon Dec 28 16:15:19 2020
  OS/Arch:          linux/amd64
  Experimental:     true
 containerd:
  Version:          1.4.3
  GitCommit:        269548fa27e0089a8b8278fc4fc781d7f65a939b
 runc:
  Version:          1.0.0-rc92
  GitCommit:        ff819c7e9184c13b7c2607fe6c30ae19403a7aff
 docker-init:
  Version:          0.19.0
  GitCommit:        de40ad0

minikubeのインストール

ドキュメントに寄ると、いくつかインストール方法があるようですが、今回はバイナリのインストールを行おうと思います。
以下のコマンドを実行します。

$ curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 \
  && chmod +x minikube

$ sudo mkdir -p /usr/local/bin/
$ sudo install minikube /usr/local/bin/

$ minikube version
minikube version: v1.16.0
commit: 9f1e482427589ff8451c4723b6ba53bb9742fbb1

インストールが終わったのでMinikubeを起動します。
この際、MinikubeのドライバーはDockerを使い、メモリのCPUの設定は公式ドキュメントにしたがって、起動したいと思います。

$ minikube start --memory=16384 --cpus=4 --kubernetes-version=v1.18.10

$ minikube status
minikube
type: Control Plane
host: Running
kubelet: Running
apiserver: Running
kubeconfig: Configured
timeToStop: Nonexistent

istioをインストール

PodとServiceに関する注意点

Istioを利用する上でPodとServiceには求められる条件があります。

Podの必須条件は以下のとおりです。

  • Podは最低一つのServiceに紐付いている必要がある
  • PodはアプリケーションをUIDが1337で実行していないこと
  • pod security policiesクラスターに設定されているかつIstio CNI Pluginを利用していない場合。Podに対してNET_ADMINNET_RAWの機能が利用できる必要がある(詳細はこちら)

また、以下のような推奨される条件があるようです。

  • Istioがメトリクスを収集する際に付加情報をつけることができるので、Podにappラベルと、versionラベルを付与しておくのがよい。
  • Serviceのspec.ports.nameプロトコルを指定して置くのが良い(指定できるプロトコルおよび詳細はこちら

Istioが利用するポート

別の注意点としてIstioが以下のようなポートを利用することを留意しておく必要があります。

Istioのサイドカープロキシ(Envoy)によって利用されるポートの一覧

ポート プロトコル 説明
15000 TCP Envoyのadmin ポート。ポッド内のみ
15001 TCP Envoyのアウトバウンドポート
15006 TCP Envoyのインバウンドポート
15008 TCP Envoyトンネルポート(インバウンド)
15020 HTTP アプリケーションやIstio agent、Envoyのテレメトリーのポイント
15021 HTTP ヘルスチェックのポイント
15090 Envoy Prometheusのテレメトリーポイント

Istioのコントロールプレーンによって利用されるポートの一覧

ポート プロトコル 説明
15010 GRPC XDSとCAサービス(プレーンテキスト)
15012 GRPC XDSとCAサービス(TLS)プロダクションで利用することが推奨される
8080 HTTP Debugインタフェース(非推奨)
443 HTTPS Webhook
15014 HTTP コントロールプレーンのモニタリング

Server First Protocols

サーバが最初のバイトを送信するようなServer First Protocolを利用する際は PREMISSIVE mTLSプロトコルの自動検出等など最初のバイトを利用する機能に影響を与える可能性があります。なので、明示的にServiceのプロトコルを選択肢てやる必要があります。
ただし、以下のようなよく知られたポートを使うプロトコルやサーバに関しては自動的にTCPが選択されるようです。

その他、アウトバウンドのアクセスでHostヘッダーを利用する場合などにも注意が必要なようです。詳細はこちらを確認ください

インストール

istioをインストールする方法は以下のような方法があるようです。

  • Istiocltを使ったマニュアルイントール
  • Istio Operatorを使ったインストール

あとは、マルチクラスターへのインストールやVMを使う方法(こいつはちょっとよくわからなかった)などがあるようです。
今回はIstiocltを使ったマニュアルイントールをやってみようと思います。

どちらにせよまずはIstioctlをインストールする必要があるようなのでインストールします。
istioctlのバージョンはIstioのコントロールプレーンのバージョンと合わせることが推奨されています。今回は最新のリリースのものをインストールします。

$ curl -sL https://istio.io/downloadIstioctl | sh -

$ istioctl version
no running Istio pods in "istio-system"
1.8.2

何度も実行するのが面倒なので、export PATH=$PATH:$HOME/.istioctl/bin.bash_profileに登録しておきます。

次にIstioをクラスターにインストールしていくのですが、まずIstioのインストールには[Profile]という概念があるみたいです。簡単にまとめるとプロファイルはIstioのコントローププレーンやIstioサイドカーに対するカスタマイズ方法を提供しています。Profileでできることの詳細に関してはこちらのページをご確認ください。
なにも設定しない場合defualtのプロファイルが利用されることになります。
今回は素直にインストールするので特にプロファイルは指定しません。

# プロファイルを指定したい場合は`--set profile=demo`などを指定する
$ istioctl install --context minikube

コマンドの実行が終わったら次にきちんとインストールが行われたか確認します。
verify-installコマンドを使ってこの確認を行なうことができます。
まずは、インストール時のさいに利用されたマニフェストを取得する必要があります。
事前に取得していない場合はistioctl manifest generate <インストール時に使用したオプション>で出力することができます。
今回の場合は以下のように実行します。

$ istioctl manifest generate >  generated-manifest.yaml

$ istioctl verify-install -f generated-manifest.yaml 
CustomResourceDefinition: authorizationpolicies.security.istio.io.default checked successfully
CustomResourceDefinition: destinationrules.networking.istio.io.default checked successfully
CustomResourceDefinition: envoyfilters.networking.istio.io.default checked successfully
CustomResourceDefinition: gateways.networking.istio.io.default checked successfully
CustomResourceDefinition: istiooperators.install.istio.io.default checked successfully
CustomResourceDefinition: peerauthentications.security.istio.io.default checked successfully
CustomResourceDefinition: requestauthentications.security.istio.io.default checked successfully
CustomResourceDefinition: serviceentries.networking.istio.io.default checked successfully
CustomResourceDefinition: sidecars.networking.istio.io.default checked successfully
CustomResourceDefinition: virtualservices.networking.istio.io.default checked successfully
CustomResourceDefinition: workloadentries.networking.istio.io.default checked successfully
CustomResourceDefinition: workloadgroups.networking.istio.io.default checked successfully
ServiceAccount: istio-ingressgateway-service-account.istio-system checked successfully
ServiceAccount: istio-reader-service-account.istio-system checked successfully
ServiceAccount: istiod-service-account.istio-system checked successfully
ClusterRole: istio-reader-istio-system.default checked successfully
ClusterRole: istiod-istio-system.default checked successfully
ClusterRoleBinding: istio-reader-istio-system.default checked successfully
ClusterRoleBinding: istiod-istio-system.default checked successfully
ValidatingWebhookConfiguration: istiod-istio-system.default checked successfully
EnvoyFilter: metadata-exchange-1.6.istio-system checked successfully
EnvoyFilter: metadata-exchange-1.7.istio-system checked successfully
EnvoyFilter: metadata-exchange-1.8.istio-system checked successfully
EnvoyFilter: stats-filter-1.6.istio-system checked successfully
EnvoyFilter: stats-filter-1.7.istio-system checked successfully
EnvoyFilter: stats-filter-1.8.istio-system checked successfully
EnvoyFilter: tcp-metadata-exchange-1.6.istio-system checked successfully
EnvoyFilter: tcp-metadata-exchange-1.7.istio-system checked successfully
EnvoyFilter: tcp-metadata-exchange-1.8.istio-system checked successfully
EnvoyFilter: tcp-stats-filter-1.6.istio-system checked successfully
EnvoyFilter: tcp-stats-filter-1.7.istio-system checked successfully
EnvoyFilter: tcp-stats-filter-1.8.istio-system checked successfully
ConfigMap: istio.istio-system checked successfully
ConfigMap: istio-sidecar-injector.istio-system checked successfully
MutatingWebhookConfiguration: istio-sidecar-injector.default checked successfully
Deployment: istio-ingressgateway.istio-system checked successfully
Deployment: istiod.istio-system checked successfully
PodDisruptionBudget: istio-ingressgateway.istio-system checked successfully
PodDisruptionBudget: istiod.istio-system checked successfully
Role: istio-ingressgateway-sds.istio-system checked successfully
Role: istiod-istio-system.istio-system checked successfully
RoleBinding: istio-ingressgateway-sds.istio-system checked successfully
RoleBinding: istiod-istio-system.istio-system checked successfully
HorizontalPodAutoscaler: istio-ingressgateway.istio-system checked successfully
HorizontalPodAutoscaler: istiod.istio-system checked successfully
Service: istio-ingressgateway.istio-system checked successfully
Service: istiod.istio-system checked successfully
Checked 12 custom resource definitions
Checked 1 Istio Deployments
Istio is installed successfully

最後の出力をみるとインストールはうまく行っているようです。

アーキテクチャについてちょっと見てみる

最後にほんの少しだけ、Istioのアーキテクチャについて見てみたいと思います。
istioのコンポーネントは論理的には以下の2つに分けられるようです。

  • data plane

    • サイドカーとしてデプロイされるEnvoyプロキシの集まり。マイクロサービス間のすべてネットワークコミュニケーションをプロキシする。また、メトリクスの収集なども行なう
  • controle plane

(公式ドキュメントより)

Envoy

Istioは拡張したEnvoy Proxyを利用します。
サービス間のインバウンドとアウトバウンドのやり取りをすべてプロキシする役割を持っており、サイドカーとしてPodにデプロイされます。

Istiod

サービスディスカバリ機能、設定、証明書マネジメントを提供します。
IstiodはハイレベルなルーティングルールをEnvoyの設定に変換し、ランタイムのサイドカーに拡散します。
以下のようなコンポーネントで構成されます。

  • Pilot
    • Envoyの構成管理を行なう
  • Citadel
    • CA。鍵の管理や証明書の発行を行なう
  • galley
    • ユーザ設定のマネジメントを行なう

参考資料

Reactor Publisherにおけるエラーハンドリング

はじめに

前回に引き続き、Reactive Springのを読んでいて、Publihsherのエラーハンドリングを行なうためのメソッド群が紹介されていたので、実際に動かして試してみようと思います。(書籍で紹介されていないものも結構あったので)

実際にやってみる

環境は以下の通り、

環境

$ java --version
openjdk version "15" 2020-09-15
OpenJDK Runtime Environment (build 15+36-1562)
OpenJDK 64-Bit Server VM (build 15+36-1562, mixed mode, sharing)


$ mvn --version
Apache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 15, vendor: Oracle Corporation, runtime: /home/yuya-hirooka/.sdkman/candidates/java/15-open
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-59-generic", arch: "amd64", family: "unix"

Pom

今回は、ReactorとJunit5を利用したいので、Pomには以下の設定を追加しました。

    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.7.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.7.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>2020.0.2</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
            <plugin>
                <artifactId>maven-failsafe-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
        </plugins>
    </build>

コードを書いていく

エラーハンドリング

まずはエラーハンドリングについて、FluxとMonoではonError**と言ったようなエラー時にフックされるようなメソッドがいくつか生えています。

onErrorResume()

onErrorResume()ですが、こいつは例外発生時のFallback処理を書くためのメソッドです。
以下のコードはintの3がPublishされた際にRutimeExceptionをスローされる処理に対して、エラー発生時はFallbackとして4をPublishします。

    @Test
    void testOnErrorResume() {

        Flux<Integer> integerFlux = Flux.just(1, 2, 3)
                .flatMap(integer -> {
                    if (integer.equals(3)) return Flux.error(new RuntimeException());
                    return Flux.just(integer);
                })
                .onErrorResume(e -> Flux.just(4));

        StepVerifier.create(integerFlux)
                .expectNext(1)
                .expectNext(2)
                .expectNext(4)
                .verifyComplete();
    }

特定の例外に対して、Fallback処理を書きたい場合は以下のように第一引数に例外クラスを書いてやるようにします。

    @Test
    void testOnErrorResume() {

        Flux<Integer> integerFlux = Flux.just(1, 2, 3)
                .flatMap(integer -> {
                    if (integer.equals(3)) return Flux.error(new RuntimeException());
                    return Flux.just(integer);
                })
                .onErrorResume(IllegalArgumentException.class, v -> Flux.just(10))
                .onErrorResume(RuntimeException.class, v -> Flux.just(4));

        StepVerifier.create(integerFlux)
                .expectNext(1)
                .expectNext(2)
                .expectNext(4)
                .verifyComplete();
    }

また、第一引数は例外クラスの代わりにPredicate<? super Throwable> predicateを受け取ることができるので以下のように特定条件の例外にマッチした際のハンドリングを記述することもできます。

    @Test
    void testOnErrorResume() {

        Flux<Integer> integerFlux = Flux.just(1, 2, 3)
                .flatMap(integer -> {
                    if (integer.equals(3) return Flux.error(new RuntimeException("Oops"));
                    return Flux.just(integer);
                })
                .onErrorResume(e -> e.getMessage().equals("Auchi"), v -> Flux.just(10))
                .onErrorResume(e -> e.getMessage().equals("Oops"), v -> Flux.just(4));


        StepVerifier.create(integerFlux)
                .expectNext(1)
                .expectNext(2)
                .expectNext(4)
                .verifyComplete();
    }

onErrorReturn

onErrorReturn()は例外発生時に単一の値をエミットする関数です。
以下のコードは先程と同じく、3がPublishされた際にRutimeExceptionをスローされる処理に対して、エラー発生時はFallbackとして4をPublishします。

    @Test
    void testOnErrorReturn() {

        Flux<Integer> integerFlux = Flux.just(1, 2, 3)
                .flatMap(integer -> {
                    if (integer.equals(3)) return Flux.error(new RuntimeException());
                    return Flux.just(integer);
                })
                .onErrorReturn(4);

        StepVerifier.create(integerFlux)
                .expectNext(1)
                .expectNext(2)
                .expectNext(4)
                .verifyComplete();
    }

特定の例外クラスにマッチした場合や、特定条件の例外にマッチした場合のハンドリングの書き方はonErrorResumeと同様の方法でできます。
すべてを動かしてみたわけでは無いですが、 onErro**系は大体同じような操作ができるっぽいです。

doOnError

まず、Flux(or Mono)でのストリーム処理時に発生したエラーをハンドリングして、ログなどの副作用の伴う処理の実行を行なう場合はdoOnError()を利用します。

    @Test
    void testDoOnError() {
        Flux<Integer> integerFlux = Flux.just(1, 2, 3)
                .map(v -> {
                    if (v.equals(3)) throw new RuntimeException("Oops");
                    return v;
                })
                .doOnError(RuntimeException.class, v -> System.out.println("message = " + v.getMessage()));

        StepVerifier.create(integerFlux)
                .expectNext(1)
                .expectNext(2)
                .expectError(RuntimeException.class)
                .verify();
    }

標準出力への出力は以下の通り。

message = Oops

onErrorMap

処理時の例外をハンドリングして、他の例外に変換してダウンストリームに返す場合は、onErrorMap()を利用します。

    @Test
    void testOnErrorMap() {

        Flux<Integer> integerFlux = Flux.just(1, 2, 3)
                .flatMap(integer -> {
                    if (integer.equals(3)) return Flux.error(new RuntimeException("Oops"));
                    return Flux.just(integer);
                })
                .onErrorMap(RuntimeException.class, e -> new IllegalArgumentException(e.getMessage()));

        StepVerifier.create(integerFlux)
                .expectNext(1)
                .expectNext(2)
                .expectError(IllegalArgumentException.class)
                .verify();
    }
}

onErrorContinue

例外発生時に、問題があるエミットを落として処理を続ける場合は、onErrorContinue()を利用します。

    @Test
    void testOnErrorContinue() {

        Flux<Integer> integerFlux = Flux.just(1, 2, 3, 4)
                .flatMap(integer -> {
                    if (integer.equals(3)) return Flux.error(new RuntimeException("Oops"));
                    return Flux.just(integer);
                })
                .onErrorContinue((e, v) -> System.out.printf("dropped value  = %s", v));


        StepVerifier.create(integerFlux)
                .expectNext(1)
                .expectNext(2)
                .expectNext(4)
                .verifyComplete();
    }
}

実行すると、標準出力に以下の文字列が出力されます。

dropped value  = 3

非Reactiveな非同期処理をFluxで扱う

はじめに

Reactive Springを読んでいて、Flux.create()を使えば非Reactiveな非同期のイベントをReactiveのProducerにアダプトすることができるみたいなことが書かれていたので、実際に使って見て理解を深めたいと思います。

Flux.create()?

Flux.create(Consumer<? super FluxSink> emitter)プログラマティックにFluxを生成するためのFactory で、非同期や同期で生成した値をFluxSink APIを通してエミットすることができます。このFactoryを使えばJMS Brockerのイベントやマルチスレッドアプリケーションで生成した値をエレメントとしてエミットすることができます。

実際にやってみる

それでは早速やっていきたいと思います。
タイトルの通り今回はTreadを作って非同期に実行されるスレッドからエレメントをエミットしたいと思います。

環境

$ java --version
openjdk version "15" 2020-09-15
OpenJDK Runtime Environment (build 15+36-1562)
OpenJDK 64-Bit Server VM (build 15+36-1562, mixed mode, sharing)


$ mvn --version
Apache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 15, vendor: Oracle Corporation, runtime: /home/yuya-hirooka/.sdkman/candidates/java/15-open
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-59-generic", arch: "amd64", family: "unix"

Pom

今回は、ReactorとJunit5を利用したいので、Pomには以下の設定を追加しました。

    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.7.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.7.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>2020.0.2</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
            <plugin>
                <artifactId>maven-failsafe-plugin</artifactId>
                <version>2.22.2</version>
            </plugin>
        </plugins>
    </build>

コードを書く

準備が整ったところで早速コードを書いていきます。
実際のコードは以下のとおりとなります。

public class FluxCreateTest {

    private final ExecutorService es = Executors.newFixedThreadPool(1);

    @Test
    void exec() {
        System.out.println("Thread Name = " + Thread.currentThread().getName());
        Flux<String> values = Flux.create(emitter -> this.execTask(emitter, 5));

        StepVerifier
                .create(values.doFinally(signalType -> this.es.shutdown()))
                .expectNext("exec number: 1")
                .expectNext("exec number: 2")
                .expectNext("exec number: 3")
                .expectNext("exec number: 4")
                .expectNext("exec number: 5")
                .verifyComplete();
    }

    private void execTask(FluxSink<String> stringFluxSink, int maxNum) {
        this.es.submit(() -> {
            System.out.println("Thread Name = " + Thread.currentThread().getName());

            AtomicInteger num = new AtomicInteger();

            while (num.get() < maxNum) {
                String emitValue = "exec number: " + num.incrementAndGet();
                System.out.println("emitValue = " + emitValue);
                stringFluxSink.next(emitValue);

                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            stringFluxSink.complete();
        });
    }
}

実行するとテストが成功しコンソールに以下の出力がされます。

Thread Name = main
Thread Name = pool-1-thread-1
emitValue = exec number: 1
emitValue = exec number: 2
emitValue = exec number: 3
emitValue = exec number: 4
emitValue = exec number: 5

上記のコードでは、ExecutorServiceによって作成されたpool-1-tread-1という名前のスレッドで、1秒に1回、exec number: *というStringをエミットしています。
Flux.create()は引数としてFluxSinkを受け取るConsumerを登録することができ、Signalは受け取ったFluxSink APIを通して発信することができます。

ここで、ダウンストリームのConsumerがオーバーフローした際のハンドリングはOverflowStrategy.BUFFERが設定されており、デフォルトではバッファする挙動となります。
この、ストラテジーは変更することが可能で、その場合は、createの第二引数にOverflowStrategyEnumを渡してやります。

2020年振り返り

はじめに

なんとなくいろんな人の振り返りみて、自分も振り返ってみようかとふと思ったので書いてみようかと。

ざっくり今年やったこと、あったこと

今年やったとことあったことをまとめると、以下のようになるかと思います。

  • 転職して一年
  • ブログを平均して週一ぐらいでは書く
  • 本を読む

転職して一年

先月の11月で今の会社に来て一年が経ちました。もともとは、転職ドラフトというサイトで声をかけていただいて今に至りますが、気がつけば一年たってました。
前職とは働き方や開発のやり方が全く違い、いろいろ学びが多かったと思います。
今の組織は、アジャイル開発を行う上で、XPのプラクティスを大切にしています。その中で自分の中の今まで知識でしかなかった部分が実践できたり、知らなかったことをたくさん学ぶことができました。

その他の面で、コミュニケーション面に置いて自分の中で1つのブレイクスルーみたいなのがあった気がします。作業が基本ペア作業で行うので一般的な組織に比べてコミュニケーションを撮る機会がかなり多いと思います。その中であまり上手くコミュニケーションを取れない事があると、感じていたのですが、いろいろ考える中で気づきみたいなものがあり少しだけコミュニケーション力がマシになった気がします。
とはいえ、まだまだ上手くできない面がかなりおおく、特に、ビジネスサイド方々などのコンテキストが共有されてない相手とのコミュニケーションに課題を感じているので、来年以降その力をつけていかないとなぁと思っています。
(ただ、自分の強みとして伸ばしたい方向性とは少し違う気もしているのですが…)

ブログを平均して週一ぐらいでは書く

前職の先輩でかなりのブログを書かれる人がいます。その人に触発されてなんとなく、最低週一ぐらいでは書いてみようと思ったのが、今年の3月です。こから42記事書いてるので目的は達成したかなと。
まぁ、今後も週一でゆるゆると書いていこうかなぁとは思います。
やってみて思ったことは以下のことです。

  • ブログをネタを探すようになるので、発見が多くなる
  • 文章にする事で理解度が上がる

前者に関して、ブログを書くことを意識してるとより多くのことを学べるなぁと感じました。
後者に関しては、ドキュメントを読んでなんとなく理解した気になっても書いてみると結構書けないみたいなのがあったのがたくさんありました。その箇所を自分の言葉で文に起こすことで理解度が断然上がるのを感じました。

本を読む

本を読むのは、新人の頃からずっと続けているのですが今年は少し読む量が減った気がします。 思い出せるだけ羅列してみようと思います。

本屋で買ったのとかで覚えてないの合わせると、多分もうちょいある気がしますが、Amazonの購入履歴やらなんやらで思い出せるのはこんだけです。
こうして眺めてみると、Javaに結構寄ってるのがわかりますね。
来年はもう少し、Linuxやネットワーク、ディスクなどの足回りの知識をもう少し固めたいなーと思いました。
後は、Java以外の言語をもう少しやって、そのパラダイムを学びたいなぁと考えてます。
特に関数型に関しては自分はかなり弱い感じがするので「すごいHaskellたのしく学ぼう!」は読み直したいなと思います。
あとは、Rustなんかも学びが多そうだなぁと思ってます。

来年やりたいこと

来年は、以下のようなことをやろうかなと。

  • 継続してブログを書く
  • テーマを考えて読む本を選んでみる
  • 資格を何か取る

ブログを書くことで、自分の技術力が上がるのは実感できたので今後も続けていたいとおもいます。

本に関しては、振り返りでも思ったのですが今までは興味の向くままに本を選んでいたのですが来年は少し考えながら本を選んでみようかと思いました。本のところでも書いたのですが具体的には以下のような本を意識的に読んでみようかと。

  • ネットワーク、ディスク、Linuxなどの下回りの知識を深める
  • Javaとは全く違う言語パラダイムを学ぶ

最後の資格に関しては、資格を取ることが目的というよりは資格を取る事でその分野に対する知識を深めることができるのではとは前々から思っていたので、やってみようかと。具体的には、LPICやCKA/CKADあたりに挑戦してみようかなと思ってます。