質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.48%
C#

C#はマルチパラダイムプログラミング言語の1つで、命令形・宣言型・関数型・ジェネリック型・コンポーネント指向・オブジェクティブ指向のプログラミング開発すべてに対応しています。

.NET Framework

.NET Framework は、Microsoft Windowsのオペレーティングシステムのために開発されたソフトウェア開発環境/実行環境です。多くのプログラミング言語をサポートしています。

Q&A

解決済

2回答

4297閲覧

Reactive Extensionsを使った一定間隔毎のイベントサンプリング方法

hossy

総合スコア15

C#

C#はマルチパラダイムプログラミング言語の1つで、命令形・宣言型・関数型・ジェネリック型・コンポーネント指向・オブジェクティブ指向のプログラミング開発すべてに対応しています。

.NET Framework

.NET Framework は、Microsoft Windowsのオペレーティングシステムのために開発されたソフトウェア開発環境/実行環境です。多くのプログラミング言語をサポートしています。

1グッド

1クリップ

投稿2016/05/10 10:24

編集2016/05/14 07:45

#やりたいことの概要

Reactive Extensionsの書き方に関する質問です。
やりたいことの概要は以下のようなものです。

  • イメージとしては、複数のセンサーを持つ機器(複数台)のイベントをサンプリングするIoTのようなもの
  • 機器はIdを持ち、センサーはSensorTypeによって識別され、その2つをキーとするイベントがストリームとして発生する
  • このストリームから、一定のサンプリング間隔で、機器(Id)毎に各センサーの直近の値を取得する
  • この処理をReactive Extensionsで書きたい

#書いてみたソースコード

##データ構造

cs

1// センサー種別 2public enum SensorType 3{ 4 TypeA, 5 TypeB 6} 7 8// 発生するイベントのデータ構造 9public class DataEvent 10{ 11 public int Id { get; set; } 12 13 public SensorType SensorType { get; set; } 14 15 public DateTime Timestamp { get; set; } 16}

##イベントストリームの疑似

cs

1// イベントストリームの疑似Observable 2var observable = Observable.Create<DataEvent>(observer => 3{ 4 int counter = 0; 5 return new System.Threading.Timer(state => 6 { 7 var data = new DataEvent { Id = counter / 2 % 2, SensorType = (SensorType)(counter % 2), Timestamp = DateTime.Now }; 8 Debug.WriteLine(String.Format("{0} : Data : Id=[{1}], Type=[{2}]", 9 data.Timestamp.ToString("HH:mm:ss"), 10 data.Id, 11 data.Type)); 12 counter++; 13 14 observer.OnNext(data); 15 }, null, 0, 1000); 16});

##自分が作った処理

cs

1// 一定期間毎に、Id単位で直近の値をサンプリング 2var timeObservable = Observable.Interval(TimeSpan.FromSeconds(5)).Publish().RefCount(); 3 4observable 5 .GroupBy(_ => _.Id) 6 .Subscribe(groupedObservable => 7 { 8 groupedObservable 9 .Buffer(timeObservable) 10 .Subscribe(datas => 11 { 12 var typeA = datas.LastOrDefault(_ => _.SensorType == SensorType.TypeA); 13 var typeB = datas.LastOrDefault(_ => _.SensorType == SensorType.TypeB); 14 15 Debug.WriteLine(String.Format("{0} : Event : Id=[{1}], Count=[{2}], TypeA=[{3}], typeB=[{4}]", 16 DateTime.Now.ToString("HH:mm:ss"), 17 groupedObservable.Key, 18 datas.Count, 19 typeA?.Timestamp.ToString("HH:mm:ss"), 20 typeB?.Timestamp.ToString("HH:mm:ss"))); 21 }); 22 });

#実行例

##出力

16:57:05 : Data : Id=[0], Type=[TypeA] 16:57:06 : Data : Id=[0], Type=[TypeB] 16:57:07 : Data : Id=[1], Type=[TypeA] 16:57:08 : Data : Id=[1], Type=[TypeB] 16:57:09 : Data : Id=[0], Type=[TypeA] 16:57:10 : Event : Id=[0], Count=[3], TypeA=[16:57:09], typeB=[16:57:06] 16:57:10 : Event : Id=[1], Count=[2], TypeA=[16:57:07], typeB=[16:57:08] 16:57:10 : Data : Id=[0], Type=[TypeB] 16:57:11 : Data : Id=[1], Type=[TypeA] 16:57:12 : Data : Id=[1], Type=[TypeB] 16:57:13 : Data : Id=[0], Type=[TypeA] 16:57:14 : Data : Id=[0], Type=[TypeB] 16:57:15 : Event : Id=[0], Count=[3], TypeA=[16:57:13], typeB=[16:57:14] 16:57:15 : Event : Id=[1], Count=[2], TypeA=[16:57:11], typeB=[16:57:12] 16:57:15 : Data : Id=[1], Type=[TypeA] 16:57:16 : Data : Id=[1], Type=[TypeB] 16:57:17 : Data : Id=[0], Type=[TypeA] 16:57:18 : Data : Id=[0], Type=[TypeB] 16:57:19 : Data : Id=[1], Type=[TypeA] 16:57:20 : Event : Id=[0], Count=[2], TypeA=[16:57:17], typeB=[16:57:18] 16:57:20 : Event : Id=[1], Count=[3], TypeA=[16:57:19], typeB=[16:57:16]

