質問編集履歴

7

ソース修正

2016/05/14 07:45

投稿

hossy
hossy

スコア15

test CHANGED
File without changes
test CHANGED
@@ -338,7 +338,23 @@
338
338
 
339
339
  {
340
340
 
341
+ var typeA = datas.LastOrDefault(_ => _.SensorType == SensorType.TypeA);
342
+
343
+ var typeB = datas.LastOrDefault(_ => _.SensorType == SensorType.TypeB);
344
+
345
+
346
+
347
+ Debug.WriteLine(String.Format("{0} : Event : Id=[{1}], Count=[{2}], TypeA=[{3}], typeB=[{4}]",
348
+
349
+ DateTime.Now.ToString("HH:mm:ss"),
350
+
341
- ...
351
+ grouped.Key,
352
+
353
+ grouped.Count(),
354
+
355
+ typeA?.Timestamp.ToString("HH:mm:ss"),
356
+
357
+ typeB?.Timestamp.ToString("HH:mm:ss")));
342
358
 
343
359
  }
344
360
 

6

記述補足

2016/05/14 07:45

投稿

hossy
hossy

スコア15

test CHANGED
File without changes
test CHANGED
@@ -256,7 +256,7 @@
256
256
 
257
257
 
258
258
 
259
- #内容補足
259
+ #内容補足(追加1)
260
260
 
261
261
 
262
262
 

5

「更に試してみたこと」追加

2016/05/14 07:44

投稿

hossy
hossy

スコア15

test CHANGED
File without changes
test CHANGED
@@ -16,8 +16,6 @@
16
16
 
17
17
  - この処理をReactive Extensionsで書きたい
18
18
 
19
- ここに質問したいことを詳細に書いてください
20
-
21
19
 
22
20
 
23
21
  #書いてみたソースコード
@@ -308,163 +306,41 @@
308
306
 
309
307
 
310
308
 
311
- なお、A2とB1については、boundaryストリームのイベントタイミングで発火されれば、同一のイベントで渡されてきても、別のリームで扱われてもどちらでも良いです。#やりたいことの概要
309
+ なお、A2とB1については、boundaryストリームのイベントタイミングで発火されれば、同一のイベントで渡されてきても、別のイベントで扱われてもどちらでも良いです。
312
-
313
-
314
-
315
- Reactive Extensionsの書き方に関する質問です。
310
+
316
-
317
- やりたいことの概要は以下のようなものです。
311
+
318
-
319
-
320
-
321
- - イメージとしては、複数のセンサーを持つ機器(複数台)のイベントをサンプリングするIoTのようなもの
312
+
322
-
323
- - 機器はIdを持ち、センサーはSensorTypeによって識別され、その2つをキーとするイベントがストリームとして発生する
324
-
325
- - このストリームから、一定のサンプリング間隔で、機器(Id)毎に各センサーの直近の値を取得する
326
-
327
- - この処理をReactive Extensionsで書きたい
328
-
329
- ここに質問したいことを詳細に書いてください
330
-
331
-
332
-
333
- #書いてみたソースコード
313
+ #更に試してみたこと(追加2)
334
-
335
-
336
-
314
+
315
+
316
+
337
- ##デタ構造
317
+ 更に以下のようなコドを書いてみました。
338
318
 
339
319
 
340
320
 
