質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.48%
C#

C#はマルチパラダイムプログラミング言語の1つで、命令形・宣言型・関数型・ジェネリック型・コンポーネント指向・オブジェクティブ指向のプログラミング開発すべてに対応しています。

Q&A

解決済

3回答

7069閲覧

C#のマルチタスクとタスクの入れ子などについて

matsu1

総合スコア19

C#

C#はマルチパラダイムプログラミング言語の1つで、命令形・宣言型・関数型・ジェネリック型・コンポーネント指向・オブジェクティブ指向のプログラミング開発すべてに対応しています。

0グッド

3クリップ

投稿2016/11/25 04:41

###前提・実現したいこと
C#のマルチタスクとタスクの入れ子について基本的なところを質問させて頂きたく、よろしくお願い致します。
具体的な例として、windows上でマルチタスクで異なるPCのフォルダ間で複数ファイルの同期を取るシステムで
述べさせて頂きます。

PC1とPC2間,PC1とPC3間で同期を取るものとし、以下のようなものです。
[PC1とPC2間]
・PC1下のフォルダ1~3下のファイルが、PC2下のフォルダ1~3下に同期されコピーされる。
・フォルダ1下にはファイル1,フォルダ2下にはファイル2,フォルダ3下にはファイル3がある。
・ファイル1は500ms,ファイル2は1000ms,ファイル3は1500ms周期でそれぞれrobocopyで同期される。

[PC1とPC3間]
・PC1下のフォルダ4~6下のファイルが、PC3下のフォルダ4~6下に同期されコピーされる。
・フォルダ4下にはファイル4,フォルダ5下にはファイル5,フォルダ6下にはファイル6がある。
・ファイル4は500ms,ファイル5は1000ms,ファイル6は1500ms周期でそれぞれrobocopyで同期される。

PC1とPC2間の同期用にタスクA,PC1とPC3間の同期用にタスクBを起動します。
さらに、robocopyはwindowsプロセスを用いて実行させるため、そのためのタスクをタスクA,Bそれぞれから
子タスクとして起動します。
タスクA,B内では各ファイルの前回同期からの経過時刻を計っており、それが同期周期に達したら同期を取るように
させています。

上記実装にあたり、質問させて頂きたいことは以下です。
その他、指摘事項ありましたら併せてご指摘のほどお願い致します。

Q1.各タスクの同期・非同期をawaitを用いて正しく実装できているか。
なるたけフォルダ同期アプリのCPU負荷を下げ、かつ、同期周期からの遅延を少なくして実装したいので
そのような観点から同期・非同期の使い方で誤っている箇所があればご指摘をお願い致します。
※コード部に★Q1.1~★Q1.6に具体的な内容を記載しましたのでご回答をよろしくお願い致します。
(基本的な部分の認識が十分でないので質問内容自体にも認識の誤りがあるかも知れませんがご容赦願います)

Q2.各ファイルの実測した同期時間が同期周期から遅延してしまう場合、どのような解決策を用いればよいか。
上記で述べました例ではファイル数が少ない場合、同期周期の遅延は発生しないのですが、
例えば、同期先のPCが10個あり、フォルダ同期タスクを10個生成し、かつ、各フォルダ下のファイル数が万オーダーで
あるような場合、現状のコードでは30秒の同期周期に対して実測周期が60秒程度かかる場合が頻発するなど、
遅延時間が膨大になってきてしまっています。

別の方法として、タイマー:System.Threading.Timeを用いてみました。 10個のフォルダ同期タスク下でそれぞれ同期周期の種類数分のタイマー (上の例の場合500,1000,1500msのタイマー)を起動し、同期周期の遅延が少なくなるよう試みましたが だめでした。 アプリ全体として10個×同期周期の種類数分のタイマーが生成されるということで、種類が少ないうちは 遅延が発生しませんが、一定数を超えるとCPU負荷が膨大になりアプリが動作しなくなってしまいました。 何かこれ以外でよい方法があればご教授お願い致します。

以下にコードを記載しますので、よろしくご回答のほどお願い致します。

###該当のソースコード

/* ◆フォルダ同期アプリクラス /
public partial class CFolderSyncApp : Form
{
/
◇メンバ変数 */
private DateTime[] m_TaskAPreTime = new DateTime[3]; // ファイル1,2,3の前回の同期時刻
private DateTime[] m_TaskBPreTime = new DateTime[3]; // ファイル4,5,6の前回の同期時刻

private int[] m_TaskACycle = {500, 1000, 1500}; // ファイル1,2,3の同期周期
private int[] m_TaskBCycle = {500, 1000, 1500}; // ファイル4,5,6の同期周期

/* ◇フォルダを同期する */
private async void SyncFolder()
{
var task_a = FolderSyncTaskA();
var task_b = FolderSyncTaskB();
await Task.WhenAll(task_a, task_b); // タスクA,タスクBを並列実行する
}

/* ◇フォルダ同期タスクA(PC1とPC2間) */
private Task FolderSyncTaskA()
{
return Task.Run(async () => {
while (true) {
SyncFileA();
await Task.Delay(500); // 500ms周期でファイルを同期する(PC1とPC2間)
// ★Q1.1 awaitの有り無しでそれぞれどのような動きになりますか?
// ★Q1.2 await Whenall()でFolderSyncTaskA()が待たれ、その中のawaitであるので
// 本タスク内で500msを同期的に待つという認識で正しいでしょうか?
// ★Q1.3 結論としてawaitは要りますか?
}
});
}

/* ◇フォルダ同期タスクB(PC1とPC3間) */
private Task FolderSyncTaskB()
{
return Task.Run(async () => {
while (true) {
SyncFileB();
await Task.Delay(500); // 500ms周期でファイルを同期する(PC1とPC3間)
}
});
}

