teratail header banner
teratail header banner
質問するログイン新規登録

質問編集履歴

2

質問変更

2021/08/14 06:21

投稿

pokemonta
pokemonta

スコア170

title CHANGED
File without changes
body CHANGED
@@ -3,35 +3,24 @@
3
3
  [リンク内容](https://recipe.kc-cloud.jp/archives/8083)
4
4
 
5
5
  コンシューマプログラムは、
6
- 以下のコマンドを実行して生成されたコマンドを実行することで動作します。
6
+ 以下のコマンドを実行して生成されたコマンドを実行することで
7
- 正常動作していよう見えます。
7
+ kinesisからデータを取り出すこと成功すのですが、logの出力失敗します。
8
8
  ```python
9
9
  python samples/amazon_kclpy_helper.py --print_command --java `which java` --properties sample.properties
10
10
  ```
11
11
  ```log
12
- 2021-08-12 07:39:31,594 [multi-lang-daemon-0000] INFO s.a.kinesis.coordinator.Scheduler [NONE] - Current stream shard assignments: shardId-000000000000
13
- 2021-08-12 07:39:31,594 [multi-lang-daemon-0000] INFO s.a.kinesis.coordinator.Scheduler [NONE] - Sleeping ...
14
- 2021-08-12 07:39:59,604 [multi-lang-daemon-0000] INFO s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=1, maximumPoolSize=2147483647)
15
- 2021-08-12 07:40:29,615 [multi-lang-daemon-0000] INFO s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=1, maximumPoolSize=2147483647)
16
- 2021-08-12 07:40:30,892 [pool-15-thread-1] INFO s.a.k.leases.LeaseCleanupManager [NONE] - Number of pending leases to clean before the scan : 0
17
- 2021-08-12 07:40:32,617 [multi-lang-daemon-0000] INFO s.a.kinesis.coordinator.Scheduler [NONE] - Current stream shard assignments: shardId-000000000000
18
- 2021-08-12 07:40:32,617 [multi-lang-daemon-0000] INFO s.a.kinesis.coordinator.Scheduler [NONE] - Sleeping ...
19
- 2021-08-12 07:40:59,626 [multi-lang-daemon-0000] INFO s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=1, maximumPoolSize=2147483647)
20
- 2021-08-12 07:41:29,644 [multi-lang-daemon-0000] INFO s.a.k.c.DiagnosticEventLogger [NONE] - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=1, largestPoolSize=1, maximumPoolSize=2147483647)
21
- 2021-08-12 07:41:30,892 [pool-15-thread-1] INFO s.a.k.leases.LeaseCleanupManager [NONE] - Number of pending leases to clean before the scan : 0
22
- 2021-08-12 07:41:30,927 [pool-14-thread-1] INFO s.a.k.c.PeriodicShardSyncManager [NONE] - WorkerId 4d81e613-5ca4-t979-95f5-027e62337193 is leader, running the periodic shard sync task
23
- 2021-08-12 07:41:30,936 [pool-14-thread-1] INFO s.a.k.c.PeriodicShardSyncManager [NONE] - Skipping shard sync for words due to the reason - Hash Ranges are complete for words
24
- 2021-08-12 07:41:33,646 [multi-lang-daemon-0000] INFO s.a.kinesis.coordinator.Scheduler [NONE] - Current stream shard assignments: shardId-000000000000
12
+ ^C2021-08-14 06:00:36,469 [multi-lang-daemon-0001] ERROR s.a.k.multilang.DrainChildSTDERRTask [NONE] - Received error line from subprocess [Record (Partition Key: cat, Sequence Number:
13
+ 49621021207286676391593542136605032695602445830552813570, Subsequence Number: 0, Data Size: 3Encountered an exception while processing records. Exception was write() argument must be str, not bytes] for shard shardId-000000000000
25
14
  ```
26
15
 
27
16
  プロデューサー側は、
28
- 以下のコマンドを実行することでkinesisへputしま
17
+ 以下のコマンドを実行することでkinesisへputするのですが
18
+ 引数の文字(dog,bird,lobster)が、バイナリとして認識されているため
19
+ エラーになっているようです。これをstrとして認識して処理するためにはどうすればよいですか?
29
20
  ```python
30
21
  sample_kinesis_wordputter.py --stream words -w cat -w dog -w bird -w lobster --region ap-northeast-1
31
22
  ```
32
- AWSコンソールからputされたことは確認できました。
33
23
 
34
- しかし、実際にkinesisからデータを取り出して処理しているように見えません。
35
24
  sample_kclpy_app.pyのprocess_record関数に
36
25
  以下のプログラムを追加しただけです。
37
26
  ```pyyhon
@@ -39,30 +28,4 @@
39
28
  with open('./test.log', mode='a') as f:
40
29
  f.write(data)
41
30
  return
42
- ```
31
+ ```
43
- もう少し範囲を拡大
44
- ```pyyhon
45
- def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
46
- """
47
- Called for each record that is passed to process_records.
48
-
49
- :param str data: The blob of data that was contained in the record.
50
- :param str partition_key: The key associated with this recod.
51
- :param int sequence_number: The sequence number associated with this record.
52
- :param int sub_sequence_number: the sub sequence number associated with this record.
53
- """
54
- ####################################
55
- # Insert your processing logic here
56
- ####################################
57
- self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
58
- .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))
59
-
60
- print(data)
61
- with open('./test.log', mode='a') as f:
62
- f.write(data)
63
- return
64
- ```
65
-
66
- ./test.logが出力されることを期待していますが、test.logができません。
67
- コンソールにもprint(data)の結果が表示されません。
68
- 対応方法をご教示願います

1

誤記訂正

2021/08/14 06:21

投稿

pokemonta
pokemonta

スコア170

title CHANGED
File without changes
body CHANGED
@@ -63,5 +63,6 @@
63
63
  return
64
64
  ```
65
65
 
66
- ./test.logが出力されることを期待しています
66
+ ./test.logが出力されることを期待していますが、test.logができません。
67
+ コンソールにもprint(data)の結果が表示されません。
67
68
  対応方法をご教示願います