341
321
  ```cs
342
322
 
343
- // センサー種別
344
-
345
- public enum SensorType
346
-
347
- {
348
-
349
- TypeA,
323
+ observable
350
-
351
- TypeB
324
+
352
-
353
- }
354
-
355
-
356
-
357
- // 発生するイベントのデータ構造
358
-
359
- public class DataEvent
360
-
361
- {
362
-
363
- public int Id { get; set; }
364
-
365
-
366
-
367
- public SensorType SensorType { get; set; }
325
+ .GroupBy(_ => new { _.Id, _.SensorType })
368
-
369
-
370
-
371
- public DateTime Timestamp { get; set; }
326
+
372
-
373
- }
374
-
375
- ```
376
-
377
-
378
-
379
- ##イベントストリームの疑似
380
-
381
-
382
-
383
- ```cs
384
-
385
- // イベントストリームの疑似Observable
327
+ .Select(_ => _.Sample(timeObservable))
328
+
386
-
329
+ .Merge()
330
+
387
- var observable = Observable.Create<DataEvent>(observer =>
331
+ .Buffer(timeObservable)
388
-
389
- {
332
+
390
-
391
- int counter = 0;
333
+ .Subscribe(datas =>
392
-
393
- return new System.Threading.Timer(state =>
394
334
 
395
335
  {
396
336
 
397
- var data = new DataEvent { Id = counter / 2 % 2, SensorType = (SensorType)(counter % 2), Timestamp = DateTime.Now };
398
-
399
- Debug.WriteLine(String.Format("{0} : Data : Id=[{1}], Type=[{2}]",
400
-
401
- data.Timestamp.ToString("HH:mm:ss"),
402
-
403
- data.Id,
404
-
405
- data.Type));
406
-
407
- counter++;
408
-
409
-
410
-
411
- observer.OnNext(data);
412
-
413
- }, null, 0, 1000);
414
-
415
- });
416
-
417
- ```
418
-
419
-
420
-
421
- ##自分が作った処理
422
-
423
-
424
-
425
- ```cs
426
-
427
- // 一定期間毎に、Id単位で直近の値をサンプリング
428
-
429
- var timeObservable = Observable.Interval(TimeSpan.FromSeconds(5)).Publish().RefCount();
430
-
431
-
432
-
433
- observable
434
-
435
- .GroupBy(_ => _.Id)
337
+ foreach (var grouped in datas.GroupBy(_ => _.Id))
436
-
437
- .Subscribe(groupedObservable =>
338
+
438
-
439
- {
339
+ {
440
-
441
- groupedObservable
340
+
442
-
443
- .Buffer(timeObservable)
444
-
445
- .Subscribe(datas =>
341
+ ...
446
-
342
+
447
- {
343
+ }
448
-
449
- var typeA = datas.LastOrDefault(_ => _.SensorType == SensorType.TypeA);
450
-
451
- var typeB = datas.LastOrDefault(_ => _.SensorType == SensorType.TypeB);
452
-
453
-
454
-
455
- Debug.WriteLine(String.Format("{0} : Event : Id=[{1}], Count=[{2}], TypeA=[{3}], typeB=[{4}]",
456
-
457
- DateTime.Now.ToString("HH:mm:ss"),
458
-
459
- groupedObservable.Key,
460
-
461
- datas.Count,
462
-
463
- typeA?.Timestamp.ToString("HH:mm:ss"),
464
-
465
- typeB?.Timestamp.ToString("HH:mm:ss")));
466
-
467
- });
468
344
 
469
345
  });
470
346
 
@@ -472,150 +348,6 @@
472
348
 
473
349
 
474
350
 
475
- #実行例
476
-
477
-
478
-
479
- ##出力
480
-
481
-
482
-
483
- ```
484
-
485
- 16:57:05 : Data : Id=[0], Type=[TypeA]
486
-
487
- 16:57:06 : Data : Id=[0], Type=[TypeB]
488
-
489
- 16:57:07 : Data : Id=[1], Type=[TypeA]
490
-
491
- 16:57:08 : Data : Id=[1], Type=[TypeB]
492
-
493
- 16:57:09 : Data : Id=[0], Type=[TypeA]
494
-
495
- 16:57:10 : Event : Id=[0], Count=[3], TypeA=[16:57:09], typeB=[16:57:06]
496
-
497
- 16:57:10 : Event : Id=[1], Count=[2], TypeA=[16:57:07], typeB=[16:57:08]
498
-
499
- 16:57:10 : Data : Id=[0], Type=[TypeB]
500
-
501
- 16:57:11 : Data : Id=[1], Type=[TypeA]
502
-
503
- 16:57:12 : Data : Id=[1], Type=[TypeB]
504
-
505
- 16:57:13 : Data : Id=[0], Type=[TypeA]
506
-
507
- 16:57:14 : Data : Id=[0], Type=[TypeB]
508
-
509
- 16:57:15 : Event : Id=[0], Count=[3], TypeA=[16:57:13], typeB=[16:57:14]
510
-
511
- 16:57:15 : Event : Id=[1], Count=[2], TypeA=[16:57:11], typeB=[16:57:12]
512
-
513
- 16:57:15 : Data : Id=[1], Type=[TypeA]
514
-
515
- 16:57:16 : Data : Id=[1], Type=[TypeB]
516
-
517
- 16:57:17 : Data : Id=[0], Type=[TypeA]
518
-
519
- 16:57:18 : Data : Id=[0], Type=[TypeB]
520
-
521
- 16:57:19 : Data : Id=[1], Type=[TypeA]
522
-
523
- 16:57:20 : Event : Id=[0], Count=[2], TypeA=[16:57:17], typeB=[16:57:18]
524
-
525
- 16:57:20 : Event : Id=[1], Count=[3], TypeA=[16:57:19], typeB=[16:57:16]
526
-
527
- ```
528
-
529
-
530
-
531
- ##解説
532
-
533
-
534
-
535
- ```
536
-
537
- 1 16:57:05 : Data : Id=[0], Type=[TypeA]
538
-
539
- 2 16:57:06 : Data : Id=[0], Type=[TypeB]
540
-
541
- 3 16:57:07 : Data : Id=[1], Type=[TypeA]
542
-
543
- 4 16:57:08 : Data : Id=[1], Type=[TypeB]
544
-
545
- 5 16:57:09 : Data : Id=[0], Type=[TypeA]
546
-
547
- 6 16:57:10 : Event : Id=[0], Count=[3], TypeA=[16:57:09], typeB=[16:57:06] 5と2が直近
548
-
549
- 7 16:57:10 : Event : Id=[1], Count=[2], TypeA=[16:57:07], typeB=[16:57:08] 3と4が直近
550
-
551
- ```
552
-
553
-
554
-
555
- #質問事項
556
-
557
-
558
-
559
- あえずやりたことは出来たのですが、Reactive Extensionsを使った書き方でもっと効率の良い書き方はできしょうか?
351
+ このコードはバッファするオブジェクト自体は最小になってお、惜しいのですが致命的な問題があ認識す。
560
-
561
-
562
-
563
-
564
-
565
- 例えば、この方法だとサンプリング間隔間のデータはすべてListで保持され、Subscribe()の中でLastOrDefault()で直近の値を取得していますが、Take()等によりストリームの段階で直近の1件のみを処理するようにする等。
352
+
566
-
567
- それしてみようと思ったのですが、Take()したいのはIdSensorTypeの復号キー単位だが最終的取得したいのはId単位た、場合にSample()等をどのタ呼び出のかがわからなり、ギブアップしました。
353
+ 本来は同じのとして処理しいtimeObservableを、キー毎Sample()イベント発火間隔Buffer()で個別処理ているめ、厳密処理にならずそのめか処理だとtimeObservableが5秒間隔に対して、初回トが10秒後に発生しその後は5秒間隔発生る、っというよう動きになりました。
568
-
569
-
570
-
571
- #内容補足
572
-
573
-
574
-
575
- 自分のやりたい内容を、以下に整理してみました。
576
-
577
-
578
-
579
- ```
580
-
581
- source |-A1-A2-B1----B2----A3-A4-B3-B4---
582
-
583
- boundary |----------X-----Y--------------Z-
584
-
585
- A2 B2 A4
586
-
587
- B1 B4
588
-
589
- ```
590
-
591
-
592
-
593
- sourceストームのA、B等がキーで、boundaryストリームのイベント発生のタイミングで同じキーの最後の値のみを取得したいのです。
594
-
595
-
596
-
597
-
598
-
599
- 現状出来ている内容だと以下のような内容で、イベント発生後に1A、1C、1Dを捨てる形になるので、boundaryのイベント発生間隔が長くなると、その間sourceストリームで発生したオブジェクト全てが保持されてしまいます。
600
-
601
-
602
-
603
- ```
604
-
605
- source |-A1-A2-B1----B2----A3-A4-B3-B4---
606
-
607
- boundary |----------X-----Y--------------Z-
608
-
609
- A1 B2 A3
610
-
611
- A2 A4
612
-
613
- B1 B3
614
-
615
- B4
616
-
617
- ```
618
-
619
-
620
-
621
- なお、A2とB1については、boundaryストリームのイベントタイミングで発火されれば、同一のイベントで渡されてきても、別のイベントで扱われてもどちらでも良いです。

4

文法の修正

2016/05/14 07:43

投稿

hossy
hossy

スコア15

test CHANGED
File without changes
test CHANGED
@@ -618,4 +618,4 @@
618
618
 
619
619
 
620
620
 
621
- なお、A2とB1については、boundaryストリームのイベントタイミングで発火されれば、同一のイベントで渡されてきても、別のリームで扱われてもどちらでも良いです。
621
+ なお、A2とB1については、boundaryストリームのイベントタイミングで発火されれば、同一のイベントで渡されてきても、別のイベントで扱われてもどちらでも良いです。

3

サンプル誤り修正

2016/05/14 05:14

投稿

hossy
hossy

スコア15

test CHANGED
File without changes
test CHANGED
@@ -292,17 +292,327 @@
292
292
 
293
293
  ```
