#やりたいことの概要
Reactive Extensionsの書き方に関する質問です。
やりたいことの概要は以下のようなものです。
- イメージとしては、複数のセンサーを持つ機器(複数台)のイベントをサンプリングするIoTのようなもの
- 機器はIdを持ち、センサーはSensorTypeによって識別され、その2つをキーとするイベントがストリームとして発生する
- このストリームから、一定のサンプリング間隔で、機器(Id)毎に各センサーの直近の値を取得する
- この処理をReactive Extensionsで書きたい
#書いてみたソースコード
##データ構造
cs
1// センサー種別 2public enum SensorType 3{ 4 TypeA, 5 TypeB 6} 7 8// 発生するイベントのデータ構造 9public class DataEvent 10{ 11 public int Id { get; set; } 12 13 public SensorType SensorType { get; set; } 14 15 public DateTime Timestamp { get; set; } 16}
##イベントストリームの疑似
cs
1// イベントストリームの疑似Observable 2var observable = Observable.Create<DataEvent>(observer => 3{ 4 int counter = 0; 5 return new System.Threading.Timer(state => 6 { 7 var data = new DataEvent { Id = counter / 2 % 2, SensorType = (SensorType)(counter % 2), Timestamp = DateTime.Now }; 8 Debug.WriteLine(String.Format("{0} : Data : Id=[{1}], Type=[{2}]", 9 data.Timestamp.ToString("HH:mm:ss"), 10 data.Id, 11 data.Type)); 12 counter++; 13 14 observer.OnNext(data); 15 }, null, 0, 1000); 16});
##自分が作った処理
cs
1// 一定期間毎に、Id単位で直近の値をサンプリング 2var timeObservable = Observable.Interval(TimeSpan.FromSeconds(5)).Publish().RefCount(); 3 4observable 5 .GroupBy(_ => _.Id) 6 .Subscribe(groupedObservable => 7 { 8 groupedObservable 9 .Buffer(timeObservable) 10 .Subscribe(datas => 11 { 12 var typeA = datas.LastOrDefault(_ => _.SensorType == SensorType.TypeA); 13 var typeB = datas.LastOrDefault(_ => _.SensorType == SensorType.TypeB); 14 15 Debug.WriteLine(String.Format("{0} : Event : Id=[{1}], Count=[{2}], TypeA=[{3}], typeB=[{4}]", 16 DateTime.Now.ToString("HH:mm:ss"), 17 groupedObservable.Key, 18 datas.Count, 19 typeA?.Timestamp.ToString("HH:mm:ss"), 20 typeB?.Timestamp.ToString("HH:mm:ss"))); 21 }); 22 });
#実行例
##出力
16:57:05 : Data : Id=[0], Type=[TypeA] 16:57:06 : Data : Id=[0], Type=[TypeB] 16:57:07 : Data : Id=[1], Type=[TypeA] 16:57:08 : Data : Id=[1], Type=[TypeB] 16:57:09 : Data : Id=[0], Type=[TypeA] 16:57:10 : Event : Id=[0], Count=[3], TypeA=[16:57:09], typeB=[16:57:06] 16:57:10 : Event : Id=[1], Count=[2], TypeA=[16:57:07], typeB=[16:57:08] 16:57:10 : Data : Id=[0], Type=[TypeB] 16:57:11 : Data : Id=[1], Type=[TypeA] 16:57:12 : Data : Id=[1], Type=[TypeB] 16:57:13 : Data : Id=[0], Type=[TypeA] 16:57:14 : Data : Id=[0], Type=[TypeB] 16:57:15 : Event : Id=[0], Count=[3], TypeA=[16:57:13], typeB=[16:57:14] 16:57:15 : Event : Id=[1], Count=[2], TypeA=[16:57:11], typeB=[16:57:12] 16:57:15 : Data : Id=[1], Type=[TypeA] 16:57:16 : Data : Id=[1], Type=[TypeB] 16:57:17 : Data : Id=[0], Type=[TypeA] 16:57:18 : Data : Id=[0], Type=[TypeB] 16:57:19 : Data : Id=[1], Type=[TypeA] 16:57:20 : Event : Id=[0], Count=[2], TypeA=[16:57:17], typeB=[16:57:18] 16:57:20 : Event : Id=[1], Count=[3], TypeA=[16:57:19], typeB=[16:57:16]
##解説
1 16:57:05 : Data : Id=[0], Type=[TypeA] 2 16:57:06 : Data : Id=[0], Type=[TypeB] 3 16:57:07 : Data : Id=[1], Type=[TypeA] 4 16:57:08 : Data : Id=[1], Type=[TypeB] 5 16:57:09 : Data : Id=[0], Type=[TypeA] 6 16:57:10 : Event : Id=[0], Count=[3], TypeA=[16:57:09], typeB=[16:57:06] 5と2が直近 7 16:57:10 : Event : Id=[1], Count=[2], TypeA=[16:57:07], typeB=[16:57:08] 3と4が直近
#質問事項
とりあえずやりたいことは出来たのですが、Reactive Extensionsを使った書き方でもっと効率の良い書き方はできるのでしょうか?
例えば、この方法だとサンプリング間隔間のデータはすべてListで保持され、Subscribe()の中でLastOrDefault()で直近の値を取得していますが、Take()等によりストリームの段階で直近の1件のみを処理するようにする等。
それも試してみようと思ったのですが、Take()したいのはIdとSensorTypeの復号キー単位だが最終的に取得したいのはId単位な点、また、その場合にSample()等をどのタイミングで呼び出すのかがわからなくなり、ギブアップしました。
#内容補足(追加1)
自分のやりたい内容を、以下に整理してみました。
source |-A1-A2-B1----B2----A3-A4-B3-B4--- boundary |----------X-----Y--------------Z- A2 B2 A4 B1 B4
sourceストームのA、B等がキーで、boundaryストリームのイベント発生のタイミングで同じキーの最後の値のみを取得したいのです。
現状出来ている内容だと以下のような内容で、イベント発生後に1A、1C、1Dを捨てる形になるので、boundaryのイベント発生間隔が長くなると、その間sourceストリームで発生したオブジェクト全てが保持されてしまいます。
source |-A1-A2-B1----B2----A3-A4-B3-B4--- boundary |----------X-----Y--------------Z- A1 B2 A3 A2 A4 B1 B3 B4
なお、A2とB1については、boundaryストリームのイベントタイミングで発火されれば、同一のイベントで渡されてきても、別のイベントで扱われてもどちらでも良いです。
#更に試してみたこと(追加2)
更に以下のようなコードを書いてみました。
cs
1observable 2 .GroupBy(_ => new { _.Id, _.SensorType }) 3 .Select(_ => _.Sample(timeObservable)) 4 .Merge() 5 .Buffer(timeObservable) 6 .Subscribe(datas => 7 { 8 foreach (var grouped in datas.GroupBy(_ => _.Id)) 9 { 10 var typeA = datas.LastOrDefault(_ => _.SensorType == SensorType.TypeA); 11 var typeB = datas.LastOrDefault(_ => _.SensorType == SensorType.TypeB); 12 13 Debug.WriteLine(String.Format("{0} : Event : Id=[{1}], Count=[{2}], TypeA=[{3}], typeB=[{4}]", 14 DateTime.Now.ToString("HH:mm:ss"), 15 grouped.Key, 16 grouped.Count(), 17 typeA?.Timestamp.ToString("HH:mm:ss"), 18 typeB?.Timestamp.ToString("HH:mm:ss"))); 19 } 20 });
このコードはバッファするオブジェクト自体は最小になっており、惜しいのですが致命的な問題がある認識です。
本来は同じものとして処理したいtimeObservableを、キー毎のSample()用とイベント発火間隔Buffer()で個別に処理しているため、厳密な処理にならず、そのためか、この処理だとtimeObservableが5秒間隔に対して、初回イベントが10秒後に発生しその後は5秒間隔で発生する、っというような動きになりました。
回答2件
あなたの回答
tips
プレビュー
バッドをするには、ログインかつ
こちらの条件を満たす必要があります。
2016/05/14 07:46 編集
2016/05/14 07:47
2016/05/16 02:29