質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.48%
Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

Q&A

解決済

3回答

1410閲覧

データパイプライン待ち受け処理の実装案

ikasoumen

総合スコア110

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

0グッド

1クリップ

投稿2020/01/04 11:05

前提・実現したいこと

依存関係がある処理を待ち受けする場合に、どのように実装するか困っています。
以下のような処理を非同期・並列で実行する場合、プロセス3を実現するためのアイデアは何があるでしょうか?

・(ジョブ例)カレーを作る
プロセス1.食材を買いに行く(並列処理)
プロセス2.食材を加工する(並列処理)
プロセス3.全ての食材を同時に煮込む

・やりたいこと

ジョブはキューに積むこととする。
プロセス1・2は、可能な限り並列で作業します。(食材の数だけ処理がある)
プロセス1・2は、食材単位で依存関係があります。
プロセス1の失敗率は高いです。(売り切れや店休日など)
プロセス1はかなり重い処理です。
プロセス3は、プロセス1・プロセス2の前処理が全て完了したら開始することとする。
プロセス3は、何かしらのタスクが失敗していたら自動的には実行しない。

発生している問題・エラーメッセージ

プロセス1・2を並列で作業させるため、AWS SQSの分散メッセージキューを使おうと思っています。
しかし、並列化により、これまでは、単に逐次処理していたプロセス3の扱いに困ってしまいました。

プロセス3を実現するためには、プロセス1・プロセス2を構成する全タスクが完了したことを検知できなくてはいけません。

考えていること

・食材を買った、加工したなどの予想しているタスクの数とそのステータスを管理する(難しそう)
・依存関係と並列処理はトレードオフ?luigiなど依存関係を処理できるフレームワークを併用する?
・食材単位の並列化は難しいので、料理単位の並列化で妥協する

FW/ツールのバージョンなど

Python3.6
AWS SQS
Luigi
Airflow
Celery

データパイプラインを実装するのは初めてで、
参考書の紹介などなんでも知恵をお貸し頂けると幸いです。

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guest

回答3

0

ベストアンサー

依存関係がある処理を待ち受けする場合に、どのように実装するか困っています。

以下のような処理を非同期・並列で実行する場合、プロセス3を実現するためのアイデアは何があるでしょうか?

このようなケースでは、タスク及び依存関係をグラフで管理します。
質問の例の場合、タスク (例: にんじんを買う) を点で表し、依存関係 (にんじんを調理するには、にんじんを買うタスクが完了している必要がある) を有向辺で表してグラフを作成すると、以下のようになります。

イメージ説明

有向非巡回グラフ であれば、トポロジカルソート で依存関係が守られるように点をソートできるので、そのソート結果に基づき、そのタスクが依存している1つ前のタスクが完了しているのであれば、そのタスクを新たにスレッドを作って開始するという実装になると思います。

質問に記載されているluigi を見てみたところ、上記の Dependency graph を扱う仕組みが備わっているようなので、このライブラリの使い方を調べてみると、やりたい事が実現できそうです。
(すみませんが、自分は使ったことがないので、具体的なコードは示せないです)

投稿2020/01/04 12:20

編集2020/01/04 12:21
tiitoi

総合スコア21956

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

ikasoumen

2020/01/04 15:25

ありがとうございます。 感覚的には実現できそうで自信がつきました。 自分で頑張って具体例を増やさないといけませんね。 ちなみに、有向非巡回グラフなどの言葉は大学で習うのでしょうか?
tiitoi

2020/01/04 15:42

自作することもできるとは思いますが、質問に記載のある Luigi、Airflow のようなライブラリを使うと簡単に実現できそうな気がします。 依存関係を考慮しつつ、タスクの実行を管理するアプリケーションを「ワークフローエンジン」というみたいです。 https://qiita.com/elyunim26/items/15db924e4c9833e5050a > ちなみに、有向非巡回グラフなどの言葉は大学で習うのでしょうか? グラフ理論という数学の分野の用語になります。大学で習う人もいるとは思います。 今回のような依存関係の解決の他、様々な問題でグラフ理論が使われています。 (地図アプリの経路検索、Google 検索など)
ikasoumen

2020/01/04 16:18

Airflowは多機能で興味があるんですが、学習コストが高そうな声があったので、まずは軽量なluigiを試しています。 airflowも試したいと思ってるんですが、 スケジュールの都合もあり、luigiを使いつつ、次の改善のフェーズでairflowと比較できればなと思っています。
guest

0

イメージとしてはプロセス1+2の塊を食材ごとに並列実行する感じですかね。
AWS使ってるならStep Functionsを使うのもアリかも…。

ちょっと面倒なjsonを書く必要があって書き方を覚えるのがちょっと面倒ですが、最初のHello,Worldサンプルがちょうどサンプルコードとしていい感じだと思います。(せめてYAMLで書ければいいと思うんですが、まだ対応してません…)
また、様々なサンプルプロジェクトがあります。
Parallel Stateで複数のプロセス1+2を並べて、それらすべてが終わったらプロセス3が開始するようにフローを組んでやれば行けそうな気がしますね。
プロセス1がエラーになった時のリトライなんかも定義できたはずです。

ちょっとフローが複雑になりそうなので実際に試してみないとできるかわかりませんが、参考になりましたら幸いです。

投稿2020/01/04 15:02

yu_1985

総合スコア7447

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

ikasoumen

2020/01/04 16:02

ありがとうございます。 参考になります。 現在AWSを利用しているので、興味があります。 時間を取ってトライしてみたいと思います。 ただ、この手のサービスはバージョン管理ができなかったり、 コードの管理リポジトリが一部AWSになってしまうなど、管理の面で考え事が少し必要そうですね。
guest

0

プロセス3も並列実行する必要はないでしょう。
2が終わる、というのが絶対条件となるため、2が終わったら、1の終了をチェックしてそのまま3を実行すればいいでしょうね。

投稿2020/01/04 12:00

編集2020/01/04 12:01
y_waiwai

総合スコア87774

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

ikasoumen

2020/01/04 15:11

はい。プロセス3は並列ではなありません。 tiitoi様の回答を参考にしていただけると幸いです。
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.48%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問