ECSタスクをSQSのコンシューマとしてイベント駆動で起動させよう②

はじめに

DTダイナミクスでSREセクションのテックリードをしている霜鳥です。
前回の記事ではSQSのメッセージをEventBridge Pipesを経由してECSタスクで処理するという話題を取り扱いました。
しかしそれと同時にタスクの異常終了の検知やリトライができないというこのアーキテクチャの課題があることも述べました。
今回はその解決編としてEventBridge PipesからStep Functions(SFn)を経由してECSタスクを起動する方法について書いていきます。

その他、霜鳥が書いた過去の記事はこちら。

今回の構成について

まず今回お話ししていく構成について前回の構成図と比較しながら概要をお伝えします。

前回の構成

こちらは前回の構成です。

イベントドリブン-ECS

処理の概要は以下の通り。

  1. SQSにメッセージがエンキューされる
  2. ポーリングしていたEventBridge PipesがECSタスクを起動
  3. ECSタスクは処理したメッセージをKinesisへ送り終了する

そしてこの構成の問題点としてはEventBridge PipesによるECSタスクの起動はFIRE_AND_FORGET(非同期)であり、タスク起動した時点でSQSのメッセージを削除してしまいエラーハンドリングができないことにありました。

今回の構成

次にこちらが今回取り扱う構成を示したものです。

イベントドリブン-StepFunctions

処理の概要は以下の通り。

  1. SQSにメッセージがエンキューされる
  2. ポーリングしていたEventBridge PipesStep Functionsを起動
  3. SFn内の1stepとしてECSタスクを同期的に起動
  4. ECSタスクは処理したメッセージをKinesisへ送り終了する
  5. ECSタスクの終了後にタスクの終了コードをチェック
    1. エラーがない場合 → そのまま終了
    2. エラーがある場合 → エラーハンドリング用のLambdaを起動する

2.でEventBridge PipesSFnの起動に成功した時点でSQSのメッセージは削除されます。
そのためリトライ処理を行いたい場合はSFnで回数を指定してリトライし、それでも失敗していた場合はSFnでDLQに送るというステップを追加することで従来のSQS/DLQ→Lambdaの仕組みを再現できるというわけです。
ただしmeviyはリトライ&DLQ送信ではなく処理に失敗したメッセージの加工とKinesisへの送信をするようにしたため、本記事はそれに倣ったステートマシンで記述します。

実装(Terraform)

重複部分については割愛しますので前回の記事もあわせてご覧いただければと思います。

SQSとEventBridge Pipes

SQSは前回と同じなので省略します。
前回まではECSにわたす細かな設定をすべてPipesのリソース内に入れていましたが、今回はECS関連の起動設定はSFn側に移ったのでシンプルになります。

###################################
##       EventBridge Pipes       ##
###################################
resource "awscc_pipes_pipe" "this" {
  name     = "example-pipe"
  role_arn = aws_iam_role.pipes.arn

  # sourceは変更がないため前回同様
  source   = aws_sqs_queue.this.arn
  source_parameters = {
    sqs = {
      sqs_queue_parameters = {
        batch_size = 1
      }
    }
  }

  # targetをSFnにする
  target = aws_sfn_state_machine.example.arn
  target_parameters = {
    step_function_state_machine_parameters = {
      # Pipes→SFnは非同期で発火する
      # SFn→ECSは同期的に動かす
      invocation_type = "FIRE_AND_FORGET"
    }
    # マネコン上では「ターゲット入力トランスフォーマー」という項目に該当する箇所
    input_template = "{\"body\": <$.body>}"
  }

  # 以下ログやタグ関連の記述は割愛
}

また、ECSの直接起動ではなくSFnの起動になったためrole_arnに指定しているIAMロールには以下を追加します。

  statement {
    effect = "Allow"
    actions = [
      "states:StartExecution"
    ]
    resources = [
      aws_sfn_state_machine.example.arn
    ]
  }

Step Funcions

ステートマシンの定義はかなり長くなりがちなので別ファイル(state_machine.json.tpl)に切り出しておき、templatefile()関数で変数展開していきます。