##解説

1 16:57:05 : Data : Id=[0], Type=[TypeA] 2 16:57:06 : Data : Id=[0], Type=[TypeB] 3 16:57:07 : Data : Id=[1], Type=[TypeA] 4 16:57:08 : Data : Id=[1], Type=[TypeB] 5 16:57:09 : Data : Id=[0], Type=[TypeA] 6 16:57:10 : Event : Id=[0], Count=[3], TypeA=[16:57:09], typeB=[16:57:06] 5と2が直近 7 16:57:10 : Event : Id=[1], Count=[2], TypeA=[16:57:07], typeB=[16:57:08] 3と4が直近

#質問事項

とりあえずやりたいことは出来たのですが、Reactive Extensionsを使った書き方でもっと効率の良い書き方はできるのでしょうか?

例えば、この方法だとサンプリング間隔間のデータはすべてListで保持され、Subscribe()の中でLastOrDefault()で直近の値を取得していますが、Take()等によりストリームの段階で直近の1件のみを処理するようにする等。
それも試してみようと思ったのですが、Take()したいのはIdとSensorTypeの復号キー単位だが最終的に取得したいのはId単位な点、また、その場合にSample()等をどのタイミングで呼び出すのかがわからなくなり、ギブアップしました。

#内容補足(追加1)

自分のやりたい内容を、以下に整理してみました。

source |-A1-A2-B1----B2----A3-A4-B3-B4--- boundary |----------X-----Y--------------Z- A2 B2 A4 B1 B4

sourceストームのA、B等がキーで、boundaryストリームのイベント発生のタイミングで同じキーの最後の値のみを取得したいのです。

現状出来ている内容だと以下のような内容で、イベント発生後に1A、1C、1Dを捨てる形になるので、boundaryのイベント発生間隔が長くなると、その間sourceストリームで発生したオブジェクト全てが保持されてしまいます。

source |-A1-A2-B1----B2----A3-A4-B3-B4--- boundary |----------X-----Y--------------Z- A1 B2 A3 A2 A4 B1 B3 B4

なお、A2とB1については、boundaryストリームのイベントタイミングで発火されれば、同一のイベントで渡されてきても、別のイベントで扱われてもどちらでも良いです。

#更に試してみたこと(追加2)

更に以下のようなコードを書いてみました。

cs

1observable 2 .GroupBy(_ => new { _.Id, _.SensorType }) 3 .Select(_ => _.Sample(timeObservable)) 4 .Merge() 5 .Buffer(timeObservable) 6 .Subscribe(datas => 7 { 8 foreach (var grouped in datas.GroupBy(_ => _.Id)) 9 { 10 var typeA = datas.LastOrDefault(_ => _.SensorType == SensorType.TypeA); 11 var typeB = datas.LastOrDefault(_ => _.SensorType == SensorType.TypeB); 12 13 Debug.WriteLine(String.Format("{0} : Event : Id=[{1}], Count=[{2}], TypeA=[{3}], typeB=[{4}]", 14 DateTime.Now.ToString("HH:mm:ss"), 15 grouped.Key, 16 grouped.Count(), 17 typeA?.Timestamp.ToString("HH:mm:ss"), 18 typeB?.Timestamp.ToString("HH:mm:ss"))); 19 } 20 });

このコードはバッファするオブジェクト自体は最小になっており、惜しいのですが致命的な問題がある認識です。
本来は同じものとして処理したいtimeObservableを、キー毎のSample()用とイベント発火間隔Buffer()で個別に処理しているため、厳密な処理にならず、そのためか、この処理だとtimeObservableが5秒間隔に対して、初回イベントが10秒後に発生しその後は5秒間隔で発生する、っというような動きになりました。

Tak1wa👍を押しています

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guest

回答2

0

【回答編集しました。】

最新の値をペアで取得する場合は、WithLatestFromオペレータを使うと良いと思います。

c#

