質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.50%
Java

Javaは、1995年にサン・マイクロシステムズが開発したプログラミング言語です。表記法はC言語に似ていますが、既存のプログラミング言語の短所を踏まえていちから設計されており、最初からオブジェクト指向性を備えてデザインされています。セキュリティ面が強力であることや、ネットワーク環境での利用に向いていることが特徴です。Javaで作られたソフトウェアは基本的にいかなるプラットフォームでも作動します。

Spark

Spark(Apache Spark)とは、膨大なデータを迅速、柔軟に分散並行処理を行うフレームワークです。分析ツールであるApache Hadoopと比較し、最大で100倍の速度でデータ処理ができるとされています。

Spring Boot

Spring Bootは、Javaのフレームワークの一つ。Springプロジェクトが提供する様々なフレームワークを統合した、アプリケーションを高速で開発するために設計されたフレームワークです。

Q&A

1回答

4636閲覧

Apache Spark(Java) + Spring Boot構成で、Sparkアクション実行時にDIできない

guzzle

総合スコア43

Java

Javaは、1995年にサン・マイクロシステムズが開発したプログラミング言語です。表記法はC言語に似ていますが、既存のプログラミング言語の短所を踏まえていちから設計されており、最初からオブジェクト指向性を備えてデザインされています。セキュリティ面が強力であることや、ネットワーク環境での利用に向いていることが特徴です。Javaで作られたソフトウェアは基本的にいかなるプラットフォームでも作動します。

Spark

Spark(Apache Spark)とは、膨大なデータを迅速、柔軟に分散並行処理を行うフレームワークです。分析ツールであるApache Hadoopと比較し、最大で100倍の速度でデータ処理ができるとされています。

Spring Boot

Spring Bootは、Javaのフレームワークの一つ。Springプロジェクトが提供する様々なフレームワークを統合した、アプリケーションを高速で開発するために設計されたフレームワークです。

0グッド

2クリップ

投稿2016/07/01 05:30

編集2022/01/12 10:55

###前提・実現したいこと
Apache Spark(Java) + Spring Boot構成で、バッチアプリを作成しています。
Sparkのアクション(foreachなど)実行の際、別オブジェクトをDIしようとすると、
Task not serializableエラーが発生します。
DIせずに普通にnewした場合は動作する事は確認済みなのですが、
どのようにすれば普通にDIできるのか現在分かっていません。

**サンプルコードをgithubにUPしました。**Java8が入ってれば動作すると思います。
https://github.com/guzzle0527/spark-spring-sample

###発生している問題・エラーメッセージ

Caused by: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.rdd.RDD.foreach(RDD.scala:910) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:332) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:46) ~[spark-core_2.10-1.6.0.jar:1.6.0] at sample.HelloWorld.run(HelloWorld.java:29) [classes/:na] at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804) [spring-boot-1.3.1.RELEASE.jar:1.3.1.RELEASE] ... 7 common frames omitted Caused by: java.io.NotSerializableException: sample.HelloWorld$$EnhancerBySpringCGLIB$$5e382b74 Serialization stack: - object not serializable (class: sample.HelloWorld$$EnhancerBySpringCGLIB$$5e382b74, value: sample.HelloWorld$$EnhancerBySpringCGLIB$$5e382b74@2a225dd7) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class sample.HelloWorld, functionalInterfaceMethod=org/apache/spark/api/java/function/VoidFunction.call:(Ljava/lang/Object;)V, implementation=invokeSpecial sample/HelloWorld.lambda$0:(Ljava/lang/String;)V, instantiatedMethodType=(Ljava/lang/String;)V, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class sample.HelloWorld$$Lambda$2/1447521302, sample.HelloWorld$$Lambda$2/1447521302@3bb5ceb) - field (class: org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1, name: f$14, type: interface org.apache.spark.api.java.function.VoidFunction) - object (class org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ~[spark-core_2.10-1.6.0.jar:1.6.0] ... 20 common frames omitted

###該当のソースコード

Java

1@Service 2@Scope("prototype") 3public class TestServiceImpl implements TestService, Serializable { 4 @Override 5 public void test(String data) { 6 System.out.println(data); 7 } 8}

Java

1@SpringBootApplication 2public class HelloWorld implements CommandLineRunner { 3 4 @Autowired 5 private TestService testService; 6 7 @Autowired 8 private ApplicationContext context; 9 10 public static void main(String[] args) { 11 SpringApplication.run(HelloWorld.class, args); 12 } 13 14 @Override 15 public void run(String... args) throws Exception { 16 JavaSparkContext sc = new JavaSparkContext("local[*]", "Hello"); 17 JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e")); 18 19 rdd.foreach(data -> { 20 testService.test(data); // Task not serializableエラー 21 TestService t1 = context.getBean(TestService.class); // これもTask not serializableエラー 22 TestService t2 = new TestServiceImpl(); // これは動作する 23 }); 24 } 25}

###補足情報(言語/FW/ツール等のバージョンなど)
Java8, Apache Spark1.6, Spring Boot1.3.1

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

eripong

2016/07/02 03:57

例外のスタックトレースとエラーメッセージをできるだけ提示してください。
guzzle

2016/07/03 01:58

コメントありがとうございます。 例外のスタックトレースとエラーメッセージを本文に追記しました。 およびサンプルコードをGitHubにUPしましたのでそのリンクも本文に記述しております。 Spark+Springの組み合わせは情報が少ないのでいろいろ苦労しております。。。
eripong

2016/07/03 02:37

調べた範囲で書きました。サンプルコード、試せたら試してみます。
guest

回答1

0

試せていないのと、情報が新しくないかもしれませんが、

https://github.com/lgscofield/spring-spark
にある様な対応が必要なのかも知れません。

Sparkが分散実行前提のため、そこで使うクラスは基本的にSerializableである必要があるようです。
そして、今の場合、Springが生成するsample.HelloWorld$$EnhancerBySpringCGLIB$$5e382b74クラスがSerializableでないことが直接的な原因です。

ただ、なぜこのクラスがSerializableである必要があるかについては、
Springの内部まで踏み入らないと分からないと思います。

そこには、

Dependency injection in spark using spring is not possible without any further effort, because of the heavy usage of serialization.

というように、大きな苦労があるようです。
直接的な解決になるかは分かりませんが、上にあるspring-sparkを試してみたらいかがでしょうか?

以下のURLも参考になるかも知れません。
scala - Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects - Stack Overflow

投稿2016/07/03 02:26

eripong

総合スコア1546

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guzzle

2016/07/04 02:41

情報ありがとうございました。 教えて頂いたspring-sparkのサイトを参考に試してみたところ、 サンプルコードは動作する事は確認できました。 しかしながら、実際のプロジェクトはKryo serializationを使っていたり、 DIしたオブジェクト先でDBコネクションを取ったりで、 この場合はまだうまく動作できていません。。。 とりあえずヒントとなる情報は大変助かりました。 いずれにせよ一筋縄ではいかなそうです。 シリアライズ周りはどうも難解。。。 継続して調査は続けますが、いったんレスさせて頂きました。
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

まだベストアンサーが選ばれていません

会員登録して回答してみよう

アカウントをお持ちの方は

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.50%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問