はじめに
DTダイナミクスでSREセクションのテックリードをしている霜鳥です。
前回の記事ではSQSのメッセージをEventBridge Pipesを経由してECSタスクで処理するという話題を取り扱いました。
しかしそれと同時にタスクの異常終了の検知やリトライができないというこのアーキテクチャの課題があることも述べました。
今回はその解決編としてEventBridge PipesからStep Functions(SFn)を経由してECSタスクを起動する方法について書いていきます。
その他、霜鳥が書いた過去の記事はこちら。
- Seekable OCIでECSのタスク起動速度をお手軽に改善しよう
- CodeBuild-hosted GitHub Actions runner使ってみた①〜紹介編〜
- CodeBuild-hosted GitHub Actions runner使ってみた②〜実践編〜
- 入社エントリ〜SREしもとり〜
- Datadog Summit Tokyo 2024参加レポ
今回の構成について
まず今回お話ししていく構成について前回の構成図と比較しながら概要をお伝えします。
前回の構成
こちらは前回の構成です。
処理の概要は以下の通り。
そしてこの構成の問題点としてはEventBridge PipesによるECSタスクの起動はFIRE_AND_FORGET(非同期)
であり、タスク起動した時点でSQSのメッセージを削除してしまいエラーハンドリングができないことにありました。
今回の構成
次にこちらが今回取り扱う構成を示したものです。
処理の概要は以下の通り。
- SQSにメッセージがエンキューされる
- ポーリングしていたEventBridge PipesがStep Functionsを起動
- SFn内の1stepとしてECSタスクを同期的に起動
- ECSタスクは処理したメッセージをKinesisへ送り終了する
- ECSタスクの終了後にタスクの終了コードをチェック
- エラーがない場合 → そのまま終了
- エラーがある場合 → エラーハンドリング用のLambdaを起動する
2.でEventBridge PipesがSFnの起動に成功した時点でSQSのメッセージは削除されます。
そのためリトライ処理を行いたい場合はSFnで回数を指定してリトライし、それでも失敗していた場合はSFnでDLQに送るというステップを追加することで従来のSQS/DLQ→Lambdaの仕組みを再現できるというわけです。
ただしmeviyはリトライ&DLQ送信ではなく処理に失敗したメッセージの加工とKinesisへの送信をするようにしたため、本記事はそれに倣ったステートマシンで記述します。
実装(Terraform)
重複部分については割愛しますので前回の記事もあわせてご覧いただければと思います。
SQSとEventBridge Pipes
SQSは前回と同じなので省略します。
前回まではECSにわたす細かな設定をすべてPipesのリソース内に入れていましたが、今回はECS関連の起動設定はSFn側に移ったのでシンプルになります。
################################### ## EventBridge Pipes ## ################################### resource "awscc_pipes_pipe" "this" { name = "example-pipe" role_arn = aws_iam_role.pipes.arn # sourceは変更がないため前回同様 source = aws_sqs_queue.this.arn source_parameters = { sqs = { sqs_queue_parameters = { batch_size = 1 } } } # targetをSFnにする target = aws_sfn_state_machine.example.arn target_parameters = { step_function_state_machine_parameters = { # Pipes→SFnは非同期で発火する # SFn→ECSは同期的に動かす invocation_type = "FIRE_AND_FORGET" } # マネコン上では「ターゲット入力トランスフォーマー」という項目に該当する箇所 input_template = "{\"body\": <$.body>}" } # 以下ログやタグ関連の記述は割愛 }
また、ECSの直接起動ではなくSFnの起動になったためrole_arn
に指定しているIAMロールには以下を追加します。
statement { effect = "Allow" actions = [ "states:StartExecution" ] resources = [ aws_sfn_state_machine.example.arn ] }
Step Funcions
ステートマシンの定義はかなり長くなりがちなので別ファイル(state_machine.json.tpl
)に切り出しておき、templatefile()
関数で変数展開していきます。
resource "aws_sfn_state_machine" "example" { name = local.name role_arn = aws_iam_role.state_machine.arn definition = templatefile( "${path.module}/state_machine.json.tpl", { cluster_arn = <ECSクラスターのARN> ecs_task_arn = <ECSタスクのARN> subnet_ids = <サブネットのIDリスト> security_group_ids = <セキュリティグループのIDリスト> transform_error_function_arn = <エラーハンドリング用のLambda関数のARN> kinesis_name = <Lambdaのエラーを送る先のKinesisのname> } ) }
分かりづらいのでまずはステートマシン図をご覧ください。
ポイントは以下です。
- Run ECS Taskで
arn:aws:states:::ecs:runTask.sync
を使う- 同期的なECSタスクの起動ができる
- Overridesで
"Command.$": "States.Array('/app/example.sh', States.JsonToString($[0].body))"
を使う - リトライ回数はこのプロジェクトではゼロ(
"MaxAttempts": 0
) - エラーハンドリングの構成
{ "Comment": "Run ECS task and check for failure", "StartAt": "Run ECS Task", "States": { "Run ECS Task": { "Type": "Task", "Resource": "arn:aws:states:::ecs:runTask.sync", "Parameters": { "Cluster": "${cluster_arn}", "TaskDefinition": "${ecs_task_arn}", "LaunchType": "FARGATE", "NetworkConfiguration": { "AwsvpcConfiguration": { "Subnets": ${subnet_ids}, "SecurityGroups": ${security_group_ids}, "AssignPublicIp": "ENABLED" } }, "Overrides": { "ContainerOverrides": [{ "Name": "example-container", "Command.$": "States.Array('/app/example.sh', States.JsonToString($[0].body))" }] }, "PropagateTags": "TASK_DEFINITION" }, "Retry": [ { "ErrorEquals": ["States.ALL"], "MaxAttempts": 0 } ], "Catch": [ { "ErrorEquals": ["States.ALL"], "Next": "Convert Error Message" } ], "Next": "Success" }, "Convert Error Message": { "Type": "Task", "Resource": "${transform_error_function_arn}", "ResultPath": "$.errorMessage", "Next": "Send to Kinesis" }, "Send to Kinesis": { "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:kinesis:putRecord", "Parameters": { "StreamName": "${kinesis_name}", "PartitionKey.$": "$$.Execution.Id", "Data.$": "$.errorMessage" }, "End": true }, "Success": { "Type": "Succeed" } } }
このステートマシンに与えるIAM権限は以下です。
"ecs:RunTask"
"ecs:StopTask"
"ecs:DescribeTasks"
"ecs:DescribeTaskDefinition"
"ecs:DescribeClusters"
"iam:PassRole"
"events:PutTargets"
"events:PutRule"
"events:DescribeRule"
"logs:CreateLogStream"
"logs:PutLogEvents"
"lambda:InvokeFunction"
"kinesis:PutRecord"
最後に
今回ご紹介した内容でECSタスクのアプリ外で発生するエラーハンドリングが可能になりました。
このようにDTダイナミクスのSREチームでは要件の上流からアプリ開発のチームと連携し、様々な新しいチャレンジを通して#時間戦略
や#顧客時間価値
にコミットしようとしています。
わたしたちミスミ、そしてDTダイナミクスは一緒にmeviyを通して世界の製造業を支える仲間を募集しています!
少しでも興味のある方はぜひカジュアル面談しましょう!