ECSタスクをSQSのコンシューマとしてイベント駆動で起動させよう①

はじめに

DTダイナミクスでSREセクションのテックリードをしている霜鳥です。
今回はAmazon SQSのメッセージをLambdaよりも柔軟なワークロードに対応でき、
処理時間も長くとれるECSタスクで処理させよう!という話題で全2回を予定しております。

その他、霜鳥が書いた過去の記事はこちら。

シンプルなSQS + Lambda構成とその制限

AWSを使っている方が「何らかの処理を非同期的に処理したい」と聞いて真っ先に思い浮かべるのはSQS + Lambdaという構成ではないでしょうか。
データをSQSへエンキューするとLambdaが発火し、処理の結果を後続のKinesisへ流すというありがちでシンプルな構成を考えてみましょう。
DLQの有無などの差異はあれど、概ね以下のような構成になるかと思います。

イベントドリブン-Lambda

この構成の素晴らしいところはポーリング、メッセージの削除、可視性タイムアウト、リトライ、DLQへの遷移などをコードとして記述することなく、
AWSの設定だけで構築可能という「Configuration over Code」の思想を体現したようなシンプルさです。
そのためアプリエンジニアは入力されたメッセージの処理の実装に集中できます。

しかしこの構成にもLambdaの最大15分のタイムアウトという制限があります

ECSタスクへ

meviyではCADデータの解析プログラムなど、ユーザの入力によって実行時間の幅が非常に大きい処理があるため、 コンピューティングに15分制限があるLambdaを使う構成を取れない箇所が往々にしてあります。
それでもSQSなどを使って非同期的な処理を実装したいとなった場合、 ECSサービスやEC2内に立てた常駐サービスにポーリングをはじめとしたSQSの操作処理を自前で実装する必要があります。
不可能ではありませんがアプリエンジニアが本当に実装したい処理に集中できませんし、 何よりSQSを扱うすべてのサービスに同じコードを追加しなければなりません。

そこでSQS + LambdaライクにECSタスクを扱えないかを調べたところEventBridge Pipesを用いることで実現可能であることが分かりました!

イベントドリブン-ECS

SQSにエンキューされたメッセージはEventBridge Pipesのポーリングに引っかかり、ECSタスクを起動するという流れです。
次項で実際のTerraformのサンプルコードをご紹介します。

cf: Amazon EventBridge Pipes

サンプルコード(Terraform)

ECSタスクやネットワークなどは割愛し、SQSとEventBridge Pipes、そして権限周りのみのサンプルを以下に載せます。

SQSとEventBridge Pipes

一番のポイントはECSタスクのコンテナオーバーライドのところです。
SQSメッセージのbodyを$.bodyとしてコンテナへ渡しています。
もちろんcommand以外でもオーバーライドできるところであればどこにもでSQSメッセージを入れることは可能です。

###################################
##              SQS              ##
###################################
# トリガーとなるSQSを用意します
# サンプルのためDLQは省略
resource "aws_sqs_queue" "this" {
  name = "example-queue"
}

###################################
##       EventBridge Pipes       ##
###################################
resource "awscc_pipes_pipe" "this" {
  name     = "example-pipe"
  role_arn = aws_iam_role.pipes.arn
  # パイプのソースには上で作成したSQSを指定
  source   = aws_sqs_queue.this.arn
  source_parameters = {
    sqs = {
      sqs_queue_parameters = {
        # ここを増やすと複数のメッセージをECSタスクに配列形式で渡すことになる
        batch_size = 1
      }
    }
  }

  # ターゲット = ECSタスク
  target = <ECSクラスターのARN> # ここはクラスターなので注意!
  target_parameters = {
    ecs_task_parameters = {
      task_definition_arn = <ECSタスクのARN> # ここでタスクを指定する
      launch_type         = "FARGATE"
      task_count          = 1
      # ネットワーク設定はECSタスクをPipes以外で起動するときと同じものを指定すればOK
      network_configuration = {
        awsvpc_configuration = {
          assign_public_ip = "DISABLED"
          subnets          = var.subnet_ids
          security_groups  = var.security_group_ids
        }
      }
      # ここのオーバーライドでタスクのコマンドにSQSメッセージのbodyを渡す
      overrides = {
        container_overrides = [
          {
            # SQSメッセージを渡したいコンテナ名
            name = "example-container"
            # 例としてSQSメッセージのbodyを環境変数として設定し、それを--dataオプションとして渡す
            command = [
              "sh",
              "-c",
              "/app/example.sh --data \"$EVENT_BODY\""
            ]
            environment = [
              { "name" : "EVENT_BODY", "value" : "$.body" } # ←ポイント!
            ]
          }
        ]
      }
    }
  }

  # タスクの起動失敗などPipesのログの出力先
  log_configuration = {
    cloudwatch_logs_log_destination = {
      log_group_arn = <CloudWatch LogsのロググループのARN>
    }
    # エラーログだけを捕まえる
    level = "ERROR"
  }
}

IAM

resource "aws_iam_role" "pipes" {
  name               = "example-pipes-role"
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "pipes.amazonaws.com"
        }
      }
    ]
  })
}
resource "aws_iam_role_policy" "pipes_inline_policy" {
  role = aws_iam_role.pipes.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      # PipesがSQSからメッセージを取得できるようにする
      {
        Effect = "Allow"
        Action = [
          "sqs:ReceiveMessage",
          "sqs:DeleteMessage",
          "sqs:GetQueueAttributes"
        ]
        Resource = [aws_sqs_queue.this.arn]
      },
      # PipesがECSタスクを起動できるようにする
      {
        Effect = "Allow"
        Action = [
          "ecs:RunTask",
          "ecs:DescribeTasks",
          "ecs:DescribeTaskDefinition",
          "ecs:DescribeClusters",
          "iam:PassRole"
        ]
        Resource = ["*"]
      },
      # Pipesがログを出力できるようにする
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = ["${aws_cloudwatch_log_group.pipes.arn}:*"]
      }
    ]
  })
}

課題と次回予告

と、ここまで書いてきましたが実はこの構成には重大な欠陥があります。
EventBridge PipesのECSタスクの起動設定はFIRE_AND_FORGET(非同期)、つまりタスク起動に成功した時点でPipesはSQSからメッセージを削除します。
そのためECSタスクの起動に失敗した場合(※)を除いてリトライ処理やDLQへの退避ができないわけです。
(※タスク実行ロールの権限不足やFargateのリソース不足、ECRとのネットワーク接続不良など) アプリケーションのエラーであれば適切なエラーハンドリングで如何様にも対処できるのですが、
OOM(OutOfMemory)などアプリケーションの外側でのエラーによるタスクの異常終了に対処できません。

この問題の解決編を次の記事で公開予定です。どうぞお楽しみに。

さいごに

DTダイナミクスのSREチームでは要件の上流からアプリ開発のチームと連携し、様々な新しいチャレンジを通して#時間戦略#顧客時間価値にコミットしようとしています。
わたしたちミスミ、そしてDTダイナミクスは一緒にmeviyを通して世界の製造業を支える仲間を募集しています!
少しでも興味のある方はぜひカジュアル面談しましょう!

www.wantedly.com