🎄teratailクリスマスプレゼントキャンペーン2024🎄』開催中!

\teratail特別グッズやAmazonギフトカード最大2,000円分が当たる!/

詳細はこちら
Rx.NET

Rx.NETは、リアクティブプログラミングが可能なライブラリの一つ。.NET Framework向けReactive Extensionsライブラリです。マイクロソフト社が初めてRxライブラリとして提供し、ここから多くの言語にRxが移植されるようになりました。

C#

C#はマルチパラダイムプログラミング言語の1つで、命令形・宣言型・関数型・ジェネリック型・コンポーネント指向・オブジェクティブ指向のプログラミング開発すべてに対応しています。

LINQ

LINQとはLanguage INtegrated Queryの略で、「統合言語クエリ」という意味です。C#やVisual Basicといった言語のコード内に記述することができるクエリです。

Q&A

2回答

6405閲覧

C# Rxで、1つのストリームを条件によって複数Subscribeする方法

ymstrya1922

総合スコア14

Rx.NET

Rx.NETは、リアクティブプログラミングが可能なライブラリの一つ。.NET Framework向けReactive Extensionsライブラリです。マイクロソフト社が初めてRxライブラリとして提供し、ここから多くの言語にRxが移植されるようになりました。

C#

C#はマルチパラダイムプログラミング言語の1つで、命令形・宣言型・関数型・ジェネリック型・コンポーネント指向・オブジェクティブ指向のプログラミング開発すべてに対応しています。

LINQ

LINQとはLanguage INtegrated Queryの略で、「統合言語クエリ」という意味です。C#やVisual Basicといった言語のコード内に記述することができるクエリです。

0グッド

1クリップ

投稿2019/11/25 03:34

C#でSystem.Reactive.Linqを用いてリアクティブプログラミングを行なっています。
一つのストリームから発行された値を、Subscribeしてある処理を行なっているのですが、その発行された値によって異なる処理を行いたいと考えています。

C#

