質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.39%
C#

C#はマルチパラダイムプログラミング言語の1つで、命令形・宣言型・関数型・ジェネリック型・コンポーネント指向・オブジェクティブ指向のプログラミング開発すべてに対応しています。

.NET Framework

.NET Framework は、Microsoft Windowsのオペレーティングシステムのために開発されたソフトウェア開発環境/実行環境です。多くのプログラミング言語をサポートしています。

Q&A

解決済

6回答

15184閲覧

Parallel.ForEachでのawaitは可能でしょうか

testset

総合スコア221

C#

C#はマルチパラダイムプログラミング言語の1つで、命令形・宣言型・関数型・ジェネリック型・コンポーネント指向・オブジェクティブ指向のプログラミング開発すべてに対応しています。

.NET Framework

.NET Framework は、Microsoft Windowsのオペレーティングシステムのために開発されたソフトウェア開発環境/実行環境です。多くのプログラミング言語をサポートしています。

0グッド

0クリップ

投稿2020/01/14 17:00

編集2020/01/15 12:16

概要

Parallel.ForEachのdelegate上でawaitを使いたいのですが、
すべての処理が実行されずにParallel.ForEachが抜けてしまいます。
Parallel.ForEachのdelegate上では同期処理を書かなくてはならないのでしょうか。

背景

listのデータに対して順序にかかわらず上限のある並列数(1~)で実行したい。
システム全体でのスレッドの上限があるため、極力節約したい。
※目くじら立てる程でもないのかもしれませんが、ほとんどの処理をAsync化してしまったので、wait等は避けようとしてどうしてこうなった。
(追記 2020/01/15 21:15 JST)
なぜParallel.ForEachを使い、並列数を指定しているのかの、説明ができていませんでした。
ループ処理内でネットワーク通信を挟んでいるためです:
ネットワーク通信先では同時接続数の上限もあるため、同時アクセス数の制限を設けたい。(Server busyとなってしまう)
なお、ネットワーク接続先は複数箇所であるため、クライアント側のソケットも使い果たすとマズイため、一部処理を並列化し、終了を簡単に待つことができる機構を求めた結果、この質問になってしまいました。

環境

  • .NET Framework 4.5.2
  • VisualStudio 2015 Community Edition
  • Windows 10 home

もしくは

  • .NET Core 2.2
  • VisualStudio 2017 Professional
  • Windows 10 Pro

備考(オフトピック)

コード確認はできていませんが、Task.Wait()メソッド(引数無し)では通常、呼び出し元スレッドを維持してしまう認識であっているか、教えていただける幸いです。

検証

コード

cs

