回答編集履歴
6
修正
test
CHANGED
@@ -12,6 +12,8 @@
|
|
12
12
|
|
13
13
|
using System.Collections.Generic;
|
14
14
|
|
15
|
+
using System.Linq;
|
16
|
+
|
15
17
|
using System.Reactive.Linq;
|
16
18
|
|
17
19
|
|
@@ -32,27 +34,13 @@
|
|
32
34
|
|
33
35
|
.Range(1, 15)
|
34
36
|
|
35
|
-
.Case(
|
36
|
-
|
37
|
-
a => a % 15 == 0,
|
38
|
-
|
39
|
-
|
37
|
+
.Case(a => a % 15 == 0, _ => Console.WriteLine("FizzBuzz"))
|
40
|
-
|
41
|
-
|
38
|
+
|
42
|
-
|
43
|
-
a => a % 3 == 0,
|
44
|
-
|
45
|
-
|
39
|
+
.Case(a => a % 3 == 0, _ => Console.WriteLine("Fizz"))
|
46
|
-
|
47
|
-
|
40
|
+
|
48
|
-
|
49
|
-
a => a % 5 == 0,
|
50
|
-
|
51
|
-
|
41
|
+
.Case(a => a % 5 == 0, _ => Console.WriteLine("Buzz"))
|
52
|
-
|
53
|
-
|
42
|
+
|
54
|
-
|
55
|
-
|
43
|
+
.Subscribe(a => Console.WriteLine(a));
|
56
44
|
|
57
45
|
}
|
58
46
|
|
@@ -118,7 +106,7 @@
|
|
118
106
|
|
119
107
|
{
|
120
108
|
|
121
|
-
OnCompletedAction(Observers);
|
109
|
+
OnCompletedAction(Observers.ToArray());
|
122
110
|
|
123
111
|
}
|
124
112
|
|
@@ -126,7 +114,7 @@
|
|
126
114
|
|
127
115
|
{
|
128
116
|
|
129
|
-
foreach (var observer in Observers)
|
117
|
+
foreach (var observer in Observers.ToArray())
|
130
118
|
|
131
119
|
{
|
132
120
|
|
@@ -152,7 +140,7 @@
|
|
152
140
|
|
153
141
|
{
|
154
142
|
|
155
|
-
OnErrorAction(error, Observers);
|
143
|
+
OnErrorAction(error, Observers.ToArray());
|
156
144
|
|
157
145
|
}
|
158
146
|
|
@@ -160,7 +148,7 @@
|
|
160
148
|
|
161
149
|
{
|
162
150
|
|
163
|
-
foreach (var observer in Observers)
|
151
|
+
foreach (var observer in Observers.ToArray())
|
164
152
|
|
165
153
|
{
|
166
154
|
|
@@ -186,15 +174,15 @@
|
|
186
174
|
|
187
175
|
{
|
188
176
|
|
189
|
-
OnNextAction?.Invoke(value, Observers);
|
177
|
+
OnNextAction?.Invoke(value, Observers.ToArray());
|
190
|
-
|
178
|
+
|
191
|
-
}
|
179
|
+
}
|
192
|
-
|
180
|
+
|
193
|
-
catch (Exception e
|
181
|
+
catch (Exception e)
|
194
|
-
|
182
|
+
|
195
|
-
{
|
183
|
+
{
|
196
|
-
|
184
|
+
|
197
|
-
OnError(e
|
185
|
+
OnError(e);
|
198
186
|
|
199
187
|
}
|
200
188
|
|
@@ -214,7 +202,11 @@
|
|
214
202
|
|
215
203
|
{
|
216
204
|
|
205
|
+
var connectable = Upstream?.Publish();
|
206
|
+
|
217
|
-
upstreamDisposable =
|
207
|
+
upstreamDisposable = connectable?.Subscribe(this);
|
208
|
+
|
209
|
+
connectable?.Connect();
|
218
210
|
|
219
211
|
}
|
220
212
|
|
@@ -286,6 +278,24 @@
|
|
286
278
|
|
287
279
|
{
|
288
280
|
|
281
|
+
public static IObservable<TDownstream> Abyss<TUpstream, TDownstream>(
|
282
|
+
|
283
|
+
this IObservable<TUpstream> upstream,
|
284
|
+
|
285
|
+
Action<TUpstream, IEnumerable<IObserver<TDownstream>>> onNext = null,
|
286
|
+
|
287
|
+
Action<Exception, IEnumerable<IObserver<TDownstream>>> onError = null,
|
288
|
+
|
289
|
+
Action<IEnumerable<IObserver<TDownstream>>> onCompleted = null)
|
290
|
+
|
291
|
+
{
|
292
|
+
|
293
|
+
return new Abyss<TUpstream, TDownstream>(upstream, onNext, onError, onCompleted);
|
294
|
+
|
295
|
+
}
|
296
|
+
|
297
|
+
|
298
|
+
|
289
299
|
public static IObservable<T> Case<T>(
|
290
300
|
|
291
301
|
this IObservable<T> upstream,
|
5
修正
test
CHANGED
@@ -28,8 +28,6 @@
|
|
28
28
|
|
29
29
|
{
|
30
30
|
|
31
|
-
var list = new List<string>();
|
32
|
-
|
33
31
|
Observable
|
34
32
|
|
35
33
|
.Range(1, 15)
|
4
修正
test
CHANGED
@@ -2,7 +2,7 @@
|
|
2
2
|
|
3
3
|
しかし、あえて自分用に作ってみました。
|
4
4
|
|
5
|
-
IObserver<T>, IObservable<T> を実装し、ラムダ式で振る舞いを変えられる汎用クラスである Abyss と、それを使用する拡張メソッド
|
5
|
+
IObserver<T>, IObservable<T> を実装し、ラムダ式で振る舞いを変えられる汎用クラスである Abyss と、それを使用する拡張メソッド Case です。
|
6
6
|
|
7
7
|
|
8
8
|
|
@@ -12,8 +12,6 @@
|
|
12
12
|
|
13
13
|
using System.Collections.Generic;
|
14
14
|
|
15
|
-
using System.Reactive.Disposables;
|
16
|
-
|
17
15
|
using System.Reactive.Linq;
|
18
16
|
|
19
17
|
|
@@ -30,17 +28,33 @@
|
|
30
28
|
|
31
29
|
{
|
32
30
|
|
31
|
+
var list = new List<string>();
|
32
|
+
|
33
33
|
Observable
|
34
34
|
|
35
35
|
.Range(1, 15)
|
36
36
|
|
37
|
+
.Case(
|
38
|
+
|
39
|
+
a => a % 15 == 0,
|
40
|
+
|
37
|
-
|
41
|
+
a => Console.WriteLine("FizzBuzz"))
|
42
|
+
|
38
|
-
|
43
|
+
.Case(
|
44
|
+
|
45
|
+
a => a % 3 == 0,
|
46
|
+
|
39
|
-
|
47
|
+
a => Console.WriteLine("Fizz"))
|
48
|
+
|
40
|
-
|
49
|
+
.Case(
|
50
|
+
|
51
|
+
a => a % 5 == 0,
|
52
|
+
|
41
|
-
|
53
|
+
a => Console.WriteLine("Buzz"))
|
54
|
+
|
42
|
-
|
55
|
+
.Subscribe(
|
56
|
+
|
43
|
-
|
57
|
+
a => Console.WriteLine(a));
|
44
58
|
|
45
59
|
}
|
46
60
|
|
@@ -48,11 +62,233 @@
|
|
48
62
|
|
49
63
|
|
50
64
|
|
65
|
+
public class Abyss<TUpstream, TDownstream> : IObserver<TUpstream>, IObservable<TDownstream>
|
66
|
+
|
67
|
+
{
|
68
|
+
|
69
|
+
public Abyss(
|
70
|
+
|
71
|
+
IObservable<TUpstream> upstream,
|
72
|
+
|
73
|
+
Action<TUpstream, IEnumerable<IObserver<TDownstream>>> onNext = null,
|
74
|
+
|
75
|
+
Action<Exception, IEnumerable<IObserver<TDownstream>>> onError = null,
|
76
|
+
|
77
|
+
Action<IEnumerable<IObserver<TDownstream>>> onCompleted = null)
|
78
|
+
|
79
|
+
{
|
80
|
+
|
81
|
+
Upstream = upstream;
|
82
|
+
|
83
|
+
OnNextAction = onNext;
|
84
|
+
|
85
|
+
OnErrorAction = onError;
|
86
|
+
|
87
|
+
OnCompletedAction = onCompleted;
|
88
|
+
|
89
|
+
}
|
90
|
+
|
91
|
+
|
92
|
+
|
93
|
+
private IDisposable upstreamDisposable = null;
|
94
|
+
|
95
|
+
|
96
|
+
|
97
|
+
protected IObservable<TUpstream> Upstream { get; private set; }
|
98
|
+
|
99
|
+
protected Action<TUpstream, IEnumerable<IObserver<TDownstream>>> OnNextAction { get; private set; }
|
100
|
+
|
101
|
+
protected Action<Exception, IEnumerable<IObserver<TDownstream>>> OnErrorAction { get; private set; }
|
102
|
+
|
103
|
+
protected Action<IEnumerable<IObserver<TDownstream>>> OnCompletedAction { get; private set; }
|
104
|
+
|
105
|
+
|
106
|
+
|
107
|
+
protected HashSet<IObserver<TDownstream>> Observers { get; }
|
108
|
+
|
109
|
+
= new HashSet<IObserver<TDownstream>>();
|
110
|
+
|
111
|
+
|
112
|
+
|
113
|
+
public void OnCompleted()
|
114
|
+
|
115
|
+
{
|
116
|
+
|
117
|
+
DisconnectUpstream();
|
118
|
+
|
119
|
+
if (OnCompletedAction != null)
|
120
|
+
|
121
|
+
{
|
122
|
+
|
123
|
+
OnCompletedAction(Observers);
|
124
|
+
|
125
|
+
}
|
126
|
+
|
127
|
+
else
|
128
|
+
|
129
|
+
{
|
130
|
+
|
131
|
+
foreach (var observer in Observers)
|
132
|
+
|
133
|
+
{
|
134
|
+
|
135
|
+
observer.OnCompleted();
|
136
|
+
|
137
|
+
}
|
138
|
+
|
139
|
+
}
|
140
|
+
|
141
|
+
Observers.Clear();
|
142
|
+
|
143
|
+
}
|
144
|
+
|
145
|
+
|
146
|
+
|
147
|
+
public void OnError(Exception error)
|
148
|
+
|
149
|
+
{
|
150
|
+
|
151
|
+
DisconnectUpstream();
|
152
|
+
|
153
|
+
if (OnErrorAction != null)
|
154
|
+
|
155
|
+
{
|
156
|
+
|
157
|
+
OnErrorAction(error, Observers);
|
158
|
+
|
159
|
+
}
|
160
|
+
|
161
|
+
else
|
162
|
+
|
163
|
+
{
|
164
|
+
|
165
|
+
foreach (var observer in Observers)
|
166
|
+
|
167
|
+
{
|
168
|
+
|
169
|
+
observer.OnError(error);
|
170
|
+
|
171
|
+
}
|
172
|
+
|
173
|
+
}
|
174
|
+
|
175
|
+
Observers.Clear();
|
176
|
+
|
177
|
+
}
|
178
|
+
|
179
|
+
|
180
|
+
|
181
|
+
[System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "<保留中>")]
|
182
|
+
|
183
|
+
public void OnNext(TUpstream value)
|
184
|
+
|
185
|
+
{
|
186
|
+
|
187
|
+
try
|
188
|
+
|
189
|
+
{
|
190
|
+
|
191
|
+
OnNextAction?.Invoke(value, Observers);
|
192
|
+
|
193
|
+
}
|
194
|
+
|
195
|
+
catch (Exception error)
|
196
|
+
|
197
|
+
{
|
198
|
+
|
199
|
+
OnError(error);
|
200
|
+
|
201
|
+
}
|
202
|
+
|
203
|
+
}
|
204
|
+
|
205
|
+
|
206
|
+
|
207
|
+
public IDisposable Subscribe(IObserver<TDownstream> observer)
|
208
|
+
|
209
|
+
{
|
210
|
+
|
211
|
+
if (observer == null) throw new ArgumentNullException(nameof(observer));
|
212
|
+
|
213
|
+
Observers.Add(observer);
|
214
|
+
|
215
|
+
if (upstreamDisposable == null)
|
216
|
+
|
217
|
+
{
|
218
|
+
|
219
|
+
upstreamDisposable = Upstream?.Subscribe(this);
|
220
|
+
|
221
|
+
}
|
222
|
+
|
223
|
+
return new Disposable(() =>
|
224
|
+
|
225
|
+
{
|
226
|
+
|
227
|
+
Observers.Remove(observer);
|
228
|
+
|
229
|
+
if (Observers.Count == 0)
|
230
|
+
|
231
|
+
{
|
232
|
+
|
233
|
+
DisconnectUpstream();
|
234
|
+
|
235
|
+
}
|
236
|
+
|
237
|
+
});
|
238
|
+
|
239
|
+
}
|
240
|
+
|
241
|
+
|
242
|
+
|
243
|
+
private void DisconnectUpstream()
|
244
|
+
|
245
|
+
{
|
246
|
+
|
247
|
+
upstreamDisposable?.Dispose();
|
248
|
+
|
249
|
+
upstreamDisposable = null;
|
250
|
+
|
251
|
+
}
|
252
|
+
|
253
|
+
|
254
|
+
|
255
|
+
private class Disposable : IDisposable
|
256
|
+
|
257
|
+
{
|
258
|
+
|
259
|
+
public Disposable(Action action)
|
260
|
+
|
261
|
+
{
|
262
|
+
|
263
|
+
disposeAction = action;
|
264
|
+
|
265
|
+
}
|
266
|
+
|
267
|
+
|
268
|
+
|
269
|
+
private readonly Action disposeAction;
|
270
|
+
|
271
|
+
|
272
|
+
|
273
|
+
public void Dispose()
|
274
|
+
|
275
|
+
{
|
276
|
+
|
277
|
+
disposeAction?.Invoke();
|
278
|
+
|
279
|
+
}
|
280
|
+
|
281
|
+
}
|
282
|
+
|
283
|
+
}
|
284
|
+
|
285
|
+
|
286
|
+
|
51
287
|
public static class AbyssExtension
|
52
288
|
|
53
289
|
{
|
54
290
|
|
55
|
-
public static IObservable<T>
|
291
|
+
public static IObservable<T> Case<T>(
|
56
292
|
|
57
293
|
this IObservable<T> upstream,
|
58
294
|
|
@@ -62,15 +298,15 @@
|
|
62
298
|
|
63
299
|
{
|
64
300
|
|
65
|
-
|
301
|
+
return new Abyss<T, T>(upstream, (data, observers) =>
|
66
|
-
|
302
|
+
|
67
|
-
{
|
303
|
+
{
|
68
|
-
|
304
|
+
|
69
|
-
if (condition?.Invoke(data) ?? false)
|
305
|
+
if (condition?.Invoke(data) ?? false && action != null)
|
70
|
-
|
306
|
+
|
71
|
-
{
|
307
|
+
{
|
72
|
-
|
308
|
+
|
73
|
-
action
|
309
|
+
action.Invoke(data);
|
74
310
|
|
75
311
|
}
|
76
312
|
|
@@ -78,149 +314,17 @@
|
|
78
314
|
|
79
315
|
{
|
80
316
|
|
81
|
-
next?.Invoke(data);
|
82
|
-
|
83
|
-
}
|
84
|
-
|
85
|
-
};
|
86
|
-
|
87
|
-
return new Abyss<T, T>(upstream, onNext);
|
88
|
-
|
89
|
-
}
|
90
|
-
|
91
|
-
|
92
|
-
|
93
|
-
private class Abyss<TUpstream, TDownstream> : IObserver<TUpstream>, IObservable<TDownstream>
|
94
|
-
|
95
|
-
{
|
96
|
-
|
97
|
-
public Abyss(
|
98
|
-
|
99
|
-
IObservable<TUpstream> upstream,
|
100
|
-
|
101
|
-
Action<TUpstream, Action<TDownstream>> onNext = null,
|
102
|
-
|
103
|
-
Action<Exception, Action<Exception>> onError = null,
|
104
|
-
|
105
|
-
Action<Action> onCompleted = null)
|
106
|
-
|
107
|
-
{
|
108
|
-
|
109
|
-
Upstream = upstream;
|
110
|
-
|
111
|
-
OnNextAction = onNext;
|
112
|
-
|
113
|
-
OnErrorAction = onError;
|
114
|
-
|
115
|
-
OnCompletedAction = onCompleted;
|
116
|
-
|
117
|
-
}
|
118
|
-
|
119
|
-
|
120
|
-
|
121
|
-
private IDisposable upstreamDisposable = null;
|
122
|
-
|
123
|
-
|
124
|
-
|
125
|
-
protected IObservable<TUpstream> Upstream { get; private set; }
|
126
|
-
|
127
|
-
protected Action<TUpstream, Action<TDownstream>> OnNextAction { get; private set; }
|
128
|
-
|
129
|
-
protected Action<Exception, Action<Exception>> OnErrorAction { get; private set; }
|
130
|
-
|
131
|
-
protected Action<Action> OnCompletedAction { get; private set; }
|
132
|
-
|
133
|
-
|
134
|
-
|
135
|
-
protected HashSet<IObserver<TDownstream>> Observers { get; }
|
136
|
-
|
137
|
-
= new HashSet<IObserver<TDownstream>>();
|
138
|
-
|
139
|
-
|
140
|
-
|
141
|
-
public void OnCompleted()
|
142
|
-
|
143
|
-
{
|
144
|
-
|
145
|
-
foreach (var observer in
|
317
|
+
foreach (var observer in observers)
|
146
|
-
|
147
|
-
{
|
148
|
-
|
149
|
-
OnCompletedAction?.Invoke(observer.OnCompleted);
|
150
|
-
|
151
|
-
}
|
152
|
-
|
153
|
-
}
|
154
|
-
|
155
|
-
|
156
|
-
|
157
|
-
public void OnError(Exception error)
|
158
|
-
|
159
|
-
{
|
160
|
-
|
161
|
-
foreach (var observer in Observers)
|
162
|
-
|
163
|
-
{
|
164
|
-
|
165
|
-
OnErrorAction?.Invoke(error, observer.OnError);
|
166
|
-
|
167
|
-
}
|
168
|
-
|
169
|
-
}
|
170
|
-
|
171
|
-
|
172
|
-
|
173
|
-
public void OnNext(TUpstream value)
|
174
|
-
|
175
|
-
{
|
176
|
-
|
177
|
-
foreach (var observer in Observers)
|
178
|
-
|
179
|
-
{
|
180
|
-
|
181
|
-
OnNextAction?.Invoke(value, observer.OnNext);
|
182
|
-
|
183
|
-
}
|
184
|
-
|
185
|
-
}
|
186
|
-
|
187
|
-
|
188
|
-
|
189
|
-
public IDisposable Subscribe(IObserver<TDownstream> observer)
|
190
|
-
|
191
|
-
{
|
192
|
-
|
193
|
-
if (observer == null) throw new ArgumentNullException(nameof(observer));
|
194
|
-
|
195
|
-
Observers.Add(observer);
|
196
|
-
|
197
|
-
if (upstreamDisposable == null)
|
198
|
-
|
199
|
-
{
|
200
|
-
|
201
|
-
upstreamDisposable = Upstream?.Subscribe(this);
|
202
|
-
|
203
|
-
}
|
204
|
-
|
205
|
-
return Disposable.Create(() =>
|
206
|
-
|
207
|
-
{
|
208
|
-
|
209
|
-
Observers.Remove(observer);
|
210
|
-
|
211
|
-
if (Observers.Count == 0)
|
212
318
|
|
213
319
|
{
|
214
320
|
|
215
|
-
upstreamDisposable?.Dispose();
|
216
|
-
|
217
|
-
|
321
|
+
observer?.OnNext(data);
|
218
322
|
|
219
323
|
}
|
220
324
|
|
325
|
+
}
|
326
|
+
|
221
|
-
|
327
|
+
});
|
222
|
-
|
223
|
-
}
|
224
328
|
|
225
329
|
}
|
226
330
|
|
3
修正
test
CHANGED
@@ -194,7 +194,13 @@
|
|
194
194
|
|
195
195
|
Observers.Add(observer);
|
196
196
|
|
197
|
+
if (upstreamDisposable == null)
|
198
|
+
|
199
|
+
{
|
200
|
+
|
197
|
-
upstreamDisposable = Upstream?.Subscribe(this);
|
201
|
+
upstreamDisposable = Upstream?.Subscribe(this);
|
202
|
+
|
203
|
+
}
|
198
204
|
|
199
205
|
return Disposable.Create(() =>
|
200
206
|
|
@@ -208,6 +214,8 @@
|
|
208
214
|
|
209
215
|
upstreamDisposable?.Dispose();
|
210
216
|
|
217
|
+
upstreamDisposable = null;
|
218
|
+
|
211
219
|
}
|
212
220
|
|
213
221
|
});
|
2
修正
test
CHANGED
@@ -190,6 +190,8 @@
|
|
190
190
|
|
191
191
|
{
|
192
192
|
|
193
|
+
if (observer == null) throw new ArgumentNullException(nameof(observer));
|
194
|
+
|
193
195
|
Observers.Add(observer);
|
194
196
|
|
195
197
|
upstreamDisposable = Upstream?.Subscribe(this);
|
1
修正
test
CHANGED
@@ -70,7 +70,7 @@
|
|
70
70
|
|
71
71
|
{
|
72
72
|
|
73
|
-
action(data);
|
73
|
+
action?.Invoke(data);
|
74
74
|
|
75
75
|
}
|
76
76
|
|
@@ -78,7 +78,7 @@
|
|
78
78
|
|
79
79
|
{
|
80
80
|
|
81
|
-
next(data);
|
81
|
+
next?.Invoke(data);
|
82
82
|
|
83
83
|
}
|
84
84
|
|