質問するログイン新規登録

回答編集履歴

6

修正

2019/11/27 04:38

投稿

Zuishin
Zuishin

スコア28675

answer CHANGED
@@ -5,6 +5,7 @@
5
5
  ```C#
6
6
  using System;
7
7
  using System.Collections.Generic;
8
+ using System.Linq;
8
9
  using System.Reactive.Linq;
9
10
 
10
11
  namespace ConsoleApp1
@@ -15,17 +16,10 @@
15
16
  {
16
17
  Observable
17
18
  .Range(1, 15)
18
- .Case(
19
- a => a % 15 == 0,
20
- a => Console.WriteLine("FizzBuzz"))
19
+ .Case(a => a % 15 == 0, _ => Console.WriteLine("FizzBuzz"))
21
- .Case(
22
- a => a % 3 == 0,
23
- a => Console.WriteLine("Fizz"))
20
+ .Case(a => a % 3 == 0, _ => Console.WriteLine("Fizz"))
24
- .Case(
25
- a => a % 5 == 0,
26
- a => Console.WriteLine("Buzz"))
21
+ .Case(a => a % 5 == 0, _ => Console.WriteLine("Buzz"))
27
- .Subscribe(
28
- a => Console.WriteLine(a));
22
+ .Subscribe(a => Console.WriteLine(a));
29
23
  }
30
24
  }
31
25
 
@@ -58,11 +52,11 @@
58
52
  DisconnectUpstream();
59
53
  if (OnCompletedAction != null)
60
54
  {
61
- OnCompletedAction(Observers);
55
+ OnCompletedAction(Observers.ToArray());
62
56
  }
63
57
  else
64
58
  {
65
- foreach (var observer in Observers)
59
+ foreach (var observer in Observers.ToArray())
66
60
  {
67
61
  observer.OnCompleted();
68
62
  }
@@ -75,11 +69,11 @@
75
69
  DisconnectUpstream();
76
70
  if (OnErrorAction != null)
77
71
  {
78
- OnErrorAction(error, Observers);
72
+ OnErrorAction(error, Observers.ToArray());
79
73
  }
80
74
  else
81
75
  {
82
- foreach (var observer in Observers)
76
+ foreach (var observer in Observers.ToArray())
83
77
  {
84
78
  observer.OnError(error);
85
79
  }
@@ -92,11 +86,11 @@
92
86
  {
93
87
  try
94
88
  {
95
- OnNextAction?.Invoke(value, Observers);
89
+ OnNextAction?.Invoke(value, Observers.ToArray());
96
90
  }
97
- catch (Exception error)
91
+ catch (Exception e)
98
92
  {
99
- OnError(error);
93
+ OnError(e);
100
94
  }
101
95
  }
102
96
 
@@ -106,7 +100,9 @@
106
100
  Observers.Add(observer);
107
101
  if (upstreamDisposable == null)
108
102
  {
103
+ var connectable = Upstream?.Publish();
109
- upstreamDisposable = Upstream?.Subscribe(this);
104
+ upstreamDisposable = connectable?.Subscribe(this);
105
+ connectable?.Connect();
110
106
  }
111
107
  return new Disposable(() =>
112
108
  {
@@ -142,6 +138,15 @@
142
138
 
143
139
  public static class AbyssExtension
144
140
  {
141
+ public static IObservable<TDownstream> Abyss<TUpstream, TDownstream>(
142
+ this IObservable<TUpstream> upstream,
143
+ Action<TUpstream, IEnumerable<IObserver<TDownstream>>> onNext = null,
144
+ Action<Exception, IEnumerable<IObserver<TDownstream>>> onError = null,
145
+ Action<IEnumerable<IObserver<TDownstream>>> onCompleted = null)
146
+ {
147
+ return new Abyss<TUpstream, TDownstream>(upstream, onNext, onError, onCompleted);
148
+ }
149
+
145
150
  public static IObservable<T> Case<T>(
146
151
  this IObservable<T> upstream,
147
152
  Func<T, bool> condition,

5

修正

2019/11/27 04:38

投稿

Zuishin
Zuishin

スコア28675

answer CHANGED
@@ -13,7 +13,6 @@
13
13
  {
14
14
  static void Main()
15
15
  {
16
- var list = new List<string>();
17
16
  Observable
18
17
  .Range(1, 15)
19
18
  .Case(

4

修正

2019/11/27 02:38

投稿

Zuishin
Zuishin

スコア28675

answer CHANGED
@@ -1,11 +1,10 @@
1
1
  tamoto さんの回答のように Publish を使うのが一般的だと思います。
2
2
  しかし、あえて自分用に作ってみました。
3
- IObserver<T>, IObservable<T> を実装し、ラムダ式で振る舞いを変えられる汎用クラスである Abyss と、それを使用する拡張メソッド When です。
3
+ IObserver<T>, IObservable<T> を実装し、ラムダ式で振る舞いを変えられる汎用クラスである Abyss と、それを使用する拡張メソッド Case です。
4
4
 
5
5
  ```C#
