JKになりたい

何か書きたいことを書きます。主にWeb方面の技術系記事が多いかも。

並列処理入門(後編・cats-effect3(CE3)を利用したScalaの並列・並行処理)

3. cats-effect3(CE3)を利用したScalaの並列・並行処理

前編の続き。

並列処理入門(前編) - JKになりたい

3.1 実装サンプル

詳細な説明の前に、GoやRubyと同じ実装サンプルを提示しざっくりと動き方を確認する。
CE3でも、Goと同じくN:Mモデルを採用している。M個のネイティブスレッドに対してN個のFiberと呼ばれる軽量スレッドが動作する仕組み。

実装例(1) Ruby,Goと同じく、並列にheavyComputationを実行する例

object Main extends IOApp {
  def heavyComputation(): IO[Int] =
    IO.delay {
      var result = 0
      for (i <- 0 until 5_000_000_0)
        result += 1
      result
    }

  def run(args: List[String]): IO[ExitCode] =
    for {
      start <- IO.realTimeInstant
      _ <- Console[IO].println(s"IOApp: 開始 $start")
      f1 <- heavyComputation().start # Fiberの生成~実行開始
      f2 <- heavyComputation().start

      a <- f1.joinWithNever
      b <- f2.joinWithNever
      _ <- Console[IO].println(s"IO1 結果 $a")
      _ <- Console[IO].println(s"IO2 結果 $b")

      end <- IO.realTimeInstant
      elapsed = (end.toEpochMilli - start.toEpochMilli).millis
      _ <- Console[IO].println(s"両方のIOが終了しました。経過時間: $elapsed")
    } yield ExitCode.Success
}
IOApp: 開始 2024-02-13T08:17:16.320654Z
IO1 結果 50000000
IO2 結果 50000000
両方のIOが終了しました。経過時間: 339 milliseconds

実装例(2) Ruby,Goと同じくネイティブスレッドを1に絞って実行する例

object Main extends IOApp {

  override protected def computeWorkerThreadCount: Int = 1

  def heavyComputation(): IO[Int] =
    IO.delay {
      var result = 0
      for (i <- 0 until 5_000_000_0)
        result += 1
      result
    }

  def run(args: List[String]): IO[ExitCode] =
    for {
      start <- IO.realTimeInstant
      _ <- Console[IO].println(s"IOApp: 開始 $start")
      f1 <- heavyComputation().start
      f2 <- heavyComputation().start

      a <- f1.joinWithNever
      b <- f2.joinWithNever
      _ <- Console[IO].println(s"IO1 結果 $a")
      _ <- Console[IO].println(s"IO2 結果 $b")

      end <- IO.realTimeInstant
      elapsed = (end.toEpochMilli - start.toEpochMilli).millis
      _ <- Console[IO].println(s"両方のIOが終了しました。経過時間: $elapsed")
    } yield ExitCode.Success
}
IOApp: 開始 2024-02-13T08:16:48.488469Z
IO1 結果 50000000
IO2 結果 50000000
両方のIOが終了しました。経過時間: 435 millisecond

実装例(3) Thread.sleepで比較

上記コードにThread.sleepを入れてみる

def heavyComputation(): IO[Int] =
    IO.delay {
      Thread.sleep(10000)
      (0 until 5_000_000_0).foldLeft(0)((acc, _) => acc + 1)
    }

override protected def computeWorkerThreadCount: Int = 1の場合

IOApp: 開始 2024-02-13T08:06:42.216908Z
IO1 結果 50000000
IO2 結果 50000000
両方のIOが終了しました。経過時間: 20743 milliseconds

override protected def computeWorkerThreadCount: Int = super.computeWorkerThreadCountの場合

IOApp: 開始 2024-02-13T08:08:06.291909Z
IO1 結果 50000000
IO2 結果 50000000
両方のIOが終了しました。経過時間: 10475 milliseconds

