Akka/Pekko Actorの初歩

Akka/Pekko logo

Akka/Pekkoとは

Akka/Pekkoとは、JavaScalaをメインにしたJVMで利用できるパイプライン的に非同期処理を行うためのライブラリです。 実際にはかなり多くの機能の集合体なので、この説明も正確ではないかもしれませんが、肝となっているのは非同期処理をうまく管理する部分だと思います。

公式ページの説明を抜粋すると以下のように記載されています。

Akka は、10 年以上にわたって運用環境で使用されてきたレスポンシブなアプリケーションを構築および実行するためのライブラリ セットを提供します。

Akkaには、オープンソース版としてフォークされたApache Pekkoがあります。 この記事の内容は、AkkaとPekkoのどちらにも共通した内容になっています。

本記事はAkkaの中のコアとなっているAkka Actorについて、どういう動きなのか、使う時の注意点はなにかなどを中心に初歩的な使い方をまとめたいと思います。 公式説明を補完するような内容となっているため、実際にActorを活用するときには以下の公式説明も参照ください。

Actorの簡単な例

まずは簡単なActorの例をみてみます。

import org.apache.pekko.actor.typed.ActorSystem
import org.apache.pekko.actor.typed.Behavior
import org.apache.pekko.actor.typed.scaladsl.Behaviors

// HelloWorld アクター
object HelloWorld {

  // アクターで扱うメッセージの型
  sealed trait HelloWorldCommand
  final case class SayHello(name: String) extends HelloWorldCommand
  final case class SayGoodbye(name: String) extends HelloWorldCommand

  // メッセージ受信時の処理
  def greet(): Behavior[HelloWorldCommand] = {
      // Behaviorsがもつ各種メソッドで、Behaviorインスタンスを生成する
      // これがActorとしての処理本体となる
      Behaviors.receiveMessage { message =>
        // Actorがメッセージを受け取ったときの処理を実装する
        message match {
            case SayHello(name) =>
                // メッセージを受けて行う処理
                println(s"[HelloWorld] Hello ${name}!")

                Behaviors.same

            case SayGoodbye(name) =>
                println(s"[HelloWorld] Goodbye ${name}!")

                Behaviors.same
        }
      }
  }
}

object Main extends App {
    // HelloWorldを動作させる本体となるActorSystemを生成する
    val system: ActorSystem[HelloWorld.HelloWorldCommand] =
        ActorSystem(
            // ActorSystemに処理させたいActorを登録する
            HelloWorld.greet(),
            // Actorの名前をつける
            "hello"
        )

    println("[Main] Send Hello")
  
    // メッセージを送信
    system ! HelloWorld.SayHello("World")

    println("[Main] Send Goodbye")
    // メッセージを送信
    system ! HelloWorld.SayGoodbye("App")

    println("[Main] Finish")

    // ActorSystemの終了を指示
    system.terminate()
}

AkkaにはClassicとTypedの2パターンの記述方法があります。 上記はTypedの方式でActorを実装した例になります。 基本的にはTypedを使えばよいと思いますが、インターネットや書籍で調べる時にClassicの例で説明されていることもあるので読み解くときに注意が必要です。

このプログラムを実行すると以下のようなログが表示されます。

[Main] Send Hello
[Main] Send Goodbye
[Main] Finish
[HelloWorld] Hello World!
[HelloWorld] Goodbye App!

system ! HelloWorld.SayHello("World")のところがActorにメッセージを送っている処理です。 !tellの別名で非同期的にメッセージを送るメソッドになっています(system.!(~~), system.tell(~~)と書くのと同じです)。
!はメッセージを送るのみで、そのメッセージが処理されるのを待ち合わせていません。 実行ログをみるとMainクラスでの"Finish"の後にHelloWorldアクター内のログが表示されています。 つまり、Actor内の処理はメインスレッドとは別スレッドで実行されていることがわかります。

ActorとFutureの違い

Actorが非同期的に処理されることがわかったところで、ScalaのFutureとの違いを考えてみます。 ScalaではFutureを使って手軽にマルチスレッド処理を実装できます。 先ほどの例も以下のように手軽にFutureで実装できます。

Future {
    println(s"Hello $name") 
}

Futureとの違いを考えるために、先の例でのActorの内部の動きを図示してみます。

Actorの処理フロー

比較としてFutureの処理フローは以下のようなイメージになります。

Futureの処理フロー