1using System; 2using Microsoft.VisualStudio.TestTools.UnitTesting; 3using System.Collections.Generic; 4using System.Threading.Tasks; 5using System.Threading; 6using System.Diagnostics; 7using System.Collections; 8using System.Linq; 9 10namespace ParallelTest 11{ 12 [TestClass] 13 public class UnitTest1 14 { 15 [TestMethod] 16 public void TestMethod1() 17 { 18 var list = Enumerable.Range(1, 30).ToList(); 19 ParallelFor(10, list, async (item) => 20 { 21 Trace.TraceInformation("Inner start {0}", item); 22 await Task.Delay(100); 23 Trace.TraceInformation("Inner end {0}", item); 24 }); 25 26 } 27 /// <summary> 28 /// 同時実行数制限付き並列動作 29 /// </summary> 30 /// <typeparam name="T">リストのデータタイプ</typeparam> 31 /// <param name="degree">並列数</param> 32 /// <param name="list">並列処理するリスト</param> 33 /// <param name="task">並列処理内容</param> 34 static void ParallelFor<T>(int degree, IList<T> list, Func<T,Task> task) 35 { 36 Trace.TraceInformation("Enter"); 37 Parallel.ForEach(list, new ParallelOptions (){ MaxDegreeOfParallelism = degree }, async (data) => 38 { 39 Trace.TraceInformation("Task start {0}", data); 40 await task(data); 41 Trace.TraceInformation("Task end {0}", data); 42 }); 43 Trace.TraceInformation("End"); 44 } 45 } 46} 47

結果

Task endが呼び出されることなく、終了してしまう。

txt

1Enter 2Task start 10 3Task start 4 4Inner start 10 5Task start 7 6Inner start 7 7Task start 13 8Task start 1 9Inner start 1 10Inner start 4 11Task start 8 12Inner start 8 13Task start 9 14Inner start 9 15Task start 14 16Inner start 14 17Task start 11 18Inner start 11 19Task start 12 20Inner start 12 21Inner start 13 22Task start 19 23Inner start 19 24Task start 20 25Inner start 20 26Task start 21 27Inner start 21 28Task start 22 29Inner start 22 30Task start 16 31Inner start 16 32Task start 17 33Inner start 17 34Task start 18 35Inner start 18 36Task start 25 37Inner start 25 38Task start 26 39Inner start 26 40Task start 27 41Inner start 27 42Task start 28 43Inner start 28 44Task start 2 45Inner start 2 46Task start 3 47Inner start 3 48Task start 23 49Inner start 23 50Task start 29 51Inner start 29 52Task start 30 53Inner start 30 54Task start 5 55Inner start 5 56Task start 6 57Inner start 6 58Task start 24 59Inner start 24 60Task start 15 61Inner start 15 62End

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

Zuishin

2020/01/14 23:04

ParallelFor を非同期にしないのに意味はありますか?
testset

2020/01/15 11:45

listの処理が終わった後、同期処理を行うため、非同期にしていません。 なぜParallel.ForEachを使い、並列数を指定しているのかの、説明ができていませんでした。 ループ処理内でネットワーク通信を挟んでいるためです: ネットワーク通信先では同時接続数の上限もあるため、同時アクセス数の制限を設けたい。(Server busyとなってしまう) なお、ネットワーク接続先は複数箇所であるため、クライアント側のソケットも使い果たすとマズイため、一部処理を並列化し、終了を簡単に待つことができる機構を求めた結果、この質問になってしまいました。
testset

2020/01/16 16:17 編集

この質問を投稿すべきか数週間悩んでいましたが、 結果として、いろいろな方面からご回答いただき、非常に勉強になりました。 PLINQのパターンや、Taskを単純にキックする物を後日(未定)対応し、 質問本文にでも記載させていただきたく存じます。 → [記載しました](#reply-342783)
guest

回答6

0

ベストアンサー

こんにちは。

Parallel.ForEach に渡す各処理を非同期メソッドにしたいという質問だと認識しました。
おそらくですが、それをやるのは無意味です。
Parallel.ForEach は指定数のスレッドを確保し、その中でそれぞれの処理を並列に実行するものですが、非同期メソッド (Task) は実処理がスレッドから切り離されているため、正しく並行性の管理ができません。

この場合、何も考えずに通常のループなどで非同期メソッドを全て起動してしまえば、Task はスレッドプールを利用するため、スレッドが際限なく増えるなどは起きず、システムが持つスレッドを良い具合に使ってくれます。

Parallel は、大量の (比較的) 短い「同期」処理を、指定した量のスレッドを予め確保して最速で走らせるためのメソッドだと考えるのが良いと思います。

投稿2020/01/15 08:03

tamoto

総合スコア4200

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guest

0

この例みたいに、
同時実行数制御をしたいのなら、
https://qiita.com/TsuyoshiUshio@github/items/79ad787899cddaa3ac1c
の記事みたいに、
await semaphoreSlim.WaitAsync();
semaphoreSlim.Release();
を使うとできるよ。
PLINQでは、適切にできている保証がないわ。

投稿2020/01/16 05:22

kiichi54321

総合スコア1984

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

0

[TestMethod] public void MyTestMethod30() { System.Console.WriteLine("S"); var list = Enumerable.Range(1, 30) .AsParallel() .WithDegreeOfParallelism(4) .Select(async n => { System.Console.WriteLine($"Start{n}"); await Task.Delay(100); System.Console.WriteLine($"End{n}"); }).ToArray(); Task.WaitAll(list); System.Console.WriteLine("E"); }

やりたいことなら、PLINQを使って、こんな感じでいいのでは。

投稿2020/01/15 05:01

kiichi54321

総合スコア1984

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

testset

2020/01/15 11:53

PLinqは思い浮かびませんでした。 試してみます。
guest

0

どこかにぶら下げようかと思いましたが、コード量が多いので、
回答として新たに記載させていただきます。
ご回答いただいた皆様、誠にありがとうございます。

誤った実装

※期待した実装は下に記載しています。
次のコードで正しく動作しているように見えましたが、対応を誤っていました。
理由は、10個ずつしか処理をしていません。

cs

1[TestMethod] 2public void TestMethod3() 3{ 4 int count = 0; 5 var list = Enumerable.Range(1, 30).ToList(); 6 ParallelForDoNot(10, list, async (item) => 7 { 8 Trace.TraceInformation("Inner start {0}", item); 9 await Task.Delay(100); 10 Trace.TraceInformation("Inner end {0}", item); 11 Interlocked.Increment(ref count); 12 }); 13 Assert.AreEqual(list.Count, count); 14} 15static void ParallelForDoNot<T>(int degree, IList<T> list, Func<T, Task> task) 16{ 17 foreach (var items in list 18 .Select((item, i) => new { Item = item, Index = i }) 19 .GroupBy(g => g.Index / degree) 20 ) 21 { 22 Task.WaitAll(items.Select(item => task(item.Item)).ToArray()); 23 } 24}

実行結果

txt

1Inner start 1 2Inner start 2 3Inner start 3 4Inner start 4 5Inner start 5 6Inner start 6 7Inner start 7 8Inner start 8 9Inner start 9 10Inner start 10 11Inner end 10 12Inner end 8 13Inner end 7 14Inner end 5 15Inner end 9 16Inner end 3 17Inner end 6 18Inner end 1 19Inner end 4 20Inner end 2 21Inner start 11 22Inner start 12 23Inner start 13 24Inner start 14 25Inner start 15 26Inner start 16 27Inner start 17 28Inner start 18 29Inner start 19 30Inner start 20 31Inner end 20 32Inner end 17 33Inner end 16 34Inner end 15 35Inner end 14 36Inner end 13 37Inner end 12 38Inner end 11 39Inner end 19 40Inner end 18 41Inner start 21 42Inner start 22 43Inner start 23 44Inner start 24 45Inner start 25 46Inner start 26 47Inner start 27 48Inner start 28 49Inner start 29 50Inner start 30 51Inner end 29 52Inner end 28 53Inner end 26 54Inner end 25 55Inner end 30 56Inner end 23 57Inner end 21 58Inner end 24 59Inner end 27 60Inner end 22

期待した実装

10個分の並列稼働を実現します。
念のためasyncを使っていない場合のためのオーバーロードも用意しています。

cs

1 2[TestMethod] 3public void TestMethod1() 4{ 5 int count = 0; 6 var list = Enumerable.Range(1, 30).ToList(); 7 ParallelFor(10, list, async (item) => 8 { 9 Trace.TraceInformation("Inner start {0}", item); 10 await Task.Delay(100); 11 Trace.TraceInformation("Inner end {0}", item); 12 Interlocked.Increment(ref count); 13 }); 14 Assert.AreEqual(list.Count, count); 15} 16 17[TestMethod] 18public void TestMethod2() 19{ 20 int count = 0; 21 var list = Enumerable.Range(1, 30).ToList(); 22 ParallelFor(10, list, (item) => 23 { 24 Trace.TraceInformation("Inner start {0}", item); 25 Thread.Sleep(100); 26 Trace.TraceInformation("Inner end {0}", item); 27 Interlocked.Increment(ref count); 28 }); 29 Assert.AreEqual(list.Count, count); 30} 31 32/// <summary> 33/// 同時実行数制限付き並列動作 34/// </summary> 35/// <typeparam name="T">リストのデータタイプ</typeparam> 36/// <param name="degree">並列数</param> 37/// <param name="list">並列処理するリスト</param> 38/// <param name="task">並列処理内容</param> 39static void ParallelFor<T>(int degree, IEnumerable<T> list, Func<T, Task> task) 40{ 41 using (var sem = new SemaphoreSlim(degree)) 42 { 43 // async記述により、Taskが返される 44 // Taskの配列をWaitする 45 Task.WaitAll(list.Select(async (item) => 46 { 47 await sem.WaitAsync(); 48 try 49 { 50 await task(item); 51 } 52 finally 53 { 54 sem.Release(); 55 } 56 }).ToArray()); 57 } 58} 59/// <summary> 60/// 同時実行数制限付き並列動作 61/// </summary> 62/// <typeparam name="T">リストのデータタイプ</typeparam> 63/// <param name="degree">並列数</param> 64/// <param name="list">並列処理するリスト</param> 65/// <param name="task">並列処理内容</param> 66static void ParallelFor<T>(int degree, IEnumerable<T> list, Action<T> task) 67{ 68 Parallel.ForEach(list, new ParallelOptions() { MaxDegreeOfParallelism = degree }, task); 69}

実行結果

txt

1テスト名: TestMethod1 2テストの完全名: ParallelTest.UnitTest1.TestMethod1 3テスト成果: 成功 4テスト継続時間: 0:00:00.3635226 5 6Inner start 1 7Inner start 2 8Inner start 3 9Inner start 4 10Inner start 5 11Inner start 6 12Inner start 7 13Inner start 8 14Inner start 9 15Inner start 10 16Inner end 8 17Inner end 7 18Inner start 12 19Inner end 6 20Inner start 13 21Inner end 5 22Inner start 14 23Inner end 4 24Inner start 15 25Inner end 3 26Inner start 16 27Inner end 2 28Inner start 17 29Inner end 1 30Inner start 18 31Inner start 11 32Inner end 10 33Inner start 19 34Inner end 9 35Inner start 20 36Inner end 11 37Inner start 21 38Inner end 17 39Inner start 22 40Inner end 14 41Inner start 23 42Inner end 15 43Inner start 24 44Inner end 12 45Inner start 25 46Inner end 13 47Inner start 26 48Inner end 18 49Inner start 27 50Inner end 16 51Inner start 28 52Inner end 20 53Inner start 29 54Inner end 19 55Inner start 30 56Inner end 28 57Inner end 27 58Inner end 25 59Inner end 23 60Inner end 22 61Inner end 21 62Inner end 24 63Inner end 26 64Inner end 30 65Inner end 29

投稿2020/01/16 16:01

testset

総合スコア221

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

0

Parallel.ForEach, Parallel.For メソッドのオーバロードの中には引数に ParallelOptions を取るものがあるので、その MaxDegreeOfParallelism プロパティを使って望むこと (上限のある並列数で実行) はできませんか?

(UI への応答性を高めるためのバックグラウンド処理をしたいのか、マルチコアを利用して処理時間の短縮を図りたいのか目的が分かりません。なんとなく見当違いのことのように思いますが、それはとりあえず置いときます)

【追記】

上に書いたことを検証してみました。

listのデータに対して順序にかかわらず上限のある並列数(1~)で実行したい。システム全体でのスレッドの上限があるため、極力節約したい。

その目的のために await を使う意味が分かりません。ParallelOptions.MaxDegreeOfParallelism を設定して、普通に以下のコードのようにすれば上記の目的は果たせるはずですが?

var list = Enumerable.Range(1, 30).ToList(); ParallelOptions option = new ParallelOptions(); option.MaxDegreeOfParallelism = 2; Parallel.ForEach(list, option, i => { Console.WriteLine($"Processing {i} on thread {Thread.CurrentThread.ManagedThreadId}"); });

上記のコード(MaxDegreeOfParallelism は 2 に制限)の実行結果は以下の通りとなります。質問者さんのコードでは 10 に制限しているように見えますが、スレッドの数はそのようになりませんか?

Processing 1 on thread 1
Processing 16 on thread 3
Processing 17 on thread 3
Processing 18 on thread 3
Processing 19 on thread 3
Processing 20 on thread 3
Processing 21 on thread 3
Processing 22 on thread 3
Processing 23 on thread 3
Processing 24 on thread 3
Processing 25 on thread 3
Processing 26 on thread 3
Processing 27 on thread 3
Processing 28 on thread 3
Processing 29 on thread 3
Processing 30 on thread 3
Processing 4 on thread 3
Processing 5 on thread 3
Processing 6 on thread 3
Processing 7 on thread 3
Processing 8 on thread 3
Processing 9 on thread 3
Processing 10 on thread 3
Processing 11 on thread 3
Processing 12 on thread 3
Processing 13 on thread 3
Processing 14 on thread 3
Processing 2 on thread 1
Processing 3 on thread 1
Processing 15 on thread 3

ちなみに、ParallelOptions を設定しない場合は以下のようになります。

Processing 1 on thread 1
Processing 2 on thread 1
Processing 16 on thread 7
Processing 7 on thread 4
Processing 8 on thread 4
Processing 9 on thread 4
Processing 13 on thread 6
Processing 14 on thread 6
Processing 15 on thread 6
Processing 19 on thread 9
Processing 4 on thread 3
Processing 10 on thread 5
Processing 26 on thread 5
Processing 27 on thread 5
Processing 28 on thread 5
Processing 29 on thread 5
Processing 30 on thread 5
Processing 22 on thread 8
Processing 11 on thread 4
Processing 20 on thread 6
Processing 25 on thread 10
Processing 3 on thread 1
Processing 17 on thread 7
Processing 18 on thread 7
Processing 21 on thread 6
Processing 12 on thread 4
Processing 5 on thread 3
Processing 6 on thread 3
Processing 23 on thread 9
Processing 24 on thread 9

投稿2020/01/14 23:03

編集2020/01/15 04:27
退会済みユーザー

退会済みユーザー

総合スコア0

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

YAmaGNZ

2020/01/15 00:09

使い方が希望している動作と一致しているかは分かりませんが、提示されているコードでMaxDegreeOfParallelismプロパティを使用していますよ。
testset

2020/01/15 11:50

ご確認のほどありがとうございます。 私が記載したコードでは、Parallel.ForEachに渡している処理がasyncであるため、TaskのStartだけを行い、次の処理を行ってしまっているようです。そのため、早々にParallel.ForEachが終了し、親タスクが破棄される(実は裏で動いているかもしれない)ため、質問に記載したログのようになっている認識です。
退会済みユーザー

退会済みユーザー

2020/01/15 12:07 編集

> Parallel.ForEachに渡している処理がasyncであるため 意味が分かりません。何故そういうことをする必要があるのか目的・理由を具体的に説明いただけませんか。どう考えても「システム全体でのスレッドの上限があるため、極力節約したい」ということとは違うと思うのですが。tamoto さんが言われるように余計なことをせず OS に任せておくか、最大やってもMaxDegreeOfParallelism の設定だけでいいのではないですか。
testset

2020/01/15 12:19

全く持って、おっしゃる通りです。OSに任せるのがよいと、今では思っています。 なぜそうしたのか、ということは質問本文に記載させていただきました。
退会済みユーザー

退会済みユーザー

2020/01/16 01:19

なぜそうしたのかの理由は質問文に追記した以下と理解しています。 > ループ処理内でネットワーク通信を挟んでいるためです:ネットワーク通信先では同時接続数の上限もあるため、同時アクセス数の制限を設けたい。(Server busyとなってしまう) その理由で Parallell.ForEach とか await とかの話が出てくるのが解せません。 相手先サーバーが Server Busy とならないようにするためにクライアント側でできることは、質問者さんが書かれているように「同時アクセス数の制限」ぐらいしかなさそうですが、とすると、そもそも Parallel.ForEach を使うことが間違いだと思います。 ASP.NET のようにサーバー側の話であれば、非同期で実行すると、スレッドプールから取得したスレッドをプールに戻して、非同期実行が完了すると再びスレッドプールからスレッドを取得して処理を完了し応答を返すということが可能(すなわち、スレッドプールのスレッドが枯渇して Server Busy とならないように対処することが可能)なのですが。 サーバー側の話とごっちゃになっているような気がします。違いますか?
退会済みユーザー

退会済みユーザー

2020/01/16 02:58

同時アクセス数を制限しつつ並列に複数データを送っていくなら(暗黙の了解でサーバーに対するクライアントの同時アクセス数が決まっているなら)一応スレッド数上限を設けたParallelでも分からなくないですが awaitが出てきた理由と結論がTask全部起動なのは未だに謎。 本人が解決したと思ってるなら良いけれど、意図するところがよく分からない話でした。
testset

2020/01/16 16:16 編集

ASP.NETを使えればよいのですが、該当処理はレガシーシステムの間に入り込む通信処理で、上位からは複数の要求があり、下位へは厳しい同時アクセス制限があります。さらに、下位の通信環境は当初酷いもの(ダイアルアップなど)だと聞いていたため、並列化を考えないといけないため、このような質問が出てきました。 Taskを全部起動するというのは説明が足りていませんでしたが、SemaphoreSlimで制限をかけたうえで起動するという方法で解決させる見込みでした。
guest

0

所望の動作が厳密に分からないですが自前のParallelForからParallel.Foreachを呼び出したりせずasyncもawaitもTask.Waitも何もなしで並列化自体はできるかと。
(あれこれやらずに普通にlistにParallel.ForなりParallel.Foreachなりを使う)
その場合全てのinner endのあとにendが来ます。

Parallel.ForEachのdelegate上でawaitを使いたいのですが、
すべての処理が実行されずにParallel.ForEachが抜けてしまいます。

そもそもawait自体Task.Waitみたいに同期とって返す物ではないのでそういうものかと。

投稿2020/01/14 21:28

編集2020/01/14 21:38
退会済みユーザー

退会済みユーザー

総合スコア0

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

testset

2020/01/15 11:46

確かにおっしゃる通りです。 asyncを行わなければ、期待する動作をすることは認識しております。 早々にご回答ありがとうございます。
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.39%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問