質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

ただいまの
回答率

88.35%

Spark-submitについて

解決済

回答 1

投稿

  • 評価
  • クリップ 0
  • VIEW 1,961

Yusei

score 11

前提・実現したいこと

今、ScalaとSparkとCassandraを組み合わせてAPIを設計しています。
その最中、spark-submitをしようとした際に問題が起こりました。

発生している問題・エラーメッセージ

sbt assemblyを行って出来たjarファイルを用いて行った結果が以下になります。

% spark-submit --master spark://localhost:7077 target/scala-2.11/Test-assembly-1.0.jar
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.cassandra.CassandraSQLContext$$anon$1.TakeOrdered()Lorg/apache/spark/sql/execution/SparkStrategies$TakeOrdered$;
    at org.apache.spark.sql.cassandra.CassandraSQLContext$$anon$1.<init>(CassandraSQLContext.scala:90)
    at org.apache.spark.sql.cassandra.CassandraSQLContext.<init>(CassandraSQLContext.scala:85)
    at Test$.getData(AnalysisOfBeacon.scala:50)
    at Test$.main(AnalysisOfBeacon.scala:29)
    at Test.main(AnalysisOfBeacon.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


このような形になります。一方sbt packageを実行して出来たjarファイルで試すと

% spark-submit --master spark://localhost:7077 target/scala-2.11/test_2.11-1.0.jar
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/cassandra/CassandraSQLContext
    at AnalysisOfBeacon$.getData(AnalysisOfBeacon.scala:50)
    at AnalysisOfBeacon$.main(AnalysisOfBeacon.scala:29)
    at AnalysisOfBeacon.main(AnalysisOfBeacon.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.cassandra.CassandraSQLContext
    at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 12 more


となります。

該当のソースコード

build.sbtです

lazy val root = (project in file(".")).
  settings(
    name := "Test",
    version := "1.0",
    scalaVersion := "2.11.7"
  )
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.4.1",
  "org.apache.spark" % "spark-streaming_2.11" % "1.4.1",
  "com.datastax.spark" %% "spark-cassandra-connector" % "1.4.2",
  "org.apache.spark" %% "spark-sql" % "1.4.1"
)

resolvers ++= Seq(
  "Sonatype Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/",
  "Sonatype Releases" at "https://oss.sonatype.org/content/repositories/releases/"
)

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
  {
    case PathList("netty", "handler", xs @ _*)         => MergeStrategy.first
    case PathList("netty", "buffer", xs @ _*)     => MergeStrategy.first
    case PathList("netty", "common", xs @ _*)     => MergeStrategy.first
    case PathList("netty", "transport", xs @ _*)     => MergeStrategy.first
    case PathList("netty", "codec", xs @ _*)     => MergeStrategy.first

    case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.first
    case PathList(ps @ _*) if ps.last endsWith ".properties" => MergeStrategy.first
    case PathList(ps @ _*) if ps.last endsWith ".xml" => MergeStrategy.first
    case PathList(ps @ _*) if ps.last endsWith ".dtd" => MergeStrategy.first
    case PathList(ps @ _*) if ps.last endsWith ".xsd" => MergeStrategy.first
    case PathList(ps @ _*) if ps.last endsWith ".types" => MergeStrategy.first
    case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first
    case x => old(x)
  }
}

以下が実行コードになります。実務に関わるものなので、恐らく関わっているであろう部分についてのみ掲載させていただきます。

import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.streaming.{Time, Durations, Seconds, StreamingContext}
import com.datastax.spark.connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.sql.Row;
import org.apache.spark.sql.cassandra.CassandraSQLContext
object Test {

  def main(args: Array[String]){

    val sc = new SparkConf().setAppName("Test")//.setMaster("local[*]").set("spark.cassandra.connection.host","127.0.0.1")
    val sct = new SparkContext(sc);
    val ssc = new StreamingContext(sct, Seconds(10))

  }
}

試したこと

バージョンによって

...TakeOrdered()...


というようなエラーが出たり(これはSpark 1.4.1まで)

...DataSourceStrategy...


というエラーに変わる事は観測しています。(Spark1.5.0~1.6.1)
1.4.1までのエラーは、Spark1.4.2で解決したと言っているissueを見かけたのですが、存在しないバージョンのようで解決の糸口になりませんでした。
また、このコードはsbt runでは正しく動作します。spark-submitの時にエラーが起きてしまうのです。

補足情報(言語/FW/ツール等のバージョンなど)

scalaは2.11.7で統一
sbtは0.13.11
sparkやspark-sqlのバージョンなどは色々試しています。(1.3.1~1.6.1)

色々いたらない点があるとは思いますが、
解決のために必要な情報等ありましたら出せる範囲で出しますので宜しくお願いします。

  • 気になる質問をクリップする

    クリップした質問は、後からいつでもマイページで確認できます。

    またクリップした質問に回答があった際、通知やメールを受け取ることができます。

    クリップを取り消します

  • 良い質問の評価を上げる

    以下のような質問は評価を上げましょう

    • 質問内容が明確
    • 自分も答えを知りたい
    • 質問者以外のユーザにも役立つ

    評価が高い質問は、TOPページの「注目」タブのフィードに表示されやすくなります。

    質問の評価を上げたことを取り消します

  • 評価を下げられる数の上限に達しました

    評価を下げることができません

    • 1日5回まで評価を下げられます
    • 1日に1ユーザに対して2回まで評価を下げられます

    質問の評価を下げる

    teratailでは下記のような質問を「具体的に困っていることがない質問」、「サイトポリシーに違反する質問」と定義し、推奨していません。

    • プログラミングに関係のない質問
    • やってほしいことだけを記載した丸投げの質問
    • 問題・課題が含まれていない質問
    • 意図的に内容が抹消された質問
    • 過去に投稿した質問と同じ内容の質問
    • 広告と受け取られるような投稿

    評価が下がると、TOPページの「アクティブ」「注目」タブのフィードに表示されにくくなります。

    質問の評価を下げたことを取り消します

    この機能は開放されていません

    評価を下げる条件を満たしてません

    評価を下げる理由を選択してください

    詳細な説明はこちら

    上記に当てはまらず、質問内容が明確になっていない質問には「情報の追加・修正依頼」機能からコメントをしてください。

    質問の評価を下げる機能の利用条件

    この機能を利用するためには、以下の事項を行う必要があります。

回答 1

check解決した方法

+1

解決方法

現状ですが、build.sbtにおいてScalaのバージョンを2.10系にしてsbt assemblyを行うと
上手く実行する事が出来ました
恐らく、Sparkが現状Scala2.10系を主にサポートしているのが原因かと思われます
もしScala2.11でも出来た方がいらっしゃいましたら是非やり方を教えて下さい

投稿

  • 回答の評価を上げる

    以下のような回答は評価を上げましょう

    • 正しい回答
    • わかりやすい回答
    • ためになる回答

    評価が高い回答ほどページの上位に表示されます。

  • 回答の評価を下げる

    下記のような回答は推奨されていません。

    • 間違っている回答
    • 質問の回答になっていない投稿
    • スパムや攻撃的な表現を用いた投稿

    評価を下げる際はその理由を明確に伝え、適切な回答に修正してもらいましょう。

15分調べてもわからないことは、teratailで質問しよう!

  • ただいまの回答率 88.35%
  • 質問をまとめることで、思考を整理して素早く解決
  • テンプレート機能で、簡単に質問をまとめられる

関連した質問

同じタグがついた質問を見る