OperatorSDKでCustom ResourceとCustom Controllerを作る

はじめに

k8sではいくつかの拡張ポイントが用意されています。その中でも、APIをカスタムするためのCustom Resourceや調整ループによってリソースオブジェクトの状態管理を行い、宣言的なAPIを可能にするCustom Controllerがあります。
まえまえからこの2つには興味がありつつも触れることができてなかったのですが、先日k8sのブログに「Writing a Controller for Pod Labels」というのを見かけて、OperatorSDKを使えばいい感じにできそうであるということに気がついたので試してみたいと思います。

OperatorSDKとは

そもそもOperatorとはなんぞやという部分なのですが、 Custom Resourceを使ってアプリケーションとそのコンポーネントを管理するソフトウェアの拡張で、OperatorそのものはControllerパターンに則ってリソースオブジェクトを管理します。

OperatorSDKはその名の通りでOperatorを作る際のSDKです。
Operatorを作る際の高次元なAPIを提供していたり、テスト、ビルド、パッケージングのサポートをしていたりします。
また、Operatorやカスタムリソースを作成する際のベースとなるテンプレートプロジェクトの作成などもしてくれるようです。
OperatorSDKでは以下の方法でOperatorの作成を行えるようです。

今回はGoでOperator(Memcached Operator)を作る方法を試してみます。
基本的にはここをたどって気になったところを深堀りする感じでやっていこうと思います。

使ってみる

環境

動作環境は以下の通り

$ uname -srvmpio
Linux 5.4.0-77-generic #86-Ubuntu SMP Thu Jun 17 02:35:03 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

$ lsb_release -a
LSB Version:    core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.2 LTS
Release:    20.04
Codename:   focal

$ go version
go version go1.16 linux/amd64


# インストールに必要っぽい
$ gpg --version
gpg (GnuPG) 2.2.19
libgcrypt 1.8.5
Copyright (C) 2019 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later <https://gnu.org/licenses/gpl.html>
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law.
(省略)


$ minikube version
minikube version: v1.21.0
commit: 76d74191d82c47883dc7e1319ef7cebd3e00ee11

$ kubectl version -o yaml
clientVersion:
  buildDate: "2021-03-18T01:10:43Z"
  compiler: gc
  gitCommit: 6b1d87acf3c8253c123756b9e61dac642678305f
  gitTreeState: clean
  gitVersion: v1.20.5
  goVersion: go1.15.8
  major: "1"
  minor: "20"
  platform: linux/amd64
serverVersion:
  buildDate: "2021-01-13T13:20:00Z"
  compiler: gc
  gitCommit: faecb196815e248d3ecfb03c680a4507229c2a56
  gitTreeState: clean
  gitVersion: v1.20.2
  goVersion: go1.15.5
  major: "1"
  minor: "20"
  platform: linux/amd64

OperatorSDKのインストール

インストトールは以下の3つの方法があります。

今回はInstall from GitHub releaseでインストールしてみます。
インストールにはcurlgpgのversion 2.0以上が必要みたいです。
バイナリをダウンロードします。

$ echo $ARCH
amd64

$ export OS=$(uname | awk '{print tolower($0)}')
$ echo $OS
linux

$ curl -LO https://github.com/operator-framework/operator-sdk/releases/download/v1.9.0/operator-sdk_${OS}_${ARCH}

次にバイナリの検証を行います。

$  gpg --keyserver keyserver.ubuntu.com --recv-keys 052996E2A20B5C7E

$ curl -LO https://github.com/operator-framework/operator-sdk/releases/download/v1.9.0/checksums.txt

$ curl -LO https://github.com/operator-framework/operator-sdk/releases/download/v1.9.0/checksums.txt.asc

$  gpg -u "Operator SDK (release) <cncf-operator-sdk@cncf.io>" --verify checksums.txt.asc
gpg: 署名されたデータが'checksums.txt'にあると想定します
gpg: 2021年06月18日 08時21分40秒 JSTに施された署名
gpg:                RSA鍵8613DB87A5BA825EF3FD0EBE2A859D08BF9886DBを使用
gpg: "Operator SDK (release) <cncf-operator-sdk@cncf.io>"からの正しい署名 [不明の]
gpg: *警告*: この鍵は信用できる署名で証明されていません!
gpg:          この署名が所有者のものかどうかの検証手段がありません。
主鍵フィンガープリント: 3B2F 1481 D146 2380 80B3  46BB 0529 96E2 A20B 5C7E
     副鍵フィンガープリント: 8613 DB87 A5BA 825E F3FD  0EBE 2A85 9D08 BF98 86DB

公開鍵自体が信用できるかわからないという警告が出ていますね。 ただ、checksum.txtの検証自体はうまく言ってるみたいなのでここでは先に進もうと思います。

$ grep operator-sdk_${OS}_${ARCH} checksums.txt | sha256sum -c -
operator-sdk_linux_amd64: OK

チェックサムもOKみたいです。
実行権限を付与して、Pathがとおっているところに配置します。

$ chmod +x operator-sdk_${OS}_${ARCH} && sudo mv operator-sdk_${OS}_${ARCH} /usr/local/bin/operator-sdk

これでインストールは完了です。

$ operator-sdk version
operator-sdk version: "v1.9.0", commit: "205e0a0c2df0715d133fbe2741db382c9c75a341", kubernetes version: "1.20.2", go version: "go1.16.5", GOOS: "linux", GOARCH: "amd64"

プロジェクトを作成する

まずは、プロジェクトを作成します。

$ mkdir operatorsdk-sample
$ cd operatorsdk-sample/
$ operator-sdk init --domain hirooka.dev --repo github.com/samuraiball/settings

operator-sdk initコマンドではGo modulesベースのプロジェクトを作成します。
$GOPATH/src以外のところでプロジェクトをInitする場合は--repoフラグでリポジトリを指定する必要があるみたいです。
--domainフラグではDockerレジストリもしくは、Docker Hubのnamespace(ユーザ名)を指定します。
今回この最初の部分で色々しくってレジストリでもユーザ名でもないものを指定してしまったのですが、Makefileの記述を変えればうまく行くので一旦ここでは先に進みます。
さておきコマンドを実行すると諸々の設定がすんだプロジェクトが出来上がります。

ディレクトリ構造は以下のような感じ。

$ tree
.
├── Dockerfile
├── Makefile
├── PROJECT
├── config
│   ├── default
│   │   ├── kustomization.yaml
│   │   ├── manager_auth_proxy_patch.yaml
│   │   └── manager_config_patch.yaml
│   ├── manager
│   │   ├── controller_manager_config.yaml
│   │   ├── kustomization.yaml
│   │   └── manager.yaml
│   ├── manifests
│   │   └── kustomization.yaml
│   ├── prometheus
│   │   ├── kustomization.yaml
│   │   └── monitor.yaml
│   ├── rbac
│   │   ├── auth_proxy_client_clusterrole.yaml
│   │   ├── auth_proxy_role.yaml
│   │   ├── auth_proxy_role_binding.yaml
│   │   ├── auth_proxy_service.yaml
│   │   ├── kustomization.yaml
│   │   ├── leader_election_role.yaml
│   │   ├── leader_election_role_binding.yaml
│   │   ├── role_binding.yaml
│   │   └── service_account.yaml
│   └── scorecard
│       ├── bases
│       │   └── config.yaml
│       ├── kustomization.yaml
│       └── patches
│           ├── basic.config.yaml
│           └── olm.config.yaml
├── go.mod
├── go.sum
├── hack
│   └── boilerplate.go.txt
└── main.go

config/ディレクトリには起動時の設定が諸々用意されて、Kustomizeyamlファイルが入っています。

  • config/manager:コントローラーをPodとして起動する設定
  • config/rbac:作成するコントローラーを操作する際のパーミッションの設定

その他にもCRDやWebhookの設定も入っているようです。

また、main.goがOperatorのエントリーポイントとなるみたいです。

func init() {
    utilruntime.Must(clientgoscheme.AddToScheme(scheme))

    //+kubebuilder:scaffold:scheme
}

func main() {
    var metricsAddr string
    var enableLeaderElection bool
    var probeAddr string
    flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
    flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
    flag.BoolVar(&enableLeaderElection, "leader-elect", false,
        "Enable leader election for controller manager. "+
            "Enabling this will ensure there is only one active controller manager.")
    opts := zap.Options{
        Development: true,
    }
    opts.BindFlags(flag.CommandLine)
    flag.Parse()

    ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

    mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
        Scheme:                 scheme,
        MetricsBindAddress:     metricsAddr,
        Port:                   9443,
        HealthProbeBindAddress: probeAddr,
        LeaderElection:         enableLeaderElection,
        LeaderElectionID:       "6a59cba3.hirooka.dev",
    })
    if err != nil {
        setupLog.Error(err, "unable to start manager")
        os.Exit(1)
    }

    //+kubebuilder:scaffold:builder

    if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
        setupLog.Error(err, "unable to set up health check")
        os.Exit(1)
    }
    if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
        setupLog.Error(err, "unable to set up ready check")
        os.Exit(1)
    }

    setupLog.Info("starting manager")
    if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
        setupLog.Error(err, "problem running manager")
        os.Exit(1)
    }
}

ここではControllerのセットアップ、実行のトラッキング、キャッシュの制御、CRDのスキーマ登録等を行ってくれるManagerを初期化する処理が書かれています。
ManagerではControllerがResorceを監視するネームスペースを制限することができます。

mgr, err := ctrl.NewManager(cfg, manager.Options{Namespace: namespace})

デフォルトではネームスペースはOperatorが動いているネームスペースのものを監視するようになります。
もしすべてのネームスペースを監視するようにしたい場合はNamespace: ""と空の文字列を入れる必要があるようです。
ここではなにも触らずに、デフォルトのままで先に進みます。

新しいCRDとControllerを追加する

足場となるコードを自動生成する

次に新しいCRDとControllerを追加します。
以下のコマンドを実行します。

$ operator-sdk create api --group cache --version v1alpha1 --kind Memcached --resource --controller 

これで、groupがcache、versionがv1alpha1、KindがMemcachedの足場が完成します。 今回深くはおいませんが、groupなどのそれぞれの意味はこちらをご確認ください。

実行してGitでどんな感じの変更が入っているかをみてみます。

$ git status
ブランチ master
Your branch is up to date with 'origin/master'.

