前提・実現したいこと
Reactive Extensionsで入力→変換→出力の"変換"の部分で非同期メソッドを使用するが、それを再入せず、順番に実行したいです。
また、変換→出力部分を後で停止できるようにもしたいです。
ここではサンプルとして、1秒毎に値を発行するObservable.Interval
を入力、文字列への変換をするメソッドConvertAsync
を変換、時刻などを付加してコンソールへプリントするメソッドWriteLineDebug
を出力としています。
変換メソッドConvertAsync
は内部でawait Task.Delay
をしていますが、実際に使用する場合はここがファイルやネットワークアクセスになります。
このConvertAsync
の開始・終了が前後しないように順番に行いたいです。
発生している問題・エラーメッセージ
SelectManyの場合
変換メソッドをSelectMany
で呼んだ場合、変換メソッドConvertAsync
の順番が前後してしまいます。
ただし、変換→出力部分の停止はできています。
以下では0~3個目までは問題ないですが、3個目の開始Start ConvertAsync 3
の後に、3個目の完了End ConvertAsync 3
前に4個目が開始Start ConvertAsync 4
してしまいます。
出力結果
00.065|Thread:1|Start Subscription 00.615|Thread:5|Start ConvertAsync 0 00.745|Thread:4|End ConvertAsync 0 00.750|Thread:6|Subscrive inputValue:[Converted:0 delayMilliSec:100] 01.100|Thread:6|Start ConvertAsync 1 01.210|Thread:6|End ConvertAsync 1 01.210|Thread:5|Subscrive inputValue:[Converted:1 delayMilliSec:100] 01.600|Thread:4|Start ConvertAsync 2 01.710|Thread:5|End ConvertAsync 2 01.710|Thread:4|Subscrive inputValue:[Converted:2 delayMilliSec:100] 02.102|Thread:5|Start ConvertAsync 3 02.597|Thread:5|Start ConvertAsync 4 02.838|Thread:5|End ConvertAsync 3 02.839|Thread:4|Subscrive inputValue:[Converted:3 delayMilliSec:723] 03.107|Thread:4|Start ConvertAsync 5 03.605|Thread:5|Start ConvertAsync 6 04.105|Thread:4|Start ConvertAsync 7 04.115|Thread:4|End ConvertAsync 4 04.115|Thread:4|Subscrive inputValue:[Converted:4 delayMilliSec:1513] 04.145|Thread:4|End ConvertAsync 5 04.145|Thread:4|Subscrive inputValue:[Converted:5 delayMilliSec:1029] 04.575|Thread:4|End ConvertAsync 6 04.575|Thread:4|Subscrive inputValue:[Converted:6 delayMilliSec:948] 04.605|Thread:1|Dispose Subscription 04.605|Thread:5|Start ConvertAsync 8 06.090|Thread:5|End ConvertAsync 7 06.289|Thread:5|End ConvertAsync 8
Scan+Concatの場合
変換メソッドをScan+Concat
で呼んだ場合、変換メソッドConvertAsync
の順番は保たれますが、
変換部分がすぐ停止しません。出力部分だけがすぐ停止されます。
以下では5個目完了ぐらいのタイミングで停止処理disposerA.Dispose()
しても、12個目まで変換は続きます。
大体止めた個数の倍ぐらいまで変換が続くようです。20個目で停止→40個目ぐらいまで変換される。
出力結果
00.065|Thread:1|Start Subscription 00.613|Thread:4|Start ConvertAsync 0 00.745|Thread:4|End ConvertAsync 0 00.751|Thread:5|Subscrive inputValue:[Converted:0 delayMilliSec:100] 01.121|Thread:5|Start ConvertAsync 1 01.233|Thread:5|End ConvertAsync 1 01.233|Thread:5|Subscrive inputValue:[Converted:1 delayMilliSec:100] 01.614|Thread:6|Start ConvertAsync 2 01.725|Thread:4|End ConvertAsync 2 01.725|Thread:4|Subscrive inputValue:[Converted:2 delayMilliSec:100] 02.112|Thread:4|Start ConvertAsync 3 03.978|Thread:6|End ConvertAsync 3 03.979|Thread:6|Start ConvertAsync 4 03.979|Thread:5|Subscrive inputValue:[Converted:3 delayMilliSec:1858] 05.823|Thread:7|End ConvertAsync 4 05.823|Thread:7|Start ConvertAsync 5 05.823|Thread:6|Subscrive inputValue:[Converted:4 delayMilliSec:1836] 06.920|Thread:1|Dispose Subscription 07.223|Thread:4|End ConvertAsync 5 07.223|Thread:4|Start ConvertAsync 6 08.545|Thread:6|End ConvertAsync 6 (略) 15.244|Thread:4|Start ConvertAsync 12 16.961|Thread:6|End ConvertAsync 12
該当のソースコード
csharp
1class Program 2{ 3 static Random rand = new Random(); 4 static Stopwatch stopwatch = new Stopwatch(); 5 6 static async Task Main(string[] args) 7 { 8 stopwatch.Start(); 9 10 //[入力] 500msec間隔で{0,1,2,3,4,5,6,,,}と流れてくる 11 var timer = Observable.Interval(TimeSpan.FromMilliseconds(500)); 12 13 WriteLineDebug($"Start Subscription"); 14 15 var disposerA = timer 16 //[変換] 文字列に変えるが、3個目以降は遅くなる 17 //Case1-SelectMany 18 .SelectMany(l => ConvertAsync(l)) 19 //Case2-Scan+Concat 20 //.Scan(Task.FromResult(""), async (previousTask, l) => 21 //{ 22 // await previousTask.ConfigureAwait(false); 23 // return await ConvertAsync(l).ConfigureAwait(false); 24 //}) 25 //.Concat() 26 27 //[出力] 現在時刻などを追加してプリント 28 .Subscribe(x => 29 WriteLineDebug($"Subscrive inputValue:[{x}]")); 30 31 //Enter押下したら、購読を停止 32 Console.ReadLine(); 33 34 WriteLineDebug("Dispose Subscription"); 35 disposerA.Dispose(); 36 37 //停止を確認 38 Console.ReadLine(); 39 } 40 41 private async static Task<string> ConvertAsync(long l) 42 { 43 WriteLineDebug($"Start ConvertAsync {l}"); 44 45 //実際はファイルアクセスやネットワークアクセスがあるとする 46 int delayMilliSec = l >= 3 ? rand.Next(600, 2000) : 100; 47 await Task.Delay(delayMilliSec).ConfigureAwait(false); 48 49 WriteLineDebug($"End ConvertAsync {l}"); 50 51 return $"Converted:{l} delayMilliSec:{delayMilliSec}"; 52 } 53 54 private static void WriteLineDebug(string message) => 55 Console.WriteLine($"{stopwatch.ElapsedMilliseconds * 0.001:00.000}|Thread:{Thread.CurrentThread.ManagedThreadId}|{message}"); 56}
試したこと
以下のブログの目次を参考に、Switch()
、スケージューラーの固定を検討しましたが、期待した動作にはなりませんでした。
Reactive Extensions再入門
補足情報(FW/ツールのバージョンなど)
C# 8.0
.NET 5.0
"System.Reactive" Version="5.0.0"
VisualStudio 2019
回答2件
あなたの回答
tips
プレビュー