こちらの方が違いがわかりやすい。Goと違いきっちりスレッドがブロッキングされ続ける。
「Fiberが長期間専有している場合、処理を譲る」ということも自分たちのコントロール下におけるのが良いところでもあり、悪いところでもある。

補足(1)

上記はFiberのstart~joinをコールしている。必要に応じてcancelしたりなどできる低レベルなインターフェースである。
一方、実コードではparSequence、parTraverseなどをよく利用する。これはそれぞれのIOに対して新しいFiberを生成し、それらをjoinする、例外があればcancelする、などの処理を背後で行っている。
こっちの方がハイレベルなAPIとなり扱いやすい。

補足(2)

実行速度がGoにくらべて遅いように見えるが、JVMJITコンパイラを利用しているため初回のコールは遅い。
これをGraalVMを使い、AOTコンパイラを利用してネイティブコードを生成しておくことで、60ms程度に短縮される。

3.2.1 CE3の目標と戦略

「スレッドのコンテキストスイッチの回数を最小にし、コンピュートプールが有用なタスクをしている時間を最大化すること」これが目標

そのために・・・

  • 利用可能なプロセッサの数とほぼ同じ数の単一のスレッドプール(=コンピュートプール)

  • ブロック処理用の数の制限のないスレッドプール

  • 非同期I/Oイベントを処理するための、1つか2つの優先度の高いスレッド

例えば、(CPUバウンドな)ある処理単位でネイティブスレッドをガンガン作成する戦略を取るとする。すると、その処理単位が100個あったら、当然100回コンテキストスイッチが必要になる。
実際に並列で処理できるのはCPUコア数だけなので、コア数のスレッドだけが存在していると、システムコールを伴うコンテキストスイッチのコストが最低限になる。

固定長のスレッドプールを用意しておく場合、そのスレッドプール上でブロッキング処理があると最悪。
例えばファイルシステム操作系のAPIやIO#unsafeRunSync、Thread#sleepなど。
知らないと罠にかかることもある。URL#equalsも実はブロッキングIOを伴う。
ref: https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/net/URL.html#equals(java.lang.Object))
” Since hosts comparison requires name resolution, this operation is a blocking operation.”

IO待ちの間、そのスレッドはブロッキングされてしまい他の処理を行うことができなくなる。
3個しかないネイティブスレッドにを3個ともブロッキングしてしまうとその間アプリケーションが完全に停止しているののと同じ。
そのため、これを逃してやるためのスレッドプールが必要。(勿論理想的には全て非同期IO、ノンブロッキングIOだが)

最後に、非同期I/Oを処理するためのプール。これは「新しい未処理の非同期 IO 通知があるかどうかをカーネルに問い合わせるだけ」のスレッドプール。
その後の処理はコンピュートプールで行うので、これは少数の固定数スレッドプールで良い。
このスレッドプールの優先度はこれらの中で最も高くしておく。(優先度が高い故に、問い合わせたその後の処理はコンピュートIOで行わないと総合的なパフォーマンスが低下することに注意)

ref: https://gist.github.com/djspiewak/46b543800958cf61af6efa8e072bfd5c

この3つの考え方を元に、目標を達成するための道具がCE3

3.2.2 (余談)Scala標準のscala.concurrent.ExecutionContext.globalを使わない理由