コミット予定の変更点:
  (use "git restore --staged <file>..." to unstage)
    new file:   ../../docker/Dockerfile
    new file:   ../opentracing/deployment.yaml
    new file:   ../opentracing/gateway.yaml
    new file:   ../opentracing/service.yaml
    new file:   ../opentracing/tracing.yaml
    new file:   ../opentracing/virtual-service.yaml
    modified:   PROJECT
    new file:   api/v1alpha1/groupversion_info.go
    new file:   api/v1alpha1/memcached_types.go
    new file:   api/v1alpha1/zz_generated.deepcopy.go
    new file:   config/crd/kustomization.yaml
    new file:   config/crd/kustomizeconfig.yaml
    new file:   config/crd/patches/cainjection_in_memcacheds.yaml
    new file:   config/crd/patches/webhook_in_memcacheds.yaml
    new file:   config/rbac/memcached_editor_role.yaml
    new file:   config/rbac/memcached_viewer_role.yaml
    new file:   config/samples/cache_v1alpha1_memcached.yaml
    new file:   config/samples/kustomization.yaml
    new file:   controllers/memcached_controller.go
    new file:   controllers/suite_test.go
    modified:   go.mod
    modified:   main.go

新しいファイルがいくつかできているのとmain.goなどが書き換わってますね。

Custom Resourceを編集する

ここで、注目すべきはapi/v1alpha1/memcached_types.gocontrollers/memcached_controller.goでこれがCustom ResourceとCustom Controllerのベースのコードとなります。
まずはmemcached_types.goの方から修正していきます。
Memcachedとそれに関係する構造体が定義されているのがわかります。

type Memcached struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec   MemcachedSpec   `json:"spec,omitempty"`
    Status MemcachedStatus `json:"status,omitempty"`
}

Memached構造体が持つMemcachedSpec.Sizeという、デプロイされるCustom Resourceの数を設定するフィールドと、MemcachedStatus.NodesではCustom Resourceで作られるPodの名前を保存するフィールドを追加します。

// MemcachedSpec defines the desired state of Memcached
type MemcachedSpec struct {
    //+kubebuilder:validation:Minimum=0
    // デプロイされるMemcachedの数
    Size int32 `json:"size"`
}

// MemcachedStatus defines the observed state of Memcached
type MemcachedStatus struct {
    // MemcachedのPodの名前を保存する
    Nodes []string `json:"nodes"`
}

次にMemached構造体に+kubebuilder:subresource:statusマーカーを追加します。
Status Subresourceを追加することによりControllerが他のCustom Resourceオブジェクトに変更を加えること無くCustom Resourceのステータスを更新できるようになるみたいです。

//+kubebuilder:subresource:status
type Memcached struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec   MemcachedSpec   `json:"spec,omitempty"`
    Status MemcachedStatus `json:"status,omitempty"`
}

*_types.goは必ず以下のコマンドを実行して、Resource Typeの自動生成されるコードを更新する必要があります。

$ make generate

Makefileで定義されたこのコマンドはcontroller-genを実行して、api/v1alpha1/zz_generated.deepcopy.goをアップデートします。

また、 SpecやStatusフィースドにCRD validationマーカーが付いている場合、以下のコマンドでそのCRDのマニフェストを生成することができます。

$ make manifests

config/crd/bases/cache.example.com_memcacheds.yamlに以下のようなCRDのマニフェストが生成されます。

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  annotations:
    controller-gen.kubebuilder.io/version: v0.4.1
  creationTimestamp: null
  name: memcacheds.cache.hirooka.dev
spec:
  group: cache.hirooka.dev
  names:
    kind: Memcached
    listKind: MemcachedList
    plural: memcacheds
    singular: memcached
  scope: Namespaced
  versions:
  - name: v1alpha1
    schema:
      openAPIV3Schema:
        description: Memcached is the Schema for the memcacheds API
        properties:
          apiVersion:
            description: 'APIVersion defines the versioned schema of this representation
              of an object. Servers should convert recognized schemas to the latest
              internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
            type: string
          kind:
            description: 'Kind is a string value representing the REST resource this
              object represents. Servers may infer this from the endpoint the client
              submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
            type: string
          metadata:
            type: object
          spec:
            description: MemcachedSpec defines the desired state of Memcached
            properties:
              size:
                description: デプロイされるMemcachedの数
                format: int32
                minimum: 0
                type: integer
            required:
            - size
            type: object
          status:
            description: MemcachedStatus defines the observed state of Memcached
            properties:
              nodes:
                description: MemcachedのPodの名前を保存する
                items:
                  type: string
                type: array
            required:
            - nodes
            type: object
        type: object
    served: true
    storage: true
    subresources:
      status: {}
status:
  acceptedNames:
    kind: ""
    plural: ""
  conditions: []
  storedVersions: []

コントローラーの実装

コントローラーの実装はここのものを一旦そのまま使います。
まず、SetUpWithManagerメソッドではManagerがセットアップされ、ControllerがどのようにCustom Resourceを管理するかを設定します。

func (r *MemcachedReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&cachev1alpha1.Memcached{}).
        Owns(&appsv1.Deployment{}).
        Complete(r)
}

NewControllerManagedByはコントローラーのビルダーを提供しており、様々なコントローラーの設定を行なうことができます。
例えば上記の例では以下のような設定を行ってます。

  • For(&cachev1alpha1.Memcached{}
    • MecahedをプライマリーResourceとして指定しています。Add/Update/Deleteのそれぞれのイベントが発火されたタイミングで調整ループの中でRequestMemcachedオブジェクトを操作するために送られます。
  • Owns(&appsv1.Deployment{})
    • Deploymentをセカンダリなリソースとして管理することを定義しています。DeploymentAdd/Update/Deleteイベントのタイミングで調整のRequestがDeploymemtのオーナーの調整ループ(Reconcileメソッド)にマップされます。今回の場合はMemcachedオブジェクトになります。

Controllerを初期化する際の様々な設定が用意されています。詳細はこちらをご確認ください。
例えば以下のように設定すると、調整ループの最大の並行数を2に設定し特定条件のイベントを無視するようになります。

func (r *MemcachedReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&cachev1alpha1.Memcached{}).
        Owns(&appsv1.Deployment{}).
                // WithOptionとWithEventFilerを追加
        WithOptions(controller.Options{MaxConcurrentReconciles: 2}).
        WithEventFilter(ignoreDeletionPredicate()).
        Complete(r)
}

func ignoreDeletionPredicate() predicate.Predicate {
    return predicate.Funcs{
        UpdateFunc: func(e event.UpdateEvent) bool {
            // メタデータが変更されていない場合はUpdateを無視する
            return e.ObjectOld.GetGeneration() != e.ObjectNew.GetGeneration()
        },
        DeleteFunc: func(e event.DeleteEvent) bool {
            // オブジェクトがDeleteとなっている場合はfalseで評価される
            return !e.DeleteStateUnknown
        },
    }
}

WithEventFilterPredicateを渡すことでイベントをフィルターすることができるようです。

次に、 Reconcileで調整ループの実装の部分をみていきます。

