質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

ただいまの
回答率

90.50%

  • C#

    7382questions

    C#はマルチパラダイムプログラミング言語の1つで、命令形・宣言型・関数型・ジェネリック型・コンポーネント指向・オブジェクティブ指向のプログラミング開発すべてに対応しています。

  • .NET Framework

    478questions

    .NET Framework は、Microsoft Windowsのオペレーティングシステムのために開発されたソフトウェア開発環境/実行環境です。多くのプログラミング言語をサポートしています。

Reactive Extensionsを使った一定間隔毎のイベントサンプリング方法

解決済

回答 2

投稿 編集

  • 評価
  • クリップ 1
  • VIEW 1,337

hossy

score 9

やりたいことの概要

Reactive Extensionsの書き方に関する質問です。
やりたいことの概要は以下のようなものです。

  • イメージとしては、複数のセンサーを持つ機器(複数台)のイベントをサンプリングするIoTのようなもの
  • 機器はIdを持ち、センサーはSensorTypeによって識別され、その2つをキーとするイベントがストリームとして発生する
  • このストリームから、一定のサンプリング間隔で、機器(Id)毎に各センサーの直近の値を取得する
  • この処理をReactive Extensionsで書きたい

書いてみたソースコード

データ構造

// センサー種別
public enum SensorType
{
    TypeA,
    TypeB
}

// 発生するイベントのデータ構造
public class DataEvent
{
    public int Id { get; set; }

    public SensorType SensorType { get; set; }

    public DateTime Timestamp { get; set; }
}

イベントストリームの疑似

// イベントストリームの疑似Observable
var observable = Observable.Create<DataEvent>(observer =>
{
    int counter = 0;
    return new System.Threading.Timer(state =>
    {
        var data = new DataEvent { Id = counter / 2 % 2, SensorType = (SensorType)(counter % 2), Timestamp = DateTime.Now };
        Debug.WriteLine(String.Format("{0} : Data  : Id=[{1}], Type=[{2}]",
            data.Timestamp.ToString("HH:mm:ss"),
            data.Id,
            data.Type));
        counter++;

        observer.OnNext(data);
    }, null, 0, 1000);
});

自分が作った処理

// 一定期間毎に、Id単位で直近の値をサンプリング
var timeObservable = Observable.Interval(TimeSpan.FromSeconds(5)).Publish().RefCount();

observable
    .GroupBy(_ => _.Id)
    .Subscribe(groupedObservable =>
    {
        groupedObservable
            .Buffer(timeObservable)
            .Subscribe(datas =>
            {
                var typeA = datas.LastOrDefault(_ => _.SensorType == SensorType.TypeA);
                var typeB = datas.LastOrDefault(_ => _.SensorType == SensorType.TypeB);

                Debug.WriteLine(String.Format("{0} : Event : Id=[{1}], Count=[{2}], TypeA=[{3}], typeB=[{4}]",
                    DateTime.Now.ToString("HH:mm:ss"),
                    groupedObservable.Key,
                    datas.Count,
                    typeA?.Timestamp.ToString("HH:mm:ss"),
                    typeB?.Timestamp.ToString("HH:mm:ss")));
            });
    });

実行例

出力

16:57:05 : Data  : Id=[0], Type=[TypeA]
16:57:06 : Data  : Id=[0], Type=[TypeB]
16:57:07 : Data  : Id=[1], Type=[TypeA]
16:57:08 : Data  : Id=[1], Type=[TypeB]
16:57:09 : Data  : Id=[0], Type=[TypeA]
16:57:10 : Event : Id=[0], Count=[3], TypeA=[16:57:09], typeB=[16:57:06]
16:57:10 : Event : Id=[1], Count=[2], TypeA=[16:57:07], typeB=[16:57:08]
16:57:10 : Data  : Id=[0], Type=[TypeB]
16:57:11 : Data  : Id=[1], Type=[TypeA]
16:57:12 : Data  : Id=[1], Type=[TypeB]
16:57:13 : Data  : Id=[0], Type=[TypeA]
16:57:14 : Data  : Id=[0], Type=[TypeB]
16:57:15 : Event : Id=[0], Count=[3], TypeA=[16:57:13], typeB=[16:57:14]
16:57:15 : Event : Id=[1], Count=[2], TypeA=[16:57:11], typeB=[16:57:12]
16:57:15 : Data  : Id=[1], Type=[TypeA]
16:57:16 : Data  : Id=[1], Type=[TypeB]
16:57:17 : Data  : Id=[0], Type=[TypeA]
16:57:18 : Data  : Id=[0], Type=[TypeB]
16:57:19 : Data  : Id=[1], Type=[TypeA]
16:57:20 : Event : Id=[0], Count=[2], TypeA=[16:57:17], typeB=[16:57:18]
16:57:20 : Event : Id=[1], Count=[3], TypeA=[16:57:19], typeB=[16:57:16]