ScalaのFutureは標準でscala.concurrent.ExecutionContext.globalでimportされるExecutionContextの元で実行される。
内部実装はForkJoinプールとなっている(ref: https://docs.oracle.com/javase/jp/11/docs/api/java.base/java/util/concurrent/ForkJoinPool.html )

これは一見良さそうに見えるが、(名前の通り)グローバルなスレッドプールなため任意のコード(主にライブラリコード)によってアクセスされるが、このスレッドプールが適切に利用されているかということは簡単にはわからない。
また、ブロッキングが伴う処理
そのため、私たちのアプリケーションを動かすためのスレッドプールとしては信頼性が低いため、利用しないことを推奨。

ref: https://gist.github.com/djspiewak/46b543800958cf61af6efa8e072bfd5c

3.3 Fiberの実行戦略

CE2ではグローバルな単一のワークキュー(Fiberのキュー)を提供していた。
このキューが他のスレッドに実行を割り当てていたのだが、これはワーカー数が増えると割当作業そのものと競合しパフォーマンスが劣っていくという問題があった。

CE3からWorkStealPoolによる実装となっている。
これは、コアごとにワークキュー(Fiberの実行キュー)を構築し、あるワークキューが空になったら他のワークキューから半分をStealしてくる仕組み。

故に、全てのワークキューに実行を待機しているファイバーがある場合、他のスレッドからStealは起こらない。
つまり、最大のパフォーマンスが必要なときに不要なワーカーに割当のような不要なタスクが発生しない。

ちなみにスレッドを必ず特定のコアで実行することを「スレッド アフィニティ」と呼ぶが、これに近いことが実質的に行えることを意味する。
Fiberのプリエンプション時などで、他のコアに処理が移動することが少なくなるため、パフォーマンスの観点で非常に有用。

private final class WorkerThread(
    idx: Int,
    // Local queue instance with exclusive write access.
    private[this] var queue: LocalQueue,
    // The state of the `WorkerThread` (parked/unparked).
    private[unsafe] var parked: AtomicBoolean,
    // External queue used by the local queue for offloading excess fibers, as well as
    // for drawing fibers when the local queue is exhausted.
    private[this] val external: ScalQueue[AnyRef],
    // A worker-thread-local weak bag for tracking suspended fibers.
    private[this] var fiberBag: WeakBag[Runnable],
    private[this] var sleepers: TimerSkipList,
    // Reference to the `WorkStealingThreadPool` in which this thread operates.
    private[this] val pool: WorkStealingThreadPool
) extends Thread with BlockContext

(CE3 のWorkerThreadのシグネチャ、実装: https://github.com/typelevel/cats-effect/blob/e99a1c35c416e02a4e6633cc2a9ee7fed9ac384b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala

上記実装のランループを見ると、WorkerThreadがどのように動作するかがわかる。

WorkerThreadは0番〜4番(以上)のStateを持ち、現在のStateによってその時々の動作が変わる。ワークキュー(上記シグネチャでいうqueue: LocalQueue)からFiberを取り出して実行するのは4番以上の場合。
このときの動作を見ていく。

case _ =>
  // Call all of our expired timers:
  var cont = true
  while (cont) {
    val cb = sleepers.pollFirstIfTriggered(now)
    if (cb ne null) {
      cb(RightUnit)
    } else {
      cont = false
    }
  }

  // Check the queue bypass reference before dequeueing from the local
  // queue.
  val fiber = if (cedeBypass eq null) {
    // The queue bypass reference is empty.
    // Fall back to the local queue.
    queue.dequeue(self)
  } else {
    // Fetch and null out the queue bypass reference.
    val f = cedeBypass
    cedeBypass = null
    f
  }
  if (fiber ne null) {
    // Run the fiber.
    try fiber.run()
    catch {
      case t if NonFatal(t) => pool.reportFailure(t)
      case t: Throwable => IOFiber.onFatalFailure(t)
    }
    // Continue executing fibers from the local queue.
    state += 1
  } else {
    // Transition to checking the external queue.
    state = 1
  }
}

とはいうものの、fiberを取り出してrunしてエラーハンドリングしているだけ。

細かい部分として気になるのはsleepers.pollFirstIfTriggered(now)やcedeBypass はなにか?ということくらいだろうか。

sleepersはTimerSkipListのインスタンスで、Fiberの一定時間経過後のキャンセル処理など割り込みに使う。

cedeBypassは IO.cede により、公平性の境界を挿入された際に利用するもの。

IO.cedeとは何かを簡単に説明する。

スレッドがあるFiberの処理に専有されると、他のFiberが実行できなくなるのは自明。
CE3ではこれを防ぐために、自動的に境界が設定される。これは cats.effect.unsafe.IORuntimeConfig.autoYieldThresholdで設定を変更できる。
しかし、これはIOバウンドなものに対して有効なものであり、コアを専有するCPUバウンドな処理については已然として当該スレッドが専有され他のFiberが実行できなくなる。

この場合、IO.cedeを高価な処理の前後にいれることで他のFiberへ処理を譲ることができる。
ちなみに「高価な」は10マイクロ秒以上かかる処理がひとつの目安。

fa.map(data => expensiveWork(data)) # これだとexpensiveWorkに専有される
↓
(fa <* IO.cede).map(data => expensiveWork(data)).guarantee(IO.cede) # 前後にIO.cedeを指定

Fiberのランループ内cedeIterationカウンタを減らしていき0以下になったタイミングで一旦リスケジュールする。

cedeBypassは「リスケジュールしたものの、ワークキューが空だったとき」にcedeBypassにリスケジュールされたFiberが設定される。
こうしておくことで、リスケジュールしてしまうと必要になるStore、Load操作などをスキップしてそのままFiberの実行を継続させることができる、といったパフォーマンス最適化のための工夫。

var nextCancelation = cancelationIterations - 1
    var nextAutoCede = autoCedeIterations
    if (nextCancelation <= 0) {
      // Ensure that we see cancelation.
      readBarrier()
      nextCancelation = runtime.cancelationCheckThreshold
      // automatic yielding threshold is always a multiple of the cancelation threshold
      nextAutoCede -= nextCancelation

      if (nextAutoCede <= 0) {
        resumeTag = AutoCedeR
        resumeIO = _cur0
        val ec = currentCtx
        rescheduleFiber(ec, this)
        return
      }
    }

ref: https://github.com/typelevel/cats-effect/blob/e99a1c35c416e02a4e6633cc2a9ee7fed9ac384b/core/shared/src/main/scala/cats/effect/IOFiber.scala#L210

※ちなみに、リスケジュールはFiberのワークキューの最後に追加されるわけじゃなく、適当な部分にランダム挿入される

3.3 私たちCE3を使う上ですべきこと

コア数が充分な環境を用意しよう

CE3のポテンシャルを発揮するには、充分なコア数が必要。
これは単純な話、コアが多ければ多いほどコンピューティング競合係数が低くなるため。
「スケールアウトよりスケールアップ」を意識する。

逆に大量のインスタンスをデプロイし、一つ一つのアプリケーションは1~2程度のCPUやそれ以下しか使えない環境をとるという戦略を使うならCE3は使うべきではない。
コンピューティング競合係数が高くなりすぎる。ref: https://typelevel.org/cats-effect/docs/core/starvation-and-tuning#not-enough-cpus

このような戦略は、ユーザ空間で定義されるスケジューラがカーネルスケジューラより劣っている場合に適するが、CE3はそうではないので非生産的な選択肢となる。
カーネルのスケジューラはネイティブスレッドを扱うため、コンテキストスイッチのコストに注意を払わないといけない)

「スケールアウトよりスケールアップ」はGoなどでも言えると思う。

不正なスレッドは極力排除しよう

ライブラリによって独自の新しいスレッドプールが生成されることがよくある。AWS SDKがその代表例。
スレッドが増えると当然、カーネルスケジューラの競合が多発する。

仮のそのスレッドがアイドル状態であったとしても、カーネルスケジューラのタイムスライスが短くなってしまうのでパフォーマンスに悪影響を及ぼす。(コンテキストスイッチを減らすため、タイムスライスはなるべく長くあってほしい)

そのため、JMX経由でスレッドの状態を把握して、極力不正なスレッドを減らすことに務めたい。
ただし、活発に活動している不正スレッドを消せない場合は、CE3のコンピュートスレッドプールの数を減らして調整することも必要となる。

ブロッキングを伴うIOはIO.blockを使ってブロッキング用スレッドプールを使うよう指定しよう

前述のとおり。

高価なCPUバウンドな処理(10マイクロ秒以上かかる処理)はIO.cedeで公平性の境界を挿入しよう

前述のとおり。