C#でSystem.Reactive.Linqを用いてリアクティブプログラミングを行なっています。
一つのストリームから発行された値を、Subscribeしてある処理を行なっているのですが、その発行された値によって異なる処理を行いたいと考えています。
C#
1observable.Subscribe(v => { 2 if(v==1) //処理1 3 else //処理2 4});
こんな感じです(スマホで打っているのでインデントがずれていたら申し訳ありません…)。
これを、Subscribeの中で条件分岐するのではなく、このような感じにすることはできないのでしょうか?
C#
1observable.SubscribeIf(v=>v==1, v=> //処理1) 2 .subscribe(v=>//処理2); 3
Whereによるフィルターでは、2回Subscriveを行うことになるのでなんだか…と思っています。
(実際には、もっと多くの条件分岐を行うので尚更)
何かそう言ったメソッド等は用意されていないのでしょうか。
よろしくお願い致します。
気になる質問をクリップする
クリップした質問は、後からいつでもMYページで確認できます。
またクリップした質問に回答があった際、通知やメールを受け取ることができます。
バッドをするには、ログインかつ
こちらの条件を満たす必要があります。
回答2件
0
こんにちは。
Rx では一般的に、処理を分岐させるならその数だけ Subscribe を分けます。
それぞれのパイプラインを独立に処理できるのが Rx を採用するメリットなので、そうしないなら Rx を採用しない方が良いコードになる可能性が高いです。
分岐を扱う場合は、分岐点で Publish を使い明示的にストリームを分岐させるのが良いです。
csharp
1// パイプラインに分岐点と水栓を作る 2var branch = observable.Publish(); 3 4// 分岐点からそれぞれ Subscribe 5branch.Where(v => v == 1).Subscribe(v => { }); 6branch.Where(v => v == ??).Subscribe(v => { }); 7 8// 水栓を開ける 9branch.Connect();
投稿2019/11/25 03:44
総合スコア4239
バッドをするには、ログインかつ
こちらの条件を満たす必要があります。
0
tamoto さんの回答のように Publish を使うのが一般的だと思います。
しかし、あえて自分用に作ってみました。
IObserver<T>, IObservable<T> を実装し、ラムダ式で振る舞いを変えられる汎用クラスである Abyss と、それを使用する拡張メソッド Case です。
C#
1using System; 2using System.Collections.Generic; 3using System.Linq; 4using System.Reactive.Linq; 5 6namespace ConsoleApp1 7{ 8 class Program 9 { 10 static void Main() 11 { 12 Observable 13 .Range(1, 15) 14 .Case(a => a % 15 == 0, _ => Console.WriteLine("FizzBuzz")) 15 .Case(a => a % 3 == 0, _ => Console.WriteLine("Fizz")) 16 .Case(a => a % 5 == 0, _ => Console.WriteLine("Buzz")) 17 .Subscribe(a => Console.WriteLine(a)); 18 } 19 } 20 21 public class Abyss<TUpstream, TDownstream> : IObserver<TUpstream>, IObservable<TDownstream> 22 { 23 public Abyss( 24 IObservable<TUpstream> upstream, 25 Action<TUpstream, IEnumerable<IObserver<TDownstream>>> onNext = null, 26 Action<Exception, IEnumerable<IObserver<TDownstream>>> onError = null, 27 Action<IEnumerable<IObserver<TDownstream>>> onCompleted = null) 28 { 29 Upstream = upstream; 30 OnNextAction = onNext; 31 OnErrorAction = onError; 32 OnCompletedAction = onCompleted; 33 } 34 35 private IDisposable upstreamDisposable = null; 36 37 protected IObservable<TUpstream> Upstream { get; private set; } 38 protected Action<TUpstream, IEnumerable<IObserver<TDownstream>>> OnNextAction { get; private set; } 39 protected Action<Exception, IEnumerable<IObserver<TDownstream>>> OnErrorAction { get; private set; } 40 protected Action<IEnumerable<IObserver<TDownstream>>> OnCompletedAction { get; private set; } 41 42 protected HashSet<IObserver<TDownstream>> Observers { get; } 43 = new HashSet<IObserver<TDownstream>>(); 44 45 public void OnCompleted() 46 { 47 DisconnectUpstream(); 48 if (OnCompletedAction != null) 49 { 50 OnCompletedAction(Observers.ToArray()); 51 } 52 else 53 { 54 foreach (var observer in Observers.ToArray()) 55 { 56 observer.OnCompleted(); 57 } 58 } 59 Observers.Clear(); 60 } 61 62 public void OnError(Exception error) 63 { 64 DisconnectUpstream(); 65 if (OnErrorAction != null) 66 { 67 OnErrorAction(error, Observers.ToArray()); 68 } 69 else 70 { 71 foreach (var observer in Observers.ToArray()) 72 { 73 observer.OnError(error); 74 } 75 } 76 Observers.Clear(); 77 } 78 79 [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "<保留中>")] 80 public void OnNext(TUpstream value) 81 { 82 try 83 { 84 OnNextAction?.Invoke(value, Observers.ToArray()); 85 } 86 catch (Exception e) 87 { 88 OnError(e); 89 } 90 } 91 92 public IDisposable Subscribe(IObserver<TDownstream> observer) 93 { 94 if (observer == null) throw new ArgumentNullException(nameof(observer)); 95 Observers.Add(observer); 96 if (upstreamDisposable == null) 97 { 98 var connectable = Upstream?.Publish(); 99 upstreamDisposable = connectable?.Subscribe(this); 100 connectable?.Connect(); 101 } 102 return new Disposable(() => 103 { 104 Observers.Remove(observer); 105 if (Observers.Count == 0) 106 { 107 DisconnectUpstream(); 108 } 109 }); 110 } 111 112 private void DisconnectUpstream() 113 { 114 upstreamDisposable?.Dispose(); 115 upstreamDisposable = null; 116 } 117 118 private class Disposable : IDisposable 119 { 120 public Disposable(Action action) 121 { 122 disposeAction = action; 123 } 124 125 private readonly Action disposeAction; 126 127 public void Dispose() 128 { 129 disposeAction?.Invoke(); 130 } 131 } 132 } 133 134 public static class AbyssExtension 135 { 136 public static IObservable<TDownstream> Abyss<TUpstream, TDownstream>( 137 this IObservable<TUpstream> upstream, 138 Action<TUpstream, IEnumerable<IObserver<TDownstream>>> onNext = null, 139 Action<Exception, IEnumerable<IObserver<TDownstream>>> onError = null, 140 Action<IEnumerable<IObserver<TDownstream>>> onCompleted = null) 141 { 142 return new Abyss<TUpstream, TDownstream>(upstream, onNext, onError, onCompleted); 143 } 144 145 public static IObservable<T> Case<T>( 146 this IObservable<T> upstream, 147 Func<T, bool> condition, 148 Action<T> action) 149 { 150 return new Abyss<T, T>(upstream, (data, observers) => 151 { 152 if (condition?.Invoke(data) ?? false && action != null) 153 { 154 action.Invoke(data); 155 } 156 else 157 { 158 foreach (var observer in observers) 159 { 160 observer?.OnNext(data); 161 } 162 } 163 }); 164 } 165 } 166}
投稿2019/11/25 11:32
編集2019/11/27 04:38総合スコア28669
バッドをするには、ログインかつ
こちらの条件を満たす必要があります。
あなたの回答
tips
太字
斜体
打ち消し線
見出し
引用テキストの挿入
コードの挿入
リンクの挿入
リストの挿入
番号リストの挿入
表の挿入
水平線の挿入
プレビュー
質問の解決につながる回答をしましょう。 サンプルコードなど、より具体的な説明があると質問者の理解の助けになります。 また、読む側のことを考えた、分かりやすい文章を心がけましょう。