🎄teratailクリスマスプレゼントキャンペーン2024🎄』開催中!

\teratail特別グッズやAmazonギフトカード最大2,000円分が当たる!/

詳細はこちら
並列処理

複数の計算が同時に実行される手法

マルチスレッド

マルチスレッドは、どのように機能がコンピュータによって実行したのかを、(一般的にはスレッドとして参照される)実行の複合的な共同作用するストリームへ区分することが出来ます。

非同期処理

非同期処理とは一部のコードを別々のスレッドで実行させる手法です。アプリケーションのパフォーマンスを向上させる目的でこの手法を用います。

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

配列

配列は、各データの要素(値または変数)が連続的に並べられたデータ構造です。各配列は添え字(INDEX)で識別されています。

Q&A

解決済

1回答

4810閲覧

pythonを使って1つの配列に対して並列処理させたい

pokemonta

総合スコア170

並列処理

複数の計算が同時に実行される手法

マルチスレッド

マルチスレッドは、どのように機能がコンピュータによって実行したのかを、(一般的にはスレッドとして参照される)実行の複合的な共同作用するストリームへ区分することが出来ます。

非同期処理

非同期処理とは一部のコードを別々のスレッドで実行させる手法です。アプリケーションのパフォーマンスを向上させる目的でこの手法を用います。

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

配列

配列は、各データの要素(値または変数)が連続的に並べられたデータ構造です。各配列は添え字(INDEX)で識別されています。

0グッド

1クリップ

投稿2021/01/29 07:47

編集2021/02/01 01:43

itemsは配列でその下に多数のオブジェクトが格納されています。
現在は、シングルスレッドで一つずつ処理していますが
マルチスレッドで並列処理させたいです。

私の方で100件ずつ処理するプログラムを考えましたが
いかんせん、初心者のためヘンテコな構造となっております。
1つの配列に対してマルチスレッドで処理する方法をご教示頂けないでしょうか

single

1items =[ 2 {'blog': 'blog1', 'count': '111', 'user_name': 'user1'}, 3 {'blog': 'blog2', 'count': '112', 'user_name': 'user2'}, 4 {'blog': 'blog3', 'count': '113', 'user_name': 'user3'}, 5   ・ 6   ・ 7   ・ 8 {'blog': 'blog1000', 'count': '11113', 'user_name': 'user1000'} 9 ] 10 11 table = dynamodb.Table('Movies') 12 for item in items: 13 response = table.put_item(Item=item)

Multithread

1items =[ 2 {'blog': 'blog1', 'count': '111', 'user_name': 'user1'}, 3 {'blog': 'blog2', 'count': '112', 'user_name': 'user2'}, 4 {'blog': 'blog3', 'count': '113', 'user_name': 'user3'}, 5   ・ 6   ・ 7   ・ 8 {'blog': 'blog1000', 'count': '11113', 'user_name': 'user1000'} 9 ] 10 11 process_list = [] 12 with ThreadPoolExecutor() as executor: 13 i = 0 14  table = dynamodb.Table('Movies') 15  for item in items: 16  if i > 100: 17   process_list.append(executor.submit(table.put_item(Item=item))) 18  elif i > 200: 19   process_list.append(executor.submit(table.put_item(Item=item))) 20   ・ 21   ・ 22   ・ 23  elif i > 900: 24   process_list.append(executor.submit(table.put_item(Item=item))) 25  else: 26   process_list.append(executor.submit(table.put_item(Item=item))) 27   28   i+=1 29 for process in process_list: 30 print(process.result())

items =[ {'blog': 'blog1', 'count': '111', 'user_name': 'user1'}, {'blog': 'blog2', 'count': '112', 'user_name': 'user2'}, {'blog': 'blog3', 'count': '113', 'user_name': 'user3'},   ・   ・   ・ {'blog': 'blog1000', 'count': '11113', 'user_name': 'user1000'} ] #リスト分ける newList = split_list(items,100) #スレッドプールを作る tpe = ThreadPoolExecutor(max_workers=100) for _ in range(newList.len): tpe.submit(put_to_db) tpe.shutdown() print("完了") #リストを分ける処理 def split_list(l, n): for idx in range(0, len(l), n): return l[idx:idx + n] #DBに登録する処理 def put_to_db(Items): table = dynamodb.Table('Movies') for item in Items: response = table.put_item(Item=item)

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guest

回答1

0

ベストアンサー

この書き方では「100個ずつ処理する」にはならず、単に1つずつ別のスレッドで処理するようになってますね。 同じ行で呼んだからといって、同じスレッドで処理されるわけではなく、それは別の登録になります。

何通りかやりかたはありますが、基本的なのはこんな感じでしょう。
・ 値のリストをうけとって、それをループで順にDBに登録する処理する関数(put_to_dbとか)を作る。
・全体のリストを、適当なサイズの塊に分ける。この場合は100個ずつでしょうか。
・スレッドプールを作る。
・ループで、作った関数と、100個ずつ分けたデータをsubmitする。
・結果をうけとる