6
6
  using System;
7
7
  using System.Collections.Generic;
8
- using System.Reactive.Disposables;
9
8
  using System.Reactive.Linq;
10
9
 
11
10
  namespace ConsoleApp1
@@ -14,102 +13,155 @@
14
13
  {
15
14
  static void Main()
16
15
  {
16
+ var list = new List<string>();
17
17
  Observable
18
18
  .Range(1, 15)
19
+ .Case(
20
+ a => a % 15 == 0,
19
- .When(a => a % 15 == 0, _ => Console.WriteLine("FizzBuzz"))
21
+ a => Console.WriteLine("FizzBuzz"))
22
+ .Case(
23
+ a => a % 3 == 0,
20
- .When(a => a % 3 == 0, _ => Console.WriteLine("Fizz"))
24
+ a => Console.WriteLine("Fizz"))
25
+ .Case(
26
+ a => a % 5 == 0,
21
- .When(a => a % 5 == 0, _ => Console.WriteLine("Buzz"))
27
+ a => Console.WriteLine("Buzz"))
28
+ .Subscribe(
22
- .Subscribe(a => Console.WriteLine(a));
29
+ a => Console.WriteLine(a));
23
30
  }
24
31
  }
25
32
 
26
- public static class AbyssExtension
33
+ public class Abyss<TUpstream, TDownstream> : IObserver<TUpstream>, IObservable<TDownstream>
27
34
  {
28
- public static IObservable<T> When<T>(
35
+ public Abyss(
29
- this IObservable<T> upstream,
36
+ IObservable<TUpstream> upstream,
30
- Func<T, bool> condition,
37
+ Action<TUpstream, IEnumerable<IObserver<TDownstream>>> onNext = null,
38
+ Action<Exception, IEnumerable<IObserver<TDownstream>>> onError = null,
31
- Action<T> action)
39
+ Action<IEnumerable<IObserver<TDownstream>>> onCompleted = null)
32
40
  {
33
- Action<T, Action<T>> onNext = (T data, Action<T> next) =>
34
- {
35
- if (condition?.Invoke(data) ?? false)
36
- {
37
- action?.Invoke(data);
41
+ Upstream = upstream;
38
- }
39
- else
40
- {
41
- next?.Invoke(data);
42
+ OnNextAction = onNext;
42
- }
43
- };
43
+ OnErrorAction = onError;
44
- return new Abyss<T, T>(upstream, onNext);
44
+ OnCompletedAction = onCompleted;
45
45
  }
46
46
 
47
- private class Abyss<TUpstream, TDownstream> : IObserver<TUpstream>, IObservable<TDownstream>
48
- {
49
- public Abyss(
50
- IObservable<TUpstream> upstream,
51
- Action<TUpstream, Action<TDownstream>> onNext = null,
52
- Action<Exception, Action<Exception>> onError = null,
53
- Action<Action> onCompleted = null)
47
+ private IDisposable upstreamDisposable = null;
54
- {
55
- Upstream = upstream;
56
- OnNextAction = onNext;
57
- OnErrorAction = onError;
58
- OnCompletedAction = onCompleted;
59
- }
60
48
 
61
- private IDisposable upstreamDisposable = null;
49
+ protected IObservable<TUpstream> Upstream { get; private set; }
50
+ protected Action<TUpstream, IEnumerable<IObserver<TDownstream>>> OnNextAction { get; private set; }
51
+ protected Action<Exception, IEnumerable<IObserver<TDownstream>>> OnErrorAction { get; private set; }
52
+ protected Action<IEnumerable<IObserver<TDownstream>>> OnCompletedAction { get; private set; }
62
53
 
63
- protected IObservable<TUpstream> Upstream { get; private set; }
64
- protected Action<TUpstream, Action<TDownstream>> OnNextAction { get; private set; }
54
+ protected HashSet<IObserver<TDownstream>> Observers { get; }
65
- protected Action<Exception, Action<Exception>> OnErrorAction { get; private set; }
66
- protected Action<Action> OnCompletedAction { get; private set; }
55
+ = new HashSet<IObserver<TDownstream>>();
67
56
 
68
- protected HashSet<IObserver<TDownstream>> Observers { get; }
69
- = new HashSet<IObserver<TDownstream>>();
70
-
71
- public void OnCompleted()
57
+ public void OnCompleted()
58
+ {
59
+ DisconnectUpstream();
60
+ if (OnCompletedAction != null)
72
61
  {
62
+ OnCompletedAction(Observers);
63
+ }
64
+ else
65
+ {
73
66
  foreach (var observer in Observers)
74
67
  {
75
- OnCompletedAction?.Invoke(observer.OnCompleted);
68
+ observer.OnCompleted();
76
69
  }
77
70
  }
71
+ Observers.Clear();
72
+ }
78
73
 
79
- public void OnError(Exception error)
74
+ public void OnError(Exception error)
75
+ {
76
+ DisconnectUpstream();
77
+ if (OnErrorAction != null)
80
78
  {
79
+ OnErrorAction(error, Observers);
80
+ }
81
+ else
82
+ {
81
83
  foreach (var observer in Observers)
82
84
  {
83
- OnErrorAction?.Invoke(error, observer.OnError);
85
+ observer.OnError(error);
84
86
  }
85
87
  }
88
+ Observers.Clear();
89
+ }
86
90
 
91
+ [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "<保留中>")]
87
- public void OnNext(TUpstream value)
92
+ public void OnNext(TUpstream value)
93
+ {
94
+ try
88
95
  {
89
- foreach (var observer in Observers)
96
+ OnNextAction?.Invoke(value, Observers);
97
+ }
98
+ catch (Exception error)
99
+ {
100
+ OnError(error);
101
+ }
102
+ }
103
+
104
+ public IDisposable Subscribe(IObserver<TDownstream> observer)
105
+ {
106
+ if (observer == null) throw new ArgumentNullException(nameof(observer));
107
+ Observers.Add(observer);
108
+ if (upstreamDisposable == null)
109
+ {
110
+ upstreamDisposable = Upstream?.Subscribe(this);
111
+ }
112
+ return new Disposable(() =>
113
+ {
114
+ Observers.Remove(observer);
115
+ if (Observers.Count == 0)
90
116
  {
91
- OnNextAction?.Invoke(value, observer.OnNext);
117
+ DisconnectUpstream();
92
118
  }
119
+ });
120
+ }
121
+
122
+ private void DisconnectUpstream()
123
+ {
124
+ upstreamDisposable?.Dispose();
125
+ upstreamDisposable = null;
126
+ }
127
+
128
+ private class Disposable : IDisposable
129
+ {
130
+ public Disposable(Action action)
131
+ {
132
+ disposeAction = action;
93
133
  }
94
134
 
95
- public IDisposable Subscribe(IObserver<TDownstream> observer)
135
+ private readonly Action disposeAction;
136
+
137
+ public void Dispose()
96
138
  {
97
- if (observer == null) throw new ArgumentNullException(nameof(observer));
98
- Observers.Add(observer);
139
+ disposeAction?.Invoke();
140
+ }
141
+ }
142
+ }
143
+
144
+ public static class AbyssExtension
145
+ {
146
+ public static IObservable<T> Case<T>(
99
- if (upstreamDisposable == null)
147
+ this IObservable<T> upstream,
148
+ Func<T, bool> condition,
149
+ Action<T> action)
150
+ {
151
+ return new Abyss<T, T>(upstream, (data, observers) =>
152
+ {
153
+ if (condition?.Invoke(data) ?? false && action != null)
100
154
  {
101
- upstreamDisposable = Upstream?.Subscribe(this);
155
+ action.Invoke(data);
102
156
  }
103
- return Disposable.Create(() =>
157
+ else
104
158
  {
105
- Observers.Remove(observer);
159
+ foreach (var observer in observers)
106
- if (Observers.Count == 0)
107
160
  {
108
- upstreamDisposable?.Dispose();
109
- upstreamDisposable = null;
161
+ observer?.OnNext(data);
110
162
  }
163
+ }
111
- });
164
+ });
112
- }
113
165
  }
