回答編集履歴

6

修正

2019/11/27 04:38

投稿

Zuishin
Zuishin

スコア28662

test CHANGED
@@ -12,6 +12,8 @@
12
12
 
13
13
  using System.Collections.Generic;
14
14
 
15
+ using System.Linq;
16
+
15
17
  using System.Reactive.Linq;
16
18
 
17
19
 
@@ -32,27 +34,13 @@
32
34
 
33
35
  .Range(1, 15)
34
36
 
35
- .Case(
36
-
37
- a => a % 15 == 0,
38
-
39
- a => Console.WriteLine("FizzBuzz"))
37
+ .Case(a => a % 15 == 0, _ => Console.WriteLine("FizzBuzz"))
40
-
41
- .Case(
38
+
42
-
43
- a => a % 3 == 0,
44
-
45
- a => Console.WriteLine("Fizz"))
39
+ .Case(a => a % 3 == 0, _ => Console.WriteLine("Fizz"))
46
-
47
- .Case(
40
+
48
-
49
- a => a % 5 == 0,
50
-
51
- a => Console.WriteLine("Buzz"))
41
+ .Case(a => a % 5 == 0, _ => Console.WriteLine("Buzz"))
52
-
53
- .Subscribe(
42
+
54
-
55
- a => Console.WriteLine(a));
43
+ .Subscribe(a => Console.WriteLine(a));
56
44
 
57
45
  }
58
46
 
@@ -118,7 +106,7 @@
118
106
 
119
107
  {
120
108
 
121
- OnCompletedAction(Observers);
109
+ OnCompletedAction(Observers.ToArray());
122
110
 
123
111
  }
124
112
 
@@ -126,7 +114,7 @@
126
114
 
