###前提・実現したいこと
Apache Spark(Java) + Spring Boot構成で、バッチアプリを作成しています。
Sparkのアクション(foreachなど)実行の際、別オブジェクトをDIしようとすると、
Task not serializableエラーが発生します。
DIせずに普通にnewした場合は動作する事は確認済みなのですが、
どのようにすれば普通にDIできるのか現在分かっていません。
**サンプルコードをgithubにUPしました。**Java8が入ってれば動作すると思います。
https://github.com/guzzle0527/spark-spring-sample
###発生している問題・エラーメッセージ
Caused by: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.rdd.RDD.foreach(RDD.scala:910) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:332) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:46) ~[spark-core_2.10-1.6.0.jar:1.6.0] at sample.HelloWorld.run(HelloWorld.java:29) [classes/:na] at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804) [spring-boot-1.3.1.RELEASE.jar:1.3.1.RELEASE] ... 7 common frames omitted Caused by: java.io.NotSerializableException: sample.HelloWorld$$EnhancerBySpringCGLIB$$5e382b74 Serialization stack: - object not serializable (class: sample.HelloWorld$$EnhancerBySpringCGLIB$$5e382b74, value: sample.HelloWorld$$EnhancerBySpringCGLIB$$5e382b74@2a225dd7) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class sample.HelloWorld, functionalInterfaceMethod=org/apache/spark/api/java/function/VoidFunction.call:(Ljava/lang/Object;)V, implementation=invokeSpecial sample/HelloWorld.lambda$0:(Ljava/lang/String;)V, instantiatedMethodType=(Ljava/lang/String;)V, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class sample.HelloWorld$$Lambda$2/1447521302, sample.HelloWorld$$Lambda$2/1447521302@3bb5ceb) - field (class: org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1, name: f$14, type: interface org.apache.spark.api.java.function.VoidFunction) - object (class org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) ~[spark-core_2.10-1.6.0.jar:1.6.0] at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ~[spark-core_2.10-1.6.0.jar:1.6.0] ... 20 common frames omitted
###該当のソースコード
Java
1@Service 2@Scope("prototype") 3public class TestServiceImpl implements TestService, Serializable { 4 @Override 5 public void test(String data) { 6 System.out.println(data); 7 } 8}
Java
1@SpringBootApplication 2public class HelloWorld implements CommandLineRunner { 3 4 @Autowired 5 private TestService testService; 6 7 @Autowired 8 private ApplicationContext context; 9 10 public static void main(String[] args) { 11 SpringApplication.run(HelloWorld.class, args); 12 } 13 14 @Override 15 public void run(String... args) throws Exception { 16 JavaSparkContext sc = new JavaSparkContext("local[*]", "Hello"); 17 JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e")); 18 19 rdd.foreach(data -> { 20 testService.test(data); // Task not serializableエラー 21 TestService t1 = context.getBean(TestService.class); // これもTask not serializableエラー 22 TestService t2 = new TestServiceImpl(); // これは動作する 23 }); 24 } 25}
###補足情報(言語/FW/ツール等のバージョンなど)
Java8, Apache Spark1.6, Spring Boot1.3.1