JKになりたい

何か書きたいことを書きます。主にWeb方面の技術系記事が多いかも。

cats.effectのIOモナドとcontextshiftを丁寧に理解する

並行処理と並列処理

並行処理(concurrent processing)

1つのCPU(コア)スレッドを切り替えながら複数のタスクを実行すること。

人間の目には複数のタスクが同時に進んでいるように見える

実行するスレッドを切り替える必要があるのでコンテキストスイッチのコストがかかることに留意(=スレッドをたくさん作って、それらを並行処理するとコンテキストスイッチによる無駄な時間が多くなりパフォーマンスが悪化する)

並行処理はCPUのアイドル時間をなくすために用いる。
以下のようにCPUバウンドな処理とIOバウンドな処理を繰り返す例を思い浮かべると効果がわかりやすい

TASK_A: CPU処理ー>IO処理ー>CPU処理ー>IO処

TASK_B: IO処理ー>CPU処理ー>IO処理ー>CPU処理

並列処理(multiple processing)

複数のコアを使って同時に複数のタスクを実行すること。

ネイティブスレッドとグリーンスレッド

ネイティブスレッド

wiki ( https://ja.wikipedia.org/wiki/スレッド_(コンピュータ)) を引用

ライトウェイトプロセス(light-weight process、LWP)または軽量プロセスとは、
スレッドを複数並行して実行するためのカーネル内の機構。
マルチプロセッシングにおいて、ひとつのプロセス内のスレッドを複数個同時に実行する仕組みである。
カーネルスレッドとLWPを総称してネイティブスレッドと呼ぶこともある。

カーネルレベルで実現されるスレッド。

コンテキストスイッチのコストはプロセス並みに大きい(マイクロ秒くらいらしい)

ネイティブスレッドはOSのスケジューラーによって資源割当を行うため、並列処理が可能

グリーンスレッド

ユーザ空間(ユーザープロセスの動作するアドレス空間)で実装されたスレッド機構のうち、特にVM上で動くものがグリーンスレッド。

OSのスケジューラーが関与しないため、マルチコア対応が不可能。 コンテキストスイッチのコストが低い(ナノ秒くらいらしい)ため1コアに対して大量のスレッド必要な場合などに有用(ref: C10K問題)

多くの言語でネイティブスレッドが標準で採用されているが、RubyPythonはマルチコアの恩恵を受けにくいのと、Goはネイティブスレッドと軽量スレッドのハイブリッドパターンになっている。

Ruby・・GVLの機構により同時に実行されるネイティブスレッドは基本は1つのみ(ただし、ネットワークIO、ディスクIO等のAPIはその限りではないらしい) ※GVL(Giant VM lock)=GIL(グローバルインタプリタロック)

Python・・GILあり

Go・・ネイティブスレッド&軽量スレッド(goroutine)

ゴルーチンのスケジューラーが「GOMAXPROCS」の数だけ生成したネイティブスレッドに対して軽量スレッドをいい感じにスケジュールしてくれるらしい

Go1.5以前はスケジューラが未熟でGOMAXPROCS=1以外だとパフォーマンス劣化したりしたらしいが、今は改善されているっぽい

(補足)

  • PythonRubyはスレッドセーフでないC言語のライブラリを使って作られているのでGILの機構が必要
  • RubyPython x 並列とかでググると、並列処理にスレッドを使えという記事が散見されるが、これらは誤り。マルチプロセスでやらないといけない。

(Scala)Future x ExecutionContextによる並列処理

cats.effect.IOの話の前に。

Scalaでは scala.concurrent.Future というAPIが提供されており、これにより並列処理を実現できる。

Futureを生成すると、内部的にはJavaのRunnableに変換される。 このRunnableをExecutionContextが良い感じにスレッドに割り当ててくれる。 スレッドは先述した通り、OSのスケジューラーにより均等に割り当てられ、実行されていく。

※つまり、Futureはスレッドを新しく作るというわけではないことに注意

よく「Futureでブロッキングが発生するをするな」という話があるが、これはどういうことか。

まず、Futureを実行するときにExecutionContextを求められるので脳死import scala.concurrent.ExecutionContext.Implicits.global をすることがあると思う。

globalで提供されるExecutionContextは標準で「利用可能なコア数の同数のスレッド」を持つ。

このExecutionContext上でブロッキングする処理を入れてしまうと、単純に1スレッドが無駄に専有されてしまう。
この状況下ではCPUが処理をしていないのにも関わらず、スレッドに別の仕事をさせることができなくなり、パフォーマンスが劣化する。

では、ブロッキング含む処理をしたいときはどうしたら良いか。 ブロッキング処理専用のExecutionContext(新しいスレッド)を作り、そこにFutureを割り当ててる。そうすると、globalのECを汚染することはない

ただ、OSのスケジューラーはスレッドに対して均等に処理時間を割り当てるのでスレッドを増やしすぎてもコンテキストスイッチのコストが上がってしまう。

(スレッドの優先度は一応設定できるものの、実際どの程度スケジューリングに影響されるかはわからない。システムコールによりネイティブスレッドの優先度を変更するため、OSによって挙動は異なる) https://docs.oracle.com/javase/jp/8/docs/api/java/lang/Thread.html#setPriority-int-

IOモナドとコンテキストシフト

これから、cats.effect.IOをみていく。

IOとFutureの違い

そもそもFutureと何が違うのか。これは、catsのドキュメントがわかりやすい。

f:id:sakata_harumi:20200830225414p:plain

(引用: https://typelevel.org/cats-effect/datatypes/io.html

IOモナドはFutureと違い純粋。評価時に正確に1つの結果が得られる。 コードで挙動の違いをみていく。

val f = Future {
  println(s"future_println")
}
val i = IO {
  println("io_println")
}
println("====START=====")
Await.result(f, Duration.Inf)
Await.result(f, Duration.Inf)
i.unsafeRunSync
i.unsafeRunSync
future_println
====START=====
io_println
io_println

Futureは値を生成した時に評価され、その後は改めて評価されることはない。つまり、値の作成に副作用が存在してしまう。

一方IOは unsafeRunSync のタイミングで評価されていることがわかる。つまり、値の生成時点でIOは副作用を産まず、参照透過に扱うことができる。

IOモナドを使ったスレッドのスケジューリング

IO.shift を使うと計算を別のスレッドプールにシフトしたり、同じスレッドプールに再度スケジュールすることができる

import java.util.concurrent.Executors
import cats.effect.{ContextShift, Fiber, IO}
import scala.concurrent.ExecutionContext

val ecOne = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
val ecTwo = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())

val csOne: ContextShift[IO] = IO.contextShift(ecOne)
val csTwo: ContextShift[IO] = IO.contextShift(ecTwo)

def infiniteIO(id: Int)(cs: ContextShift[IO]): IO[Fiber[IO, Unit]] = {
  def repeat: IO[Unit] = IO(println(id)).flatMap(_ => repeat)

  repeat.start(cs)
}
val prog =
  for {
    _ <- infiniteIO(1)(csOne)
    _ <- infiniteIO(11)(csOne)
  } yield ()

prog.unsafeRunSync()

(引用: https://typelevel.org/cats-effect/concurrency/basics.html#thread-scheduling )

上記の例では、スレッドが1つのExecutionContextを生成し、これを用いてIOの実行を割当ている。 infiniteIO(1)infiniteIO(11) が同じECを使ってどちらも同じスレッドにタスクを割り当てようとする。しかし、前者の infiniteIO(1)のタスクが完了することはないため、スレッドが専有された状態になるため、後者は実行されることはない。

val prog =
  for {
    _ <- infiniteIO(1)(csOne)
        _ <- infiniteIO(11)(csOne)
    _ <- infiniteIO(2)(csTwo)
        _ <- infiniteIO(22)(csTwo)
  } yield ()

prog.unsafeRunSync()

(引用: https://typelevel.org/cats-effect/concurrency/basics.html#thread-scheduling )

これだと12が動くようになる。それぞれのスレッドは専有されているが、OSのスケジューラーは平等に実行時間を割当てる。

ただし、各々のスレッドは開放されることがないので 1122 が動くことはない

IO.shiftを利用して、スレッドに再スケジュールしてやることで、11,22も動かすことができる

def infiniteIO(id: Int)(implicit cs: ContextShift[IO]): IO[Fiber[IO, Unit]] = {
  def repeat: IO[Unit] = IO(println(id)).flatMap(_ => IO.shift *> repeat)
  repeat.start
}

(引用: https://typelevel.org/cats-effect/concurrency/basics.html#thread-scheduling )

repeatが実行されると IO(println(id))が実行された後に「 IO.shift を挟んでから」再度repeatをコールするような形に変更されている。

これにより IO.shiftのタイミングでタスクの再割当てが起こり、別のタスクがスケジューリングされる余地が産まれる。

(今回は同じECに再割当てを行っているが、別のECに割当てることも勿論可能)