質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.49%
AWS(Amazon Web Services)

Amazon Web Services (AWS)は、仮想空間を機軸とした、クラスター状のコンピュータ・ネットワーク・データベース・ストーレッジ・サポートツールをAWSというインフラから提供する商用サービスです。

Q&A

0回答

403閲覧

Activity StateMachine をポーリングするFargate on ECS Taskの挙動について

zvub1123

総合スコア230

AWS(Amazon Web Services)

Amazon Web Services (AWS)は、仮想空間を機軸とした、クラスター状のコンピュータ・ネットワーク・データベース・ストーレッジ・サポートツールをAWSというインフラから提供する商用サービスです。

0グッド

1クリップ

投稿2019/05/21 04:15

編集2019/05/22 04:24

実現したいこと

テーブルA, テーブルB, テーブルC の三つのテーブルを
SQLServerからPostgreSQLへマイグレーションする仕組みを作成しております。
その際、ActivityでTaskを待ち受けるStateMachineはParallel実行とし、上記の各テーブルのマイグレーションを並列で処理しています。

以下はアクティビティ用のステートマシンです。

json

1{ 2 "Comment": "StateMachine of Tables defined by Fargate", 3 "StartAt": "Parallel", 4 "States": { 5 "Parallel": { 6 "Type": "Parallel", 7 "Next": "Final State", 8 "Branches": [ 9 { 10 "StartAt": "TableA受信0", 11 "States": { 12 "TableA受信0": { 13 "Type": "Task", 14 "Resource": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXX:activity:MigrateTableA", 15 "Next": "TableA受信1" 16 }, 17 "TableA受信1": { 18 "Type": "Task", 19 "Resource": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXX:activity:MigrateTableA", 20 "Next": "TableA受信2" 21 }, 22 "TableA受信2": { 23 "Type": "Task", 24 "Resource": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXX:activity:MigrateTableA", 25 "End": true 26 } 27 } 28 }, 29 { 30 "StartAt": "TableB受信0", 31 "States": { 32 "TableB受信0": { 33 "Type": "Task", 34 "Resource": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXX:activity:MigrateTableB", 35 "Next": "TableB受信1" 36 }, 37 "TableB受信1": { 38 "Type": "Task", 39 "Resource": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXX:activity:MigrateTableB", 40 "Next": "TableB受信2" 41 }, 42 "TableB受信2": { 43 "Type": "Task", 44 "Resource": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXX:activity:MigrateTableB", 45 "End": true 46 } 47 } 48 }, 49 { 50 "StartAt": "TableC受信0", 51 "States": { 52 "TableC受信0": { 53 "Type": "Task", 54 "Resource": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXX:activity:MigrateTableC", 55 "Next": "TableC受信1" 56 }, 57 "TableC受信1": { 58 "Type": "Task", 59 "Resource": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXX:activity:MigrateTableC", 60 "Next": "TableC受信2" 61 }, 62 "TableC受信2": { 63 "Type": "Task", 64 "Resource": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXX:activity:MigrateTableC", 65 "End": true 66 } 67 } 68 } 69 ] 70 }, 71 "Final State": { 72 "Type": "Pass", 73 "End": true 74 } 75 } 76}

また、各ECS TaskはCloudWatch Eventにて所定のタイミングで実行されるようになっています。
イベントターゲットは以下の通りです。

json

1{ 2 "Targets": [ 3 { 4 "RoleArn": "arn:aws:iam::XXXXXXXXXXXX:role/My_ROLE", 5 "EcsParameters": { 6 "NetworkConfiguration": { 7 "awsvpcConfiguration": { 8 "Subnets": [ 9 "subnet-00AAAAAAAABCDE" 10 ], 11 "SecurityGroups": [ 12 "sg-XYZXYZXYZ", 13 "sg-ABC123DEF" 14 ], 15 "AssignPublicIp": "ENABLED" 16 } 17 }, 18 "LaunchType": "FARGATE", 19 "TaskDefinitionArn": "arn:aws:ecs:ap-northeast-1:XXXXXXXXXXXX:task-definition/MigrateTableAFargate", 20 "TaskCount": 1 21 }, 22 "Id": "MigrateTableAFargate", 23 "Arn": "arn:aws:ecs:ap-northeast-1:XXXXXXXXXXXX:cluster/test-cluster" 24 } 25 ] 26}

想定していた動作

「Fargate用のイベントとActivity用のStateMachineを同時に実行すると、並列でTableA, B, Cのマイグレーション処理が開始され、
それぞれのジョブの完了後にステートマシンが終了する」という流れを想定していました。

発生していること

上記の想定通りに実行できるかと思っておりましたが、実際にステートマシン、タスクを共に起動すると
3つのそれぞれのタスクの開始のタイミングが並列になっていることは確認できたのですが、
「TableBのマイグレーションが終わり、TableA, Cの終了を待つ」
のではなく
「TableBのマイグレーション終了後、TableA, Cが実行中の場合、TableBのマイグレーション用のTaskが再実行されてしまう」
という事象が発生しております。

TableBの処理は一度は完了するものの、A, Cの処理が完了していないためECSのTaskのポーリングが繰り返されているのでは無いかと推測しているのですが
それらしい情報やナレッジ等が見つからず困っております。

ご助力いただけますでしょうか。。

追記:
Fargate用のTaskを、Branch内の処理が終了したら落とすように改修したところ、無限ループからは抜け出せました。

改修前:

java

1while (true) { 2 GetActivityTaskResult getActivityTaskResult = client.getActivityTask( 3 new GetActivityTaskRequest().withActivityArn(ACTIVITY_ARN)); 4 5 if (getActivityTaskResult.getTaskToken() != null) { 6 try { 7 // Fargate Task 処理 8 9 client.sendTaskSuccess(new SendTaskSuccessRequest() 10 .withOutput(sfResult) 11 .withTaskToken(getActivityTaskResult.getTaskToken())); 12 System.out.println("Worker SendTask Success."); 13 } catch (Exception e) { 14 client.sendTaskFailure(new SendTaskFailureRequest() 15 .withTaskToken(getActivityTaskResult.getTaskToken())); 16 logger.error("StepFunctions Failed\n" + e.getMessage() + "\n"); 17 System.out.println("Worker Failure."); 18 } 19 } else { 20 sleep(1000); 21 } 22}

改修後

java

1while (true) { 2 GetActivityTaskResult getActivityTaskResult = client.getActivityTask( 3 new GetActivityTaskRequest().withActivityArn(ACTIVITY_ARN)); 4 5 if (getActivityTaskResult.getTaskToken() != null) { 6 try { 7 // Fargate Task 処理 8 9 client.sendTaskSuccess(new SendTaskSuccessRequest() 10 .withOutput(sfResult) 11 .withTaskToken(getActivityTaskResult.getTaskToken())); 12 System.out.println("Worker SendTask Success."); 13 } catch (Exception e) { 14 client.sendTaskFailure(new SendTaskFailureRequest() 15 .withTaskToken(getActivityTaskResult.getTaskToken())); 16 logger.error("StepFunctions Failed\n" + e.getMessage() + "\n"); 17 System.out.println("Worker Failure."); 18 } 19 } else { 20 break; // loop out 21 } 22}

当初の問題は解決していますが、モヤモヤが残っている状態です。

Taskの側ではStateMachineのParallel処理におけるBranchごとの終了ステータスを見ていないのでしょうか。(正常な動作ではあると思いますが。。。)
また、その場合、Activityをポーリングするタスクというのはソース上などから明示的に閉じる必要があるのでしょうか。
その辺りに詳しい方が居ましたら、ご教示いただけると幸いです。

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

まだ回答がついていません

会員登録して回答してみよう

アカウントをお持ちの方は

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.49%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問