1var r = new Random(); 2var sensor = Observable.Interval( TimeSpan.FromSeconds( 1 ) ) 3 .Select( x => new 4 { 5 Id = x , 6 Timestamp = DateTime.Now , 7 Key = r.Next() % 2 == 0 ? 'A' : 'B' 8 } ) 9 .Do( x => Console.WriteLine( $"DEBUG {x.Key}-{x.Id}->{x.Timestamp}" ) ) 10 .Publish() 11 .RefCount(); 12 13 14sensor 15 .Where( x => x.Key == 'A' ) 16 .WithLatestFrom( 17 sensor.Where( x => x.Key == 'B' ) , 18 ( a , b ) => new 19 { 20 A = a , 21 B = b 22 } 23 ) 24 .Sample( TimeSpan.FromSeconds( 3 ) ) 25 .Subscribe( x => Console.WriteLine( $"{x.A.Key}:ID={x.A.Id}->{x.A.Timestamp} {x.B.Key}:{x.B.Id}->{x.B.Timestamp}" ) );

WithLatestFromオペレータは、それぞれのシーケンスの最新の値を合成します。
Sampleオペレータは、値をサンプリングし続け、一定間隔毎に最新の値を後続に流すオペレータです。
実行結果は、次のようになります。

//--- 実行結果 ---//
DEBUG B-2->2016/05/14 17:47:18
DEBUG B-3->2016/05/14 17:47:19
DEBUG A-4->2016/05/14 17:47:20
DEBUG B-5->2016/05/14 17:47:21
A:ID=4->2016/05/14 17:47:20 B:3->2016/05/14 17:47:19
DEBUG A-6->2016/05/14 17:47:22
DEBUG B-7->2016/05/14 17:47:23
DEBUG A-8->2016/05/14 17:47:24
A:ID=6->2016/05/14 17:47:22 B:5->2016/05/14 17:47:21
DEBUG B-9->2016/05/14 17:47:25
DEBUG B-10->2016/05/14 17:47:26
A:ID=8->2016/05/14 17:47:24 B:7->2016/05/14 17:47:23
DEBUG A-11->2016/05/14 17:47:27

投稿2016/05/13 12:44

編集2016/05/14 08:58
Temarin

総合スコア14

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

hossy

2016/05/14 07:46 編集

ありがとうございます。 Sample自体は良いのですが、自分が解決できなかった点の一つが「イベントストリームにはキー(例だとキーはId+SensorType)があり、そのキー毎のSampleを同一タイミングで取得したい」っという点なのです。 上記の問題がわかりやすいように、内容を整理して質問の最後に「内容補足」を追加しました。
hossy

2016/05/14 07:47

更に試してみたことを追加しました。
hossy

2016/05/16 02:29

拡張メソッドを用意することで対処することにしました。
guest

0

自己解決

下記の拡張メソッドを作成することで対処することにしました。

#拡張メソッド

cs

1public static IObservable<IList<TSource>> SampleBy<TSource, TBufferBoundary, TKey>( 2 this IObservable<TSource> source, 3 IObservable<TBufferBoundary> bufferBoundaries, 4 Func<TSource, TKey> keySelector) 5{ 6 return new AnonymousObservable<IList<TSource>>(observer => 7 { 8 var map = new Dictionary<TKey, TSource>(); 9 return new CompositeDisposable 10 { 11 source.Subscribe(_ => 12 { 13 lock (map) 14 { 15 map[keySelector(_)] = _; 16 } 17 }), 18 bufferBoundaries.Subscribe(_ => 19 { 20 lock (map) 21 { 22 observer.OnNext(map.Values.ToList()); 23 map.Clear(); 24 } 25 }) 26 }; 27 }); 28}

#使用例

cs

1observable 2 .SampleBy(Observable.Interval(TimeSpan.FromSeconds(5)), _ => new {_.Id, _.Type}) 3 .Subscribe(datas => 4 { 5 foreach (var grouped in datas.GroupBy(_ => _.Id)) 6 { 7 var typeA = grouped.LastOrDefault(_ => _.Type == DataType.TypeA); 8 var typeB = grouped.LastOrDefault(_ => _.Type == DataType.TypeB); 9 Debug.WriteLine(String.Format("{0} : Event : Id=[{1}], Count=[{2}], TypeA=[{3}], typeB=[{4}]", 10 DateTime.Now.ToString("HH:mm:ss"), 11 grouped.Key, 12 grouped.Count(), 13 typeA?.Timestamp.ToString("HH:mm:ss"), 14 typeB?.Timestamp.ToString("HH:mm:ss"))); 15 } 16 });

投稿2016/05/16 02:28

hossy

総合スコア15

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.48%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問