解説

1 16:57:05 : Data  : Id=[0], Type=[TypeA]
2 16:57:06 : Data  : Id=[0], Type=[TypeB]
3 16:57:07 : Data  : Id=[1], Type=[TypeA]
4 16:57:08 : Data  : Id=[1], Type=[TypeB]
5 16:57:09 : Data  : Id=[0], Type=[TypeA]
6 16:57:10 : Event : Id=[0], Count=[3], TypeA=[16:57:09], typeB=[16:57:06] 52が直近
7 16:57:10 : Event : Id=[1], Count=[2], TypeA=[16:57:07], typeB=[16:57:08] 34が直近

質問事項

とりあえずやりたいことは出来たのですが、Reactive Extensionsを使った書き方でもっと効率の良い書き方はできるのでしょうか?

例えば、この方法だとサンプリング間隔間のデータはすべてListで保持され、Subscribe()の中でLastOrDefault()で直近の値を取得していますが、Take()等によりストリームの段階で直近の1件のみを処理するようにする等。
それも試してみようと思ったのですが、Take()したいのはIdとSensorTypeの復号キー単位だが最終的に取得したいのはId単位な点、また、その場合にSample()等をどのタイミングで呼び出すのかがわからなくなり、ギブアップしました。

内容補足(追加1)

自分のやりたい内容を、以下に整理してみました。

source   |-A1-A2-B1----B2----A3-A4-B3-B4---
boundary |----------X-----Y--------------Z-
                    A2    B2             A4
                    B1                   B4

sourceストームのA、B等がキーで、boundaryストリームのイベント発生のタイミングで同じキーの最後の値のみを取得したいのです。

現状出来ている内容だと以下のような内容で、イベント発生後に1A、1C、1Dを捨てる形になるので、boundaryのイベント発生間隔が長くなると、その間sourceストリームで発生したオブジェクト全てが保持されてしまいます。

source   |-A1-A2-B1----B2----A3-A4-B3-B4---
boundary |----------X-----Y--------------Z-
                    A1    B2             A3
                    A2                   A4
                    B1                   B3
                                         B4

なお、A2とB1については、boundaryストリームのイベントタイミングで発火されれば、同一のイベントで渡されてきても、別のイベントで扱われてもどちらでも良いです。

更に試してみたこと(追加2)

更に以下のようなコードを書いてみました。

observable
    .GroupBy(_ => new { _.Id, _.SensorType })
    .Select(_ => _.Sample(timeObservable))
    .Merge()
    .Buffer(timeObservable)
    .Subscribe(datas =>
    {
        foreach (var grouped in datas.GroupBy(_ => _.Id))
        {
            var typeA = datas.LastOrDefault(_ => _.SensorType == SensorType.TypeA);
            var typeB = datas.LastOrDefault(_ => _.SensorType == SensorType.TypeB);

            Debug.WriteLine(String.Format("{0} : Event : Id=[{1}], Count=[{2}], TypeA=[{3}], typeB=[{4}]",
                DateTime.Now.ToString("HH:mm:ss"),
                grouped.Key,
                grouped.Count(),
                typeA?.Timestamp.ToString("HH:mm:ss"),
                typeB?.Timestamp.ToString("HH:mm:ss")));
        }
    });

