fs2を試す。

はじめに

ScalaでReactorみたいなリアクティブなプログラミングをするにはどうすれば良い?みたいなのをScalaに詳しい同僚に聞いてみたところ。
FS2というライブラリを教えて頂いたので試してみようかと思います。
ただ、この時期ではリアクティブのところまでは行かずにまずは基本的な使い方を確認します。

FS2とは

純粋な関数型の多形性を持つストリームプロセスをサポートするライブラリです。
I/O(netwrking, flies)のストリーム処理をリソースセーフに実行することを可能とします。
Scala 2.12、 2.13、3で利用できるみたいです。 また、FS2はCatsCats Effectを利用しているライブラリで、逆にhttp4sskunkdoobieに用いられるようです。
ちなみに、名前はFunctional Streams for ScalaでFS2もしくはFSSらしいです。

動かしてみる

環境

ソースコードの実行環境は以下の通り。

$ uname -srvmpio
Linux 5.4.0-72-generic #80-Ubuntu SMP Mon Apr 12 17:35:00 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

$ lsb_release -a
LSB Version:    core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.2 LTS
Release:    20.04
Codename:   focal

$ sbt --version
[info] 1.2.7
sbt script version: 1.5.2


$ scala --version
Scala code runner version 3.0.0 -- Copyright 2002-2021, LAMP/EPFL

プロジェクトの作成

sbtを使ってプロジェクトを作成します。
以下のコマンドを実行します。

$ sbt new sbt/scala-seed.g8
[info] welcome to sbt 1.5.2 (Oracle Corporation Java 1.8.0_292)
[info] set current project to new (in build file:/tmp/sbt_c0223d2e/new/)

A minimal Scala project. 

name [Scala Seed Project]: fs2-example

出来上がったbuild.sbtに依存をつかします。

lazy val root = (project in file("."))
  .settings(
    name := "fs2",
    libraryDependencies += scalaTest % Test,
    // 以下を追加
    libraryDependencies += "co.fs2" %% "fs2-core" % "3.0.0",
    libraryDependencies += "co.fs2" %% "fs2-io" % "3.0.0",
    libraryDependencies += "co.fs2" %% "fs2-reactive-streams" % "3.0.0"
  )

Streamの作成と副作用を持つStreamの実行

FS2ではStream[F, 0]というクラスが基本のデータクラスっぽいです。
ここでF エフェクト型と呼ばで、Oはアウトプット型と呼ばれるようです。
Streamは以下のようにして作成します。

import cats.effect.{IO, IOApp}
import fs2.{INothing, Pure, Stream}

object Fs2Example {
  
  val streamEmpty: Stream[Pure, INothing] = Stream.empty
  val streamOne: Stream[Pure, Int] = Stream.emit(1)
  val streamThree: Stream[Pure, Int] = Stream(1, 2, 3)
  val streamList: Stream[Pure, List[Int]] = Stream(List(1, 2, 3))
}

例えば、上記のstreamOneStream[Pure, Int]を持っており、アウトプット型がIntでエフェクト型がPureとなります。
Pureは実行時に副作用が存在しないことを示すようです。

StreamはtoListtoVectorをそれぞれ呼び出すことで、ListVectorに変換できるようです。
またStreamはlist-likeな以下のようなメソッドを持っているようです。

val filteredStream: Stream[Pure, Int] = Stream(1, 2, 3, 4, 5, 6).filter(_ > 0)
val foldSum: List[Int] = Stream(1, 2, 3).fold(0)(_ + _).toList
val repeat: Stream[Pure, Int] = Stream(1, 2, 3).repeat.take(9)
val collect: Stream[Pure, Int] = Stream(None, Some(0), Some(1)).collect { case Some(i)=> 1}

副作用を含んたStreamの作成も行なうことができます。

val eff: Stream[IO, Int] = Stream.eval(IO {
  println("Hello! FS2"); 1 + 1
})

IOはエフェクト型で、この型を作ること自体は副作用を起こしません。
また、Stream.evalも作成されたタイミングでは何もしません。
この副採用を含んだストリームを実行するためにはまずcompileメソッドを呼び出してIOを取り出します。
compileは更に以下のようなメソッドを持ちます。

val vector: IO[Vector[Int]] = eff.compile.toVector
private val drain: IO[Unit] = eff.compile.drain
private val value: IO[Int] = eff.compile.fold(0)(_ + _)

toVectorはすべてのアウトプットを1つのベクターに入れ込み、drainはすべての実行結果を捨て、 foldは結果を畳み込みます。
compileを実行するだけではまだ、実行はおこなわれません。
これを実行するには以下のように

import cats.effect.unsafe.implicits.global

object Main extends App {

  val foldResult: IO[Int] = eff.compile.fold(0)(_ + _)
  vector.unsafeRunSync
}

import cats.effect.unsafe.implicits.globalに関してはまだ理解が甘いですが、今回利用するIORumtimeクラスの指定しているようです。
実行結果は以下の通り。

Hello! FS2

Streamの基本操作

Streamには便利なオペレーターが用意されています。
代表的なものには++mapflatMapbracketのようなものがあります。

object MainOperator extends App {

  val result = (Stream(Some(1), Some(2)) ++ Stream(Some(3), Some(4), None))
    .flatMap(i => Stream(i, i))
    .map {
      case Some(i) => i
      case _ => -1
    }.map(_ + 1)

  result.toList.foreach(println)
}

これの実行結果は以下のようになります。

2
2
3
3
4
4
5
5
0
0

エラーハンドリング

Streamでエラーを投げたい場合はStream.raseErrorを使います。
以下のような使い方ができるようです。

//
val err = Stream.raiseError[IO](new Exception("Oops! 1"))

val err2 = Stream(1, 2, 3) ++ (throw new Exception("Oops! 2"))

val err3 = Stream.eval(IO(throw new Exception("Oops! 3")))

このハンドリングは以下のように行えます。

  try err.compile.toList.unsafeRunSync() catch {
    case e: Exception => println(e)
  }

  try err2.toList catch {
    case e: Exception => println(e)
  }

  try err3.compile.drain.unsafeRunSync() catch {
    case e: Exception => println(e)
  }
}

実行結果は以下の通り

java.lang.Exception: Oops! 1
java.lang.Exception: Oops! 2
java.lang.Exception: Oops! 3