質問編集履歴

2

質問変更

2021/08/14 06:21

投稿

pokemonta
pokemonta

スコア170

test CHANGED
File without changes
test CHANGED
@@ -8,9 +8,9 @@
8
8
 
9
9
  コンシューマプログラムは、
10
10
 
11
- 以下のコマンドを実行して生成されたコマンドを実行することで動作します。
11
+ 以下のコマンドを実行して生成されたコマンドを実行することで
12
12
 
13
- 正常動作していよう見えます。
13
+ kinesisからデータを取り出すこと成功すのですが、logの出力失敗します。
14
14
 
15
15
  ```python
16
16
 
@@ -20,31 +20,9 @@
20
20
 
21
21
  ```log
22
22
 
23
- 2021-08-12 07:39:31,594 [multi-lang-daemon-0000] INFO s.a.kinesis.coordinator.Scheduler [NONE] - Current stream shard assignments: shardId-000000000000
23
+ ^C2021-08-14 06:00:36,469 [multi-lang-daemon-0001] ERROR s.a.k.multilang.DrainChildSTDERRTask [NONE] - Received error line from subprocess [Record (Partition Key: cat, Sequence Number:
24
24
 
25
- 2021-08-12 07:39:31,594 [multi-lang-daemon-0000] INFO s.a.kinesis.coordinator.Scheduler [NONE] - Sleeping ...
26
-
27
- 2021-08-12 07:39:59,604 [multi-lang-daemon-0000] INFO s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=1, maximumPoolSize=2147483647)
28
-
29
- 2021-08-12 07:40:29,615 [multi-lang-daemon-0000] INFO s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=1, maximumPoolSize=2147483647)
30
-
31
- 2021-08-12 07:40:30,892 [pool-15-thread-1] INFO s.a.k.leases.LeaseCleanupManager [NONE] - Number of pending leases to clean before the scan : 0
32
-
33
- 2021-08-12 07:40:32,617 [multi-lang-daemon-0000] INFO s.a.kinesis.coordinator.Scheduler [NONE] - Current stream shard assignments: shardId-000000000000
34
-
35
- 2021-08-12 07:40:32,617 [multi-lang-daemon-0000] INFO s.a.kinesis.coordinator.Scheduler [NONE] - Sleeping ...
36
-
37
- 2021-08-12 07:40:59,626 [multi-lang-daemon-0000] INFO s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=1, maximumPoolSize=2147483647)
38
-
39
- 2021-08-12 07:41:29,644 [multi-lang-daemon-0000] INFO s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=1, maximumPoolSize=2147483647)
40
-
41
- 2021-08-12 07:41:30,892 [pool-15-thread-1] INFO s.a.k.leases.LeaseCleanupManager [NONE] - Number of pending leases to clean before the scan : 0
42
-
43
- 2021-08-12 07:41:30,927 [pool-14-thread-1] INFO s.a.k.c.PeriodicShardSyncManager [NONE] - WorkerId 4d81e613-5ca4-t979-95f5-027e62337193 is leader, running the periodic shard sync task
44
-
45
- 2021-08-12 07:41:30,936 [pool-14-thread-1] INFO s.a.k.c.PeriodicShardSyncManager [NONE] - Skipping shard sync for words due to the reason - Hash Ranges are complete for words
46
-
47
- 2021-08-12 07:41:33,646 [multi-lang-daemon-0000] INFO s.a.kinesis.coordinator.Scheduler [NONE] - Current stream shard assignments: shardId-000000000000
25
+ 49621021207286676391593542136605032695602445830552813570, Subsequence Number: 0, Data Size: 3Encountered an exception while processing records. Exception was write() argument must be str, not bytes] for shard shardId-000000000000
48
26
 
49
27
  ```
50
28
 
@@ -52,7 +30,11 @@
52
30
 
53
31
  プロデューサー側は、
54
32
 
55
- 以下のコマンドを実行することでkinesisへputしま
33
+ 以下のコマンドを実行することでkinesisへputするのですが
34
+
35
+ 引数の文字(dog,bird,lobster)が、バイナリとして認識されているため
36
+
37
+ エラーになっているようです。これをstrとして認識して処理するためにはどうすればよいですか?
56
38
 
57
39
  ```python
58
40
 
@@ -60,11 +42,7 @@
60
42
 
61
43
  ```
62
44
 
63
- AWSコンソールからputされたことは確認できました。
64
45
 
65
-
66
-
67
- しかし、実際にkinesisからデータを取り出して処理しているように見えません。
68
46
 
69
47
  sample_kclpy_app.pyのprocess_record関数に
70
48
 
@@ -81,55 +59,3 @@
81
59
  return
82
60
 
83
61
  ```
84
-
85
- もう少し範囲を拡大
86
-
87
- ```pyyhon
88
-
89
- def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
90
-
91
- """
92
-
93
- Called for each record that is passed to process_records.
94
-
95
-
96
-
97
- :param str data: The blob of data that was contained in the record.
98
-
99
- :param str partition_key: The key associated with this recod.
100
-
101
- :param int sequence_number: The sequence number associated with this record.
102
-
103
- :param int sub_sequence_number: the sub sequence number associated with this record.
104
-
105
- """
106
-
107
- ####################################
108
-
109
- # Insert your processing logic here
110
-
111
- ####################################
112
-
113
- self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
114
-
115
- .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))
116
-
117
-
118
-
119
- print(data)
120
-
121
- with open('./test.log', mode='a') as f:
122
-
123
- f.write(data)
124
-
125
- return
126
-
127
- ```
128
-
129
-
130
-
131
- ./test.logが出力されることを期待していますが、test.logができません。
132
-
133
- コンソールにもprint(data)の結果が表示されません。
134
-
135
- 対応方法をご教示願います

1

誤記訂正

2021/08/14 06:21

投稿

pokemonta
pokemonta

スコア170

test CHANGED
File without changes
test CHANGED
@@ -128,6 +128,8 @@
128
128
 
129
129
 
130
130
 
131
- ./test.logが出力されることを期待しています
131
+ ./test.logが出力されることを期待していますが、test.logができません。
132
+
133
+ コンソールにもprint(data)の結果が表示されません。
132
134
 
133
135
  対応方法をご教示願います