127
115
  {
128
116
 
129
- foreach (var observer in Observers)
117
+ foreach (var observer in Observers.ToArray())
130
118
 
131
119
  {
132
120
 
@@ -152,7 +140,7 @@
152
140
 
153
141
  {
154
142
 
155
- OnErrorAction(error, Observers);
143
+ OnErrorAction(error, Observers.ToArray());
156
144
 
157
145
  }
158
146
 
@@ -160,7 +148,7 @@
160
148
 
161
149
  {
162
150
 
163
- foreach (var observer in Observers)
151
+ foreach (var observer in Observers.ToArray())
164
152
 
165
153
  {
166
154
 
@@ -186,15 +174,15 @@
186
174
 
187
175
  {
188
176
 
189
- OnNextAction?.Invoke(value, Observers);
177
+ OnNextAction?.Invoke(value, Observers.ToArray());
190
-
178
+
191
- }
179
+ }
192
-
180
+
193
- catch (Exception error)
181
+ catch (Exception e)
194
-
182
+
195
- {
183
+ {
196
-
184
+
197
- OnError(error);
185
+ OnError(e);
198
186
 
199
187
  }
200
188
 
@@ -214,7 +202,11 @@
214
202
 
215
203
  {
216
204
 
205
+ var connectable = Upstream?.Publish();
206
+
217
- upstreamDisposable = Upstream?.Subscribe(this);
207
+ upstreamDisposable = connectable?.Subscribe(this);
208
+
209
+ connectable?.Connect();
218
210
 
219
211
  }
220
212
 
@@ -286,6 +278,24 @@
286
278
 
287
279
  {
288
280
 
281
+ public static IObservable<TDownstream> Abyss<TUpstream, TDownstream>(
282
+
283
+ this IObservable<TUpstream> upstream,
284
+
285
+ Action<TUpstream, IEnumerable<IObserver<TDownstream>>> onNext = null,
286
+
287
+ Action<Exception, IEnumerable<IObserver<TDownstream>>> onError = null,
288
+
289
+ Action<IEnumerable<IObserver<TDownstream>>> onCompleted = null)
290
+
291
+ {
292
+
293
+ return new Abyss<TUpstream, TDownstream>(upstream, onNext, onError, onCompleted);
294
+
295
+ }
296
+
297
+
298
+
289
299
  public static IObservable<T> Case<T>(
290
300
 
291
301
  this IObservable<T> upstream,

5

修正

2019/11/27 04:38

投稿

Zuishin
Zuishin

スコア28662

test CHANGED
@@ -28,8 +28,6 @@
28
28
 
29
29
  {
30
30
 
31
- var list = new List<string>();
32
-
33
31
  Observable
34
32
 
35
33
  .Range(1, 15)

4

修正

2019/11/27 02:38

投稿

Zuishin
Zuishin

スコア28662

test CHANGED
@@ -2,7 +2,7 @@
2
2
 
3
3
  しかし、あえて自分用に作ってみました。
4
4
 
5
- IObserver<T>, IObservable<T> を実装し、ラムダ式で振る舞いを変えられる汎用クラスである Abyss と、それを使用する拡張メソッド When です。
5
+ IObserver<T>, IObservable<T> を実装し、ラムダ式で振る舞いを変えられる汎用クラスである Abyss と、それを使用する拡張メソッド Case です。
6
6
 
7
7
 
8
8
 
@@ -12,8 +12,6 @@
12
12
 
13
13
  using System.Collections.Generic;
14
14
 
15
- using System.Reactive.Disposables;
16
-
17
15
  using System.Reactive.Linq;
18
16
 
19
17
 
@@ -30,17 +28,33 @@
30
28
 
31
29
  {
32
30
 
31
+ var list = new List<string>();
32
+
33
33
  Observable
34
34
 
35
35
  .Range(1, 15)
36
36
 
37
+ .Case(
38
+
39
+ a => a % 15 == 0,
40
+
37
- .When(a => a % 15 == 0, _ => Console.WriteLine("FizzBuzz"))
41
+ a => Console.WriteLine("FizzBuzz"))
42
+
38
-
43
+ .Case(
44
+
45
+ a => a % 3 == 0,
46
+
39
- .When(a => a % 3 == 0, _ => Console.WriteLine("Fizz"))
47
+ a => Console.WriteLine("Fizz"))
48
+
40
-
49
+ .Case(
50
+
51
+ a => a % 5 == 0,
52
+
41
- .When(a => a % 5 == 0, _ => Console.WriteLine("Buzz"))
53
+ a => Console.WriteLine("Buzz"))
54
+
42
-
55
+ .Subscribe(
56
+
43
- .Subscribe(a => Console.WriteLine(a));
57
+ a => Console.WriteLine(a));
44
58
 
45
59
  }
46
60
 
@@ -48,11 +62,233 @@
48
62
 
49
63
 
50
64
 
65
+ public class Abyss<TUpstream, TDownstream> : IObserver<TUpstream>, IObservable<TDownstream>
66
+
67
+ {
68
+
69
+ public Abyss(
70
+
71
+ IObservable<TUpstream> upstream,
72
+
73
+ Action<TUpstream, IEnumerable<IObserver<TDownstream>>> onNext = null,
74
+
75
+ Action<Exception, IEnumerable<IObserver<TDownstream>>> onError = null,
76
+
77
+ Action<IEnumerable<IObserver<TDownstream>>> onCompleted = null)
78
+
79
+ {
80
+
81
+ Upstream = upstream;
82
+
83
+ OnNextAction = onNext;
84
+
85
+ OnErrorAction = onError;
86
+
87
+ OnCompletedAction = onCompleted;
88
+
89
+ }
90
+
91
+
92
+
93
+ private IDisposable upstreamDisposable = null;
94
+
95
+
96
+
97
+ protected IObservable<TUpstream> Upstream { get; private set; }
98
+
99
+ protected Action<TUpstream, IEnumerable<IObserver<TDownstream>>> OnNextAction { get; private set; }
100
+
101
+ protected Action<Exception, IEnumerable<IObserver<TDownstream>>> OnErrorAction { get; private set; }
102
+
103
+ protected Action<IEnumerable<IObserver<TDownstream>>> OnCompletedAction { get; private set; }
104
+
105
+
106
+
107
+ protected HashSet<IObserver<TDownstream>> Observers { get; }
108
+
109
+ = new HashSet<IObserver<TDownstream>>();
110
+
111
+
112
+
113
+ public void OnCompleted()
114
+
115
+ {
116
+
117
+ DisconnectUpstream();
118
+
119
+ if (OnCompletedAction != null)
120
+
121
+ {
122
+
123
+ OnCompletedAction(Observers);
124
+
125
+ }
126
+
127
+ else
128
+
129
+ {
130
+
131
+ foreach (var observer in Observers)
132
+
133
+ {
134
+
135
+ observer.OnCompleted();
136
+
137
+ }
138
+
139
+ }
140
+
141
+ Observers.Clear();
142
+
143
+ }
144
+
145
+
146
+
147
+ public void OnError(Exception error)
148
+
149
+ {
150
+
151
+ DisconnectUpstream();
152
+
153
+ if (OnErrorAction != null)
154
+
155
+ {
156
+
157
+ OnErrorAction(error, Observers);
158
+
159
+ }
160
+
161
+ else
162
+
163
+ {
164
+
165
+ foreach (var observer in Observers)
166
+
167
+ {
168
+
169
+ observer.OnError(error);
170
+
171
+ }
172
+
173
+ }
174
+
175
+ Observers.Clear();
176
+
177
+ }
178
+
179
+
180
+
181
+ [System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "<保留中>")]
182
+
183
+ public void OnNext(TUpstream value)
184
+
185
+ {
186
+
187
+ try
188
+
189
+ {
190
+
191
+ OnNextAction?.Invoke(value, Observers);
192
+
193
+ }
194
+
195
+ catch (Exception error)
196
+
197
+ {
198
+
199
+ OnError(error);
200
+
201
+ }
202
+
203
+ }
204
+
205
+
206
+
207
+ public IDisposable Subscribe(IObserver<TDownstream> observer)
208
+
209
+ {
210
+
211
+ if (observer == null) throw new ArgumentNullException(nameof(observer));
212
+
213
+ Observers.Add(observer);
214
+
215
+ if (upstreamDisposable == null)
216
+
217
+ {
218
+
219
+ upstreamDisposable = Upstream?.Subscribe(this);
220
+
221
+ }
222
+
223
+ return new Disposable(() =>
224
+
225
+ {
226
+
227
+ Observers.Remove(observer);
228
+
229
+ if (Observers.Count == 0)
230
+
231
+ {
232
+
233
+ DisconnectUpstream();
234
+
235
+ }
236
+
237
+ });
238
+
239
+ }
240
+
241
+
242
+
243
+ private void DisconnectUpstream()
244
+
245
+ {
246
+
247
+ upstreamDisposable?.Dispose();
248
+
249
+ upstreamDisposable = null;
250
+
251
+ }
252
+
253
+
254
+
255
+ private class Disposable : IDisposable
256
+
257
+ {
258
+
259
+ public Disposable(Action action)
260
+
261
+ {
262
+
263
+ disposeAction = action;
264
+
265
+ }
266
+
267
+
268
+
269
+ private readonly Action disposeAction;
270
+
271
+
272
+
273
+ public void Dispose()
274
+
275
+ {
276
+
277
+ disposeAction?.Invoke();
278
+
279
+ }
280
+
281
+ }
282
+
283
+ }
284
+
285
+
286
+
51
287
  public static class AbyssExtension
52
288
 
53
289
  {
54
290
 
55
- public static IObservable<T> When<T>(
291
+ public static IObservable<T> Case<T>(
56
292
 
57
293
  this IObservable<T> upstream,
58
294
 
@@ -62,15 +298,15 @@
62
298
 
63
299
  {
64
300
 
65
- Action<T, Action<T>> onNext = (T data, Action<T> next) =>
301
+ return new Abyss<T, T>(upstream, (data, observers) =>
66
-
302
+
67
- {
303
+ {
68
-
304
+
69
- if (condition?.Invoke(data) ?? false)
305
+ if (condition?.Invoke(data) ?? false && action != null)
70
-
306
+
71
- {
307
+ {
72
-
308
+
73
- action?.Invoke(data);
309
+ action.Invoke(data);
74
310
 
75
311
  }
76
312
 
@@ -78,149 +314,17 @@
78
314
 
79
315
  {
80
316
 
81
- next?.Invoke(data);
82
-
83
- }
84
-
85
- };
86
-
87
- return new Abyss<T, T>(upstream, onNext);
88
-
89
- }
90
-
91
-
92
-
93
- private class Abyss<TUpstream, TDownstream> : IObserver<TUpstream>, IObservable<TDownstream>
94
-
95
- {
96
-
97
- public Abyss(
98
-
99
- IObservable<TUpstream> upstream,
100
-
101
- Action<TUpstream, Action<TDownstream>> onNext = null,
102
-
103
- Action<Exception, Action<Exception>> onError = null,
104
-
105
- Action<Action> onCompleted = null)
106
-
107
- {
108
-
109
- Upstream = upstream;
110
-
111
- OnNextAction = onNext;
112
-
113
- OnErrorAction = onError;
114
-
115
- OnCompletedAction = onCompleted;
116
-
117
- }
118
-
119
-
120
-
121
- private IDisposable upstreamDisposable = null;
122
-
123
-
124
-
125
- protected IObservable<TUpstream> Upstream { get; private set; }
126
-
127
- protected Action<TUpstream, Action<TDownstream>> OnNextAction { get; private set; }
128
-
129
- protected Action<Exception, Action<Exception>> OnErrorAction { get; private set; }
130
-
131
- protected Action<Action> OnCompletedAction { get; private set; }
132
-
133
-
134
-
135
- protected HashSet<IObserver<TDownstream>> Observers { get; }
136
-
137
- = new HashSet<IObserver<TDownstream>>();
138
-
139
-
140
-
141
- public void OnCompleted()
142
-
143
- {
144
-
145
- foreach (var observer in Observers)
317
+ foreach (var observer in observers)
146
-
147
- {
148
-
149
- OnCompletedAction?.Invoke(observer.OnCompleted);
150
-
151
- }
152
-
153
- }
154
-
155
-
156
-
157
- public void OnError(Exception error)
158
-
159
- {
160
-
161
- foreach (var observer in Observers)
162
-
163
- {
164
-
165
- OnErrorAction?.Invoke(error, observer.OnError);
166
-
167
- }
168
-
169
- }
170
-
171
-
172
-
173
- public void OnNext(TUpstream value)
174
-
175
- {
176
-
177
- foreach (var observer in Observers)
178
-
179
- {
180
-
181
- OnNextAction?.Invoke(value, observer.OnNext);
182
-
183
- }
184
-
185
- }
186
-
187
-
188
-
189
- public IDisposable Subscribe(IObserver<TDownstream> observer)
190
-
191
- {
192
-
193
- if (observer == null) throw new ArgumentNullException(nameof(observer));
194
-
195
- Observers.Add(observer);
196
-
197
- if (upstreamDisposable == null)
198
-
199
- {
200
-
201
- upstreamDisposable = Upstream?.Subscribe(this);
202
-
203
- }
204
-
205
- return Disposable.Create(() =>
206
-
207
- {
208
-
209
- Observers.Remove(observer);
210
-
211
- if (Observers.Count == 0)
212
318
 
213
319
  {
214
320
 
215
- upstreamDisposable?.Dispose();
216
-
217
- upstreamDisposable = null;
321
+ observer?.OnNext(data);
218
322
 
219
323
  }
220
324
 
325
+ }
326
+
221
- });
327
+ });
222
-
223
- }
224
328
 
225
329
  }
226
330
 

3

修正

2019/11/27 02:37

投稿

Zuishin
Zuishin

スコア28662

test CHANGED
@@ -194,7 +194,13 @@
194
194
 
195
195
  Observers.Add(observer);
196
196
 
197
+ if (upstreamDisposable == null)
198
+
199
+ {
200
+
197
- upstreamDisposable = Upstream?.Subscribe(this);
201
+ upstreamDisposable = Upstream?.Subscribe(this);
202
+
203
+ }
198
204
 
199
205
  return Disposable.Create(() =>
200
206
 
@@ -208,6 +214,8 @@
208
214
 
209
215
  upstreamDisposable?.Dispose();
210
216
 
217
+ upstreamDisposable = null;
218
+
211
219
  }
212
220
 
213
221
  });

2

修正

2019/11/25 11:44

投稿

Zuishin
Zuishin

スコア28662

test CHANGED
@@ -190,6 +190,8 @@
190
190
 
191
191
  {
192
192
 
193
+ if (observer == null) throw new ArgumentNullException(nameof(observer));
194
+
193
195
  Observers.Add(observer);
194
196
 
195
197
  upstreamDisposable = Upstream?.Subscribe(this);

1

修正

2019/11/25 11:36

投稿

Zuishin
Zuishin

スコア28662

test CHANGED
@@ -70,7 +70,7 @@
70
70
 
71
71
  {
72
72
 
73
- action(data);
73
+ action?.Invoke(data);
74
74
 
75
75
  }
76
76
 
@@ -78,7 +78,7 @@
78
78
 
79
79
  {
80
80
 
81
- next(data);
81
+ next?.Invoke(data);
82
82
 
83
83
  }
84
84