1observable.Subscribe(v => { 2 if(v==1) //処理1 3 else //処理2 4});

こんな感じです(スマホで打っているのでインデントがずれていたら申し訳ありません…)。

これを、Subscribeの中で条件分岐するのではなく、このような感じにすることはできないのでしょうか?

C#

1observable.SubscribeIf(v=>v==1, v=> //処理1) 2 .subscribe(v=>//処理2); 3

Whereによるフィルターでは、2回Subscriveを行うことになるのでなんだか…と思っています。
(実際には、もっと多くの条件分岐を行うので尚更)

何かそう言ったメソッド等は用意されていないのでしょうか。
よろしくお願い致します。

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

Zuishin

2019/11/29 02:56

反応がなかったら何を求めているのかわからないので他の回答もつかないでしょう。
guest

回答2

0

こんにちは。

Rx では一般的に、処理を分岐させるならその数だけ Subscribe を分けます。
それぞれのパイプラインを独立に処理できるのが Rx を採用するメリットなので、そうしないなら Rx を採用しない方が良いコードになる可能性が高いです。
分岐を扱う場合は、分岐点で Publish を使い明示的にストリームを分岐させるのが良いです。

csharp

1// パイプラインに分岐点と水栓を作る 2var branch = observable.Publish(); 3 4// 分岐点からそれぞれ Subscribe 5branch.Where(v => v == 1).Subscribe(v => { }); 6branch.Where(v => v == ??).Subscribe(v => { }); 7 8// 水栓を開ける 9branch.Connect();

投稿2019/11/25 03:44

tamoto

総合スコア4239

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

0

tamoto さんの回答のように Publish を使うのが一般的だと思います。
しかし、あえて自分用に作ってみました。
IObserver<T>, IObservable<T> を実装し、ラムダ式で振る舞いを変えられる汎用クラスである Abyss と、それを使用する拡張メソッド Case です。

C#

1using System; 2using System.Collections.Generic; 3using System.Linq; 4using System.Reactive.Linq; 5 6namespace ConsoleApp1 7{ 8 class Program 9 { 10 static void Main() 11 { 12 Observable 13 .Range(1, 15) 14 .Case(a => a % 15 == 0, _ => Console.WriteLine("FizzBuzz")) 15 .Case(a => a % 3 == 0, _ => Console.WriteLine("Fizz")) 16 .Case(a => a % 5 == 0, _ => Console.WriteLine("Buzz")) 17 .Subscribe(a => Console.WriteLine(a)); 18 } 19 } 20 21 public class Abyss<TUpstream, TDownstream> : IObserver<TUpstream>, IObservable<TDownstream> 22 { 23 public Abyss( 24 IObservable<TUpstream> upstream, 25 Action<TUpstream, IEnumerable<IObserver<TDownstream>>> onNext = null, 26 Action<Exception, IEnumerable<IObserver<TDownstream>>> onError = null, 27 Action<IEnumerable<IObserver<TDownstream>>> onCompleted = null) 28 { 29 Upstream = upstream; 30 OnNextAction = onNext; 31 OnErrorAction = onError; 32 OnCompletedAction = onCompleted; 33 } 34 35 private IDisposable upstreamDisposable = null; 36 37 protected IObservable<TUpstream> Upstream { get; private set; } 38 protected Action<TUpstream, IEnumerable<IObserver<TDownstream>>> OnNextAction { get; private set; } 39 protected Action<Exception, IEnumerable<IObserver<TDownstream>>> OnErrorAction { get; private set; } 40 protected Action<IEnumerable<IObserver<TDownstream>>> OnCompletedAction { get; private set; } 41 42 protected HashSet<IObserver<TDownstream>> Observers { get; } 43 = new HashSet<IObserver<TDownstream>>(); 44 45 public void OnCompleted() 46 { 47 DisconnectUpstream(); 48 if (OnCompletedAction != null) 49 { 50 OnCompletedAction(Observers.ToArray()); 51 } 52 else 53 { 54 foreach (var observer in Observers.ToArray()) 55 { 56 observer.OnCompleted(); 57 } 58 } 59 Observers.Clear(); 60 } 61 62 public void OnError(Exception error) 63 { 64 DisconnectUpstream(); 65 if (OnErrorAction != null) 66 { 67 OnErrorAction(error, Observers.ToArray()); 68 } 69 else 70 { 71 foreach (var observer in Observers.ToArray()) 72 { 73 observer.OnError(error); 74 } 75 } 76 Observers.Clear(); 77 } 78 79 [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "<保留中>")] 80 public void OnNext(TUpstream value) 81 { 82 try 83 { 84 OnNextAction?.Invoke(value, Observers.ToArray()); 85 } 86 catch (Exception e) 87 { 88 OnError(e); 89 } 90 } 91 92 public IDisposable Subscribe(IObserver<TDownstream> observer) 93 { 94 if (observer == null) throw new ArgumentNullException(nameof(observer)); 95 Observers.Add(observer); 96 if (upstreamDisposable == null) 97 { 98 var connectable = Upstream?.Publish(); 99 upstreamDisposable = connectable?.Subscribe(this); 100 connectable?.Connect(); 101 } 102 return new Disposable(() => 103 { 104 Observers.Remove(observer); 105 if (Observers.Count == 0) 106 { 107 DisconnectUpstream(); 108 } 109 }); 110 } 111 112 private void DisconnectUpstream() 113 { 114 upstreamDisposable?.Dispose(); 115 upstreamDisposable = null; 116 } 117 118 private class Disposable : IDisposable 119 { 120 public Disposable(Action action) 121 { 122 disposeAction = action; 123 } 124 125 private readonly Action disposeAction; 126 127 public void Dispose() 128 { 129 disposeAction?.Invoke(); 130 } 131 } 132 } 133 134 public static class AbyssExtension 135 { 136 public static IObservable<TDownstream> Abyss<TUpstream, TDownstream>( 137 this IObservable<TUpstream> upstream, 138 Action<TUpstream, IEnumerable<IObserver<TDownstream>>> onNext = null, 139 Action<Exception, IEnumerable<IObserver<TDownstream>>> onError = null, 140 Action<IEnumerable<IObserver<TDownstream>>> onCompleted = null) 141 { 142 return new Abyss<TUpstream, TDownstream>(upstream, onNext, onError, onCompleted); 143 } 144 145 public static IObservable<T> Case<T>( 146 this IObservable<T> upstream, 147 Func<T, bool> condition, 148 Action<T> action) 149 { 150 return new Abyss<T, T>(upstream, (data, observers) => 151 { 152 if (condition?.Invoke(data) ?? false && action != null) 153 { 154 action.Invoke(data); 155 } 156 else 157 { 158 foreach (var observer in observers) 159 { 160 observer?.OnNext(data); 161 } 162 } 163 }); 164 } 165 } 166}

投稿2019/11/25 11:32

編集2019/11/27 04:38
Zuishin

総合スコア28669

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

まだベストアンサーが選ばれていません

会員登録して回答してみよう

アカウントをお持ちの方は

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.36%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問