294
294
 
295
- source |-1A-2A-1B----2B----1C-1D-2C-2D---
295
+ source |-A1-A2-B1----B2----A3-A4-B3-B4---
296
296
 
297
297
  boundary |----------X-----Y--------------Z-
298
298
 
299
- 1A 2B 1C
299
+ A1 B2 A3
300
-
300
+
301
- 2A 1D
301
+ A2 A4
302
-
302
+
303
- 1B 2C
303
+ B1 B3
304
-
304
+
305
- 2D
305
+ B4
306
+
307
+ ```
308
+
309
+
310
+
311
+ なお、A2とB1については、boundaryストリームのイベントタイミングで発火されれば、同一のイベントで渡されてきても、別のストリームで扱われてもどちらでも良いです。#やりたいことの概要
312
+
313
+
314
+
315
+ Reactive Extensionsの書き方に関する質問です。
316
+
317
+ やりたいことの概要は以下のようなものです。
318
+
319
+
320
+
321
+ - イメージとしては、複数のセンサーを持つ機器(複数台)のイベントをサンプリングするIoTのようなもの
322
+
323
+ - 機器はIdを持ち、センサーはSensorTypeによって識別され、その2つをキーとするイベントがストリームとして発生する
324
+
325
+ - このストリームから、一定のサンプリング間隔で、機器(Id)毎に各センサーの直近の値を取得する
326
+
327
+ - この処理をReactive Extensionsで書きたい
328
+
329
+ ここに質問したいことを詳細に書いてください
330
+
331
+
332
+
333
+ #書いてみたソースコード
334
+
335
+
336
+
337
+ ##データ構造
338
+
339
+
340
+
341
+ ```cs
342
+
343
+ // センサー種別
344
+
345
+ public enum SensorType
346
+
347
+ {
348
+
349
+ TypeA,
350
+
351
+ TypeB
352
+
353
+ }
354
+
355
+
356
+
357
+ // 発生するイベントのデータ構造
358
+
359
+ public class DataEvent
360
+
361
+ {
362
+
363
+ public int Id { get; set; }
364
+
365
+
366
+
367
+ public SensorType SensorType { get; set; }
368
+
369
+
370
+
371
+ public DateTime Timestamp { get; set; }
372
+
373
+ }
374
+
375
+ ```
376
+
377
+
378
+
379
+ ##イベントストリームの疑似
380
+
381
+
382
+
383
+ ```cs
384
+
385
+ // イベントストリームの疑似Observable
386
+
387
+ var observable = Observable.Create<DataEvent>(observer =>
388
+
389
+ {
390
+
391
+ int counter = 0;
392
+
393
+ return new System.Threading.Timer(state =>
394
+
395
+ {
396
+
397
+ var data = new DataEvent { Id = counter / 2 % 2, SensorType = (SensorType)(counter % 2), Timestamp = DateTime.Now };
398
+
399
+ Debug.WriteLine(String.Format("{0} : Data : Id=[{1}], Type=[{2}]",
400
+
401
+ data.Timestamp.ToString("HH:mm:ss"),
402
+
403
+ data.Id,
404
+
405
+ data.Type));
406
+
407
+ counter++;
408
+
409
+
410
+
411
+ observer.OnNext(data);
412
+
413
+ }, null, 0, 1000);
414
+
415
+ });
416
+
417
+ ```
418
+
419
+
420
+
421
+ ##自分が作った処理
422
+
423
+
424
+
425
+ ```cs
426
+
427
+ // 一定期間毎に、Id単位で直近の値をサンプリング
428
+
429
+ var timeObservable = Observable.Interval(TimeSpan.FromSeconds(5)).Publish().RefCount();
430
+
431
+
432
+
433
+ observable
434
+
435
+ .GroupBy(_ => _.Id)
436
+
437
+ .Subscribe(groupedObservable =>
438
+
439
+ {
440
+
441
+ groupedObservable
442
+
443
+ .Buffer(timeObservable)
444
+
445
+ .Subscribe(datas =>
446
+
447
+ {
448
+
449
+ var typeA = datas.LastOrDefault(_ => _.SensorType == SensorType.TypeA);
450
+
451
+ var typeB = datas.LastOrDefault(_ => _.SensorType == SensorType.TypeB);
452
+
453
+
454
+
455
+ Debug.WriteLine(String.Format("{0} : Event : Id=[{1}], Count=[{2}], TypeA=[{3}], typeB=[{4}]",
456
+
457
+ DateTime.Now.ToString("HH:mm:ss"),
458
+
459
+ groupedObservable.Key,
460
+
461
+ datas.Count,
462
+
463
+ typeA?.Timestamp.ToString("HH:mm:ss"),
464
+
465
+ typeB?.Timestamp.ToString("HH:mm:ss")));
466
+
467
+ });
468
+
469
+ });
470
+
471
+ ```
472
+
473
+
474
+
475
+ #実行例
476
+
477
+
478
+
479
+ ##出力
480
+
481
+
482
+
483
+ ```
484
+
485
+ 16:57:05 : Data : Id=[0], Type=[TypeA]
486
+
487
+ 16:57:06 : Data : Id=[0], Type=[TypeB]
488
+
489
+ 16:57:07 : Data : Id=[1], Type=[TypeA]
490
+
491
+ 16:57:08 : Data : Id=[1], Type=[TypeB]
492
+
493
+ 16:57:09 : Data : Id=[0], Type=[TypeA]
494
+
495
+ 16:57:10 : Event : Id=[0], Count=[3], TypeA=[16:57:09], typeB=[16:57:06]
496
+
497
+ 16:57:10 : Event : Id=[1], Count=[2], TypeA=[16:57:07], typeB=[16:57:08]
498
+
499
+ 16:57:10 : Data : Id=[0], Type=[TypeB]
500
+
501
+ 16:57:11 : Data : Id=[1], Type=[TypeA]
502
+
503
+ 16:57:12 : Data : Id=[1], Type=[TypeB]
504
+
505
+ 16:57:13 : Data : Id=[0], Type=[TypeA]
506
+
507
+ 16:57:14 : Data : Id=[0], Type=[TypeB]
508
+
509
+ 16:57:15 : Event : Id=[0], Count=[3], TypeA=[16:57:13], typeB=[16:57:14]
510
+
511
+ 16:57:15 : Event : Id=[1], Count=[2], TypeA=[16:57:11], typeB=[16:57:12]
512
+
513
+ 16:57:15 : Data : Id=[1], Type=[TypeA]
514
+
515
+ 16:57:16 : Data : Id=[1], Type=[TypeB]
516
+
517
+ 16:57:17 : Data : Id=[0], Type=[TypeA]
518
+
519
+ 16:57:18 : Data : Id=[0], Type=[TypeB]
520
+
521
+ 16:57:19 : Data : Id=[1], Type=[TypeA]
522
+
523
+ 16:57:20 : Event : Id=[0], Count=[2], TypeA=[16:57:17], typeB=[16:57:18]
524
+
525
+ 16:57:20 : Event : Id=[1], Count=[3], TypeA=[16:57:19], typeB=[16:57:16]
526
+
527
+ ```
528
+
529
+
530
+
531
+ ##解説
532
+
533
+
534
+
535
+ ```
536
+
537
+ 1 16:57:05 : Data : Id=[0], Type=[TypeA]
538
+
539
+ 2 16:57:06 : Data : Id=[0], Type=[TypeB]
540
+
541
+ 3 16:57:07 : Data : Id=[1], Type=[TypeA]
542
+
543
+ 4 16:57:08 : Data : Id=[1], Type=[TypeB]
544
+
545
+ 5 16:57:09 : Data : Id=[0], Type=[TypeA]
546
+
547
+ 6 16:57:10 : Event : Id=[0], Count=[3], TypeA=[16:57:09], typeB=[16:57:06] 5と2が直近
548
+
549
+ 7 16:57:10 : Event : Id=[1], Count=[2], TypeA=[16:57:07], typeB=[16:57:08] 3と4が直近
550
+
551
+ ```
552
+
553
+
554
+
555
+ #質問事項
556
+
557
+
558
+
559
+ とりあえずやりたいことは出来たのですが、Reactive Extensionsを使った書き方でもっと効率の良い書き方はできるのでしょうか?
560
+
561
+
562
+
563
+
564
+
565
+ 例えば、この方法だとサンプリング間隔間のデータはすべてListで保持され、Subscribe()の中でLastOrDefault()で直近の値を取得していますが、Take()等によりストリームの段階で直近の1件のみを処理するようにする等。
566
+
567
+ それも試してみようと思ったのですが、Take()したいのはIdとSensorTypeの復号キー単位だが最終的に取得したいのはId単位な点、また、その場合にSample()等をどのタイミングで呼び出すのかがわからなくなり、ギブアップしました。
568
+
569
+
570
+
571
+ #内容補足
572
+
573
+
574
+
575
+ 自分のやりたい内容を、以下に整理してみました。
576
+
577
+
578
+
579
+ ```
580
+
581
+ source |-A1-A2-B1----B2----A3-A4-B3-B4---
582
+
583
+ boundary |----------X-----Y--------------Z-
584
+
585
+ A2 B2 A4
586
+
587
+ B1 B4
588
+
589
+ ```
590
+
591
+
592
+
593
+ sourceストームのA、B等がキーで、boundaryストリームのイベント発生のタイミングで同じキーの最後の値のみを取得したいのです。
594
+
595
+
596
+
597
+
598
+
599
+ 現状出来ている内容だと以下のような内容で、イベント発生後に1A、1C、1Dを捨てる形になるので、boundaryのイベント発生間隔が長くなると、その間sourceストリームで発生したオブジェクト全てが保持されてしまいます。
600
+
601
+
602
+
603
+ ```
604
+
605
+ source |-A1-A2-B1----B2----A3-A4-B3-B4---
606
+
607
+ boundary |----------X-----Y--------------Z-
608
+
609
+ A1 B2 A3
610
+
611
+ A2 A4
612
+
613
+ B1 B3
614
+
615
+ B4
306
616
 