コメントの処理をみてみました。 コメントはインデントなくなっているので、インデントはあつずっぽいです。間にコメントいれます。

python

1#リスト分ける 2newList = split_list(items,100) 3#スレッドプールを作る 4tpe = ThreadPoolExecutor(max_workers=100)

python

1for _ in range(newList.len): 2 tpe.submit(put_to_db) 3tpe.shutdown() 4print("完了")

submitが返すfutureを保存していませが、このfutureで結果を取得できるので、結果が必要なら、保存が必要です。、

python

1#リストを分ける処理 2def split_list(l, n): 3 for idx in range(0, len(l), n): 4 return l[idx:idx + n]

for文で回しても、1つでリターンしてしまったら、1つしか取得できません。
分けたリストを取っておくリストを作って、最後にそれを返すようにします。

python

1#DBに登録する処理 2def put_to_db(Items): 3 table = dynamodb.Table('Movies') 4 for item in Items: 5 response = table.put_item(Item=item)

responsで受けた結果をどこにも保存していないので、返してもいません。
結果返却用のリストを作って、そこにresponseを保存して、最後に返すようにします。


で、ざっと直してみました。動かしていないので、間違えているところあるはずですが、正解に近付いているとは思います。
もっと効率のいい書き方などあるのですが、元の方式を残す形にしました。
また、読み易さを考えて冗長に書いたところもあります。

python

1#リスト分ける 2newList = split_list(items,100) 3#スレッドプールを作る 4tpe = ThreadPoolExecutor(max_workers=100) 5 6future_list = [] 7for l in newList: 8 f = tpe.submit(put_to_db, l) 9 future_list.append(f) 10tpe.shutdown() 11 12for r in future_list: 13 print(r.result) 14 15print("完了") 16 17 18#リストを分ける処理 19def split_list(l, n): 20 new_list 21 for idx in range(0, len(l), n): 22 new_list.append(l[idx:idx + n]) 23 return nuw_list 24 25 26#DBに登録する処理 27def put_to_db(item_list): 28 result_list = [] 29 table = dynamodb.Table('Movies') 30 for item in Items: 31 result = table.put_item(Item=item) 32 result_list.append(result) 33 return result_list

投稿2021/01/29 10:39

編集2021/02/01 05:20
TakaiY

総合スコア13767

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

pokemonta

2021/01/29 10:50 編集

ご回答ありがとうございます。 教えて頂いたイメージに近い感じで 実装したつもりなんですけど、 どこが決定的に違うんでしょうか? (リストを100個ずつに分割して並列に処理させたつもりです)
TakaiY

2021/01/29 11:08

だいぶかけはなれてますよ。 ・関数作っていない ・リストを100個ずつにわける処理がない ・よって、スレッドに適切に処理が渡せていない。 です。 process_list.append(executor.submit(table.put_item(Item=item))) この記述だと、 table.put_item(Item=item)は、渡されるのでなくここで実行されます。 なので、executer.submit()にはその返り値(何かは不明)が渡されています。 で、その結果がprocess_listに格納されています。 まずは、最初の2つをやってみてください。
pokemonta

2021/02/01 01:41

手探りでご教示頂いた方法を作ってみました。 #リスト分ける newList = split_list(items,100) #スレッドプールを作る tpe = ThreadPoolExecutor(max_workers=100) for _ in range(newList.len): tpe.submit(put_to_db) tpe.shutdown() print("完了") #リストを分ける処理 def split_list(l, n): for idx in range(0, len(l), n): return l[idx:idx + n] #DBに登録する処理 def put_to_db(Items): table = dynamodb.Table('Movies') for item in Items: response = table.put_item(Item=item) 以下の件がよくわからなかったのですが、どのように実装すればよいのでしょうか? ・ループで、作った関数と、100個ずつ分けたデータをsubmitする。 ・結果をうけとる
pokemonta

2021/02/01 04:55

内容確認させて頂います。ありがとうございます。
pokemonta

2021/02/01 06:14

items の数が100でスレッド1で処理した場合、 改修前の8倍場合遅くなります。 一方でitemsの数が500でスレッド5で処理した場合、 itemsの数が1000でスレッド10で処理した場合より遅くなります。 あとは、チューニングになってくるのでしょうか?(分割=スレッド数)
TakaiY

2021/02/01 06:39

うごいたみたいですね。 そうですね、後はチューニングですね。 分割するのにも、タスクを登録するのにも、結果を集めるのにもそれぞれ時間がかかります。最適なところは探ってみる必要があります。 今は、threadですがprocessにしてみるとまた結果は変わります。 受ける DBの処理能力にもよります。
pokemonta

2021/02/01 07:33

ありがとうございました。
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.36%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問