func (r *MemcachedReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {

    memcached := &cachev1alpha1.Memcached{}
    err := r.Get(ctx, req.NamespacedName, memcached)

       // 省略
}

Reconcile関数はCustom Resourceを実際のシステムで理想な状態にする役割を持ちます。
イベントが発火されるたびにReconcile関数が呼ばれ、調整が行われます。
Reconcile関数はRequestを引数として受け取り、RequestNamespace/Nameの鍵を持っており、リクエストに対応するオブジェクトをLookUpするのに利用されます。
上記の例では、調整リクエストに対応するMemcachedをLookUpしてます。

また、Reconcile関数は以下の戻り値を返すことが可能です。

  • return ctrl.Result{}, err:エラー
  • return ctrl.Result{Requeue: true}, nil:エラーなし
  • return ctrl.Result{}, nil:調整の停止
  • return ctrl.Result{RequeueAfter: nextRun.Sub(r.Now())}, nil:調整をX時間後に再度行なう

最後にContorollerはRBACのパーミンションを持つ必要があります。
以下のようにRBACマーカーを付与します。

//+kubebuilder:rbac:groups=cache.hirooka.dev,resources=memcacheds,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=cache.hirooka.dev,resources=memcacheds/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=cache.hirooka.dev,resources=memcacheds/finalizers,verbs=update
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
func (r *MemcachedReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
//省略
}

以下のコマンドを叩くとconfig/rbac/role.yamlに上記のマーカーからマニフェストを自動生成してくれます。

$ make manifests

OperatorのイメージをDocker HubにPushする

作成したOperatorのイメージをDockerレジストリにPushしておく必要があります。
Makefileの以下の部分を書き換えます。

-IMG ?= controller:latest
+IMG ?= $(IMAGE_TAG_BASE):$(VERSION)

IMAGE_TAG_BASEでInitのところで指定したDockerレジストリを取得できるみたいです。
しかし、前述の通り私はちょっとinitのところでしくってしまったのでこのブログでは以下のように書き換えます。

IMG ?= hirohiroyuya/sample-controller:latest

次のコマンドでイメージをビルドしてPushします。

$ make docker-build docker-push

クラスターにデプロイする

作ったCDRとControllerをクラスターにデプロイします。
デプロイにはいくつかの方法があるようですが今回はこの中で「Run as a Deployment inside the cluster」の方法を試してみます。
具体的には以下のコマンドを実行します。

$ make deploy

この際クラスターはカレントContextで指定されるものが利用されるみたいです(このブログではminikubeにしてます)。
コマンドの実行が成功するとデフォルトでは<project-name>-systemのネームスペースでOperatorが実行されています。

$ kubectl get deployment -n operatorsdk-sample-system 
NAME                                    READY   UP-TO-DATE   AVAILABLE   AGE
operatorsdk-sample-controller-manager   1/1     1            1           16m

いい感じに動いてくれてるみたいですね。

Memcached Recourceを作成する

Operatorのデプロイまでできたので、いよいよMemcached Resourceをクラスタにデプロイしてみます。
config/samples/cache_v1alpha1_memcached.yamlを以下のように書き換えます。’

apiVersion: cache.hirooka.dev/v1alpha1
kind: Memcached
metadata:
  name: memcached-sample
spec:
  size: 3

applyしてリソースが作成されていることを確認します。

$ kubectl apply -f config/samples/cache_v1alpha1_memcached.yaml
memcached.cache.hirooka.dev/memcached-sample created


$ kubectl get deployment
NAME               READY   UP-TO-DATE   AVAILABLE   AGE
memcached-sample   3/3     3            3           91s


$  kubectl get pods
NAME                                READY   STATUS    RESTARTS   AGE
memcached-sample-6c765df685-6mzjj   1/1     Running   0          2m12s
memcached-sample-6c765df685-l9jpr   1/1     Running   0          2m12s
memcached-sample-6c765df685-t2wkw   1/1     Running   0          2m12s

いい感じで動いてくれてそうですね!!

Isito、k8sでカナリアデプロイをする

はじめに

先月の5月18日にIstioが1.10のリリースがされたみたいです。
その中でStable Revision Labelsと言う機能を使ってカナリアップデートを行なう例が乗っていて、最初言葉尻を完全に読み間違えていてアプリのカナリアデプロイが容易になると思ったのですがそうでは無く、リビジョン付きの複数のコントロールプレーンをデプロイするサポートが入ったって感じっぽいですね。
まぁ、1.10は置いておいても、k8sやIstioを使ったカナリアリデプロイってやったことなかったので、ちょっと試してみようかと思います。
基本的にはここに乗っているのに基づいてやろうと思います。
かなり古いブログですが、これ以上に新しいものをみつけることができなかったので、もしもっと良いやり方がありそうであればコメントなどで教えてくれると嬉しいです。

やっていく

環境

今回の動作環境は以下のようになってます。

$ uname -srvmpio
Linux 5.4.0-74-generic #83-Ubuntu SMP Sat May 8 02:35:39 UTC 2021 x86_64 x86_64 x86_64 GNU/Linu

$ lsb_release -a
LSB Version:    core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.2 LTS
Release:    20.04
Codename:   focal


$ minikube version
minikube version: v1.21.0
commit: 76d74191d82c47883dc7e1319ef7cebd3e00ee11


$ kubectl version  -o yaml
clientVersion:
  buildDate: "2021-03-18T01:10:43Z"
  compiler: gc
  gitCommit: 6b1d87acf3c8253c123756b9e61dac642678305f
  gitTreeState: clean
  gitVersion: v1.20.5
  goVersion: go1.15.8
  major: "1"
  minor: "20"
  platform: linux/amd64
serverVersion:
  buildDate: "2021-01-13T13:20:00Z"
  compiler: gc
  gitCommit: faecb196815e248d3ecfb03c680a4507229c2a56
  gitTreeState: clean
  gitVersion: v1.20.2
  goVersion: go1.15.5
  major: "1"
  minor: "20"
  platform: linux/amd64

$ istioctl version
client version: 1.10.1
control plane version: 1.10.1
data plane version: 1.10.1 (1 proxies)

また、このブログでは特に明記しない場合k8sのコンテキストはminikubeを使っています。

下準備

下準備として特定の文字列を返すnginxのコンテナを作成しておきます。
まずは、以下のコマンドを実行して、Dockerのコンテキストをminikubeのものに変えておきます。

$ eval $(minikube docker-env)

これでDockerクライアントはMinikubeのコンテキストで動くようになりました。  

次に以下の簡単なDokcerfileを用意します。

Dockerfile-v1

FROM nginx:1.19.6

RUN echo "Hello, Canary v1" > /usr/share/nginx/html/index.html

Dockerfile-v2

FROM nginx:1.19.6

RUN echo "Hello, Canary v2" > /usr/share/nginx/html/index.html

それぞれをビルドしておきます。

$ docker build ./ -t hello-nginx-v2 -f Dockerfile-v2
$ docker build ./ -t hello-nginx-v1 -f Dockerfile-v1

$ docker images | grep hello-nginx
hello-nginx-v2                            latest     9defac4edce6   2 minutes ago   133MB
hello-nginx-v1                            latest     abd78992aeba   8 minutes ago   133MB

最後にあとで使うのでMinikubeのIPを調べておきます。

$ minikube ip
192.168.49.2

これで下準備は完了です。

どんなことを行なうか

今回は、以下のパターンをやってみようかと思います。

Kubernetesのみでのカナリアデプロイをやってみる
・Istioを使ったカナリアリデプロイをやってみる

k8sを使ったカナリアデプロイ

k8s単体でもカナリアデプロイを行なうこと自体は可能です。 それを行なうには以下の2つの方法取るようです。

selectorを用いたカナリアデプロイ

selectorを用いてカナリアデプロイをやってみようと思います。
こいつの考え方としては、古いバージョンと新しいバージョンのPod数を変更することで、新しいVersionのPodへルーティングされる割合を調整します。
実際にやってみます。 まずは、2つのDeploymentを用意します。

$ kubectl create deployment hello-nginx --image=hello-nginx-v1 --dry-run=client -o yaml > deployment.yaml
$ echo --- >> deployment.yaml 
$ kubectl create deployment hello-nginx --image=hello-nginx-v2 --dry-run=client -o yaml >> deployment.yam

できたデプロイメントが以下のようになります。

apiVersion: apps/v1
kind: Deployment
metadata:
  creationTimestamp: null
  labels:
    app: hello-nginx
  name: hello-nginx
spec:
  replicas: 1
  selector:
    matchLabels:
      app: hello-nginx
  strategy: {}
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: hello-nginx
    spec:
      containers:
      - image: hello-nginx-v1
        name: hello-nginx-v1
        resources: {}
status: {}
---
apiVersion: apps/v1
kind: Deployment
metadata:
  creationTimestamp: null
  labels:
    app: hello-nginx
  name: hello-nginx
spec:
  replicas: 1
  selector:
    matchLabels:
      app: hello-nginx
  strategy: {}
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: hello-nginx
    spec:
      containers:
      - image: hello-nginx-v2
        name: hello-nginx-v2
        resources: {}
status: {}

それぞれを見分けるためnamenginx-hello-v1nginx-hello-v2のに変え環境を示すラベル(track)追加します。
また、nginx-hello-v1の方はレプリカ数も変えておきます。

apiVersion: apps/v1
kind: Deployment
metadata:
  creationTimestamp: null
  labels:
    app: hello-nginx      
    track: stable               # 環境を示すラベルを追加
  name: hello-nginx-v1          # nameにVersionを追加
spec:
  replicas: 3                   # レプリカ数を3に
  selector:
    matchLabels:
      app: hello-nginx
  strategy: {}
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: hello-nginx
    spec:
      containers:
      - image: hello-nginx-v1
        name: hello-nginx-v1
        resources: {}
status: {}
---
apiVersion: apps/v1
kind: Deployment
metadata:
  creationTimestamp: null
  labels:
    app: hello-nginx
    track: canary               # 環境を示すラベルを追加
  name: hello-nginx-v2          # nameにVersionを追加
spec:
  replicas: 1
  selector:
    matchLabels:
      app: hello-nginx
  strategy: {}
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: hello-nginx
    spec:
      containers:
      - image: hello-nginx-v2
        name: hello-nginx-v2
        resources: {}
status: {}

deploymentをapplyします。

$ kubectl apply -f deployment.yaml 
deployment.apps/hello-nginx-v1 created
deployment.apps/hello-nginx-v2 created

$ kubectl get po
NAME                              READY   STATUS    RESTARTS   AGE
hello-nginx-v1-7b8665d769-grsz5   1/1     Running   0          7s
hello-nginx-v1-7b8665d769-rdwg5   1/1     Running   0          7s
hello-nginx-v1-7b8665d769-zvlsg   1/1     Running   0          7s
hello-nginx-v2-86467b54f9-ghdnm   1/1     Running   0          7s

そして、これらのデプロイメントに対してapp: hello-nginxのラベルに対してルーティングを行なうServiceを記述してやります。   

$ kubectl create service nodeport hello-nginx --tcp=8081:80 --dry-run=client -o yaml > service.yaml

できたServiceが以下の通りになります。

apiVersion: v1
kind: Service
metadata:
  creationTimestamp: null
  labels:
    app: hello-nginx
  name: hello-nginx
spec:
  ports:
  - name: 8081-80
    port: 8081
    protocol: TCP
    targetPort: 80
  selector:
    app: hello-nginx
  type: NodePort
status:
  loadBalancer: {}

applyしてNodePortのポートをチェックしておきます。

$ kubectl  apply -f service.yaml

$ kubectl get svc/hello-nginx
NAME          TYPE       CLUSTER-IP       EXTERNAL-IP   PORT(S)          AGE
hello-nginx   NodePort   10.100.167.221   <none>        8081:32697/TCP   68m

これでおおよそ1/4程度のリクエストがv2にルーティングされるようになっているはずです。
ロードテストツールであるk6で以下のスクリプトを書いてチェックしてみます。

$ cat 100requests.js 
import http from 'k6/http';
import { sleep, check } from 'k6';

export let options = {
  vus: 10,
  duration: '10s',
};

export default function () {
  
  let res = http.get('http://192.168.49.2:32697');
  sleep(1);

  check(res, {
    'v1 responded': (r) => r.body == 'Hello, Canary v1\n',
    'v2 responded': (r) => r.body == 'Hello, Canary v2\n',
  });
}

ここでk6の使い方を詳細には説明しませんが、10並列で10秒ごとに計100回のリクエストを送り、そのレスポンスボディをチェックするスクリプトになっています。
k6に関しては過去にブログを書いているので、よろしければそちらもみてみてください)