resource "aws_sfn_state_machine" "example" {
  name     = local.name
  role_arn = aws_iam_role.state_machine.arn

  definition = templatefile(
    "${path.module}/state_machine.json.tpl",
    {
      cluster_arn                  = <ECSクラスターのARN>
      ecs_task_arn                 = <ECSタスクのARN>
      subnet_ids                   = <サブネットのIDリスト>
      security_group_ids           = <セキュリティグループのIDリスト>
      transform_error_function_arn = <エラーハンドリング用のLambda関数のARN>
      kinesis_name                 = <Lambdaのエラーを送る先のKinesisのname>
    }
  )
}

分かりづらいのでまずはステートマシン図をご覧ください。
ステートマシン図
ポイントは以下です。

  • Run ECS Taskでarn:aws:states:::ecs:runTask.syncを使う
    • 同期的なECSタスクの起動ができる
  • Overridesで"Command.$": "States.Array('/app/example.sh', States.JsonToString($[0].body))"を使う
    • 配列形式でのコマンドの渡し方(States.Array()関数)
    • Pipesのターゲット入力トランスフォーマーで取り出した値の取得の仕方($[0].body)
  • リトライ回数はこのプロジェクトではゼロ("MaxAttempts": 0)
  • エラーハンドリングの構成
    • Lambdaではシンプルにメッセージの変換のみを行い、変換結果をSFnに返す
    • Lambdaからの返却値をSFnKinesisへPUTする
{
  "Comment": "Run ECS task and check for failure",
  "StartAt": "Run ECS Task",
  "States": {
    "Run ECS Task": {
      "Type": "Task",
      "Resource": "arn:aws:states:::ecs:runTask.sync",
      "Parameters": {
        "Cluster": "${cluster_arn}",
        "TaskDefinition": "${ecs_task_arn}",
        "LaunchType": "FARGATE",
        "NetworkConfiguration": {
          "AwsvpcConfiguration": {
            "Subnets": ${subnet_ids},
            "SecurityGroups": ${security_group_ids},
            "AssignPublicIp": "ENABLED"
          }
        },
        "Overrides": {
          "ContainerOverrides": [{
            "Name": "example-container",
            "Command.$": "States.Array('/app/example.sh', States.JsonToString($[0].body))"
          }]
        },
        "PropagateTags": "TASK_DEFINITION"
      },
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "MaxAttempts": 0
        }
      ],
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "Convert Error Message"
        }
      ],
      "Next": "Success"
    },
    "Convert Error Message": {
      "Type": "Task",
      "Resource": "${transform_error_function_arn}",
      "ResultPath": "$.errorMessage",
      "Next": "Send to Kinesis"
    },
    "Send to Kinesis": {
      "Type": "Task",
      "Resource": "arn:aws:states:::aws-sdk:kinesis:putRecord",
      "Parameters": {
        "StreamName": "${kinesis_name}",
        "PartitionKey.$": "$$.Execution.Id",
        "Data.$": "$.errorMessage"
      },
      "End": true
    },
    "Success": {
      "Type": "Succeed"
    }
  }
}

このステートマシンに与えるIAM権限は以下です。

  • "ecs:RunTask"
  • "ecs:StopTask"
  • "ecs:DescribeTasks"
  • "ecs:DescribeTaskDefinition"
  • "ecs:DescribeClusters"
  • "iam:PassRole"
  • "events:PutTargets"
  • "events:PutRule"
  • "events:DescribeRule"
  • "logs:CreateLogStream"
  • "logs:PutLogEvents"
  • "lambda:InvokeFunction"
  • "kinesis:PutRecord"

最後に

今回ご紹介した内容でECSタスクのアプリ外で発生するエラーハンドリングが可能になりました。
このようにDTダイナミクスのSREチームでは要件の上流からアプリ開発のチームと連携し、様々な新しいチャレンジを通して#時間戦略#顧客時間価値にコミットしようとしています。
わたしたちミスミ、そしてDTダイナミクスは一緒にmeviyを通して世界の製造業を支える仲間を募集しています!
少しでも興味のある方はぜひカジュアル面談しましょう!

www.wantedly.com