質問編集履歴
2
質問変更
title
CHANGED
File without changes
|
body
CHANGED
@@ -3,35 +3,24 @@
|
|
3
3
|
[リンク内容](https://recipe.kc-cloud.jp/archives/8083)
|
4
4
|
|
5
5
|
コンシューマプログラムは、
|
6
|
-
以下のコマンドを実行して生成されたコマンドを実行することで
|
6
|
+
以下のコマンドを実行して生成されたコマンドを実行することで
|
7
|
-
|
7
|
+
kinesisからデータを取り出すことに成功するのですが、logの出力に失敗します。
|
8
8
|
```python
|
9
9
|
python samples/amazon_kclpy_helper.py --print_command --java `which java` --properties sample.properties
|
10
10
|
```
|
11
11
|
```log
|
12
|
-
|
13
|
-
|
14
|
-
2021-08-12 07:39:59,604 [multi-lang-daemon-0000] INFO s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=1, maximumPoolSize=2147483647)
|
15
|
-
2021-08-12 07:40:29,615 [multi-lang-daemon-0000] INFO s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=1, maximumPoolSize=2147483647)
|
16
|
-
2021-08-12 07:40:30,892 [pool-15-thread-1] INFO s.a.k.leases.LeaseCleanupManager [NONE] - Number of pending leases to clean before the scan : 0
|
17
|
-
2021-08-12 07:40:32,617 [multi-lang-daemon-0000] INFO s.a.kinesis.coordinator.Scheduler [NONE] - Current stream shard assignments: shardId-000000000000
|
18
|
-
2021-08-12 07:40:32,617 [multi-lang-daemon-0000] INFO s.a.kinesis.coordinator.Scheduler [NONE] - Sleeping ...
|
19
|
-
2021-08-12 07:40:59,626 [multi-lang-daemon-0000] INFO s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=1, maximumPoolSize=2147483647)
|
20
|
-
2021-08-12 07:41:29,644 [multi-lang-daemon-0000] INFO s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=1, maximumPoolSize=2147483647)
|
21
|
-
2021-08-12 07:41:30,892 [pool-15-thread-1] INFO s.a.k.leases.LeaseCleanupManager [NONE] - Number of pending leases to clean before the scan : 0
|
22
|
-
2021-08-12 07:41:30,927 [pool-14-thread-1] INFO s.a.k.c.PeriodicShardSyncManager [NONE] - WorkerId 4d81e613-5ca4-t979-95f5-027e62337193 is leader, running the periodic shard sync task
|
23
|
-
2021-08-12 07:41:30,936 [pool-14-thread-1] INFO s.a.k.c.PeriodicShardSyncManager [NONE] - Skipping shard sync for words due to the reason - Hash Ranges are complete for words
|
24
|
-
2021-08-12 07:41:33,646 [multi-lang-daemon-0000] INFO s.a.kinesis.coordinator.Scheduler [NONE] - Current stream shard assignments: shardId-000000000000
|
12
|
+
^C2021-08-14 06:00:36,469 [multi-lang-daemon-0001] ERROR s.a.k.multilang.DrainChildSTDERRTask [NONE] - Received error line from subprocess [Record (Partition Key: cat, Sequence Number:
|
13
|
+
49621021207286676391593542136605032695602445830552813570, Subsequence Number: 0, Data Size: 3Encountered an exception while processing records. Exception was write() argument must be str, not bytes] for shard shardId-000000000000
|
25
14
|
```
|
26
15
|
|
27
16
|
プロデューサー側は、
|
28
|
-
以下のコマンドを実行することでkinesisへput
|
17
|
+
以下のコマンドを実行することでkinesisへputするのですが
|
18
|
+
引数の文字(dog,bird,lobster)が、バイナリとして認識されているため
|
19
|
+
エラーになっているようです。これをstrとして認識して処理するためにはどうすればよいですか?
|
29
20
|
```python
|
30
21
|
sample_kinesis_wordputter.py --stream words -w cat -w dog -w bird -w lobster --region ap-northeast-1
|
31
22
|
```
|
32
|
-
AWSコンソールからputされたことは確認できました。
|
33
23
|
|
34
|
-
しかし、実際にkinesisからデータを取り出して処理しているように見えません。
|
35
24
|
sample_kclpy_app.pyのprocess_record関数に
|
36
25
|
以下のプログラムを追加しただけです。
|
37
26
|
```pyyhon
|
@@ -39,30 +28,4 @@
|
|
39
28
|
with open('./test.log', mode='a') as f:
|
40
29
|
f.write(data)
|
41
30
|
return
|
42
|
-
```
|
31
|
+
```
|
43
|
-
もう少し範囲を拡大
|
44
|
-
```pyyhon
|
45
|
-
def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
|
46
|
-
"""
|
47
|
-
Called for each record that is passed to process_records.
|
48
|
-
|
49
|
-
:param str data: The blob of data that was contained in the record.
|
50
|
-
:param str partition_key: The key associated with this recod.
|
51
|
-
:param int sequence_number: The sequence number associated with this record.
|
52
|
-
:param int sub_sequence_number: the sub sequence number associated with this record.
|
53
|
-
"""
|
54
|
-
####################################
|
55
|
-
# Insert your processing logic here
|
56
|
-
####################################
|
57
|
-
self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
|
58
|
-
.format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))
|
59
|
-
|
60
|
-
print(data)
|
61
|
-
with open('./test.log', mode='a') as f:
|
62
|
-
f.write(data)
|
63
|
-
return
|
64
|
-
```
|
65
|
-
|
66
|
-
./test.logが出力されることを期待していますが、test.logができません。
|
67
|
-
コンソールにもprint(data)の結果が表示されません。
|
68
|
-
対応方法をご教示願います
|
1
誤記訂正
title
CHANGED
File without changes
|
body
CHANGED
@@ -63,5 +63,6 @@
|
|
63
63
|
return
|
64
64
|
```
|
65
65
|
|
66
|
-
./test.logが出力されることを期待しています
|
66
|
+
./test.logが出力されることを期待していますが、test.logができません。
|
67
|
+
コンソールにもprint(data)の結果が表示されません。
|
67
68
|
対応方法をご教示願います
|