このコードはバッファするオブジェクト自体は最小になっており、惜しいのですが致命的な問題がある認識です。
本来は同じものとして処理したいtimeObservableを、キー毎のSample()用とイベント発火間隔Buffer()で個別に処理しているため、厳密な処理にならず、そのためか、この処理だとtimeObservableが5秒間隔に対して、初回イベントが10秒後に発生しその後は5秒間隔で発生する、っというような動きになりました。

  • 気になる質問をクリップする

    クリップした質問は、後からいつでもマイページで確認できます。

    またクリップした質問に回答があった際、通知やメールを受け取ることができます。

    クリップを取り消します

  • 良い質問の評価を上げる

    以下のような質問は評価を上げましょう

    • 質問内容が明確
    • 自分も答えを知りたい
    • 質問者以外のユーザにも役立つ

    評価が高い質問は、TOPページの「注目」タブのフィードに表示されやすくなります。

    質問の評価を上げたことを取り消します

  • 評価を下げられる数の上限に達しました

    評価を下げることができません

    • 1日5回まで評価を下げられます
    • 1日に1ユーザに対して2回まで評価を下げられます

    質問の評価を下げる

    teratailでは下記のような質問を「具体的に困っていることがない質問」、「サイトポリシーに違反する質問」と定義し、推奨していません。

    • プログラミングに関係のない質問
    • やってほしいことだけを記載した丸投げの質問
    • 問題・課題が含まれていない質問
    • 意図的に内容が抹消された質問
    • 広告と受け取られるような投稿

    評価が下がると、TOPページの「アクティブ」「注目」タブのフィードに表示されにくくなります。

    質問の評価を下げたことを取り消します

    この機能は開放されていません

    評価を下げる条件を満たしてません

    評価を下げる理由を選択してください

    詳細な説明はこちら

    上記に当てはまらず、質問内容が明確になっていない質問には「情報の追加・修正依頼」機能からコメントをしてください。

    質問の評価を下げる機能の利用条件

    この機能を利用するためには、以下の事項を行う必要があります。

回答 2

check解決した方法

0

下記の拡張メソッドを作成することで対処することにしました。

拡張メソッド

public static IObservable<IList<TSource>> SampleBy<TSource, TBufferBoundary, TKey>(
    this IObservable<TSource> source,
    IObservable<TBufferBoundary> bufferBoundaries,
    Func<TSource, TKey> keySelector)
{
    return new AnonymousObservable<IList<TSource>>(observer =>
    {
        var map = new Dictionary<TKey, TSource>();
        return new CompositeDisposable
        {
            source.Subscribe(_ =>
            {
                lock (map)
                {
                    map[keySelector(_)] = _;
                }
            }),
            bufferBoundaries.Subscribe(_ =>
            {
                lock (map)
                {
                    observer.OnNext(map.Values.ToList());
                    map.Clear();
                }
            })
        };
    });
}

使用例

observable
    .SampleBy(Observable.Interval(TimeSpan.FromSeconds(5)), _ => new {_.Id, _.Type})
    .Subscribe(datas =>
    {
        foreach (var grouped in datas.GroupBy(_ => _.Id))
        {
            var typeA = grouped.LastOrDefault(_ => _.Type == DataType.TypeA);
            var typeB = grouped.LastOrDefault(_ => _.Type == DataType.TypeB);
            Debug.WriteLine(String.Format("{0} : Event : Id=[{1}], Count=[{2}], TypeA=[{3}], typeB=[{4}]",
                DateTime.Now.ToString("HH:mm:ss"),
                grouped.Key,
                grouped.Count(),
                typeA?.Timestamp.ToString("HH:mm:ss"),
                typeB?.Timestamp.ToString("HH:mm:ss")));
        }
    });

投稿

  • 回答の評価を上げる

    以下のような回答は評価を上げましょう

    • 正しい回答
    • わかりやすい回答
    • ためになる回答

    評価が高い回答ほどページの上位に表示されます。

  • 回答の評価を下げる

    下記のような回答は推奨されていません。

    • 間違っている回答
    • 質問の回答になっていない投稿
    • スパムや攻撃的な表現を用いた投稿

    評価を下げる際はその理由を明確に伝え、適切な回答に修正してもらいましょう。

