KafkaからStormにデータを流すものを作りたいのですが、Storm側でKafkaから送られてきたデータをDeserializeする方法がわからなかったため、質問させて頂きました。
ここを参考に、KafkaSpoutを作成し、kafka-console-producer から送られてきたデータを受取り、StringScheme()を使って KafkaSpout() から emit されているところまでは確認することが出来ました。
しかし、 kafka connect jdbc を使って sqlite からデータを送る際に、送られてきたデータが単純な String でないため、 StringScheme() では正しくDeserializeすることが出来ませんでした。
こういう場合は、 Scheme を自分で作るというように書いてあったのですが、 Scheme の deserialize メソッドで byteBuffer を読み込む方法がわかりませんでした。
java初心者なので、初歩的な質問かもしれませんが、ご回答よろしくお願いします!!!
sql
1create table example( 2 id int primary key autoincrement not null, 3 text varchar 4);
java
1 public static class SpoutScheme implements Scheme { 2 private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8; 3 4 public List<Object> deserialize(ByteBuffer bytes) { 5 // ここの bytes を OutputFields に合うようにパースしたいです! 6 return new Values(deserializeString(bytes)); 7 } 8 9 public static String deserializeString(ByteBuffer string) { 10 if (string.hasArray()) { 11 int base = string.arrayOffset(); 12 return new String(string.array(), base + string.position(), string.remaining()); 13 } else { 14 return new String(Utils.toByteArray(string), UTF8_CHARSET); 15 } 16 } 17 18 public Fields getOutputFields() { 19 return new Fields("id", "url"); 20 }
環境
centos 6
java 1.8
storm 1.0.2
kafka_2.10
あなたの回答
tips
プレビュー