$ docker run --network=host -i loadimpact/k6 run  - < 100requests.js

          /\      |‾‾| /‾‾/   /‾‾/   
     /\  /  \     |  |/  /   /  /    
    /  \/    \    |     (   /   ‾‾\  
   /          \   |  |\  \ |  (‾)  | 
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: -
     output: -

  scenarios: (100.00%) 1 scenario, 10 max VUs, 40s max duration (incl. graceful stop):
           * default: 10 looping VUs for 10s (gracefulStop: 30s)


(省略)


     ✗ v1 responded
      ↳  80% — ✓ 80 / ✗ 20
     ✗ v2 responded
      ↳  20% — ✓ 20 / ✗ 80

(省略)

ここでは大体20%ぐらいのリクエストがV2に振られたみたいですね。
もう一度テストを実行すると70%、30%の割合になったので、おおよそ25%の割合で新しいVersionのPodにリクエストが割り振られているのがわかります。
Podが少ない状態である程度新しいバージョンの動作に確証が取れてきたらscaleコマンドでPodの数を調整します。

$ kubectl scale --replicas=2  deployment/hello-nginx-v2
deployment.apps/hello-nginx-v2 scaled
$ kubectl scale --replicas=2  deployment/hello-nginx-v1
deployment.apps/hello-nginx-v1 scaled

$ kubectl get po
NAME                              READY   STATUS    RESTARTS   AGE
hello-nginx-v1-7b8665d769-khzfp   1/1     Running   0          67m
hello-nginx-v1-7b8665d769-wjfxj   1/1     Running   0          67m
hello-nginx-v2-86467b54f9-ghdnm   1/1     Running   0          84m
hello-nginx-v2-86467b54f9-nsr99   1/1     Running   0          37s


$ docker run --network=host -i loadimpact/k6 run  - < 100requests.js

(省略)

     ✗ v1 responded
      ↳  50% — ✓ 50 / ✗ 50
     ✗ v2 responded
      ↳  50% — ✓ 50 / ✗ 50

(省略)

最終的にv1のスケールを0にしてv2を必要な数までスケールさせるとデプロイ完了です。
ロールバックしたい場合は、v2のスケールを0にして、v1のスケールを元の数まで戻せば良いです。

rolloutを用いたカナリアデプロイ

今度はk8sのrollout/pauseコマンドを用いてカナリアデプロイを行ってみようと思います。
考え方としては、selectorと似ており、Podの数を調整することで新しいVersionにルーティングされる割合を少しずつ増やしていき、カナリアデプロイを実現します。
具体的には、imageのアップデートが行われた際に実行されるrolloutを途中でpauseすることでより、比較的少ない新しいVersionのPodを起動して、少しずつデプロイして行きます。
selectorの例で用いていたDeploymentとServiceを削除して次のDeploymentを使用します。

deployment-rp.yaml

$ cat deployment-rp.yaml 
apiVersion: apps/v1
kind: Deployment
metadata:
  creationTimestamp: null
  labels:
    app: hello-nginx      
  name: hello-nginx
spec:
  replicas: 4
  selector:
    matchLabels:
      app: hello-nginx
  strategy: {}
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: hello-nginx
    spec:
      containers:
      - image: hello-nginx-v1
        name: hello-nginx-v1
        resources: {}
        imagePullPolicy: IfNotPresent
status: {}

applyしてPodが起動するのを確認します。

$ kubectl apply -f deployment-rp.yaml 
deployment.apps/hello-nginx create

$ kubectl get po
NAME                           READY   STATUS    RESTARTS   AGE
hello-nginx-59849bb7d7-cgkhw   1/1     Running   0          40s
hello-nginx-59849bb7d7-lz8bf   1/1     Running   0          40s
hello-nginx-59849bb7d7-mgrrb   1/1     Running   0          40s
hello-nginx-59849bb7d7-vl7lg   1/1     Running   0          40s

次にイメージをアップデートして、すぐにpauseコマンドを実行します。

$ kubectl rollout pause deployment/hello-nginx

$ kubectl set image deployment/hello-nginx hello-nginx=hello-nginx-v2
deployment.apps/hello-nginx image updated
$ kubectl rollout pause deployment/hello-nginx
deployment.apps/hello-nginx paused

$ kubectl get po
NAME                           READY   STATUS    RESTARTS   AGE
hello-nginx-59849bb7d7-cgkhw   1/1     Running   0          2m46s
hello-nginx-59849bb7d7-lz8bf   1/1     Running   0          2m46s
hello-nginx-59849bb7d7-mgrrb   1/1     Running   0          2m46s
hello-nginx-5d8dc5f978-4nx4g   1/1     Running   0          11s
hello-nginx-5d8dc5f978-d5p8s   1/1     Running   0          11s

今回の場合新しいPodが2つ起動されていますね。
この新しいPodはイメージが新しいものが使われているのがわかります。

$ kubectl describe po hello-nginx-5d8dc5f978-4nx4g 
Name:         hello-nginx-5d8dc5f978-4nx4g
Namespace:    default
Priority:     0
Node:         minikube/192.168.49.2
Start Time:   Sun, 20 Jun 2021 15:11:33 +0900
Labels:       app=hello-nginx
              pod-template-hash=5d8dc5f978
Annotations:  <none>
Status:       Running
IP:           172.17.0.8
IPs:
  IP:           172.17.0.8
Controlled By:  ReplicaSet/hello-nginx-5d8dc5f978
Containers:
  hello-nginx:
    Container ID:   docker://0988812d8fa7b3ccd1eb91c5fe48bbfed265ec0964407b9064e9f3dbb7801bc3
    Image:          hello-nginx-v2
    Image ID:       docker://sha256:9defac4edce66768e5023868544942142630a22e35b0770a222b973083fde7da
    Port:           <none>
    Host Port:      <none>
    State:          Running
      Started:      Sun, 20 Jun 2021 15:11:34 +0900
    Ready:          True
    Restart Count:  0
    Environment:    <none>
    Mounts:
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-2zfxv (ro)
Conditions:
  Type              Status
  Initialized       True 
  Ready             True 
  ContainersReady   True 
  PodScheduled      True 
Volumes:
  default-token-2zfxv:
    Type:        Secret (a volume populated by a Secret)
    SecretName:  default-token-2zfxv
    Optional:    false
QoS Class:       BestEffort
Node-Selectors:  <none>
Tolerations:     node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
                 node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events:
  Type    Reason     Age   From               Message
  ----    ------     ----  ----               -------
  Normal  Scheduled  100s  default-scheduler  Successfully assigned default/hello-nginx-5d8dc5f978-4nx4g to minikube
  Normal  Pulled     100s  kubelet            Container image "hello-nginx-v2" already present on machine
  Normal  Created    100s  kubelet            Created container hello-nginx
  Normal  Started    100s  kubelet            Started container hello-nginx

もし、リリース中に問題があった場合は、rollbackコマンドを実行し、リビジョンを1つ前のものに戻します。

$ kubectl rollout resume deployment/hello-nginx
deployment.apps/hello-nginx resumed

$ kubectl rollout undo deployment/hello-nginx
deployment.apps/hello-nginx rolled back

Istioを使ったカナリアデプロイ

k8sを使ったカナリアデプロイでは限定的で以下のようなカナリアリデプロイを行なう際には課題があります。

  • 全体の1%のリクエストだけ新しいVersionのPodにルーティングしたい場合(Podを100個起動する必要がある)
  • 特定のクライテリアを満たすリクエストを新しいVersionにルーティングしたい

Istioを用いることで、上記のような、より複雑な条件でのデプロイを簡単に行なうことができるようになります。
一度、rolloutで使ったdeploymentを削除して、

deployment-istio.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  creationTimestamp: null
  labels:
    app: hello-nginx      
  name: hello-nginx-v1
spec:
  replicas: 3
  selector:
    matchLabels:
      app: hello-nginx
  strategy: {}
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: hello-nginx
        version: v1
    spec:
      containers:
      - image: hello-nginx-v1
        name: hello-nginx-v1
        resources: {}
        imagePullPolicy: IfNotPresent
status: {}
---
apiVersion: apps/v1
kind: Deployment
metadata:
  creationTimestamp: null
  labels:
    app: hello-nginx
  name: hello-nginx-v2
spec:
  replicas: 3
  selector:
    matchLabels:
      app: hello-nginx
  strategy: {}
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: hello-nginx
        version: v2
    spec:
      containers:
      - image: hello-nginx-v2
        name: hello-nginx-v2
        resources: {}
        imagePullPolicy: IfNotPresent
status: {}

selectorのところで用いたdeploymentと少し似ていますが、Podに対して、versionラベルを追加しているのとPodの数を3つずつ、2つのDeploymentで計6個のPodを起動しています。

$ kubectl apply -f deployment-istio.yaml

$ kubectl get po
NAME                              READY   STATUS    RESTARTS   AGE
hello-nginx-v1-7b8665d769-bzgzg   1/1     Running   0          99s
hello-nginx-v1-7b8665d769-j569c   1/1     Running   0          99s
hello-nginx-v1-7b8665d769-wbthk   1/1     Running   0          99s
hello-nginx-v2-86467b54f9-d9txr   1/1     Running   0          8s
hello-nginx-v2-86467b54f9-hdsx9   1/1     Running   0          99s
hello-nginx-v2-86467b54f9-m8gmk   1/1     Running   0          8s

次に外部からアクセスを可能にするため、Gatewayリソースを作成します。

gateway.yaml

apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
  name: hello-nginx
spec:
  selector:
    istio: ingressgateway 
  servers:
  - port:
      number: 80
      name: http
      protocol: HTTP
    hosts:
    - "*"

そして、きもとなるVertualServiceとDestinationRouleを作成します。

destination-rule.yaml

apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
  name: hello-nginx
spec:
  host: hello-nginx
  subsets:
  - name: v1
    labels:
      version: v1
  - name: v2
    labels:
      version: v2

DestinationRouleでPodに付与したversionのラベルを指定することで、ルーティング先をsubsetとして定義しています。

virtual-service.yaml

apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: hello-nginx
spec:
  hosts:
  - "*"
  gateways:
  - hello-nginx
  http:
  - route:
    - destination:
        host: hello-nginx
        subset: v1
      weight: 90
    - destination:
        host: hello-nginx
        subset: v2
      weight: 10

ここでは、Gatewayに対するどんなHostのリクエストに対してもsubsetのv1v2に対してルーティングを行なうように設定しています。
また、weightを設定し、リクエストの10%がv2へルーティングされるようにしています。

これで準備は完了です、k6からGateway経由でリクエストを送るようにするためにGatewayのport番号を取得してk6スクリプトを修正します。

$ kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}'
31424

取得したPortでリクエストを送るようにk6スクリプトを修正したら、テストを実行します。

$ docker run --network=host -i loadimpact/k6 run  - < 100requests.js

(省略)

     ✗ v1 responded
      ↳  90% — ✓ 90 / ✗ 10
     ✗ v2 responded
      ↳  10% — ✓ 10 / ✗ 90

(省略)

先ほどと違い、Podの数とは関係なくルーティングが行われていることが確認できました。
より小さなリクエストの数で、新しいVersionの動作確認が取れたらweightを修正して新しいVersionにのみリクエストが送られていくように変更していきます。
今回は、Podの数は固定で行いましたが、Podの数で割合を調整する必要がなくなったので、Deploymentのオートスケールの機能と合わせて利用することも可能です。

http4sのHTTP Serverを使ってみる

はじめに

FS2に依存するライブラリはいくつかありますが、http4sというHTTPサーバとクライアントを用意してくれているライブラリがあり、ちょっと興味が湧いたので触ってみようかと思います。
http4sはFS2(と、もちろんCats Effects)をベースにしているので、FunctinnalでStreamingでFunctionalというのが特徴みたいです。

やってみる

環境

$ scala --version
Scala code runner version 2.13.6 -- Copyright 2002-2021, LAMP/EPFL and Lightbend, Inc.

$ sbt --version
[info] 1.2.7
sbt script version: 1.5.2

プロジェクトの作成

まずはベースとなるプロジェクトを作成します。
 

$ sbt new sbt/scala-seed.g8
[info] welcome to sbt 1.5.2 (Oracle Corporation Java 1.8.0_292)
[info] loading global plugins from /home/yuya-hirooka/.sbt/1.0/plugins
[info] set current project to new (in build file:/tmp/sbt_adb8af4/new/)

A minimal Scala project. 

name [Scala Seed Project]: http4s-example

作成されたプロジェクトのbuild.sbtに以下の依存を追加します。

libraryDependencies += "org.http4s" %% "http4s-dsl" % "0.21.23",
libraryDependencies += "org.http4s" %% "http4s-blaze-server" % "0.21.23",
libraryDependencies += "org.http4s" %% "http4s-blaze-client" % "0.21.23",

今回はhttp4sの現在のstableバージョンである0.21.x系を利用しようと思います。

