fs2を試す。
はじめに
ScalaでReactorみたいなリアクティブなプログラミングをするにはどうすれば良い?みたいなのをScalaに詳しい同僚に聞いてみたところ。
FS2というライブラリを教えて頂いたので試してみようかと思います。
ただ、この時期ではリアクティブのところまでは行かずにまずは基本的な使い方を確認します。
FS2とは
純粋な関数型の多形性を持つストリームプロセスをサポートするライブラリです。
I/O(netwrking, flies)のストリーム処理をリソースセーフに実行することを可能とします。
Scala 2.12、 2.13、3で利用できるみたいです。
また、FS2はCatsやCats Effectを利用しているライブラリで、逆にhttp4sやskunk、doobieに用いられるようです。
ちなみに、名前はFunctional Streams for ScalaでFS2もしくはFSSらしいです。
動かしてみる
環境
ソースコードの実行環境は以下の通り。
$ uname -srvmpio Linux 5.4.0-72-generic #80-Ubuntu SMP Mon Apr 12 17:35:00 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux $ lsb_release -a LSB Version: core-11.1.0ubuntu2-noarch:security-11.1.0ubuntu2-noarch Distributor ID: Ubuntu Description: Ubuntu 20.04.2 LTS Release: 20.04 Codename: focal $ sbt --version [info] 1.2.7 sbt script version: 1.5.2 $ scala --version Scala code runner version 3.0.0 -- Copyright 2002-2021, LAMP/EPFL
プロジェクトの作成
sbtを使ってプロジェクトを作成します。
以下のコマンドを実行します。
$ sbt new sbt/scala-seed.g8 [info] welcome to sbt 1.5.2 (Oracle Corporation Java 1.8.0_292) [info] set current project to new (in build file:/tmp/sbt_c0223d2e/new/) A minimal Scala project. name [Scala Seed Project]: fs2-example
出来上がったbuild.sbt
に依存をつかします。
lazy val root = (project in file(".")) .settings( name := "fs2", libraryDependencies += scalaTest % Test, // 以下を追加 libraryDependencies += "co.fs2" %% "fs2-core" % "3.0.0", libraryDependencies += "co.fs2" %% "fs2-io" % "3.0.0", libraryDependencies += "co.fs2" %% "fs2-reactive-streams" % "3.0.0" )
Streamの作成と副作用を持つStreamの実行
FS2ではStream[F, 0]
というクラスが基本のデータクラスっぽいです。
ここでF
エフェクト型と呼ばで、O
はアウトプット型と呼ばれるようです。
Streamは以下のようにして作成します。
import cats.effect.{IO, IOApp} import fs2.{INothing, Pure, Stream} object Fs2Example { val streamEmpty: Stream[Pure, INothing] = Stream.empty val streamOne: Stream[Pure, Int] = Stream.emit(1) val streamThree: Stream[Pure, Int] = Stream(1, 2, 3) val streamList: Stream[Pure, List[Int]] = Stream(List(1, 2, 3)) }
例えば、上記のstreamOne
はStream[Pure, Int]
を持っており、アウトプット型がInt
でエフェクト型がPure
となります。
Pure
は実行時に副作用が存在しないことを示すようです。
StreamはtoList
かtoVector
をそれぞれ呼び出すことで、List
とVector
に変換できるようです。
またStreamはlist-like
な以下のようなメソッドを持っているようです。
val filteredStream: Stream[Pure, Int] = Stream(1, 2, 3, 4, 5, 6).filter(_ > 0) val foldSum: List[Int] = Stream(1, 2, 3).fold(0)(_ + _).toList val repeat: Stream[Pure, Int] = Stream(1, 2, 3).repeat.take(9) val collect: Stream[Pure, Int] = Stream(None, Some(0), Some(1)).collect { case Some(i)=> 1}
副作用を含んたStreamの作成も行なうことができます。
val eff: Stream[IO, Int] = Stream.eval(IO { println("Hello! FS2"); 1 + 1 })
IOはエフェクト型で、この型を作ること自体は副作用を起こしません。
また、Stream.eval
も作成されたタイミングでは何もしません。
この副採用を含んだストリームを実行するためにはまずcompile
メソッドを呼び出してIO
を取り出します。
compile
は更に以下のようなメソッドを持ちます。
val vector: IO[Vector[Int]] = eff.compile.toVector private val drain: IO[Unit] = eff.compile.drain private val value: IO[Int] = eff.compile.fold(0)(_ + _)
toVector
はすべてのアウトプットを1つのベクターに入れ込み、drain
はすべての実行結果を捨て、 fold
は結果を畳み込みます。
compileを実行するだけではまだ、実行はおこなわれません。
これを実行するには以下のように
import cats.effect.unsafe.implicits.global object Main extends App { val foldResult: IO[Int] = eff.compile.fold(0)(_ + _) vector.unsafeRunSync }
import cats.effect.unsafe.implicits.global
に関してはまだ理解が甘いですが、今回利用するIORumtime
クラスの指定しているようです。
実行結果は以下の通り。
Hello! FS2
Streamの基本操作
Stream
には便利なオペレーターが用意されています。
代表的なものには++
、 map
、flatMap
、bracket
のようなものがあります。
object MainOperator extends App { val result = (Stream(Some(1), Some(2)) ++ Stream(Some(3), Some(4), None)) .flatMap(i => Stream(i, i)) .map { case Some(i) => i case _ => -1 }.map(_ + 1) result.toList.foreach(println) }
これの実行結果は以下のようになります。
2 2 3 3 4 4 5 5 0 0
エラーハンドリング
Stream
でエラーを投げたい場合はStream.raseError
を使います。
以下のような使い方ができるようです。
// val err = Stream.raiseError[IO](new Exception("Oops! 1")) val err2 = Stream(1, 2, 3) ++ (throw new Exception("Oops! 2")) val err3 = Stream.eval(IO(throw new Exception("Oops! 3")))
このハンドリングは以下のように行えます。
try err.compile.toList.unsafeRunSync() catch { case e: Exception => println(e) } try err2.toList catch { case e: Exception => println(e) } try err3.compile.drain.unsafeRunSync() catch { case e: Exception => println(e) } }
実行結果は以下の通り
java.lang.Exception: Oops! 1 java.lang.Exception: Oops! 2 java.lang.Exception: Oops! 3