回答編集履歴
6
修正
answer
CHANGED
@@ -5,6 +5,7 @@
|
|
5
5
|
```C#
|
6
6
|
using System;
|
7
7
|
using System.Collections.Generic;
|
8
|
+
using System.Linq;
|
8
9
|
using System.Reactive.Linq;
|
9
10
|
|
10
11
|
namespace ConsoleApp1
|
@@ -15,17 +16,10 @@
|
|
15
16
|
{
|
16
17
|
Observable
|
17
18
|
.Range(1, 15)
|
18
|
-
.Case(
|
19
|
-
a => a % 15 == 0,
|
20
|
-
|
19
|
+
.Case(a => a % 15 == 0, _ => Console.WriteLine("FizzBuzz"))
|
21
|
-
.Case(
|
22
|
-
a => a % 3 == 0,
|
23
|
-
|
20
|
+
.Case(a => a % 3 == 0, _ => Console.WriteLine("Fizz"))
|
24
|
-
.Case(
|
25
|
-
a => a % 5 == 0,
|
26
|
-
|
21
|
+
.Case(a => a % 5 == 0, _ => Console.WriteLine("Buzz"))
|
27
|
-
.Subscribe(
|
28
|
-
|
22
|
+
.Subscribe(a => Console.WriteLine(a));
|
29
23
|
}
|
30
24
|
}
|
31
25
|
|
@@ -58,11 +52,11 @@
|
|
58
52
|
DisconnectUpstream();
|
59
53
|
if (OnCompletedAction != null)
|
60
54
|
{
|
61
|
-
OnCompletedAction(Observers);
|
55
|
+
OnCompletedAction(Observers.ToArray());
|
62
56
|
}
|
63
57
|
else
|
64
58
|
{
|
65
|
-
foreach (var observer in Observers)
|
59
|
+
foreach (var observer in Observers.ToArray())
|
66
60
|
{
|
67
61
|
observer.OnCompleted();
|
68
62
|
}
|
@@ -75,11 +69,11 @@
|
|
75
69
|
DisconnectUpstream();
|
76
70
|
if (OnErrorAction != null)
|
77
71
|
{
|
78
|
-
OnErrorAction(error, Observers);
|
72
|
+
OnErrorAction(error, Observers.ToArray());
|
79
73
|
}
|
80
74
|
else
|
81
75
|
{
|
82
|
-
foreach (var observer in Observers)
|
76
|
+
foreach (var observer in Observers.ToArray())
|
83
77
|
{
|
84
78
|
observer.OnError(error);
|
85
79
|
}
|
@@ -92,11 +86,11 @@
|
|
92
86
|
{
|
93
87
|
try
|
94
88
|
{
|
95
|
-
OnNextAction?.Invoke(value, Observers);
|
89
|
+
OnNextAction?.Invoke(value, Observers.ToArray());
|
96
90
|
}
|
97
|
-
catch (Exception
|
91
|
+
catch (Exception e)
|
98
92
|
{
|
99
|
-
OnError(
|
93
|
+
OnError(e);
|
100
94
|
}
|
101
95
|
}
|
102
96
|
|
@@ -106,7 +100,9 @@
|
|
106
100
|
Observers.Add(observer);
|
107
101
|
if (upstreamDisposable == null)
|
108
102
|
{
|
103
|
+
var connectable = Upstream?.Publish();
|
109
|
-
upstreamDisposable =
|
104
|
+
upstreamDisposable = connectable?.Subscribe(this);
|
105
|
+
connectable?.Connect();
|
110
106
|
}
|
111
107
|
return new Disposable(() =>
|
112
108
|
{
|
@@ -142,6 +138,15 @@
|
|
142
138
|
|
143
139
|
public static class AbyssExtension
|
144
140
|
{
|
141
|
+
public static IObservable<TDownstream> Abyss<TUpstream, TDownstream>(
|
142
|
+
this IObservable<TUpstream> upstream,
|
143
|
+
Action<TUpstream, IEnumerable<IObserver<TDownstream>>> onNext = null,
|
144
|
+
Action<Exception, IEnumerable<IObserver<TDownstream>>> onError = null,
|
145
|
+
Action<IEnumerable<IObserver<TDownstream>>> onCompleted = null)
|
146
|
+
{
|
147
|
+
return new Abyss<TUpstream, TDownstream>(upstream, onNext, onError, onCompleted);
|
148
|
+
}
|
149
|
+
|
145
150
|
public static IObservable<T> Case<T>(
|
146
151
|
this IObservable<T> upstream,
|
147
152
|
Func<T, bool> condition,
|
5
修正
answer
CHANGED
@@ -13,7 +13,6 @@
|
|
13
13
|
{
|
14
14
|
static void Main()
|
15
15
|
{
|
16
|
-
var list = new List<string>();
|
17
16
|
Observable
|
18
17
|
.Range(1, 15)
|
19
18
|
.Case(
|
4
修正
answer
CHANGED
@@ -1,11 +1,10 @@
|
|
1
1
|
tamoto さんの回答のように Publish を使うのが一般的だと思います。
|
2
2
|
しかし、あえて自分用に作ってみました。
|
3
|
-
IObserver<T>, IObservable<T> を実装し、ラムダ式で振る舞いを変えられる汎用クラスである Abyss と、それを使用する拡張メソッド
|
3
|
+
IObserver<T>, IObservable<T> を実装し、ラムダ式で振る舞いを変えられる汎用クラスである Abyss と、それを使用する拡張メソッド Case です。
|
4
4
|
|
5
5
|
```C#
|
6
6
|
using System;
|
7
7
|
using System.Collections.Generic;
|
8
|
-
using System.Reactive.Disposables;
|
9
8
|
using System.Reactive.Linq;
|
10
9
|
|
11
10
|
namespace ConsoleApp1
|
@@ -14,102 +13,155 @@
|
|
14
13
|
{
|
15
14
|
static void Main()
|
16
15
|
{
|
16
|
+
var list = new List<string>();
|
17
17
|
Observable
|
18
18
|
.Range(1, 15)
|
19
|
+
.Case(
|
20
|
+
a => a % 15 == 0,
|
19
|
-
|
21
|
+
a => Console.WriteLine("FizzBuzz"))
|
22
|
+
.Case(
|
23
|
+
a => a % 3 == 0,
|
20
|
-
|
24
|
+
a => Console.WriteLine("Fizz"))
|
25
|
+
.Case(
|
26
|
+
a => a % 5 == 0,
|
21
|
-
|
27
|
+
a => Console.WriteLine("Buzz"))
|
28
|
+
.Subscribe(
|
22
|
-
|
29
|
+
a => Console.WriteLine(a));
|
23
30
|
}
|
24
31
|
}
|
25
32
|
|
26
|
-
public
|
33
|
+
public class Abyss<TUpstream, TDownstream> : IObserver<TUpstream>, IObservable<TDownstream>
|
27
34
|
{
|
28
|
-
public
|
35
|
+
public Abyss(
|
29
|
-
|
36
|
+
IObservable<TUpstream> upstream,
|
30
|
-
|
37
|
+
Action<TUpstream, IEnumerable<IObserver<TDownstream>>> onNext = null,
|
38
|
+
Action<Exception, IEnumerable<IObserver<TDownstream>>> onError = null,
|
31
|
-
Action<
|
39
|
+
Action<IEnumerable<IObserver<TDownstream>>> onCompleted = null)
|
32
40
|
{
|
33
|
-
Action<T, Action<T>> onNext = (T data, Action<T> next) =>
|
34
|
-
{
|
35
|
-
if (condition?.Invoke(data) ?? false)
|
36
|
-
{
|
37
|
-
|
41
|
+
Upstream = upstream;
|
38
|
-
}
|
39
|
-
else
|
40
|
-
{
|
41
|
-
|
42
|
+
OnNextAction = onNext;
|
42
|
-
}
|
43
|
-
|
43
|
+
OnErrorAction = onError;
|
44
|
-
|
44
|
+
OnCompletedAction = onCompleted;
|
45
45
|
}
|
46
46
|
|
47
|
-
private class Abyss<TUpstream, TDownstream> : IObserver<TUpstream>, IObservable<TDownstream>
|
48
|
-
{
|
49
|
-
public Abyss(
|
50
|
-
IObservable<TUpstream> upstream,
|
51
|
-
Action<TUpstream, Action<TDownstream>> onNext = null,
|
52
|
-
Action<Exception, Action<Exception>> onError = null,
|
53
|
-
|
47
|
+
private IDisposable upstreamDisposable = null;
|
54
|
-
{
|
55
|
-
Upstream = upstream;
|
56
|
-
OnNextAction = onNext;
|
57
|
-
OnErrorAction = onError;
|
58
|
-
OnCompletedAction = onCompleted;
|
59
|
-
}
|
60
48
|
|
61
|
-
|
49
|
+
protected IObservable<TUpstream> Upstream { get; private set; }
|
50
|
+
protected Action<TUpstream, IEnumerable<IObserver<TDownstream>>> OnNextAction { get; private set; }
|
51
|
+
protected Action<Exception, IEnumerable<IObserver<TDownstream>>> OnErrorAction { get; private set; }
|
52
|
+
protected Action<IEnumerable<IObserver<TDownstream>>> OnCompletedAction { get; private set; }
|
62
53
|
|
63
|
-
protected IObservable<TUpstream> Upstream { get; private set; }
|
64
|
-
|
54
|
+
protected HashSet<IObserver<TDownstream>> Observers { get; }
|
65
|
-
protected Action<Exception, Action<Exception>> OnErrorAction { get; private set; }
|
66
|
-
|
55
|
+
= new HashSet<IObserver<TDownstream>>();
|
67
56
|
|
68
|
-
protected HashSet<IObserver<TDownstream>> Observers { get; }
|
69
|
-
= new HashSet<IObserver<TDownstream>>();
|
70
|
-
|
71
|
-
|
57
|
+
public void OnCompleted()
|
58
|
+
{
|
59
|
+
DisconnectUpstream();
|
60
|
+
if (OnCompletedAction != null)
|
72
61
|
{
|
62
|
+
OnCompletedAction(Observers);
|
63
|
+
}
|
64
|
+
else
|
65
|
+
{
|
73
66
|
foreach (var observer in Observers)
|
74
67
|
{
|
75
|
-
|
68
|
+
observer.OnCompleted();
|
76
69
|
}
|
77
70
|
}
|
71
|
+
Observers.Clear();
|
72
|
+
}
|
78
73
|
|
79
|
-
|
74
|
+
public void OnError(Exception error)
|
75
|
+
{
|
76
|
+
DisconnectUpstream();
|
77
|
+
if (OnErrorAction != null)
|
80
78
|
{
|
79
|
+
OnErrorAction(error, Observers);
|
80
|
+
}
|
81
|
+
else
|
82
|
+
{
|
81
83
|
foreach (var observer in Observers)
|
82
84
|
{
|
83
|
-
|
85
|
+
observer.OnError(error);
|
84
86
|
}
|
85
87
|
}
|
88
|
+
Observers.Clear();
|
89
|
+
}
|
86
90
|
|
91
|
+
[System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "<保留中>")]
|
87
|
-
|
92
|
+
public void OnNext(TUpstream value)
|
93
|
+
{
|
94
|
+
try
|
88
95
|
{
|
89
|
-
|
96
|
+
OnNextAction?.Invoke(value, Observers);
|
97
|
+
}
|
98
|
+
catch (Exception error)
|
99
|
+
{
|
100
|
+
OnError(error);
|
101
|
+
}
|
102
|
+
}
|
103
|
+
|
104
|
+
public IDisposable Subscribe(IObserver<TDownstream> observer)
|
105
|
+
{
|
106
|
+
if (observer == null) throw new ArgumentNullException(nameof(observer));
|
107
|
+
Observers.Add(observer);
|
108
|
+
if (upstreamDisposable == null)
|
109
|
+
{
|
110
|
+
upstreamDisposable = Upstream?.Subscribe(this);
|
111
|
+
}
|
112
|
+
return new Disposable(() =>
|
113
|
+
{
|
114
|
+
Observers.Remove(observer);
|
115
|
+
if (Observers.Count == 0)
|
90
116
|
{
|
91
|
-
|
117
|
+
DisconnectUpstream();
|
92
118
|
}
|
119
|
+
});
|
120
|
+
}
|
121
|
+
|
122
|
+
private void DisconnectUpstream()
|
123
|
+
{
|
124
|
+
upstreamDisposable?.Dispose();
|
125
|
+
upstreamDisposable = null;
|
126
|
+
}
|
127
|
+
|
128
|
+
private class Disposable : IDisposable
|
129
|
+
{
|
130
|
+
public Disposable(Action action)
|
131
|
+
{
|
132
|
+
disposeAction = action;
|
93
133
|
}
|
94
134
|
|
95
|
-
|
135
|
+
private readonly Action disposeAction;
|
136
|
+
|
137
|
+
public void Dispose()
|
96
138
|
{
|
97
|
-
if (observer == null) throw new ArgumentNullException(nameof(observer));
|
98
|
-
|
139
|
+
disposeAction?.Invoke();
|
140
|
+
}
|
141
|
+
}
|
142
|
+
}
|
143
|
+
|
144
|
+
public static class AbyssExtension
|
145
|
+
{
|
146
|
+
public static IObservable<T> Case<T>(
|
99
|
-
|
147
|
+
this IObservable<T> upstream,
|
148
|
+
Func<T, bool> condition,
|
149
|
+
Action<T> action)
|
150
|
+
{
|
151
|
+
return new Abyss<T, T>(upstream, (data, observers) =>
|
152
|
+
{
|
153
|
+
if (condition?.Invoke(data) ?? false && action != null)
|
100
154
|
{
|
101
|
-
|
155
|
+
action.Invoke(data);
|
102
156
|
}
|
103
|
-
|
157
|
+
else
|
104
158
|
{
|
105
|
-
|
159
|
+
foreach (var observer in observers)
|
106
|
-
if (Observers.Count == 0)
|
107
160
|
{
|
108
|
-
upstreamDisposable?.Dispose();
|
109
|
-
|
161
|
+
observer?.OnNext(data);
|
110
162
|
}
|
163
|
+
}
|
111
|
-
|
164
|
+
});
|
112
|
-
}
|
113
165
|
}
|
114
166
|
}
|
115
167
|
}
|
3
修正
answer
CHANGED
@@ -96,13 +96,17 @@
|
|
96
96
|
{
|
97
97
|
if (observer == null) throw new ArgumentNullException(nameof(observer));
|
98
98
|
Observers.Add(observer);
|
99
|
+
if (upstreamDisposable == null)
|
100
|
+
{
|
99
|
-
|
101
|
+
upstreamDisposable = Upstream?.Subscribe(this);
|
102
|
+
}
|
100
103
|
return Disposable.Create(() =>
|
101
104
|
{
|
102
105
|
Observers.Remove(observer);
|
103
106
|
if (Observers.Count == 0)
|
104
107
|
{
|
105
108
|
upstreamDisposable?.Dispose();
|
109
|
+
upstreamDisposable = null;
|
106
110
|
}
|
107
111
|
});
|
108
112
|
}
|
2
修正
answer
CHANGED
@@ -94,6 +94,7 @@
|
|
94
94
|
|
95
95
|
public IDisposable Subscribe(IObserver<TDownstream> observer)
|
96
96
|
{
|
97
|
+
if (observer == null) throw new ArgumentNullException(nameof(observer));
|
97
98
|
Observers.Add(observer);
|
98
99
|
upstreamDisposable = Upstream?.Subscribe(this);
|
99
100
|
return Disposable.Create(() =>
|
1
修正
answer
CHANGED
@@ -34,11 +34,11 @@
|
|
34
34
|
{
|
35
35
|
if (condition?.Invoke(data) ?? false)
|
36
36
|
{
|
37
|
-
action(data);
|
37
|
+
action?.Invoke(data);
|
38
38
|
}
|
39
39
|
else
|
40
40
|
{
|
41
|
-
next(data);
|
41
|
+
next?.Invoke(data);
|
42
42
|
}
|
43
43
|
};
|
44
44
|
return new Abyss<T, T>(upstream, onNext);
|