/* ◇ファイルを同期する(PC1とPC2間) */
private void SyncFileA()
{
for (int i=0; i < 3; i++) {
double ms = (DateTime.Now - m_TaskAPreTime[i]).TotalMilliseconds; // 前回の同期時刻との差[ms]

if (ms > m_TaskACycle[i]) { if (i == 0) { // PC1下のファイル1を500ms周期でPC2下に同期する // ★Q1.4 RobocopyTask()の前にawaitの有り無しでそれぞれどのような動きになりますか? // ★Q1.5 結論としてRobocopyTask()の前にawaitは要りますか? CCommon common = new CCommon(); common.RobocopyTask("(PC1のフォルダ1のパス)", "(PC2のフォルダ1のパス)", "ファイル1"); } else if (i == 1) { // PC1下のファイル2を1000ms周期でPC2下に同期する CCommon common = new CCommon(); common.RobocopyTask("(PC1のフォルダ2のパス)", "(PC2のフォルダ2のパス)", "ファイル2"); } else if (i == 2) { // PC1下のファイル3を1500ms周期でPC2下に同期する CCommon common = new CCommon(); common.RobocopyTask("(PC1のフォルダ3のパス)", "(PC2のフォルダ3のパス)", "ファイル3"); } m_TaskAPreTime[i] = DateTime.Now; } }

}

/* ◇ファイルを同期する(PC1とPC3間) */
private void SyncFileB()
{
for (int i=0; i < 3; i++) {
double ms = (DateTime.Now - m_TaskBPreTime[i]).TotalMilliseconds; // 前回の同期時刻との差[ms]

if (ms > m_TaskBCycle[i]) { if (i == 0) { // PC1下のファイル4を500ms周期でPC3下に同期する CCommon common = new CCommon(); common.RobocopyTask("(PC1のフォルダ4のパス)", "(PC3のフォルダ4のパス)", "ファイル4"); } else if (i == 1) { // PC1下のファイル5を1000ms周期でPC3下に同期する CCommon common = new CCommon(); common.RobocopyTask("(PC1のフォルダ5のパス)", "(PC3のフォルダ5のパス)", "ファイル5"); } else if (i == 2) { // PC1下のファイル6を1500ms周期でPC3下に同期する CCommon common = new CCommon(); common.RobocopyTask("(PC1のフォルダ6のパス)", "(PC3のフォルダ6のパス)", "ファイル6"); } m_TaskBPreTime[i] = DateTime.Now; } }

}
};

/* ◆共通クラス /
public class CCommon
{
/
◇メンバ変数 */
private bool m_IsExit; // windowsプロセスが終了したか[true:終了/false:未終了]

/* ◇robocopyをwindowsプロセスで実行するタスク */
public Task RobocopyTask(string src_folder, string dst_folder, string file)
{
return Task.Run(() => {
// ファイル単位でrobocopyをwindowsプロセスで実行するための設定をする
Process process = new Process();
process.StartInfo.FileName = "robocopy.exe";
process.StartInfo.Arguments = src_folder + " " + dst_folder + " /IF " + file;
// プロセス終了ハンドラを登録する
process.Exited += new EventHandler(Process_Exit);
process.EnableRaisingEvents = true;
m_IsExit = false;

// robocopyプロセスを開始し、10ms周期でプロセスが終了したか確認する // ★Q1.6 Thread.Sleep()よりawait Task.Delayを用いるべきでしょうか?(その場合「Task.Run(async()」に変える) // 両者でそれぞれどのような動きになりますか? process.Start(); while (true) { Thread.Sleep(10); if (m_IsExit) { break; } } // プロセスを閉じる process.Exited -= new EventHandler(Process_Exit); process.Close(); });

}

/* ◇プロセス終了時呼ばれるイベントハンドラ */
private void Process_Exit(object sender, EventArgs e)
{
m_IsExit = true;
}
};

###補足情報(言語/FW/ツール等のバージョンなど)
VisualStudio2015 .Net4.6 C#

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

ishi9

2016/11/25 04:58

Q2に関しては、もう少し前提条件の詳細が無いと回答が難しいですね。まずは遅延そのものは許容されているのかどうかです。遅延そのものが許容されていないなら、処理の高速化をするしかありません。遅延自体は許容されているなら、コピーが完了してから一定時間待ってコピーする等の手段もありかと思います。
matsu1

2016/11/25 05:28

ishi9さん ご回答ありがとうございます。同期周期に関しては厳密な精度は求めてはおりません。同期周期が500msや1000msであれば±20%程度の誤差、同期周期が10秒や100秒であれば±10%くらいにはおさめたいかなという感じです。よろしくお願い致します。
ishi9

2016/11/25 05:38 編集

同期の周期は実測値に応じて、いくら遅れても大丈夫ということですか?であれば、stopwatch等でタスクの時間を計測しておいて、かかった時間に応じてdelayするのでもいいと思います。あるいは計測などせずコピータスク完了後、一律で1秒待つ。つまり同期周期はコピーにかかった時間+1秒にしてしまう
matsu1

2016/11/25 06:36

同期の周期は実測値に応じて、いくら遅れても大丈夫ということですか?→先ほど記載しましたように、同期周期が500msや1000msとプログラムで組んでいるものは実測の同期時間も±20%、すなわち500msであれば400~600ms程度に、1000msであれば800~1200ms程度に。10秒であれば±10%、すなわち9~11秒程度になっていてほしいと考えています。よろしくお願い致します。
ishi9

2016/11/25 06:39

失礼しました。では実処理時間を高速化するしかありませんね。そちらに関しては回答してみます。
guest

回答3

0

ベストアンサー

こんにちは。
質問の内容はひとまず置いておいて、少し気になったことがあるので回答します。
疑問点に対する直接の回答でなくて申し訳ないですが……。


まず最初に。非同期や並列で「解決できる事柄」を勘違いしているように思います。
並列にすることでパフォーマンスが出せるというのは、複数のコアを持つCPUがあり、複数の処理を文字通り「同時に」処理できることが前提となります。
では、ファイルのコピーやネットワークはどうでしょうか?RAID構成でもなければHDDは基本的に1台でしょうし、ネットワークもほとんどの環境で1本でしょう。ということは、複数のタスクを同時に処理しようとすればするほど1本のパイプを圧迫していくため、ある段階から詰まったように遅くなっていくはずです。
並列で特にパフォーマンスを得られる箇所は「CPUによる計算処理」、そしてそもそもが並列である「分散処理」環境だけなのです。


質問者さんのこのような問題の解決手法として、「不定期(定期)に実行される多数の処理を一度一つの実行キューに並べ、それを順番に実行していく」という実装方法を提案します。このようなデザインのことを「スケジューラー」といいます。
スケジューラーに処理を登録(予約)すると、スケジューラーがそれを勝手に順次実行していくので、使用者側では定期的に処理をスケジューラーに投げるだけでよくなります。

お節介ですが、ちょうど少し前に似たような問題に適用できるスケジューラーの汎用実装を作ったものがあったので、もしよろしければこちらをお使いください。(100行くらいあるけど回答欄に貼ってもいいんですかね……?)

(12/13 コードを更新)

csharp

1 2public class ActionQueueScheduler 3{ 4 private readonly ConcurrentQueue<Func<Task>> queue; 5 6 private int isRunning; 7 8 private Func<Task> immediate; 9 10 public ActionQueueScheduler() 11 { 12 this.queue = new ConcurrentQueue<Func<Task>>(); 13 } 14 15 public Task Add(Action action) 16 => this.Add(() => { action(); return default(object); }); 17 18 public Task<T> Add<T>(Func<T> action) 19 { 20 var tcs = new TaskCompletionSource<T>(); 21 this.queue.Enqueue(() => CreateTask(action, tcs)); 22 this.Run(); 23 return tcs.Task; 24 } 25 26 public Task Add(Func<Task> action) 27 => this.Add(async () => { await action(); return default(object); }); 28 29 public Task<T> Add<T>(Func<Task<T>> action) 30 { 31 var tcs = new TaskCompletionSource<T>(); 32 this.queue.Enqueue(() => CreateTaskAsync(action, tcs)); 33 this.Run(); 34 return tcs.Task; 35 } 36 37 public Task Interrupt(Action action) 38 => this.Interrupt(() => { action(); return default(object); }); 39 40 public Task<T> Interrupt<T>(Func<T> action) 41 { 42 var tcs = new TaskCompletionSource<T>(); 43 Task.Run(() => 44 { 45 var spin = new SpinWait(); 46 while (Interlocked.CompareExchange(ref this.immediate, () => CreateTask(action, tcs), null) != null) 47 spin.SpinOnce(); 48 this.Run(); 49 }); 50 return tcs.Task; 51 } 52 53 public Task Interrupt(Func<Task> action) 54 => this.Interrupt(async () => { await action(); return default(object); }); 55 56 public Task<T> Interrupt<T>(Func<Task<T>> action) 57 { 58 var tcs = new TaskCompletionSource<T>(); 59 Task.Run(() => 60 { 61 var spin = new SpinWait(); 62 while (Interlocked.CompareExchange(ref this.immediate, async () => await CreateTaskAsync(action, tcs).ConfigureAwait(false), null) != null) 63 spin.SpinOnce(); 64 this.Run(); 65 }); 66 return tcs.Task; 67 } 68 69 private Task CreateTask<T>(Func<T> action, TaskCompletionSource<T> tcs) 70 { 71 try 72 { 73 tcs.TrySetResult(action()); 74 } 75 catch (Exception exception) 76 { 77 tcs.TrySetException(exception); 78 } 79 return Task.FromResult(default(object)); 80 } 81 82 private async Task CreateTaskAsync<T>(Func<Task<T>> asyncAction, TaskCompletionSource<T> tcs) 83 { 84 try 85 { 86 tcs.TrySetResult(await asyncAction().ConfigureAwait(false)); 87 } 88 catch (Exception exception) 89 { 90 tcs.TrySetException(exception); 91 } 92 } 93 94 private void Run() 95 { 96 if (Interlocked.Exchange(ref this.isRunning, 1) != 0) 97 return; 98 Task.Run(async () => 99 { 100 var action = default(Func<Task>); 101 while (true) 102 { 103 if (this.immediate != null) 104 await Interlocked.Exchange(ref this.immediate, null).Invoke().ConfigureAwait(false); 105 else if (this.queue.TryDequeue(out action)) 106 await action().ConfigureAwait(false); 107 else 108 break; 109 } 110 Interlocked.Exchange(ref this.isRunning, 0); 111 if (this.queue.Count != 0) 112 this.Run(); 113 }); 114 } 115}

以下、少しだけ使い方を解説します。

csharp

1private ActionQueueScheduler scheduler = new ActionQueueScheduler(); 2 3private async Task FolderSync1() 4{ 5 while (true) 6 { 7 await this.scheduler.Add(async () => 8 { 9 CCommon common = new CCommon(); 10 await common.RobocopyTask("(PC1のフォルダ1のパス)", "(PC2のフォルダ1のパス)", "ファイル1"); 11 }).ConfigureAwait(false); // queueの中に処理を入れてawaitで完了を待つ 12 await Task.Delay(500).ConfigureAwait(false); // 500ms周期でファイルを同期する(PC1とPC2間) 13 } 14}

ファイル毎にループメソッドを作り、一つのスケジューラーにそれぞれ独立でAddしていくだけで良いです。
これだけの記述で、Syncの処理が「完了してから」500ms後に、再びSyncが実行されるようにスケジュールされます。
他のメソッドが増えたとしても、このメソッドをコピーしたものを作成し、それを同時に開始するだけで大丈夫です。
「同期処理の処理速度より、スケジューラーのタスク登録速度のほうが明らかに速い」という条件でさえ無ければ、問題無く動作するはずです。

AddメソッドとInterruptメソッドを提供しており、それぞれに同期メソッド、非同期メソッドを登録可能で、実行結果を受け取ることもできます。各メソッドは「登録したメソッドが実行完了したかどうかを表すTask(Task<T>)」を返すため、これをawaitすることで、実行結果を受け取ったり、スケジューリングから次回実行までの時間調整などが簡単に行えます。
上記の使用例であれば、同期処理が完了してから次の同期処理を再登録という流れになるので、「完了してないのに次の同期が始まる」ということが起こることもないため、安全です。
Interruptはキューを無視して処理を割り込み実行するためのメソッドなので使用には注意してください。

投稿2016/11/25 15:48

編集2016/12/13 11:49
tamoto

総合スコア4105

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

matsu1

2016/11/28 09:42

tamotoさん いつもお世話になります。 並列と非同期とが混同していたようです。ご説明で理解できました。 また、わかりやすいキューのコードを載せて頂き誠にありがとうございます。 これより開発環境で試してみたいと思います。 その際、またわからないことがありましたら質問させて頂きます。よろしくお願い致します。
matsu1

2016/12/13 02:10

tamotoさん お世話になります。 キューのコードについて解析したのですが、わからない所がありまして、お手数ですがご教授のほどお願い致します。 (1)Add() 以下の4個のAdd()が多重定義されています。 ①Task Add(Action action) ②Task<T> Add<T>(Func<T> action) ③Task Add(Func<Task> action) ④Task<T> Add<T>(Func<Task<T>> action) ファイル毎にループメソッドを作りAddする際は、同期タスクの場合:①,②,非同期タスクの場合:③,④を用いればよさそうですが、①内のthis.Addは②のAdd()を呼び出していますでしょうか?同様に③内のthis.Addは④のAdd()を呼び出していますでしょうか? (2)Interrupt() SpinWaitとSpinOnceの意味を調べたのですが、いまいちよくわかりませんでした。 Interrupt()内でのこれらはどのように動くものでしょうか。 (割込みタスクが未生成の場合はSpinOnce()は実行せず、割込みタスクが生成済の場合はSpinOnce()を実行すると言う所までは解析したのですが) (3)Run() 割込みタスクを実行する際や割り込みではないキューに登録されたタスクを実行する際にawaitとConfigureAwait(false)が用いられています。 実際に私の方でこれらのコードを実装する場合、例えばRun()をUIスレッドから呼出した場合に、await以降の行の処理がUIスレッドに戻らず、非同期メソッドを実行したスレッド側で行われるようになると思うのですが、これによりUIがフリーズしたりなどはないのでしょうか?すみません、ここら辺をあまりよくわかっていないので、ご教授のほどお願い致します。 以上よろしくお願い致します。
tamoto

2016/12/13 03:09 編集

どうもこんにちは。回答します。 (1) その通りです。例えばActionを取るAddは、ActionをFunc<T>にラップしなおし、Add(Func<T>)に送ります。 (2) SpinWaitはいわゆるwhileループ待ちのような待機を行うためのクラスです。Interruptの割り込みは、同時多発的に発生するような使用法はそもそも想定しておらず、「万が一」衝突した時のためになるべく軽量にループ待機を行うために採用しています。もしも、Interruptが大量に発生するような使用法(非推奨です)を考える場合は、割り込み待ちのimmediateのほうもConcurrentQueue化し、順次処理を実装することを考えなければなりません。 (3) Runメソッドはプライベートメソッドであり、ActionQueueのインフラとして使用されるものになります。パブリックAPIが返却するTaskは全てTaskCompletionSourceから作成されたものになるので、Run内で動作するスレッドプール処理とは物理的に分断されています。 これと直接は関係ない話ですが、ConfigureAwait(false)の指定というのは上層に伝播するものではなく、例えばUIスレッド上でAddを呼び、そこでawaitする際にConfigureAwait(true)にしている場合は、たとえその内部でConfigureAwait(false)が呼び出されていたとしても、Addを呼び出した行のコンテキストは保存されているので、その後のUI処理も問題なく行えます。 一つだけ、注意しなければならないのは、「Addに渡すデリゲート内でUIに関する処理は絶対に入れてはいけません」。Addに渡したデリゲートは処理待ちキューに入り、その後スレッドプール上で処理されることになるので、ここでUIに触れると例外を吐くことになります。 --- このSchedulerはそのまま実用可能ですが、まだ改良の余地はあります。例えば、現状では、一度Queueに入れた処理を後からキャンセルする方法が無いので、Runメソッド内のTask.RunにCancellationTokenを渡すようにし、Scheduler自体にIDisposableを実装し、明示的なキャンセル処理を実装するなどの追加設計を考えることができます。 また何かわからないことがあったら遠慮なく質問して下さい。
matsu1

2016/12/13 09:33

ご回答ありがとうございます。 コードについてですが、以下のビルドエラーが発生します。 ◆(1.1)CreateTask<T>(Func<T> action, TaskCompletionSource<T> tcs)内の return Task.CompletedTask; ◆(1.2)Run()内のTask.CompletedTask.ConfigureAwait(false) 上記はともに「CS0117 'Task' に 'CompletedTask' の定義がありません」が出ます。 Task.CompletedTaskプロパティの説明ページ: https://msdn.microsoft.com/ja-jp/library/system.threading.tasks.task.completedtask(v=vs.110).aspx によれば、 ①ユニバーサル Windows プラットフォーム10以降で使用可能 ②.NET Framework4.6以降で使用可能 とのことです。 こちらの開発環境はVisualStudio2015,.Netバージョン:4.6.01055のため②はクリアしていると思いますが、①の方が満たしていないものかと考えています。OSはWindows7なのですが、①を開発環境にインストールしてきたりは可能なのでしょうか? もし①がWindows7では満たせない場合、代替手段が必要かと思いますが、調べてみると Task.CompletedTaskをTask.FromResult(0)で置換する案がありましたが、可能でしょうか? ◆(2)Run()内の「else if (this.m_TaskQueue.TryDequeue(out var action))」 これについては ・CS1003 構文エラーです。',' が必要です。 ・CS0103 現在のコンテキストに 'var' という名前は存在しません。 ・CS0103 現在のコンテキストに 'action' という名前は存在しません。 などのビルドエラーが発生するようです。 varによるactionの宣言を関数引数内で宣言がよくない?と思い、関数の外で宣言しましたがうまくいかず。 お手数ですが上記に付きましてご教授をお願い致します。
tamoto

2016/12/13 11:51

はわわわ……out var使ったコード載せちゃっ…… (2)については、えっと、次のC#の新構文を使っちゃってました。VS2017が出たら書けるようになります……すいません…… (1)については、インストールされた.NET Frameworkが4.6だとしても、「プロジェクトがターゲットにしているFramework Versionが古い」という可能性があります。例えば、4.6がインストールされたPCでもプロジェクトターゲットを4.5で作成すると、「4.5までの機能しか使えない代わりに、4.5がインストールされたPCでも動く」というアプリになります。おそらくCompletedTaskはプロジェクトのバージョンを上げることで使用可能になると思います。 バージョンを変えずにFromResultでの置換でも可能です。CompletedTaskが使えるバージョンがターゲットであるなら、絶対にそっちを使った方がいい、という程度ですね。 回答のほう、問題の箇所についてコードを更新しました。念のためTask.FromResultを使うようにしてあります。ほかには、一箇所到達不可能なコードパスがあったので削除してます。
matsu1

2017/01/10 06:52

tamotoさん 今年もよろしくお願い致します。 (1)はプロジェクトターゲットを4.6に上げると使用可能になりました。 (2)は更新後のコードで問題なく動きました。 上記ご回答ありがとうございます。 改良の余地として述べられた部分を以下のように実装しようと考えています。 ◆(1)Scheduler自体にIDisposableを実装 →ActionQueueSchedulerのクラスを以下のようにする。 public class ActionQueueScheduler : IDisposable { } ◆(2)Runメソッド内のTask.RunにCancellationTokenを渡す →スケジューラのAddメソッドの呼出元で使用するCancellationTokenを、 スケジューラに渡すようにします。 ◆(3)アンマネージリソースの破棄におけるIDisposable,usingの関係 IDisposableはアンマネージリソースを破棄するためのインターフェースで、IDisposableを実装したクラス内のStreamReaderなどのアンマネージリソースの使用箇所でusingを用いてやれば、適切にリソースを解放してくれると以下に書いてありました。 ・http://garicchi.hatenablog.jp/entry/2014/09/14/200000 「Scheduler自体にIDisposableを実装」した際にアンマネージリソースは何になりますでしょうか?またIDisposableとスケジューラに渡すCancellationTokenの関連性はどういったものでしょうか?よろしくご回答をお願い致します。
tamoto

2017/01/10 08:12

よろしくお願いします。早速ですが回答していきます。 (1)はその記述のとおりですね。単なるインターフェースの実装です。実装の内容については(3)で説明します。 (2)については、Addした各アクションそのものを単独でキャンセルするのは非常に難しいです。各アクションをどのようにキャンセル処理を行うべきかが定まらないためです。なので、この機能は実装せず、キャンセルはスケジューラ側で行うのではなく、各アクションメソッドの方でキャンセル時の処理を記述するようにしたほうが良いです。 CancellationTokenによるキャンセルは、Schedulerそのものの動作の停止を目的とします。これも続きは(3)で説明します。 (3)について。 IDisposableインターフェースがアンマネージリソースを破棄するためのものという話は、IDisposableの一面に過ぎません。IDisposableインターフェースが表すのは「Dispose()という『終了処理』を記述するメソッドを提供する」というただ一つのみです。終了処理を行うので、アンマネージリソースの破棄にも使うというだけなのです。IDisposable自体はアンマネージリソースと関係なく使用することができます。 今回はアンマネージリソースは持っていないので、IDisposableインターフェースで行うのは「Queueに溜まった残りの処理を捨て、Schedulerの動作を停止させる」というものになります。 Schedulerの実際の動作はRunメソッド内のTask.Runで行われているので、このTask.RunにCancellationTokenを渡しておき、「Dispose()が呼び出されたらTaskをキャンセルする」という動作を実装すればよいわけです。さらに、SchedulerにAddされている各処理を管理するTaskにもキャンセルを通知する必要があります。でないと、実行されなくなったTaskが永久に完了しないまま止まってしまうからです。 この部分はメモリのリーク防止や同時実行制御なども影響する箇所なので、結構難しい実装になります。今すぐではないですが、自分の方でも組んでみますね。
matsu1

2017/01/16 01:21

tamotoさん IDisposeについての説明ありがとうございます。理解しました。 私の開発環境で以下のように、アプリ終了処理とDispose処理を実装してみましたがいかがでしょうか。 私の方では一定周期でAdd()を呼んでいるタスクがあり、まずそのタスクをタスクキャンセルし、キャンセルが完了するまで待ちます。これにより以降キューにタスクがAddされることはなくなります。 次に、アプリのmain処理側からScheduler(IDisposableを継承済とする)のDispose()を呼びます。このDispose()はRunメソッド(予めCancellationTokenを渡しておく)に対してタスクキャンセルを行うようにします。 Runメソッドはそれを受けてタスクキャンセルし、catch (OperationCanceledException)処理の中で、キューのCountプロパティ(残タスク数)を見てそれが0でない場合は0になるまでTryDequeue()するようにします。このときアプリのmain処理側ではキューの残タスク数が0になるまで、TryDequeue()とは非同期で待つようにさせておきます。 残タスク数が0になったらアプリを終了させます。
tamoto

2017/01/16 11:12

こんにちは。 非同期プログラミングには見えづらい罠が多いため、表層の説明のみで処理が大丈夫かどうかを断定するのは難しいです……が、考え方は間違っていないと思います。いくつかのやり方はありますが、そのうちの一つではあると思います。一つだけ気をつけなければいけないのは、SchedulerをDisposeしたとき、実行されなくなった残りのTaskは単に捨て置かず、ちゃんとキャンセル状態にしてやらないといけないです。そうしないと、もしどこかでそのTaskをawaitしていた場合、永久にプログラムが先に進まなくなります。 自分もキャンセル処理を念頭に置いて書き直してみたので、置いておきますね。 https://git.io/vMrbO キャンセルを自然に組み込むために全力で破壊的変更してるので、ついでにいろいろ修正してます。基本的な考え方は変わってないですが、キャンセル処理をスレッドセーフに行うためにステート管理を書き直したり、Addを呼ぶときは引数にCancellationTokenが入るようにして、簡単にキャンセル処理が書けるようにしたりしてます。あと、一気にいじってから検証が足りてない気がするので不具合があるかもしれないです。。
matsu1

2017/01/17 06:22 編集

tamotoさん 新しいキャンセルのコードありがとうございました。それにて再実装&検証してみようと思います。 コードについて質問ですが、まず仕様としては以下の理解でよいでしょうか。 ・Add()やInterrupt()はそれぞれ別々のキューにpushされ、Run()がInterrupt()によりpushされたタスクが有る限りそれをまず処理する、それがない場合はAdd()によりpushされたタスクが処理される。 ・ActionQueueScheduler.Dispose()を呼ぶとAdd()とInterrupt()に対してキャンセルがかかり、その結果Run()はwhile文をbreakで抜ける。breakで抜けるまでをDispose()内でRun.Wait()で待っている。これにより、キャンセル時にキューにpushされていたタスクは全てキャンセルされた上でキューから取出される。 次に質問点ですが ●質問(1) 例えばAdd()をアプリ側から呼ぶ際は、最初に教えて頂いたコードでは以下のような引数の渡し方でしたが、 ・await Add(() => { /*タスク処理*/ }).ConfigureAwait(false); 今回の場合はどのようにタスク処理とキャンセルトークンソースをAdd()の引数に渡せばよいでしょうか。わからないのでご教授をお願い致します。 ●質問(2) メンバ変数のcancellerと、Add()の引数のCancellationTokenについて以下のように認識しましたが正しいでしょうか? ・cancellerはアプリ側からActionQueueScheduler.Dispose()を呼んだ際に使用され、Add()やInterrupt()の処理をキャンセルするもの。 ・Add()の引数のCancellationTokenは、アプリ側でAdd()の引数のCancellationToken.Cancel()を実行した際に、キューにpushされたタスクの処理をキャンセルするもの。 ・以上により、キューおよびキューにpushしたタスクの破棄処理は、以下のようになる。 ①アプリ側でActionQueueScheduler.Dispose()を呼ぶ →キューへのpushが停止される→キューの全タスクが取出される→キューが破棄される ②アプリ側でAdd()の引数のCancellationToken.Cancel()を呼ぶ →キューにpushされたタスクの処理がキャンセルされる 長文になりましたが、よろしくご回答のほどお願い致します。
tamoto

2017/01/17 07:03 編集

仕様の方ですが、「Add()とInterrupt()に対してキャンセルがかかり」というのは少し違って、「Add()とInterrupt()で追加され、実行待ちキューに残っているActionが全てキャンセル待ちになる」という感じです。「その結果Run()はwhile文をbreakで抜ける」というのは違って、「Queueの中身が空になるまで回して、全てのActionをキャンセルしてからbreakする」というものですね。 質問1 ・await Add(token => { /*タスク処理*/ }).ConfigureAwait(false); という書き方になります。このtokenはSchedulerがDisposeされたときにCanceledになるCancellationTokenなので、DisposeされたときにActionを実行途中でも安全に終了させるために使います。/*タスク処理*/の中で、tokenのIsCancellationRequestedを見て条件分岐するような使い方ですね。 質問2 CancellationTokenにCancel()というメソッドはなく、Tokenそのものができることは、「外から処理のキャンセルがリクエストされたとき、処理側でそれを認識できるようにする」だけなので、それを利用して安全なキャンセル手続きを自分で記述しなければなりません。このときの「外からのキャンセル」というのは「SchedulerのDispose」のことです。 「アプリ側からタスクのキャンセル」は、一つ一つのActionを個別にキャンセルしたい場合は、ラムダ式の変数キャプチャなどを使用して個別にキャンセル処理を実装してやればいいだけです。 よって、①は正しく、②はそもそも存在しないです。
matsu1

2017/01/18 07:47 編集

tamotoさん ご回答ありがとうございます。Run()内の以下について質問させて頂きます。 if (Interlocked.CompareExchange(ref this.state, 1, 0) != 0) return this.Run(); return this.runner = Task.Run(async () => 上述部は以下のように解析しました。 ①state=0(停止)の場合は、state=1(セットアップ)にしてTask.Run(…)以下を非同期で実行する。 ②state=1(セットアップ)or2(実行中)の場合は、Run()を再帰的に実行する。 私の環境では、複数の非同期タスク1~nでただ1つのキューを共用し、そのキューに対して各非同期タスクがAdd()を呼んでいます。 このような場合に、例えば、タスク1からRun()が呼ばれたとします。すると、CompareExchange()が実行されstateが0→1に変更開始されます。 この変更中にタスク2からRun()が呼ばれたとします。タスク1で呼ばれたRun()が実行中ですので、タスク2から呼ばれたRun()はその実行を待たされます。 タスク1で呼ばれたRun()がstateを0→1に変更完了し、次行のTask.Run()を実行すると、処理権がタスク2で呼ばれたRun()側に移ります。すると、その際にはstate=1になっているので、return this.Run()の部分が無限に繰り返されます。 と、言うようなことが起こりませんでしょうか? 私の環境ではRun()の何回目かの実行時にSystem.StackOverflowExceptionが発生し、上述が原因ではないのかと考えております。 対策としては if (Interlocked.CompareExchange(ref this.state, 1, 0) != 0) return this.Run(); 上記の2行を削除し、stateの定義からもセットアップを削除し停止と実行中のみで運用すればよいのではないかと考えました。(すみません、なぜセットアップのstateが必要なのか意図がわかりませんでした) 解析が違っていたらすみません、ご見解のほどよろしくお願い致します。
tamoto

2017/01/18 09:20

はわ……実際にスタック溢れが発生したわけですね。すみません……。state:1が必要なのは、Task.Runでタスクの作成を開始してからthis.runnerに代入されるまでの僅かな隙間でだけ、this.runnerの状態が期待しない状態になるためでした。確かにスタック溢れを起こしそうな可能性はあるコードでしたが、即座に書き換えるので大丈夫かなと思ってました……今見たら実際普通にダメですねこれ…… ちょっとだけ書き直したので確認してください。変更点はrevisionsにあります。動作検証がまだできてないですが、その問題は起きなくなるはず……です。
matsu1

2017/01/19 08:17

tamotoさん rev3のコードを見させてもらいました。ありがとうございます。 private Task Run() { /* ①while部 */ while (Interlocked.CompareExchange(ref this.state, 1, 0) != 0) if (this.state == 2) return this.runner; /* ②Task.Run部 */ this.runner = Task.Run(async () => { … Interlocked.Exchange(ref this.state, 0); }); /* ③Exchange部 */ Interlocked.Exchange(ref this.state, 2); return this.runner; } 動作順序としては①→③→②。 あるタスク1から①でstate=0→1に変更後③が実行されるまでに、別タスク2から①が実行された場合、 ①のwhileループから永久に抜けれなくなるコードとなっていますが、 ①と③間でタスクの切替えは起こることはないため、①でstate=0→1になった後は 必ず③でstate=1→2になり、そのような事象は起こりえない。 との認識でよろしいでしょうか? 現状の動作確認では①で永久に抜けられなくなることは発生していません。 となりますと、state=1が必要なのか少し疑問に思ってしまいました。 以下のようにstate=1(セットアップ)なしで、state=0(停止),2(実行中)だけで実現できる のではないかと考えましたが、ご見解いかがでしょうか?よろしくご回答をお願い致します。 (※解析が誤っていたらすみません) private Task Run() { /* ①while部 */ while (Interlocked.CompareExchange(ref this.state, 2, 0) != 0) if (this.state == 2) return this.runner; /* ②Task.Run部 */ this.runner = Task.Run(async () => { … Interlocked.Exchange(ref this.state, 0); }); /* ③Exchange部 */ (削除) }
tamoto

2017/01/19 09:22 編集

その認識で正しいです。stateは②のTask.Runが複数同時に行われる事を避けるためだけに存在するので、stateを0->1にすることに成功したタスクはその後必ずstateを1->2へ移行します。state:2への移行が完了すれば、whileループはif文内のreturnにより脱出します。 state:1を削除した場合(コメントのコード)の問題点は、①にてstateを2にした後、②でTask.Runが実行され、「this.runnerにTaskが代入されるまでの間に」別タスクが①に入ってしまった場合、その別タスクには「this.runnerが更新されていない(最初であればnull)ものが返される」という状態になります。これが期待されないので、state:1が必要なのです。 今気がついたんですが、state:1になる状態をなるべく短くするためにrev2からrev3に書き換えたんですが、stateを1->2にするExchangeと2->0にするExchangeが別のタスクで実行されるコードになってしまったため、Task.Runの中身が後で実行されることを暗黙的に期待するコードになっていますね……rev3でもまず問題は起きないと思いますが、rev2に巻き戻したほうが安全かもです。戻します。。 (rev4: 念のためのチェックを追加して更新しました。)
matsu1

2017/01/20 08:18

tamotoさん rev4のコードを見させて頂きました、ありがとうございます。 各revで動作確認をしました。 rev2はマルチタスク下で実行された際に、タスク1がstate=1になった状態でタスク2からRun()が実行されると「while (Interlocked.CompareExchange(ref this.state, 1, 0) != 0)」から抜けられなくなり、はまるので、このrev2は使用できないかと思います。 rev1は先述の通りスタックオーバーフローが発生するので使えません。 rev3,rev4はそれらは起こらず、tamotoさんがおっしゃられたように、Task.Runの中身が後で実行されることを暗黙的に期待するコードとなっていますが、先述の通り①while部と③Exchange部で処理権の切り替えは起こりえないと考えられますので問題ないかと思います。 rev4は以下のようにrev3から変更されています。 [rev3] Interlocked.Exchange(ref this.state, 0); → [rev4] while (Interlocked.CompareExchange(ref this.state, 0, 2) != 2) ; rev4のwhile文ではstate=0 or 1の場合に待ちが発生しますが、マルチタスク下でここで抜けられなくなる状況は思い付きませんでした。個人的な所感ですが、whileとCompareExchangeを用いたチェックは無限待ちに陥る可能性が常にないか考える必要があり、本当に全ての動作パターンを推定・網羅できているか少々心配になります。if (state==2) {Exchange(state, 0)}とした方がそのようなことを考える必要がなくなりプログラムが簡単なのではないかと思ってしまいました。※解析が誤っていたらすみません。 以上よろしくお願い致します。
tamoto

2017/01/20 08:47 編集

マルチスレッディングでの状態遷移は、「どこが並列に動作するか」を図に描いてみるとわかりやすいかもしれません。 whileでCompareExchangeを行うコードは、「そこにたどり着くコードが実行されたとき、最終的に『必ず』stateが2になる」ことが保証されていれば、無限ループにはまることはありえないです。もちろん、ちょっとでも間違えればすぐに動かなくなるため、コードを書くときは細心の注意を払わないといけません。 rev4のコードを見てみましょう。Task.Runが実行される行では「必ずstateが1である」ことが保証されています。次に、「そのTaskの作成が完了した瞬間にstateが2になる」ことも保証されています。そして、「stateが2以外になるにはTask.Run内のwhile(CompareExchange)だけ(state:2->0)」になります。よって、必ず0->1->2->0->...の順で状態が変化するようにつくられているわけです。whileを使ったCompareExchangeはいわば「保険」であり、ほとんどのパターンで1回きり実行されて次の行に移ることが期待されています。言い換えると、「もしこの行にきたときにstateが『まだ』xじゃなかったら、ちょっと待つ」という待機コードなのです。 状態の管理を確実にすることは必須で、例えばコメントのif (state==2) {Exchange(state, 0)}と書いたとしたら、if文に辿り着いた時に「もしもまだstateが2になっていなかったら?」一生stateが0に戻ることはなくなり、Schedulerは破損状態になり、以後正常に動作しなくなるわけです。 ところで、whileとInterlockedを使っているのは、並列動作時に操作がぶつかることは少なめであると仮定して、ぶつからない場合になるべく高速に動作させることを考えているためで、効率を一切考慮しないならlockステートメントなどで同期を取ってしまってもいいわけです。
tamoto

2017/01/20 09:43

rev2について、stateが1になる(==Task.Runの行へ移行する)と、「その後必ずstate:2になる(Task.Run内)」ようになっているので、もしstateが1の時に別のRunが実行されwhileのループにはまっても、その後stateが2になることが保証されているので、2になったときにif文で脱出するという動きになります。よって、rev2は問題なく動作するはずです。
matsu1

2017/01/24 03:56

tamotoさん ご回答ありがとうございます。 rev2は勘違いしていました。非同期でTask.Run()内でstate:2になるのではまりは発生しないですね。 rev4についても理解しました。 whileとInterlockedの意味についても理解が深まりました。 これより、最新のrev4で実装して動作確認と性能評価(CPU負荷や同期周期の遅延など)をしてみたいと思います。 よろしくお願い致します。
tamoto

2017/01/24 04:08

一応、CancellationTokenとCanceledTaskの取り扱いについて一部自然な形に修正したrev5をアップしてありますので、確認してみて下さい。スレッドセーフティ関係の修正はrev4で完成しているため、そちらに変更はありません。よろしくお願いします。
matsu1

2017/01/24 07:47

tamotoさん ありがとうございます。rev5を確認しました、実装に使わさせて頂きます。
matsu1

2017/01/25 07:32

tamotoさん 以前ご回答頂いた以下について質問させてください。 「質問1 ・await Add(token => { /*タスク処理*/ }).ConfigureAwait(false); という書き方になります。このtokenはSchedulerがDisposeされたときにCanceledになるCancellationTokenなので、DisposeされたときにActionを実行途中でも安全に終了させるために使います。/*タスク処理*/の中で、tokenのIsCancellationRequestedを見て条件分岐するような使い方ですね。」 私の環境では、Add()呼出元タスクというものがあり、そこからAdd()を呼びキューに上述の「/*タスク処理*/」を追加する形を取っています。呼出元タスクはキュータスクとは別のキャンセルトークンソース(以下呼出元CTS)を持っています。 キャンセル対象タスクは、以下の2つです。 ①呼出元タスク(呼出元タスクをキャンセルすることは上述の「/*タスク処理*/」のキャンセルと等価) ②キュータスク ①,②の両タスクのキャンセル方法は以下の2通りが考えられます。 (1) /*タスク処理*/の中で、tokenのIsCancellationRequestedを見ておき、タスクキューのCTS.Cancel()で上述の①と②の両タスクをキャンセルする方法。 (2) 呼出元CTS.Cancel()で①のタスクをキャンセルし、タスクキューのCTS.Cancel()で②のタスクをキャンセルする方法。 どちらを用いるべきか考えた際に、Add()元である呼出元を先にキャンセル完了させた後、キュータスクをキャンセルしたいと考えています。(2)ですとCTSが別なので、そのように実装するのは容易ですが、(1)はCTSが共通なため①と②のタスクがどちらが先にキャンセルされるかわからず、別途①→②の順にキャンセルされるように対策が必要になるのではないかと思ってしまいました。 これについてご見解いかがでしょうか。よろしくお願い致します。
tamoto

2017/01/25 07:43

呼び出し元タスクというのは、中でAdd()をawaitしているのですよね?そうであれば、Add()した/*タスク処理*/が完了するまで呼び出し元タスクはawaitの行に留まるわけなので、その間にCTSのCancelが呼び出された場合、単純にAdd()が完了してから次のAdd()が呼ばれるまでの間に抜けるだけで良いと思います。Addした/*タスク処理*/を個別にキャンセルする必要はありません。呼び出し元タスクと/*タスク処理*/は同期的なフローで動作しているからです。 「/*タスク処理*/を実行中に、それを一刻も早く停止させたい」という要件であれば、/*タスク処理*/の中でCTS.TokenのThrowIfCancellationRequestedを呼べばいいでしょう。その場合Add()はCanceled状態になり、連鎖的に呼び出し元タスクもCanceled状態に移行します。
matsu1

2017/02/07 04:32

tamotoさん キャンセル方法についてご回答ありがとうございました。理解できました。 現在、同期周期の遅延について性能調査をしています。 以前、tamotoさんが記載頂いた以下について質問させて頂きます。 while (true) { await this.scheduler.Add(async () => { await "何らかの処理"; }).ConfigureAwait(false); await Task.Delay(500).ConfigureAwait(false); } ●質問1:実質的な周期について 上記は500ms周期で処理をキューにAddして実行するものですが、キューが処理中で実行を待たされる場合は、500msの待ち+キューの実行待ち時間が、実質的な周期になりますでしょうか? ●質問2:同期遅延の解決策について 質問1が正しいと仮定します。私の環境では300個のキューへAddするタスクが動作しています。 そのうち250個は、500ms周期でファイルを1個or数10個をコピー元→先へコピーする処理、 残り50個は10秒,30秒,60秒などでファイルを1個or数10個をコピー元→先へコピーする処理、 を行わせています。 この場合の同期周期の遅延は10秒,30秒,60秒などで平均数秒の遅延が発生しています。 この遅延が減るように解決したいのですが、tamotoさんが書かれたように「同期処理の処理速度より、スケジューラーのタスク登録速度のほうが明らかに速い」場合に当てはまってしまいなかなか難しいでしょうか? 以下にいくつか試してみた方法を記載します。 (1)処理時間を考慮した上でDelay時間を決める方法 ・以下により、より正確に同期周期を保てるようになりました。  しかし、この方法でもキューでの実行待ち時間分は遅延されてしまいます。 while (true) { DateTime start_time = DateTime.Now; // 処理開始時刻 await this.scheduler.Add(async () => { start_time = DateTime.Now; // 処理開始時刻をセットする await "何らかの処理"; }).ConfigureAwait(false); int process_time = (int)(DateTime.Now - start_time).TotalMilliseconds; // 処理時間 if (process_time < 同期周期) await Task.Delay(同期周期 - process_time).ConfigureAwait(false); else await Task.Delay(1).ConfigureAwait(false); } (2)前回の処理開始時刻との差分からDelay時間を決める方法 ・前回の処理開始時刻との差分からDelay時間を決めれば、キューの実行待ち時間も考慮してDelayできて正確だったのですが、これをすると他のタスクの実行も全て遅れてきてしまうので不可でした。 while (true) { DateTime start_time = DateTime.Now; await this.scheduler.Add(async () => { int process_time = (int)(DateTime.Now - start_time).TotalMilliseconds; if (process_time < 同期周期) await Task.Delay(同期周期 - process_time).ConfigureAwait(false); else await Task.Delay(1).ConfigureAwait(false); start_time = DateTime.Now; await "何らかの処理"; }).ConfigureAwait(false); } (3)キューにAddした後、個別にDelayさせて実行させる方法 ・Addした後、各タスクがキューにより実行される段階でDelay時間を計算してDelayする方法を用いてはどうかと考えたのですが、そうして各タスクをDelayさせていざ割込みキューで実行しようとしても、他のタスクも割込み待ちをしますから、結局はキューの待ち時間による遅延は回避できないと思いました。 (4)同期周期にオフセットを加算する方法 ・例えば同期周期が2秒,4秒の2つのタスクがあった場合、2秒時は1個のタスクのみ実行ですが、4秒時は2個のタスクが同時にキューに並びます。しかし、同期周期が3秒,7秒などの組み合わせであれば、同時にキューに並ぶ回数が減り、キューの実行待ち遅延の発生回数も減ります。 しかし、現実には同期周期にオフセットを乗せることは仕様とことなる周期で動くことになるので、よろしくないと考えました。 (5)同期処理の開始時刻にオフセットを加算する方法 ・(4)とは異なり、同期周期にはオフセットを乗せず、各タスクの最初のAddする時刻にオフセットを乗せることを考えてみました。例えば、同期周期が2秒と4秒の2つのタスクがあった場合、4秒の方は2秒の方から1秒ずらしてAdd開始してやれば、同時にキューに並ぶことはなくなります。 もちろん、実際はタスクの数ももっと多いですし、このように全てをうまくずらし合うことは不可能ですが、例えば100msずつずらすなどしてやれば少しはましになるのではないかと考えました。 以上が私の方で思い付いたものですが、なかなか決定打はありませんでした。 もし何か良いアイデアをお持ちでしたらご教授頂ければ助かります。 よろしくお願い致します。
tamoto

2017/02/07 05:03 編集

こんにちは。元コードの「Taskが完了してから500ms待つ」というのでは都合が悪いのですね。 書き換えるとして、自分だったら「タスクを実行開始して500ms待つ、その時Taskが完了していれば即座に次のループへ、完了していなければ待つ」という書き方をします。 while (true) { var task = this.scheduler.Add(async () => { await "何らかの処理"; }).ConfigureAwait(false); // 実行開始だけ await Task.Delay(500).ConfigureAwait(false); // 500ms待つ await task; // 完了していなければ完了するまで待つ } --- 追記 「500msに1回のペースで必ず処理を実行したい」というなら、むしろ「Taskの完了状態を無視して、500msに1回のペースを保ってスケジューラにAddし続ける」というやり方も考えられます。 while (true) { this.scheduler.Add(async () => { await "何らかの処理"; }).ConfigureAwait(false); await Task.Delay(500).ConfigureAwait(false); } この方法なら、その時ごとのスケジューラの混雑状況に左右されず、定期的に処理を入れることが可能になります(実際の処理間隔はスケジューラの混雑状況によって、一瞬で2回実行されたり、1秒以上間隔が空くことはあります)。しかし、各Taskの管理責任を完全にスケジューラに任せてしまうことになるので、タスク管理的に都合が悪い状況が発生する可能性があります。また、このやり方だと、実際の処理に掛かる時間を次の処理のAdd速度が上回っていた場合、少しずつスケジューラのキューが溜まっていき、長期間実行するとメモリが溢れることになるので、あまりオススメしません。 --- もう一度追記 すみません、質問に対する直接の回答を含んでなかったです。 質問1は、その通りです。 質問2は、「50個の処理」というのが若干ヘビーなものなのでしょうか、スケジューラは処理を一列に並べて実行するものなので、周期的なAdd処理が各タスクの完了を確認しなければならない現状では、その分の遅延は発生してしまいます。1つ目の提案では遅延を軽減可能です。2つ目の提案なら完全に問題を回避できますが、メモリリークなど致命的な問題に繋がる可能性があるのでオススメできないところです。
matsu1

2017/02/07 09:36

tamotoさん ご回答ありがとうございます。 同期周期の遅延をなくすことへのアプローチ方法は以下の2つあるかと思います。 (1)「this.scheduler.Add」が正確に周期通り行われること。 (2)「await "何らかの処理";」が正確に周期通り行われること。 先ほどのご回答で追記の上の回答、及び、追記の方は共に(1)にあたるかと思います。 こちらの環境で確認しましたが、Addの周期は正確に500msを刻んでくれました。 しかし、今回私の方では(2)の何らかの処理が周期通りに行われることを目指していまして、良い方法がないか悩んでおります。 例えば、追記の上の方法の実行順は 1.var task = this.scheduler.Add(async () => 2.await Task.Delay(500).ConfigureAwait(false); 3.await task; 4.await "何らかの処理"; キューが実行中で待ち時間があった場合、それが上記1の時点から4が実行されるまでの時間となります。 それを例えば50msとします。そして、何らかの処理が100msで終わったとします。すると350ms後にawait task;が実行完了し、次のAdd処理が開始されます。 次のAdd時のキューの実行待ち時間が0msであったとします。するとすぐに何らかの処理が開始されます。このような場合、前回の何らかの処理と今回の何らかの処理の間の時間は450msとなってしまい、周期の500msとはずれてきてしまいます。 何らかの処理の実行フローを以下に示します。 1回目のAdd() → キューの実行待ち時間q1 → ★await 何らかの処理開始 → 何らかの処理の処理時間p1 → Delay(d1) → 2回目のAdd() → キューの実行待ち時間q2 → ★await 何らかの処理開始 上記で1回目と2回目の「★await 何らかの処理開始」の差が同期周期になっていればよいので 「Delay時間d1=同期周期 - 何らかの処理の処理時間p1 - キューの実行待ち時間q2」 で求まるかと思います。 となると、Delayするのはq2がわかってからということになり、Addしたタスク内でDelayさせなければならなくなります。しかし、そのようにキュー内でDelayを実行するとキューの処理全体がその分、待たされて遅くなってしまうので、これでは使い物になりません。 ということで、どのようにしたら良いか困っているのですが、ご見解頂ければ幸いです。 よろしくお願い致します。
tamoto

2017/02/07 09:54

えーっと……"何らかの処理"が周期通り実行されることを期待するのは「不可能」です。なぜなら、同時多発的に"何らかの処理"のタスクが生成される瞬間に順番待ちが発生し、その分どちらかのタスクの実行開始時間が後ろにずれ込むからです。それでは、スケジューラを使って処理順序を直列的にした意味が全くなくなってしまいます。なんだか最初の質問まで巻き戻った感じがしますが……その"何らかの処理"はファイルのコピー処理ですよね?それを時間通りに実行しようとしたら、必ず何処かでタスクが同時に実行されることになるため、IO処理が競合して非常に大きな遅延が発生し、使い物にならなかったのではないですか。
matsu1

2017/02/10 00:29

tamotoさん ご回答ありがとうございます。 そうですね。本来の目的を見失いかけていました。 やはりタスクキューにスケジューリングしてもらって、キューの混み具合により既定の同期時刻からは処理が速まったり遅まったりするが、平均すれば既定の同期周期になっている、と言うやり方がベターなのでしょうね。 現状では500msの同期周期処理が250個,10秒・30秒・60秒の同期周期処理が20個,20個,10個ある場合に、実測の同期周期は10秒・30秒・60秒のものに関しては5~15秒,25~35秒,55~65秒の振れ幅が出ています。いずれも平均すればきっちり10秒・30秒・60秒にはなっています。 また、上記の状況で同期周期設定を10秒・30秒・60秒から1秒,3秒,6秒に変えると、実測周期が全て平均10秒くらいになり、キュー処理がだいぶ遅れがちになっていました。ここら辺は、同期周期の設定値やキューに追加するタスク数を検討する必要があるように思えました。 CPU使用率については上記の場合で30%程度でしたので、十分余裕があり助かっております。
tamoto

2017/02/10 02:21

スケジューラというものは「処理を『依頼』さえしておけば、あとは良きに計らってくれる」というものなので、「一定時間毎に処理を依頼することを保証する」ところまでマネジメントしてしまえば要件は満たしている、と考えてしまって良いと思います。 同期周期設定の短縮で発生する大きな遅れというのは、単純に「処理に掛かっている時間が実行周期よりも大きい」というだけですね。「地球の裏側と遅延1ms以内に通信したい」とか無茶を言ってるようなものなので。処理が追いつかないのであれば、周期に遅れが発生するのは避けられないです。 念のためもう一度言っておきますが、繰り返しスケジューリングするときに、前の自タスクの完了を確認しないで次のタスクを追加するようなやり方は、安全の面から避けるべきです。例えば開発環境では問題ないプログラムが実行環境のスペックに左右され「キュー処理がだいぶ遅れがち」が発生したとすると、長時間実行するとそのキューが積み上がってしまい、再現の非常に難しいバグに繋がったりします。書いたコードが自身を管理できているようにしておかないと危険です。
matsu1

2017/02/13 05:41

tamotoさん スケジューラの概念、ご教授頂きありがとうございます。 念のための部分について質問させて頂きます。 「前の自タスクの完了を確認してから次タスクを追加するのが安全な方法」とありますが、これは「2017/02/07 14:03 」に回答頂いた以下の部分で実現されているとの認識でよいでしょうか? while (true) { var task = this.scheduler.Add(async () => { await "何らかの処理"; }).ConfigureAwait(false); // 実行開始だけ await Task.Delay(500).ConfigureAwait(false); // 500ms待つ await task; // 完了していなければ完了するまで待つ } すなわち、キューに積んだタスクが実行完了するまではawaitするので、前の自タスクが必ず完了してから、次のタスクを追加する動きになる。よろしくご回答をお願い致します。
tamoto

2017/02/13 05:52

はい、その通りです。 タスクの追加直後に500msの待機をセットしているため、500ms経過するまでの間にタスクが完了していなかった場合のみawait task;の行で完了するまで待機します。
matsu1

2017/02/13 06:40

tamotoさん ご回答理解しました。ありがとうございます。 これに関連する部分で理解を深めたいので質問させて頂きますが、 public Task<T> Add<T>(Func<CancellationToken, T> action) { var tcs = new TaskCompletionSource<T>(); this.queue.Enqueue(() => this.InvokeAction(action, tcs)); this.Run(); return tcs.Task; } 上記のAdd()の戻り値がtcs.Taskとなっています。 これは引数actionで渡されるタスク処理が実行完了した状態を返すものでしょうか。 となると、このAdd()内でactionが実行完了するまで待ってくれる?ひいては上述の「await task; // 完了していなければ完了するまで待つ」のawaitは必要なのかな?という疑問が沸いてしまいました。 すみませんが、この部分について正しい解析をご教授頂けませんでしょうか。よろしくお願い致します。
tamoto

2017/02/13 07:17 編集

このスケジューラは一つのactionを動かすために3つ以上の独立した役割を持つTaskが作られたり動いたりしてるのでちょっと難解なのです。。 tcs.Taskは、 「actionが実際に処理完了したとき『完了』となる」 「actionが何らかの例外を発生したとき『失敗』となる」 「action内でOperationCanceledExceptionが投げられたとき『キャンセル』となる」 という一つのタスクオブジェクトとなっています。 TaskCompletionSource<T>のことは理解していますか?これを利用することで、(スケジューラが勝手に実行する)actionの実行状況をTaskという形に固めて、ユーザが容易に取り扱えるようにしたものです。 Addが完了してTaskが返されるのは「actionをスケジューラに登録完了した」タイミングであり、スケジューラによってactionが実際に実行されたときに初めて『完了』状態となるため、これをawaitすることは大きな意味があります。 tcsの扱い方は、InvokeActionのコード部分を読むと分かりやすいかもしれません。 --- 追記 もしかして、this.Run()が気になっているでしょうか。これは、スケジューラにアイテムが追加された時、スケジューラのタスク処理機構を(非同期に)稼働させる命令を投げるものなので、実行後は即座に制御がAddに戻っています。
matsu1

2017/02/28 05:16

tamotoさん TaskCompletionSourceの動きについてご教授ありがとうございます。理解しました。 また、this.Run()の動きは、おっしゃる部分は理解しております。ありがとうございます。 本件についてはこちらで実装完了できましたので、ベストアンサーとさせて頂きます。 3ヶ月の長きに渡りご教授頂きまして、誠にありがとうございました。
guest

0

あんまり質問読んでないで回答するのですが(ごめんなさい...)、非同期と並列処理を混同していますでしょうか。

非同期処理は、同期処理だとロックされるスレッドを、切り離す(=コンテキストスイッチを発生させなくする)だけで、並列処理をするわけではありません。
並列処理は...並列に処理するだけです。(非同期にしたスレッドを並列で処理するって感じです。)

コンテキストスイッチは、CPUにとって中々コストがかかるので、例えばIOバウンドのときは、CPUを使ってないのにコンテキストスイッチさせてCPUを無駄に使うのが無駄=非同期処理にすべき。
逆にCPUバウンドだと、非同期してもそもそもCPU使ってるので無駄(なケースが考えられる)とかです。
つまり、今回ファイルのIO待ちの時は、並列処理をしなくても非同期処理をする価値があるのです。

async/awaitは、非同期処理です。

(ちょっと極端な例かもですが、)asyncをして、すぐ下の行でawaitすればただの非同期処理です。記載されたコードにもありますが、Taskを複数走らせて、whenAllなどで待ってあげたりすることで並列処理をすることができます。

という点がクリアになれば、async/awaitすべきところや、並列がどう動くかについて解決できるかと思いまして、書いてみました。
(ちなみに、並列処理でもスレッドウィンドウを使ってスレッドを追いながらデバッグができます...)

投稿2016/11/25 11:35

BEACHSIDE

総合スコア294

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

matsu1

2016/11/28 09:35

BEACHSIDEさん ご回答ありがとうございます。 非同期と並列について混同があり、気づかさせて頂きました。 また、CPUバウンド,IOバウンドに関しても学ばさせて頂きました。 解決に向けてヒントを頂けましたので、また調べて考えてみます。 ありがとうございます。
guest

0

Q1に関しては、使い方の解説サイト等は多分既にご覧になっていると思われますのでそれ以上の解説は私にはできません。
強いて言えば、シンプルなテストプログラムを作って、実際にいろいろいじり倒してみるのが近道だと思います。

Q2に関しては、高度な誤魔化し技術もいろいろあるとは思いますが、まずは処理を高速化すべきだと思います。
fastcopyという有名な高速コピーのソフトウェアがあるので、それの使用を検討してみてください。
まぁそれでも根本的な解決ではないので、いつかは頭打ちになる可能性はありますが。

それ以外だと、やはり前提条件次第ですね。どうしてもどこかしら妥協すべきだと思います。

投稿2016/11/25 06:48

ishi9

総合スコア1294

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

matsu1

2016/11/25 08:28

ishi9さん ご回答ありがとうございます。 Q1に関しては解説サイトを読み、プログラムも組んでいるのですが、awaitの入れ子になったときにどのように動くかいまいち頭の中で整理できておらず、わかっていらっしゃる方に回答頂きたくて質問させて頂きました。基本的な内容の質問ですが、もし可能であればお手数ですがご回答頂けると大変助かります。 Q2に関してはfastcopyのご紹介ありがとうございます。 フリーなソフトについてはrobocopy以外のものは調べてみました。 ただ開発の前提としてmicrosoft純正のコードを使用したいこだわり(microsoftが動作を保証しているため)があり、robocopyにしております。基本的にはrobocopyとfastcopyでそこまで大きく速度が変わることはないのかなと、いくつかの比較サイトを見て感じております。 前提条件はおっしゃるように妥協する必要があると感じております。 ただ、現状の遅延があまりにひどいもので、そこを多少なりとも改善できる方法があればよいかと思い質問させて頂きました。
ishi9

2016/11/25 08:41

後はまぁ小さいファイルが大量にあるケースの場合、 一度全部圧縮してから転送して、転送先で解凍した方が速いケースもあります。
matsu1

2016/11/28 09:43

ishi9さん ご回答ありがとうございます。 おっしゃるアイデアを試してみたいと思います。 教えて頂きありがとうございました。
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.48%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問