質問編集履歴
2
質問変更
test
CHANGED
File without changes
|
test
CHANGED
@@ -8,9 +8,9 @@
|
|
8
8
|
|
9
9
|
コンシューマプログラムは、
|
10
10
|
|
11
|
-
以下のコマンドを実行して生成されたコマンドを実行することで
|
11
|
+
以下のコマンドを実行して生成されたコマンドを実行することで
|
12
12
|
|
13
|
-
|
13
|
+
kinesisからデータを取り出すことに成功するのですが、logの出力に失敗します。
|
14
14
|
|
15
15
|
```python
|
16
16
|
|
@@ -20,31 +20,9 @@
|
|
20
20
|
|
21
21
|
```log
|
22
22
|
|
23
|
-
2021-08-1
|
23
|
+
^C2021-08-14 06:00:36,469 [multi-lang-daemon-0001] ERROR s.a.k.multilang.DrainChildSTDERRTask [NONE] - Received error line from subprocess [Record (Partition Key: cat, Sequence Number:
|
24
24
|
|
25
|
-
2021
|
26
|
-
|
27
|
-
2021-08-12 07:39:59,604 [multi-lang-daemon-0000] INFO s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=1, maximumPoolSize=2147483647)
|
28
|
-
|
29
|
-
2021-08-12 07:40:29,615 [multi-lang-daemon-0000] INFO s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=1, maximumPoolSize=2147483647)
|
30
|
-
|
31
|
-
2021-08-12 07:40:30,892 [pool-15-thread-1] INFO s.a.k.leases.LeaseCleanupManager [NONE] - Number of pending leases to clean before the scan : 0
|
32
|
-
|
33
|
-
2021-08-12 07:40:32,617 [multi-lang-daemon-0000] INFO s.a.kinesis.coordinator.Scheduler [NONE] - Current stream shard assignments: shardId-000000000000
|
34
|
-
|
35
|
-
2021-08-12 07:40:32,617 [multi-lang-daemon-0000] INFO s.a.kinesis.coordinator.Scheduler [NONE] - Sleeping ...
|
36
|
-
|
37
|
-
2021-08-12 07:40:59,626 [multi-lang-daemon-0000] INFO s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=1, maximumPoolSize=2147483647)
|
38
|
-
|
39
|
-
2021-08-12 07:41:29,644 [multi-lang-daemon-0000] INFO s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=1, maximumPoolSize=2147483647)
|
40
|
-
|
41
|
-
2021-08-12 07:41:30,892 [pool-15-thread-1] INFO s.a.k.leases.LeaseCleanupManager [NONE] - Number of pending leases to clean before the scan : 0
|
42
|
-
|
43
|
-
2021-08-12 07:41:30,927 [pool-14-thread-1] INFO s.a.k.c.PeriodicShardSyncManager [NONE] - WorkerId 4d81e613-5ca4-t979-95f5-027e62337193 is leader, running the periodic shard sync task
|
44
|
-
|
45
|
-
2021-08-12 07:41:30,936 [pool-14-thread-1] INFO s.a.k.c.PeriodicShardSyncManager [NONE] - Skipping shard sync for words due to the reason - Hash Ranges are complete for words
|
46
|
-
|
47
|
-
2021-08-12 07:41:33,646 [multi-lang-daemon-0000] INFO s.a.kinesis.coordinator.Scheduler [NONE] - Current stream shard assignments: shardId-000000000000
|
25
|
+
49621021207286676391593542136605032695602445830552813570, Subsequence Number: 0, Data Size: 3Encountered an exception while processing records. Exception was write() argument must be str, not bytes] for shard shardId-000000000000
|
48
26
|
|
49
27
|
```
|
50
28
|
|
@@ -52,7 +30,11 @@
|
|
52
30
|
|
53
31
|
プロデューサー側は、
|
54
32
|
|
55
|
-
以下のコマンドを実行することでkinesisへput
|
33
|
+
以下のコマンドを実行することでkinesisへputするのですが
|
34
|
+
|
35
|
+
引数の文字(dog,bird,lobster)が、バイナリとして認識されているため
|
36
|
+
|
37
|
+
エラーになっているようです。これをstrとして認識して処理するためにはどうすればよいですか?
|
56
38
|
|
57
39
|
```python
|
58
40
|
|
@@ -60,11 +42,7 @@
|
|
60
42
|
|
61
43
|
```
|
62
44
|
|
63
|
-
AWSコンソールからputされたことは確認できました。
|
64
45
|
|
65
|
-
|
66
|
-
|
67
|
-
しかし、実際にkinesisからデータを取り出して処理しているように見えません。
|
68
46
|
|
69
47
|
sample_kclpy_app.pyのprocess_record関数に
|
70
48
|
|
@@ -81,55 +59,3 @@
|
|
81
59
|
return
|
82
60
|
|
83
61
|
```
|
84
|
-
|
85
|
-
もう少し範囲を拡大
|
86
|
-
|
87
|
-
```pyyhon
|
88
|
-
|
89
|
-
def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
|
90
|
-
|
91
|
-
"""
|
92
|
-
|
93
|
-
Called for each record that is passed to process_records.
|
94
|
-
|
95
|
-
|
96
|
-
|
97
|
-
:param str data: The blob of data that was contained in the record.
|
98
|
-
|
99
|
-
:param str partition_key: The key associated with this recod.
|
100
|
-
|
101
|
-
:param int sequence_number: The sequence number associated with this record.
|
102
|
-
|
103
|
-
:param int sub_sequence_number: the sub sequence number associated with this record.
|
104
|
-
|
105
|
-
"""
|
106
|
-
|
107
|
-
####################################
|
108
|
-
|
109
|
-
# Insert your processing logic here
|
110
|
-
|
111
|
-
####################################
|
112
|
-
|
113
|
-
self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
|
114
|
-
|
115
|
-
.format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))
|
116
|
-
|
117
|
-
|
118
|
-
|
119
|
-
print(data)
|
120
|
-
|
121
|
-
with open('./test.log', mode='a') as f:
|
122
|
-
|
123
|
-
f.write(data)
|
124
|
-
|
125
|
-
return
|
126
|
-
|
127
|
-
```
|
128
|
-
|
129
|
-
|
130
|
-
|
131
|
-
./test.logが出力されることを期待していますが、test.logができません。
|
132
|
-
|
133
|
-
コンソールにもprint(data)の結果が表示されません。
|
134
|
-
|
135
|
-
対応方法をご教示願います
|
1
誤記訂正
test
CHANGED
File without changes
|
test
CHANGED
@@ -128,6 +128,8 @@
|
|
128
128
|
|
129
129
|
|
130
130
|
|
131
|
-
./test.logが出力されることを期待しています
|
131
|
+
./test.logが出力されることを期待していますが、test.logができません。
|
132
|
+
|
133
|
+
コンソールにもprint(data)の結果が表示されません。
|
132
134
|
|
133
135
|
対応方法をご教示願います
|