Actorにメッセージを送るとメッセージキューにメッセージが追加されます。 Actor側ではメインスレッドとは別のスレッドを使ってキューのメッセージを逐次的に処理します。 この点がFutureを使う場合との違いの1つとなります。

  • Actor: メインスレッドとは並列で処理されるが、メッセージは1つのスレッドで逐次的に実行される
  • Future: Futureを呼び出すごとにそれぞれの処理が別々のスレッドで並列に実行される

Actorだからできること

次に別のActorをみてみます。

object StateMachine {

  sealed trait Command
  final case class Run(query: String) extends Command
  final case object Done extends Command

  // 待ち受け状態
  def idle(): Behavior[Command] = {
    Behaviors.receiveMessage { message =>
      message match {
        case Run(query) =>
          println(s"start running query=$query")
          running(query)

        case Done =>
          println(s"now idling! Invalid event: ${message}")
          Behaviors.same
      }
    }
  }

  // 実行中状態
  def running(query: String): Behavior[Command] = {
    Behaviors.receiveMessage { message =>
      message match {
        case Run(_) =>
          println(s"now running '$query' query. Invalid event: ${message}")
          Behaviors.same

        case Done =>
          println(s"done. query=$query")
          idle()
      }
    }
  }
}

object Main extends App {
  val system: ActorSystem[StateMachine.Command] =
    ActorSystem(StateMachine.idle(), "state")

  system ! StateMachine.Run("find")
  system ! StateMachine.Run("update")
  system ! StateMachine.Done
  system ! StateMachine.Run("update")
  system ! StateMachine.Done

  println("[Main] Finish")

  system.terminate()
}

実行結果は以下です。

[Main] Finish
start running query=find
now running 'find' query! invalid event: Run(update)
done. query=find
start running query=update
done. query=update

Behaviors.receiveMessageはメッセージを処理するためのBehaviorインスタンスを生成するメソッドです。 このreceiveMessageの中で実行する処理ブロックは何らかの処理をしたあとにBehaviorを戻り値として返します。 返答したBehaviorは次のメッセージを処理する時に使用されます。 Behavior.sameは次のメッセージに対しても同じ処理を行うように指示することを意味します。

ActorでのBehaviorの切り替え

この例では、Run("find")メッセージを処理したときにrunningメソッドで生成したBehaviorを返しています。 そのため、次に来たRun("update")メッセージはrunning側のロジックで処理され、「now running 'find' query」と表示されました。

このようにメッセージごとにロジックを切り替えられることで、状態遷移などを簡単に実装できるようになっています。

また、ロジックを変えるだけでなくActorで利用する値を保持させるような使い方もできます。

object Counter {

  sealed trait Command
  final case object Up extends Command
  final case object Reset extends Command

  // counterで回数を保持する
  def count(counter: Int): Behavior[Command] = {
      Behaviors.receiveMessage { message =>
        message match {
            case Up =>
                val newCount = counter + 1
                println(s"[Counter] count: ${newCount}")
                // 次の処理としてcounterが1増えた状態でのBehaviorを生成する
                count(newCount)

            case Reset =>
                val newCount = 0
                println(s"[Counter] reset count: ${newCount}")
                // 次の処理としてcounterが0になった状態でのBehaviorを生成する
                count(newCount)
        }
      }
  }
}

この時のポイントとしてActorの内部は1スレッドで処理しているということです。 そのため、Actorの状態として保持する情報(例でのcounterやqueryなど)は、マルチスレッド処理での同時更新の防止(synchronize)などを考える必要がありません。 つまり、以下のようにメッセージ送信を並列で行っても、カウンターの値は正確にカウントできます。

    Future { system ! Counter.Up }
    Future { system ! Counter.Up }
    Future { system ! Counter.Up }

Actorから値を取得する方法

Actorはメッセージを投げたらその処理完了を待ち合わせない使い方が基本となっているように思います。 ですが、Actorの処理を待ち合わせて値を取得する使い方もできます。

import scala.concurrent._
import scala.util._
import org.apache.pekko.actor.typed.scaladsl.AskPattern._
import org.apache.pekko.util.Timeout
import scala.concurrent.duration._

object CoinLocker {

  sealed trait Command
  final case class Put(item: String) extends Command

  // 値の返却先のActorのActorRefを渡す
  final case class Get(replyTo: ActorRef[Option[String]]) extends Command

  // アイテムを保持していない状態
  def empty(): Behavior[Command] = {
      Behaviors.receiveMessage { message =>
        message match {
            case Put(item) =>
                println(s"[Locker] item=$item")
                store(item)

            case Get(replyTo) =>
                println(s"[Locker] empty")
                replyTo ! None
                Behaviors.same
        }
      }
  }