名前を受け取って挨拶を返すサーバを作成する

最初にhttp4s-blaze-serverを使って、サーバーの方を作成します。
最終的には名前をJsonで受け取って挨拶を返すサーバを作成しようと思います。

helloの文字列を返す

まずは、単純なGETリクエストに対して"hello"という文字列を返すだけのサーバを作ります。
そのためにはRouterBlazeServerBuilderを使います。
Routerを使ってルートの定義を作成します。

import cats.effect._
import org.http4s._
import org.http4s.dsl.io._
import org.http4s.implicits.http4sKleisliResponseSyntaxOptionT
import org.http4s.server.Router

object SampleRouters {

  def helloHandler() = IO("hello")

  val helloRouter = HttpRoutes.of[IO] {
    case GET -> Root / "hello" => helloHandler().flatMap(Ok(_))
  }

  def createApp = Router("/" -> helloRouter).orNotFound

}

HttpRoutes http4s-dsl,を使ってルータを定義します。
みてすぐわかるかも知れませんが、パターンマッチングでGETと/helloのパスにマッチした場合helloHandr()を呼び出し、その戻り値であるIO("")をflatMapして取り出しOk()レスポンスで返すRouterを定義しています。
1番最後の行ではRouter objectを作成策定していますが、ここでは定義したルータのルートに当たるパスを指定することができます。

Routeの作成はできたので、次にアプリケーションのエントリーポイントとなるobjectを作ります。

import cats.effect.{ExitCode, IO, IOApp}
import org.http4s.server.blaze.BlazeServerBuilder

import scala.concurrent.ExecutionContext.Implicits.global


object AppStarter extends IOApp {

  override def run(args: List[String]): IO[ExitCode] =
    BlazeServerBuilder[IO](global)
      .bindHttp(8081, "localhost")
      .withHttpApp(SampleRouters.createApp)
      .serve
      .compile
      .drain
      .as(ExitCode.Success)

}