0

【回答編集しました。】

最新の値をペアで取得する場合は、WithLatestFromオペレータを使うと良いと思います。

var r = new Random();
var sensor = Observable.Interval( TimeSpan.FromSeconds( 1 ) )
    .Select( x => new
    {
        Id = x ,
        Timestamp = DateTime.Now ,
        Key = r.Next() % 2 == 0 ? 'A' : 'B'
    } )
    .Do( x => Console.WriteLine( $"DEBUG {x.Key}-{x.Id}->{x.Timestamp}" ) )
    .Publish()
    .RefCount();


sensor
    .Where( x => x.Key == 'A' )
    .WithLatestFrom(
        sensor.Where( x => x.Key == 'B' ) ,
        ( a , b ) => new
        {
            A = a ,
            B = b
        }
    )
    .Sample( TimeSpan.FromSeconds( 3 ) )
    .Subscribe( x => Console.WriteLine( $"{x.A.Key}:ID={x.A.Id}->{x.A.Timestamp}   {x.B.Key}:{x.B.Id}->{x.B.Timestamp}" ) );

WithLatestFromオペレータは、それぞれのシーケンスの最新の値を合成します。
Sampleオペレータは、値をサンプリングし続け、一定間隔毎に最新の値を後続に流すオペレータです。
実行結果は、次のようになります。

//--- 実行結果 ---//
DEBUG B-2->2016/05/14 17:47:18
DEBUG B-3->2016/05/14 17:47:19
DEBUG A-4->2016/05/14 17:47:20
DEBUG B-5->2016/05/14 17:47:21
A:ID=4->2016/05/14 17:47:20   B:3->2016/05/14 17:47:19
DEBUG A-6->2016/05/14 17:47:22
DEBUG B-7->2016/05/14 17:47:23
DEBUG A-8->2016/05/14 17:47:24
A:ID=6->2016/05/14 17:47:22   B:5->2016/05/14 17:47:21
DEBUG B-9->2016/05/14 17:47:25
DEBUG B-10->2016/05/14 17:47:26
A:ID=8->2016/05/14 17:47:24   B:7->2016/05/14 17:47:23
DEBUG A-11->2016/05/14 17:47:27

投稿

編集

  • 回答の評価を上げる

    以下のような回答は評価を上げましょう

    • 正しい回答
    • わかりやすい回答
    • ためになる回答

    評価が高い回答ほどページの上位に表示されます。

  • 回答の評価を下げる

    下記のような回答は推奨されていません。

    • 間違っている回答
    • 質問の回答になっていない投稿
    • スパムや攻撃的な表現を用いた投稿

    評価を下げる際はその理由を明確に伝え、適切な回答に修正してもらいましょう。

  • 2016/05/14 13:53 編集

    ありがとうございます。
    Sample自体は良いのですが、自分が解決できなかった点の一つが「イベントストリームにはキー(例だとキーはId+SensorType)があり、そのキー毎のSampleを同一タイミングで取得したい」っという点なのです。


    上記の問題がわかりやすいように、内容を整理して質問の最後に「内容補足」を追加しました。

    キャンセル

  • 2016/05/14 16:47

    更に試してみたことを追加しました。

    キャンセル

  • 2016/05/16 11:29

    拡張メソッドを用意することで対処することにしました。

    キャンセル

15分調べてもわからないことは、teratailで質問しよう!

  • ただいまの回答率 90.50%
  • 質問をまとめることで、思考を整理して素早く解決
  • テンプレート機能で、簡単に質問をまとめられる

関連した質問

同じタグがついた質問を見る

  • C#

    7382questions

    C#はマルチパラダイムプログラミング言語の1つで、命令形・宣言型・関数型・ジェネリック型・コンポーネント指向・オブジェクティブ指向のプログラミング開発すべてに対応しています。

  • .NET Framework

    478questions

    .NET Framework は、Microsoft Windowsのオペレーティングシステムのために開発されたソフトウェア開発環境/実行環境です。多くのプログラミング言語をサポートしています。