  // アイテムを保持している状態
  def store(item: String): Behavior[Command] = {
      Behaviors.receiveMessage { message =>
        message match {
            case Put(_) =>
                println(s"[Locker] It's not empty, item=$item")
                Behaviors.same

            case Get(replyTo) =>
                println(s"[Locker] Get item=$item")
                // 値の返答先に向けて値を送信
                replyTo ! Some(item)
                empty()
        }
      }
  }
}

object Main extends App {
    val system: ActorSystem[CoinLocker.Command] =
        ActorSystem( CoinLocker.empty(), "locker")

    system ! CoinLocker.Put("my bag")

    // 2つ目のPutはNGになる
    system ! CoinLocker.Put("other bag")

    // Actorからデータ取得
    val timeout: Timeout = 3.seconds
    val result: Future[Option[String]] =
        system.ask(ref => CoinLocker.Get(ref))(timeout, system.scheduler)

    val ec = system.executionContext
    result.onComplete {
        case Success(Some(item))  => println(s"[Main] Got $item")
        case Success(None)        => println(s"[Main] No item")
        case Failure(ex)          => println(s"[Main] Failure: ${ex.getMessage}")
    }(ec)

    println("[Main] Finish")

    system.terminate()
}

実行結果は以下の通りです。

[Main] Finish
[Locker] item=my bag
[Locker] It's not empty, item=my bag
[Locker] Get item=my bag
[Main] Got my bag

Actorからデータを取得するときはメッセージの送信に!(tell)ではなく、askを使います(簡易な表記だと?)。 askに渡す処理ブロックのrefが戻り値を受け取るためのActorRefです。これに値を送信する(!する)とaskで取得したFutureを通して値を受け取れるようになっています。

ここまででかなり初歩となるActorの使い方を紹介しました。 より実践的な使い方は、公式のドキュメントに目を通すことをお勧めします。

Actorの使い所や注意点

ここからは、個人的に気になった点や注意点について記載していきます。

1スレッドで処理する利点

そもそもActorを使う利点を考えてみます。 Futureでのマルチスレッド処理に対して、Actorを使うことで以下のような利点があると思います。

  • アプリ全体での並列処理数を制限できる
  • Actor内の処理はsynchroniezedなどのマルチスレッドを意識した実装をする必要がない
    • システム全体で利用する情報(キャッシュやマスタ的な情報)の更新管理などを実装しやすい
      • シングルトンな情報の更新が複数スレッドで並列に実行されることを防ぐ
  • 単一のリソースを扱う処理を安全に実装できる
    • DBアクセスなどでのコネクション数を制限できる
    • 単一のファイルを読み書きするような操作をする処理を実装できる
      • 複数のスレッドで同時に同じファイルを編集/読み込みすることはむづかしい

逆に言うと上記のようなことを気にする必要がないのであれば、わざわざAkka/Pekko Actorを使わずともFutureを使う方がシンプルで良いと思います。

スレッドローカルと合わせて使うには

Java/Scalaではスレッドローカルな値を使うことがあります。 Actorの利用においてスレッドローカル変数を使うときには注意が必要です。 Actorのメッセージの送信元のスレッドと送信先のスレッドは別々のスレッドであるためスレッドローカルな値の受け渡しが必要です。

よく利用されるものとしてはlogbackでのMDCがあります。以下はこのMDCを使った例です。
(以下はサンプルとしての実装で、実用するにはもっと良い設計があると思います。)

object HelloWorld {

  sealed trait HelloWorldCommand
  final case class SayHello(name: String, mdccontext: Map[String, String]) extends HelloWorldCommand
  final case class SayGoodbye(name: String, mdccontext: Map[String, String]) extends HelloWorldCommand

  def greet(): Behavior[HelloWorldCommand] = {
    Behaviors.receiveMessage { message =>
      try {
        message match {
          case SayHello(name, mdccontext) =>
            MDC.setContextMap(mdccontext) // スレッドローカルに値を保持させる
            logger.info(s"Hello ${name}!") // このログ出力にMDCが利用させれる
            Behaviors.same

          case SayGoodbye(name, mdccontext) =>
            MDC.setContextMap(mdccontext)
            logger.info(s"Goodbye ${name}!")
            Behaviors.same
        }
      } finally {
        MDC.clear() // スレッドローカルな値をクリアする
      }
    }
  }
}