http4sは様々なバックエンドをサポートしているようですが(サポートはここを確認してください。
BlazeServerBuilderを使って、 もし、IOApp以外の場所でBuilderを使いたい場合は以下のimplicitを定義する必要があります。

implicit val cs: ContextShift[IO] = IO.contextShift(global)
implicit val timer: Timer[IO] = IO.timer(global)

アプリケーションを実行するし、cURLでアプリケーションにリクエストを送ってみます。

$ curl localhost:8081/greeting -v
*   Trying 127.0.0.1:8081...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8081 (#0)
> GET /greeting HTTP/1.1
> Host: localhost:8081
> User-Agent: curl/7.68.0
> Accept: */*
> 
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< Content-Type: text/plain; charset=UTF-8
< Date: Fri, 11 Jun 2021 14:20:49 GMT
< Content-Length: 5
< 
* Connection #0 to host localhost left intact
hello

うまく起動できてるみたいですね。

パスパラーメータで値を受け取る

それでは次に、パスパラメータ名前を受け取ってその名前に対して挨拶を返す用にしてみます。
新しく、Routerを作成します。

object SampleRouters {

  def helloHandler() = IO("hello")

  val helloRouter = HttpRoutes.of[IO] {
    case GET -> Root / "hello" => helloHandler().flatMap(Ok(_))
  }

  def greetingSomeone(name: String) = IO(s"Hello, $name")

  val greetingRouter = HttpRoutes.of[IO] {
    case GET -> Root / "greeting" / name => helloHandler().flatMap(Ok(_))
  }

  def createApp = Router("/" -> helloRouter, "/v1" -> greetingRouter).orNotFound

}

ここではgreetingRouterを新たに定義してます。
DSLのなかでパスの位置い変数を置くことで、パスパラメータを束縛することができます。
また、上記のようにRouter object作成時には複数のRouterを登録することができます。

cURLでリクエストを送ってみます。

$ curl localhost:8081/v1/greeting/Moheji -v
*   Trying 127.0.0.1:8081...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8081 (#0)
> GET /v1/greeting/Moheji HTTP/1.1
> Host: localhost:8081
> User-Agent: curl/7.68.0
> Accept: */*
> 
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< Content-Type: text/plain; charset=UTF-8
< Date: Fri, 11 Jun 2021 14:46:01 GMT
< Content-Length: 13
< 
* Connection #0 to host localhost left intact
Hello, Moheji

Jsonで値を受け取る。Jsonの値を返す

それでは最後にJsonの値を扱うようなPathを追加してみようと思います。
まずですが、http4sでJsonを扱うためには以下の依存を追加する必要があります。

libraryDependencies += "org.http4s" %% "http4s-circe" % "0.21.23"
libraryDependencies += "io.circe" %% "circe-generic" % "0.13.0"

circe-genericの依存はオプショナルっぽいですが、JsonScalaのクラスとの変換を知れてくれるみたいなので追加しておきます。

それでは追加したhttp4s-circecirce-genericを使ってgreetingRouterに新たなRouteを書き加えてみます。

import io.circe.generic.auto._
import io.circe.syntax.EncoderOps
import org.http4s._
import org.http4s.circe.CirceEntityCodec._
//他のImportは省略


object SampleRouters {
  // 他の実証は諸略  

  case class Name(name: String)

  case class Greeting(hello: String)

  val greetingRouter = HttpRoutes.of[IO] {
    case req@POST -> Root / "greeting" => for {
      n <- req.as[Name]
      resp <- Created(Greeting(n.name).asJson)
    } yield resp
    case GET -> Root / "greeting" / name => greetingSomeone(name).flatMap(Ok(_))
  }

  def createApp = Router("/" -> helloRouter, "/v1" -> greetingRouter).orNotFound

}

POSTリクエストで名前を受け取るためにDSLを使います。
req@のように記述刷ることでreqにリクエストの情報をバインドできるみたいです。
リクエストを受け取るためのcase

// リクエスト用のcase class
case class Name(name: String)

// レスポンス用のcase class
case class Greeting(hello: String)

本来はこれらに対してエンコーダーデコーダーを作ってimplicitとして定義刷る必要があるようですが。 以下の2つをインポートしていればそれぞれが自動でimplicitされるみたいです。

import io.circe.generic.auto._
import org.http4s.circe.CirceEntityCodec._

それではリクエストを送ってみましょう。

$ curl localhost:8081/v1/greeting -d '{"name":"Moheji"}' -v
*   Trying 127.0.0.1:8081...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8081 (#0)
> POST /v1/greeting HTTP/1.1
> Host: localhost:8081
> User-Agent: curl/7.68.0
> Accept: */*
> Content-Length: 17
> Content-Type: application/x-www-form-urlencoded
> 
* upload completely sent off: 17 out of 17 bytes
* Mark bundle as not supporting multiuse
< HTTP/1.1 201 Created
< Content-Type: application/json
< Date: Fri, 11 Jun 2021 15:51:56 GMT
< Content-Length: 18
< 
* Connection #0 to host localhost left intact
{"hello":"Moheji"}

これでちょっと触った程度ですが一応、サーバができましたね。

fs2を試す。

はじめに

ScalaでReactorみたいなリアクティブなプログラミングをするにはどうすれば良い?みたいなのをScalaに詳しい同僚に聞いてみたところ。
FS2というライブラリを教えて頂いたので試してみようかと思います。
ただ、この時期ではリアクティブのところまでは行かずにまずは基本的な使い方を確認します。

FS2とは

純粋な関数型の多形性を持つストリームプロセスをサポートするライブラリです。
I/O(netwrking, flies)のストリーム処理をリソースセーフに実行することを可能とします。
Scala 2.12、 2.13、3で利用できるみたいです。 また、FS2はCatsCats Effectを利用しているライブラリで、逆にhttp4sskunkdoobieに用いられるようです。
ちなみに、名前はFunctional Streams for ScalaでFS2もしくはFSSらしいです。

動かしてみる

環境

ソースコードの実行環境は以下の通り。

$ uname -srvmpio
Linux 5.4.0-72-generic #80-Ubuntu SMP Mon Apr 12 17:35:00 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

$ lsb_release -a
LSB Version:    core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.2 LTS
Release:    20.04
Codename:   focal

$ sbt --version
[info] 1.2.7
sbt script version: 1.5.2


$ scala --version
Scala code runner version 3.0.0 -- Copyright 2002-2021, LAMP/EPFL

プロジェクトの作成

sbtを使ってプロジェクトを作成します。
以下のコマンドを実行します。

$ sbt new sbt/scala-seed.g8
[info] welcome to sbt 1.5.2 (Oracle Corporation Java 1.8.0_292)
[info] set current project to new (in build file:/tmp/sbt_c0223d2e/new/)

A minimal Scala project. 

name [Scala Seed Project]: fs2-example

出来上がったbuild.sbtに依存をつかします。

lazy val root = (project in file("."))
  .settings(
    name := "fs2",
    libraryDependencies += scalaTest % Test,
    // 以下を追加
    libraryDependencies += "co.fs2" %% "fs2-core" % "3.0.0",
    libraryDependencies += "co.fs2" %% "fs2-io" % "3.0.0",
    libraryDependencies += "co.fs2" %% "fs2-reactive-streams" % "3.0.0"
  )

Streamの作成と副作用を持つStreamの実行

FS2ではStream[F, 0]というクラスが基本のデータクラスっぽいです。
ここでF エフェクト型と呼ばで、Oはアウトプット型と呼ばれるようです。
Streamは以下のようにして作成します。

import cats.effect.{IO, IOApp}
import fs2.{INothing, Pure, Stream}

object Fs2Example {
  
  val streamEmpty: Stream[Pure, INothing] = Stream.empty
  val streamOne: Stream[Pure, Int] = Stream.emit(1)
  val streamThree: Stream[Pure, Int] = Stream(1, 2, 3)
  val streamList: Stream[Pure, List[Int]] = Stream(List(1, 2, 3))
}

例えば、上記のstreamOneStream[Pure, Int]を持っており、アウトプット型がIntでエフェクト型がPureとなります。
Pureは実行時に副作用が存在しないことを示すようです。

StreamはtoListtoVectorをそれぞれ呼び出すことで、ListVectorに変換できるようです。
またStreamはlist-likeな以下のようなメソッドを持っているようです。

val filteredStream: Stream[Pure, Int] = Stream(1, 2, 3, 4, 5, 6).filter(_ > 0)
val foldSum: List[Int] = Stream(1, 2, 3).fold(0)(_ + _).toList
val repeat: Stream[Pure, Int] = Stream(1, 2, 3).repeat.take(9)
val collect: Stream[Pure, Int] = Stream(None, Some(0), Some(1)).collect { case Some(i)=> 1}

副作用を含んたStreamの作成も行なうことができます。

val eff: Stream[IO, Int] = Stream.eval(IO {
  println("Hello! FS2"); 1 + 1
})

IOはエフェクト型で、この型を作ること自体は副作用を起こしません。
また、Stream.evalも作成されたタイミングでは何もしません。
この副採用を含んだストリームを実行するためにはまずcompileメソッドを呼び出してIOを取り出します。
compileは更に以下のようなメソッドを持ちます。

val vector: IO[Vector[Int]] = eff.compile.toVector
private val drain: IO[Unit] = eff.compile.drain
private val value: IO[Int] = eff.compile.fold(0)(_ + _)

toVectorはすべてのアウトプットを1つのベクターに入れ込み、drainはすべての実行結果を捨て、 foldは結果を畳み込みます。
compileを実行するだけではまだ、実行はおこなわれません。
これを実行するには以下のように

import cats.effect.unsafe.implicits.global

object Main extends App {

  val foldResult: IO[Int] = eff.compile.fold(0)(_ + _)
  vector.unsafeRunSync
}

import cats.effect.unsafe.implicits.globalに関してはまだ理解が甘いですが、今回利用するIORumtimeクラスの指定しているようです。
実行結果は以下の通り。

Hello! FS2

Streamの基本操作

Streamには便利なオペレーターが用意されています。
代表的なものには++mapflatMapbracketのようなものがあります。

object MainOperator extends App {

  val result = (Stream(Some(1), Some(2)) ++ Stream(Some(3), Some(4), None))
    .flatMap(i => Stream(i, i))
    .map {
      case Some(i) => i
      case _ => -1
    }.map(_ + 1)

  result.toList.foreach(println)
}

これの実行結果は以下のようになります。

2
2
3
3
4
4
5
5
0
0

エラーハンドリング

Streamでエラーを投げたい場合はStream.raseErrorを使います。
以下のような使い方ができるようです。

//
val err = Stream.raiseError[IO](new Exception("Oops! 1"))

val err2 = Stream(1, 2, 3) ++ (throw new Exception("Oops! 2"))

val err3 = Stream.eval(IO(throw new Exception("Oops! 3")))

このハンドリングは以下のように行えます。

  try err.compile.toList.unsafeRunSync() catch {
    case e: Exception => println(e)
  }

  try err2.toList catch {
    case e: Exception => println(e)
  }

  try err3.compile.drain.unsafeRunSync() catch {
    case e: Exception => println(e)
  }
}

実行結果は以下の通り

java.lang.Exception: Oops! 1
java.lang.Exception: Oops! 2
java.lang.Exception: Oops! 3

ScalaTestとScalaMockでテストを行なう

はじめに

今後Scalaを触ることになりそうになのでちょっと勉強しておこうかと思いまして、まずはテストのやり方を確認しようかと思いScalaTestScalaMockを使ってユニットテストを書いてみようと思います。

書いてみる

環境

今回の環境は以下の通り

$ scala --version
Scala code runner version 3.0.0 -- Copyright 2002-2021, LAMP/EPFL

$ sbt --version
sbt version in this project: 1.5.2
sbt script version: 1.5.2

セットアップ

まず、sbtのプロジェクトはIntelliJプラグインで作成しました。
ScalaTestのセットアップはこちらを参考に行います。

まずは、build.sbtに以下の依存を追加します。

libraryDependencies += "org.scalactic" %% "scalactic" % "3.2.9"
libraryDependencies += "org.scalamock" %% "scalamock" % "5.1.0" % Test
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.9" % "test"

sbtで依存を定義するための手っ取り早い方法はlibraryDependenciesに次のような構文で記述するようです。
libraryDependencies += groupID % artifactID % revision % configuration

ここでは3つ依存を定義しています。scalatestscalamockに関しては良いとしてscalaitcですが、ScalaTestの姉妹ライブラリーで==オペレーターなどのテストやプロダクションコードで使えるような諸々を提供してくれるみたいです。今回使うかはわかりませんが、ドキュメントで依存に追加することが推奨されていたので追加しておこうと思います。

ScalaTestでユニットテストを記述する

ScalaTestのスタイルについて

ScalaTestでは以下のようなテストスタイルをサポートしているようです。

  • FunSuiteスタイル
  • FlatSpecスタイル
  • FunSpecスタイル
  • FreeSpecスタイル

それぞれのテストの書き方はこちらを参考にしてください。 最初のステップとしてはFlatSpecスタイルがおすすめされているようなので、このブログではFlatSpecを用いてテストを記述しようと思います。

単純なテストを書いてみる

まずは以下のような足し算をするだけの簡単なクラスを用意します。

class Calc {
  def plus(a: Int, b: Int): Int = a + b
}

このクラスのメソッドplusをテストするコードは以下のようになります。

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should

class CalcTest extends AnyFlatSpec with should.Matchers {

  it should "足し算の結果を返す" in {
    val c = new Calc
    c.plus(1, 2) should be (3)
  }
}

AnyFlatSpecX should YA must Bのような形式でテストを記述するための構文を提供してくれます。   shouldmustcanと行ったような助動詞の記述が可能なようです。
また、should.Matchersをミクスインしていますが、これは result should be (expected)のようにアサーションを記述するための構文を提供してくれます。
should.Matchersは以下のような構文を提供します。

  • result should equal (expected)
    • 比較がカスタマイズ可能
  • result should === (expected)
    • 比較がカスタマイズ可能
    • 型の制約を強制する
  • result should be (expected)
    • 比較がカスタマイズができない代わりにコンパイルが早い
  • result shouldEqual expected
    • ()を必要としない
    • カスタマイズ可能な比較
  • result shouldBe 3
    • ()を必要としない
    • 比較がカスタマイズができない代わりにコンパイルが早い

should.Matchersの他にもAnyFlatSpecの上位のクラスでミクスインされているAssertionsトレイトではアサーションを行なうためのいくつかのマクロを定義されています。
このマクロには例えば以下のようなものがあります。

  • fail
    • テストを失敗させる
  • succeed
    • テストを成功させる

他にもさまざまなマクロがあります。詳細はこちらを確認ください。

それでは、記述したテストを実行してみます。
様々な実行方法がありますが、今回はsbtを使って実行したいと思います。
単純にすべてのテストを実行するためにはプロジェクトルートで以下のコマンドを実行します。

$ sbt

> test
[info] CalcSpec:
[info] - should 足し算の結果を返す
[info] CalcFunSuiteTest:
[info] - 足し算の結果を返す
[info] Run completed in 120 milliseconds.
[info] Total number of tests run: 2
[info] Suites: completed 2, aborted 0
[info] Tests: succeeded 2, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 0 s, completed 2021/05/29 23:15:25

CalcFunSuiteTestはブログには記載してませんが、同じテストを別のスタイルで書いているだけです。ここではそんなに気にしなくても大丈夫です。
すべてのテストでは無く一部のテストを実行したい場合は以下のように実行します。

> testOnly CalcSpec
[info] CalcSpec:
[info] - should 足し算の結果を返す
[info] Run completed in 324 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 1 s, completed 2021/05/29 23:14:45

先程と違いCalcSpecだけが実行されたのがわかります。

例外をテストする

例外のテストを行なう場合もshould.Matchersで定義されているshould be thrownByを用います。
例えば先程のplusメソッドは正の数だけを受け取ることを想定しているメソッドで負の数を受け取った場合はIllegalArgumentExceptionを投げることを想定しているとします。
CalcSpecに以下のテストを追加します。

  it should "負の数が引数に渡された場合にIllegalArgumentException" in {
    val c = new Calc
    a[IllegalArgumentException] should be thrownBy {
      c.plus(1, -2)
    }
  }

このテストを実行すると以下の結果になります。

> testOnly CalcSpec
[info] CalcSpec:
[info] - should 足し算の結果を返す
[info] - should 負の数が引数に渡された場合 *** FAILED ***
[info]   Expected exception java.lang.IllegalArgumentException to be thrown, but no exception was thrown (CalcSpec.scala:13)
[info] Run completed in 118 milliseconds.
[info] Total number of tests run: 2
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 1, canceled 0, ignored 0, pending 0
[info] *** 1 TEST FAILED ***
[error] Failed tests:
[error]         CalcSpec
[error] (Test / testOnly) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 0 s, completed 2021/05/29 23:22:03

まだ実装をしてないので失敗しますね。
それでは実装を行ってもう一度テストを実行します。
実装を以下のように修正します。

class Calc {
  def plus(a: Int, b: Int): Int = {
    require(a > 0 && b > 0)
    a + b
  }
}

Scalaの引数チェックはrequireメソッドを用いて行なうことができるようです。
こいつは条件に合致しない場合、IllegalArgumentExceptionを投げます。 テストを実行します。  

> testOnly CalcSpec
[info] CalcSpec:
[info] - should 足し算の結果を返す
[info] - should 負の数が引数に渡された場合にIllegalArgumentException
[info] Run completed in 123 milliseconds.
[info] Total number of tests run: 2
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 2, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 0 s, completed 2021/05/29 23:29:27

実行が成功しましたね。

ScalaMockでモックする

次はScalaMockを使ってモックを行ってみようと思います。
例えば以下のようなトレイトとクラスがあったとします。

trait Language {
  def greeting(): String = "Hello!!"
}

class Person(val lang: Language) {
  def saySomeThing(): String = lang.greeting()
}

class Japanese extends Language {
  override def greeting(): String = "こんにちは"
}

このPersonクラスのsaySomeThingメソッドをテストし、Languageトレイトをモックするとします。
テストは以下のように記述します。

class PersonSpec extends AnyFlatSpec with should.Matchers with MockFactory {

  it should "Languageをモックする" in {
    val mockLang = mock[Language]
    (mockLang.greeting _).expects().returning("Hello, ScalaMock!!").once()

    val target = new Person(mockLang)
    target.saySomeThing() should be ("Hello, ScalaMock!!")
  }
}

MockFactoryをミクスインすることで、ScalaMockの構文を利用することができます。
まずは、val mockLang = mock[Language]のところでMockオブジェクトを作成します。
次に、(mockLang.greeting _).expects().returning("Hello, ScalaMock!!").once()のところで、モックの設定をしてます。
書いてあるとおりですがreturningでモックのが返す値を モックが引数を期待する場合はexpects()の引数に渡すようです。
例えばなんでも良いがモックが1つの引数を期待する場合はexpects(*)とかけば良いようです。
そして最後に、once()ですがこのモックが1回実行されることを確認しています。

このテストを実行すると以下のような結果になります。

> testOnly PersonSpec
[info] PersonSpec:
[info] - should Languageをモックする
[info] Run completed in 132 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 0 s, completed 2021/05/30 1:08:24

非同期でテストする

ScalaTestとScalaMockを非同期にするのは簡単でAsyncFlatSpecAsyncMockFactoryを利用するだけです。
先程のPersonTestのテストを非同期で行なう用に書き換えます。

import org.scalamock.scalatest.AsyncMockFactory
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should

class PersonSpec extends AsyncFlatSpec with should.Matchers with AsyncMockFactory {

  it should "Languageをモックする1" in {
    val mockLang = mock[Language]
    (mockLang.greeting _).expects().returning("Hello, ScalaMock!").once()

    val target = new Person(mockLang)
    target.saySomeThing() should be("Hello, ScalaMock!")
  }


  it should "Languageをモックする2" in {
    val mockLang = mock[Language]
    (mockLang.greeting _).expects().returning("Hello, ScalaMock!!").once()

    val target = new Person(mockLang)
    target.saySomeThing() should be("Hello, ScalaMock!!")
  }
}

AnyFlatSpecMockFactoryを単純に置き換えただけです。
AnyFlatSpecに対するAsyncFlatSpecの用にスタイルごとに非同期用のものが用意されているようです。

実行すると以下のような結果になります。

> testOnly PersonSpec
[info] PersonSpec:
[info] - should Languageをモックする1
[info] - should Languageをモックする2
[info] Run completed in 151 milliseconds.
[info] Total number of tests run: 2
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 2, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 0 s, completed 2021/05/30 1:10:53

Spring Boot 2.5についてメモ

はじめに

今週(2020/5/20)にSpring Boot 2.5が出ましたが、どんなのが出たのかまとめて、なんとなく違いを把握おこうと思います。 基本的にはRelease Notesの内容を個人的に気になったところを少し深ぼって、自分の理解をまとめようと思うので、正確な情報は本家のブログやそれに付随する記事等をご参照ください(なるべく、情報ソースのリンクはつけます)。 ちなみに、ドキュメントに書かれた変更点を自分の理解でまとめたものなので、基本的に機能に対しては動作検証などは行っておりません。

2.4からの変更点

このブログでは以下のような変更点についてまとめます。

  • DataSourceの初期化スクリプトに対するサポートの変更
  • Actuator /infoエンドポイントがよりセキュアに
  • 環境変数のプリフィクス
  • HTTP/2 over TCP (h2c)
  • R2DBCを用いたデータ初期化
  • Docker Imageバインディングサポート
  • ActuatorのPrometheusエンドポイントでOpenMetrics形式でのリソース公開

DataSourceの初期化スクリプトに対するサポートの変更

全体を眺めていると1番メインの変更っぽいです。
schema.sqldata.sqlに関するサポートがリデザインされています。
まず1つ目としてspring.datasource.*は非推奨になりspring.sql.init.*プロパティが新しく用意されました。
SqlInitializationProperties.javaでいろいろと定義されているみたいですね。
主なものをいかにまとめようと思います。spring.sql.init.のプリフィクスの部分は省略します。

プロパティ名 説明
schemaLocations スキーマDDLスクリプトのロケーション
dataLocations データ(DMLスクリプトのロケーション
username 初期化スクリプトを事項するユーザ名。もし別途設定が必要な場合
password 初期化スクリプトを事項するパスワード。もし別途設定が必要な場合
continueOnError エラーが発生してもスクリプト継続するか否か(デフォルトfalse)

spring.datasource.*から初期化スクリプトに関わるプロパティが切り出された感じっぽいですね。
また、usernamepasswordに関しては専用のものが用意されたみたいです。

Actuator /infoエンドポイントがよりセキュアに

デフォルトではActuatorの/infoエンドポイントがデフォルトでは公開されないようになりました。
しかし、Spring Securityがクラスパスにある場合は認証が求められるようになります。

2.5からの新しい機能

環境変数のプリフィクス

この機能で、同じ環境で複数のSpring Bootアプリケーションを実行、設定を行なうことができます。
SpringApplication.setEnvironmentPrefix(…​)を利用することで、この機能を利用できます。
例えばmyappというプリフィクスを付けてアプリケーションを起動する場合は以下のようにすれば可能になります。

SpringApplication application = new SpringApplication(MyApp.class);
application.setEnvironmentPrefix("myapp");
application.run(args);

この設定を行なうことで、すべてのプロパティを環境変数で変更するためにプリフィクスをつける必要があります。
例えば、ポートを変更したい場合はMYAPP_SERVER_PORTを設定する必要があります。
ちょっと動かしてみましたが、上記の設定が行われている場合SERVER_PORTではポートの変更を行なうことはできませんでした。

HTTP/2 over TCP (h2c)

マニュアル設定は無しで、HTTP/2 over TCP (h2c)、TSLではないHTTPでのHTTP/2の利用が可能となりました。
この機能を利用するためにはserver.http2.enabled=trueを設定して、かつserver.ssl.enabled=falseに設定する必要があります。
実際の実行結果は以下のようになります。

$ curl http://localhost:8080/hello -v --http2
*   Trying 127.0.0.1:8080...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /hello HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.68.0
> Accept: */*
> Connection: Upgrade, HTTP2-Settings
> Upgrade: h2c
> HTTP2-Settings: AAMAAABkAARAAAAAAAIAAAAA
> 
* Mark bundle as not supporting multiuse
< HTTP/1.1 101 
< Connection: Upgrade
< Upgrade: h2c
< Date: Sun, 23 May 2021 08:59:28 GMT
* Received 101
* Using HTTP2, server supports multi-use
* Connection state changed (HTTP/2 confirmed)
* Copying HTTP/2 data in stream buffer to connection buffer after upgrade: len=0
* Connection state changed (MAX_CONCURRENT_STREAMS == 100)!
< HTTP/2 200 
< content-type: text/plain;charset=UTF-8
< content-length: 10
< date: Sun, 23 May 2021 08:59:28 GMT
< 
* Connection #0 to host localhost left intact
Hello, 2.5

R2DBCを用いたデータ初期化

R2DBCを通して、スクリプトベースの初期化が可能になりました。
クラスパスにschema.sqldata.sqlというスクリプトファイルがある場合は自動的に実行されるようになります。
また、JDBC同様にspring.sql.init.*プロパティを通して設定が可能です。

Docker Imageバインディングサポート

Custom Buildpacksのサポート

Custom BuildpacksがMavenでもGradleでもサポートされるようになりました。
buildpacksプロパティにディレクトリ、tar.gz、ビルダー、Dockerイメージを指定することができます。

バインディング

Mavanでも、GradleでもVolumeバインディングがサポートされました。
この機能のおかげでBuildpacksnがローカルのパスやDocker Volumeをバインドすることができます。

ActuatorのPrometheusエンドポイントでOpenMetrics形式でのリソース公開

Actuatorの/actuator/prometheusエンドポイントで通常のPrometheusの形式とOpenMetricsの形式でレスポンスを返すことが可能になってます。
application/openmetrics-text;version=1.0.0のようなヘッダーをつけることで、OpenMetricsのレスポンスを受け取ることが可能です。

Dependency Upgrades

ここに書かれるように複数の依存がアップグレードされています。

Ktorのログ出力をJsonに変える

はじめに

まえにQuarkusで同じようなことをやったのですが、Ktorだとどうなるんだろうとふと思ってやってみようと思います。
先にお伝えしておくと、タイトル詐欺ではないですが、ほぼLogbackの設定の話になりKtor特有のものはなさそうでした。
主題とは関係ないですが、KtorにはCallLoggingと呼ばれるようなリクエストのロギングを行ってれるモジュール?は用意されているみたいなのでそちらもおまけとして使ってみようと思います。

やってみる

実行環境

アプリの実行環境は以下のようになっています。

$ uname -srvmpio
Linux 5.4.0-72-generic #80-Ubuntu SMP Mon Apr 12 17:35:00 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

$ lsb_release -a
LSB Version:    core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.2 LTS
Release:    20.04
Codename:   focal

$ java -version
openjdk version "1.8.0_292"
OpenJDK Runtime Environment (build 1.8.0_292-b10)
OpenJDK 64-Bit Server VM (build 25.292-b10, mixed mode)

$ mvn --version
Apache Maven 3.6.3
Maven home: /usr/share/maven
Java version: 1.8.0_292, vendor: Oracle Corporation, runtime: /home/yuya-hirooka/.sdkman/candidates/java/8.0.292-open/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "5.4.0-73-generic", arch: "amd64", family: "unix"

プロジェクトを作成する

Ktor Project Generatorを使ってプロジェクトを作成します。
設定は以下のとおりにします。

f:id:yuya_hirooka:20210521115755p:plain

プロジェクトは個人の趣味でMavenで作成してます。
プロジェクトをダウンロードして解答し、適当なIDEなり何なりで開きます。

起動してみる

デフォルトでは以下のようなApplication.ktというクラスがされています。

fun main(args: Array<String>): Unit = io.ktor.server.netty.EngineMain.main(args)

@Suppress("unused") // Referenced in application.conf
@kotlin.jvm.JvmOverloads
fun Application.module(testing: Boolean = false) {
    routing {
        get("/") {
            call.respondText("HELLO WORLD!", contentType = ContentType.Text.Plain)
        }
    }
}

localhost:8080/HELLO WORLD!の文字列を返すハンドラーが用意されているみたいです。
起動してみます。

$ mvn compile exec:java -Dexec.mainClass=dev.hirooka.ApplicationKt

現段階では起動時に以下のようなログが出力されます。

2021-05-21 20:22:32.657 [dev.hirooka.ApplicationKt.main()] TRACE Application - {
    # application.conf @ file:/home/yuya-hirooka/source/kotlin/ktor-sandbox/logging/target/classes/application.conf: 6
    "application" : {
        # application.conf @ file:/home/yuya-hirooka/source/kotlin/ktor-sandbox/logging/target/classes/application.conf: 7
        "modules" : [
            # application.conf @ file:/home/yuya-hirooka/source/kotlin/ktor-sandbox/logging/target/classes/application.conf: 7
            "dev.hirooka.ApplicationKt.module"
        ]
    },
    # application.conf @ file:/home/yuya-hirooka/source/kotlin/ktor-sandbox/logging/target/classes/application.conf: 2
    "deployment" : {
        # application.conf @ file:/home/yuya-hirooka/source/kotlin/ktor-sandbox/logging/target/classes/application.conf: 3
        "port" : 8080
    },
    # Content hidden
    "security" : "***"
}

2021-05-21 20:22:32.690 [dev.hirooka.ApplicationKt.main()] INFO  Application - Autoreload is disabled because the development mode is off.
2021-05-21 20:22:32.901 [dev.hirooka.ApplicationKt.main()] INFO  Application - Responding at http://0.0.0.0:8080

curlでエンドポイントにアクセスしてみます。

$ curl localhost:8080
HELLO WORLD!

アクセスログを出すようにしてみる

前述の通り、Ktorでは流入するリクエストの情報を出力してくれるCallLogging言うのものを用意してくれています。
使い方は簡単でただinstallするだけでOkです。 Application.ktを以下のように書き換えます。

@Suppress("unused") // Referenced in application.conf
@kotlin.jvm.JvmOverloads
fun Application.module(testing: Boolean = false) {
    routing {
        get("/") {
            call.respondText("HELLO WORLD!", contentType = ContentType.Text.Plain)
        }
    }
    // 追加した部分
    install(CallLogging)
}

アプリケーションを再起動して先ほどと同様のリクエストを送ると以下のようなログ出力が行われるようになります。

2021-05-21 20:46:37.840 [eventLoopGroupProxy-4-1] TRACE Application - 200 OK: GET - /

ログをJsonで出力するように変更

KtorはSLF4Jを利用したロギングが行えるようで、Project Generatorでアプリを作成した場合その実装はlogback-classicになるようです。
なので、Ktorのログ出力を変えたい場合をLogbackの出力をJsonに変える方法と同様な方法で行えます。
今回は、Logstashを使ってみたいと思います。
まずはpomの依存に以下を追加します。

<dependency>
    <groupId>net.logstash.logback</groupId>
    <artifactId>logstash-logback-encoder</artifactId>
    <version>6.6</version>
</dependency>

バージョンは最新を確認して入れるようにしてください。
次にLogbackの設定を変更してエンコーダーをLogstashのものを使うようにします。
resourceディレクトリ配下にあるlogback.xmlを以下のように修正します。

<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!--        <encoder>-->
<!--            <pattern>%d{YYYY-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>-->
<!--        </encoder>-->
        <encoder class="net.logstash.logback.encoder.LogstashEncoder" />
    </appender>
    <root level="trace">
        <appender-ref ref="STDOUT"/>
    </root>
    <logger name="org.eclipse.jetty" level="INFO"/>
    <logger name="io.netty" level="INFO"/>
</configuration>

もともとあったencoderタグの部分をコメントアウトして、net.logstash.logback.encoder.LogstashEncoderエンコーダーとして使用するようにしてます。
これで設定は完了です。 プロジェクトを再起動してみると今度はLogがJson形式で出力されるようになったのがわかります。

{"@timestamp":"2021-05-21T20:40:48.994+09:00","@version":"1","message":"{\n    # application.conf @ file:/home/yuya-hirooka/source/kotlin/ktor-sandbox/logging/target/classes/application.conf: 6\n    \"application\" : {\n        # application.conf @ file:/home/yuya-hirooka/source/kotlin/ktor-sandbox/logging/target/classes/application.conf: 7\n        \"modules\" : [\n            # application.conf @ file:/home/yuya-hirooka/source/kotlin/ktor-sandbox/logging/target/classes/application.conf: 7\n            \"dev.hirooka.ApplicationKt.module\"\n        ]\n    },\n    # application.conf @ file:/home/yuya-hirooka/source/kotlin/ktor-sandbox/logging/target/classes/application.conf: 2\n    \"deployment\" : {\n        # application.conf @ file:/home/yuya-hirooka/source/kotlin/ktor-sandbox/logging/target/classes/application.conf: 3\n        \"port\" : 8080\n    },\n    # Content hidden\n    \"security\" : \"***\"\n}\n","logger_name":"Application","thread_name":"dev.hirooka.ApplicationKt.main()","level":"TRACE","level_value":5000}
{"@timestamp":"2021-05-21T20:40:49.038+09:00","@version":"1","message":"Autoreload is disabled because the development mode is off.","logger_name":"Application","thread_name":"dev.hirooka.ApplicationKt.main()","level":"INFO","level_value":20000}
{"@timestamp":"2021-05-21T20:40:49.339+09:00","@version":"1","message":"Responding at http://0.0.0.0:8080","logger_name":"Application","thread_name":"dev.hirooka.ApplicationKt.main()","level":"INFO","level_value":20000}

スタックトレースを出力する

上記の設定ではスタックトレースの出力がされず別途設定が必要のようです。
logback.xmlを以下のように書き換えます。

<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <!--    <appender name="STDOUT" class="net.logstash.logback.appender.AccessEventAsyncDisruptorAppender">-->
        <!--        <encoder>-->
        <!--            <pattern>%d{YYYY-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>-->
        <!--        </encoder>-->
        <encoder class="net.logstash.logback.encoder.LogstashEncoder">
            <throwableConverter class="net.logstash.logback.stacktrace.ShortenedThrowableConverter">
                <maxDepthPerThrowable>30</maxDepthPerThrowable>
                <maxLength>2048</maxLength>
                <shortenedClassNameLength>20</shortenedClassNameLength>
                <exclude>sun\.reflect\..*\.invoke.*</exclude>
                <exclude>net\.sf\.cglib\.proxy\.MethodProxy\.invoke</exclude>
                <rootCauseFirst>true</rootCauseFirst>
                <inlineHash>true</inlineHash>
            </throwableConverter>
        </encoder>
    </appender>
    <root level="trace">
        <appender-ref ref="STDOUT"/>
    </root>
    <logger name="org.eclipse.jetty" level="INFO"/>
    <logger name="io.netty" level="INFO"/>
</configuration>

これでスタックトレースの情報もログ出力されるようになります。
Application.ktを以下のように書き換えます。

@Suppress("unused") // Referenced in application.conf
@kotlin.jvm.JvmOverloads
fun Application.module(testing: Boolean = false) {
    routing {
        get("/") {
            throw RuntimeException("opps!!")
            //call.respondText("HELLO WORLD!", contentType = ContentType.Text.Plain)
        }
    }
    install(CallLogging)
}

curlでリクエストを送ると以下のようなログが出力されます。

{"@timestamp":"2021-05-21T21:38:19.628+09:00","@version":"1","message":"{\n    # application.conf @ file:/home/yuya-hirooka/source/kotlin/ktor-sandbox/logging/target/classes/application.conf: 6\n    \"application\" : {\n        # application.conf @ file:/home/yuya-hirooka/source/kotlin/ktor-sandbox/logging/target/classes/application.conf: 7\n        \"modules\" : [\n            # application.conf @ file:/home/yuya-hirooka/source/kotlin/ktor-sandbox/logging/target/classes/application.conf: 7\n            \"dev.hirooka.ApplicationKt.module\"\n        ]\n    },\n    # application.conf @ file:/home/yuya-hirooka/source/kotlin/ktor-sandbox/logging/target/classes/application.conf: 2\n    \"deployment\" : {\n        # application.conf @ file:/home/yuya-hirooka/source/kotlin/ktor-sandbox/logging/target/classes/application.conf: 3\n        \"port\" : 8080\n    },\n    # Content hidden\n    \"security\" : \"***\"\n}\n","logger_name":"Application","thread_name":"main","level":"TRACE","level_value":5000}
{"@timestamp":"2021-05-21T21:38:19.670+09:00","@version":"1","message":"Autoreload is disabled because the development mode is off.","logger_name":"Application","thread_name":"main","level":"INFO","level_value":20000}
{"@timestamp":"2021-05-21T21:38:19.957+09:00","@version":"1","message":"Responding at http://0.0.0.0:8080","logger_name":"Application","thread_name":"main","level":"INFO","level_value":20000}
{"@timestamp":"2021-05-21T21:38:19.957+09:00","@version":"1","message":"Application started: io.ktor.application.Application@39d9314d","logger_name":"Application","thread_name":"main","level":"TRACE","level_value":5000}
{"@timestamp":"2021-05-21T21:38:21.593+09:00","@version":"1","message":"hello","logger_name":"Application","thread_name":"eventLoopGroupProxy-4-1","level":"INFO","level_value":20000}
{"@timestamp":"2021-05-21T21:38:21.597+09:00","@version":"1","message":"Unhandled: GET - /","logger_name":"Application","thread_name":"eventLoopGroupProxy-4-1","level":"ERROR","level_value":40000,"stack_trace":"<#f0019090> j.l.RuntimeException: hello\n\tat d.h.ApplicationKt$module$1$1.invokeSuspend(Application.kt:17)\n\tat d.h.ApplicationKt$module$1$1.invoke(Application.kt)\n\tat i.k.u.p.SuspendFunctionGun.loop(SuspendFunctionGun.kt:246)\n\tat i.k.u.p.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:116)\n\tat i.k.u.p.SuspendFunctionGun.execute(SuspendFunctionGun.kt:136)\n\tat i.k.u.p.Pipeline.execute(Pipeline.kt:79)\n\tat i.k.routing.Routing.executeResult(Routing.kt:155)\n\tat i.k.routing.Routing.interceptor(Routing.kt:39)\n\tat i.k.r.Routing$Feature$install$1.invokeSuspend(Routing.kt:107)\n\tat i.k.r.Routing$Feature$install$1.invoke(Routing.kt)\n\tat i.k.u.p.SuspendFunctionGun.loop(SuspendFunctionGun.kt:246)\n\tat i.k.u.p.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:116)\n\tat i.k.f.CallLogging$Feature$install$2.invokeSuspend(CallLogging.kt:139)\n\tat i.k.f.CallLogging$Feature$install$2.invoke(CallLogging.kt)\n\tat i.k.u.p.SuspendFunctionGun.loop(SuspendFunctionGun.kt:246)\n\tat i.k.u.p.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:116)\n\tat i.k.u.p.SuspendFunctionGun.execute(SuspendFunctionGun.kt:136)\n\tat i.k.u.p.Pipeline.execute(Pipeline.kt:79)\n\tat i.k.s.e.DefaultEnginePipelineKt$defaultEnginePipeline$2.invokeSuspend(DefaultEnginePipeline.kt:124)\n\tat i.k.s.e.DefaultEnginePipelineKt$defaultEnginePipeline$2.invoke(DefaultEnginePipeline.kt)\n\tat i.k.u.p.SuspendFunctionGun.loop(SuspendFunctionGun.kt:246)\n\tat i.k.u.p.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:116)\n\tat i.k.u.p.SuspendFunctionGun.execute(SuspendFunctionGun.kt:136)\n\tat i.k.u.p.Pipeline.execute(Pipeline.kt:79)\n\tat i.k.s.n.NettyApplicationCallHandler$handleRequest$1.invokeSuspend(NettyApplicationCallHandler.kt:123)\n\tat i.k.s.n.NettyApplicationCallHandler$handleRequest$1.invoke(NettyApplicationCallHandler.kt)\n\tat k.c.i.UndispatchedKt.startCoroutineUndispatched(Undispatched.kt:55)\n\tat k.c.BuildersKt__Builders_commonKt.startCoroutineImpl(Builders.common.kt:194)\n\tat k.c.BuildersKt.startCoroutineImpl(Unknown Source:1)\n\tat k.c.AbstractCoroutine.start(AbstractCoroutine.kt:145)\n\t... 1..."}