307
617
  ```
308
618
 

2

補足の内容をわかりやすく修正

2016/05/14 05:13

投稿

hossy
hossy

スコア15

test CHANGED
File without changes
test CHANGED
@@ -270,7 +270,7 @@
270
270
 
271
271
  source |-A1-A2-B1----B2----A3-A4-B3-B4---
272
272
 
273
- boundary |----------0-----1--------------3-
273
+ boundary |----------X-----Y--------------Z-
274
274
 
275
275
  A2 B2 A4
276
276
 
@@ -294,7 +294,7 @@
294
294
 
295
295
  source |-1A-2A-1B----2B----1C-1D-2C-2D---
296
296
 
297
- boundary |----------0-----1--------------3-
297
+ boundary |----------X-----Y--------------Z-
298
298
 
299
299
  1A 2B 1C
300
300
 

1

内容補足追加

2016/05/14 05:11

投稿

hossy
hossy

スコア15

test CHANGED
File without changes
test CHANGED
@@ -255,3 +255,57 @@
255
255
  例えば、この方法だとサンプリング間隔間のデータはすべてListで保持され、Subscribe()の中でLastOrDefault()で直近の値を取得していますが、Take()等によりストリームの段階で直近の1件のみを処理するようにする等。
256
256
 
257
257
  それも試してみようと思ったのですが、Take()したいのはIdとSensorTypeの復号キー単位だが最終的に取得したいのはId単位な点、また、その場合にSample()等をどのタイミングで呼び出すのかがわからなくなり、ギブアップしました。
258
+
259
+
260
+
261
+ #内容補足
262
+
263
+
264
+
265
+ 自分のやりたい内容を、以下に整理してみました。
266
+
267
+
268
+
269
+ ```
270
+
271
+ source |-A1-A2-B1----B2----A3-A4-B3-B4---
272
+
273
+ boundary |----------0-----1--------------3-
274
+
275
+ A2 B2 A4
276
+
277
+ B1 B4
278
+
279
+ ```
280
+
281
+
282
+
283
+ sourceストームのA、B等がキーで、boundaryストリームのイベント発生のタイミングで同じキーの最後の値のみを取得したいのです。
284
+
285
+
286
+
287
+
288
+
289
+ 現状出来ている内容だと以下のような内容で、イベント発生後に1A、1C、1Dを捨てる形になるので、boundaryのイベント発生間隔が長くなると、その間sourceストリームで発生したオブジェクト全てが保持されてしまいます。
290
+
291
+
292
+
293
+ ```
294
+
295
+ source |-1A-2A-1B----2B----1C-1D-2C-2D---
296
+
297
+ boundary |----------0-----1--------------3-
298
+
299
+ 1A 2B 1C
300
+
301
+ 2A 1D
302
+
303
+ 1B 2C
304
+
305
+ 2D
306
+
307
+ ```
308
+
309
+
310
+
311
+ なお、A2とB1については、boundaryストリームのイベントタイミングで発火されれば、同一のイベントで渡されてきても、別のストリームで扱われてもどちらでも良いです。