メッセージでスレッドローカルな情報を受け渡しして、それをActorの中でスレッドローカル変数に保持させています。 注意として、Actorの中で保持させたスレッドローカルな値は処理完了時に忘れずにクリアする必要があります。 Actorは複数のメッセージを同じスレッドで処理します。 そのため、別のメッセージにおいてスレッドローカルな値が影響してしまわないようにする必要があります。

ActorSystemの作成ルール

Actorを実行する時のコアとなるものがActorSystemですが、これの生成方法について個人的には以下のルールで利用するのがよいと考えています。

公式に明記されているわけではないですが以下の2つの観点から基本的にはこの想定で利用するのがよいと思っています。

  • ActorSystemは固定の1つの設定ファイルを読み込む
  • ActorSystemは終了処理が必要

ActorSystemはインスタンス化するときに設定ファイルとしてリソース上のapplication.confという名前のファイルを決め打ちで読み込むようになっています。 読み込むファイルを変更できるとはいえ、基本的にはapplication.confファイルを使うことが想定されています。 設定ファイルを単一のファイルにしているということは、アプリとしてActorSystem自体も1つだけであることを想定していると思います。

次に終了処理の観点ですが、 JavaではActorSystemの内部のスレッドが終了するまでJavaプロセス自体が終了しません。 ここまでの例でもActorにメッセージを送信した後にも処理が続いています。 (Main処理のprintln("Finish")の実行後も処理が続いている)
プロセスを終了させるにはActorSystemを明示的に終了させる必要があります。そのため、むやみにActorSystemを作成しない方がよいと思います。

// ActorSystemを終了させる
system.terminate()

ちなみに、ActorSystemを終了させることがアプリケーションの終了と同義であるとするなら、以下の設定をonにするとActorSystemが終了すると同時にJavaプロセス自体も停止されるようになります。

akka.coordinated-shutdown.exit-jvm = on

Actorの動的な登録方法について

作成済みのActorSystemインスタンスに対して、後付けでActorを登録するのはClassic方式であれば簡単にできます。 一方でTyped方式の場合は簡単には後から追加できないようです。 そのため、アプリケーション全体でActorをどのように管理するかを考えて実装する必要があります。

Typed方式で動的にActorを追加したい場合はSpawnProtocolという機能を使う必要があります。

Java/Scalaでの有名なWebフレームワークのPlayframeworkは内部的にAkka/Pekkoを使っています。 このPlayframeworkではActor専用のDIの仕組みがあり、これを使うことでActorを定義できるようになっています。 なので、Playを使う場合はこのDI機能を使うことを検討してください。

スレッドの管理

Actorを動かすスレッドはどこから生成されているでしょうか?

Akka/Pekko内で使うスレッドプールはActorSystem内で管理されています。 ActorSystemの設定において「dispatcher」という項目で設定します。

デフォルトではakka.actor.default-dispatcherという名前で定義されているDispacherを使います。 これは、CPUコア数のスレッド数でのForkJoinPoolのスレッドプールです。
また、これとは別にブロッキングな処理向けの固定スレッド数のakka.actor.default-blocking-io-dispatcherも定義されています。
これらのライブラリに既存定義されているもの以外にも、自分で好きな内容のDispatcherを定義できます。
どのスレッドプールを使うべきなのかは、実装するActorの内容やシステム全体の構成を考慮して考える必要があります。

Playframeworkでの注意点

PlayframeworkではAkka/PekkoをつかってHTTPのリクエストを受けつける処理をしています。 そこではデフォルトのdefault-dispatcher(ForkJoinPool)が使われます。 そのため、自前のActorが長時間ブロックする処理をしてしまうと、HTTPのリクエストを受け付ける処理が滞ってしまう可能性があります。 もし、自前のActorを作成するときは、その処理をどのスレッドプールで行うべきか?ということも含めて検討する必要があります。

Dispatcherによるスレッドの割当

まとめ

この記事ではAkka/Pekko Actorについてかなり初歩的な部分の説明に絞って説明しました。 ですが、Actorはもっと複雑な処理を実現できる強力なライブラリです。 ぜひ公式ドキュメントを読むなどして活用方法を知っていただければと思います。

また、今回は基礎としてActorを取り上げましたが、実際にAkka/Pekkoを活用するならAkka/Pekko Streamsの方が使いやすいと思います。 Actorに続けてStreamの使い方も身につけると、様々な処理を便利に実装できるようになると思います。