114
166
  }
115
167
  }

3

修正

2019/11/27 02:37

投稿

Zuishin
Zuishin

スコア28675

answer CHANGED
@@ -96,13 +96,17 @@
96
96
  {
97
97
  if (observer == null) throw new ArgumentNullException(nameof(observer));
98
98
  Observers.Add(observer);
99
+ if (upstreamDisposable == null)
100
+ {
99
- upstreamDisposable = Upstream?.Subscribe(this);
101
+ upstreamDisposable = Upstream?.Subscribe(this);
102
+ }
100
103
  return Disposable.Create(() =>
101
104
  {
102
105
  Observers.Remove(observer);
103
106
  if (Observers.Count == 0)
104
107
  {
105
108
  upstreamDisposable?.Dispose();
109
+ upstreamDisposable = null;
106
110
  }
107
111
  });
108
112
  }

2

修正

2019/11/25 11:44

投稿

Zuishin
Zuishin

スコア28675

answer CHANGED
@@ -94,6 +94,7 @@
94
94
 
95
95
  public IDisposable Subscribe(IObserver<TDownstream> observer)
96
96
  {
97
+ if (observer == null) throw new ArgumentNullException(nameof(observer));
97
98
  Observers.Add(observer);
98
99
  upstreamDisposable = Upstream?.Subscribe(this);
99
100
  return Disposable.Create(() =>

1

修正

2019/11/25 11:36

投稿

Zuishin
Zuishin

スコア28675

answer CHANGED
@@ -34,11 +34,11 @@
34
34
  {
35
35
  if (condition?.Invoke(data) ?? false)
36
36
  {
37
- action(data);
37
+ action?.Invoke(data);
38
38
  }
39
39
  else
40
40
  {
41
- next(data);
41
+ next?.Invoke(data);
42
42
  }
43
43
  };
44
44
  return new Abyss<T, T>(upstream, onNext);