以下を参考にSpark MLlibをPythonで動かしてレコメンデーションの結果を得たいと考えています.
参考のキータURL
model = ALS.train(ratings, rank, numIterations)
データの学習のところでつまずいているのですが,どのような修正をすればALS.trainがつかえるようになりますでしょうか?
初心者ですみませんがよろしくお願いいたします.
環境
mac os 11.6
Python 3.5.2 :: Anaconda custom (x86_64)
python
1# Sparkの起動 2import os, sys 3import pandas as pd 4import numpy as np 5from datetime import datetime as dt 6print "loading PySpark setting..." 7spark_home = os.environ.get('SPARK_HOME', None) 8print spark_home 9if not spark_home: 10 raise ValueError('SPARK_HOME environment variable is not set') 11sys.path.insert(0, os.path.join(spark_home, 'python')) 12sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip')) 13#########以下でもエラーが出るので,削除 14#execfile(os.path.join(spark_home, 'python/pyspark/shell.py')) 15 16################# 17#Sparkは起動済 確認できました 18 19# Exampleデータの読み込み 20df = pd.read_csv(os.path.join(spark_home, 'data/mllib/als/sample_movielens_ratings.txt'), 21 delimiter='::', names=('uid', 'iid', 'rating','time'),engine='python') 22pv_rating = df.pivot(index='uid', columns='iid', values='rating').fillna(0) 23 24print (pv_rating) 25 26from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating 27################## 28#データの可視化 確認済み 29################## 30 31# トレーニングデータの準備 32sc_rating = sc.parallelize(df.as_matrix()) 33ratings = sc_rating.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2]))) 34 35# ALS(Alternating Least Squares)でレコメンデーションの生成 36rank = 10 37numIterations = 10 38########### 39#以下でエラー 40########## 41model = ALS.train(ratings, rank, numIterations)
エラー
Py4JJavaError Traceback (most recent call last)
<ipython-input-16-9e6099cbfb83> in <module>()
8 rank = 10
9 numIterations = 10
---> 10 model = ALS.train(ratings, rank, numIterations)
/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/mllib/recommendation.py in train(cls, ratings, rank, iterations, lambda_, blocks, nonnegative, seed)
270 (default: None)
271 """
--> 272 model = callMLlibFunc("trainALSModel", cls.prepare(ratings), rank, iterations,
273 lambda, blocks, nonnegative, seed)
274 return MatrixFactorizationModel(model)
/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/mllib/recommendation.py in _prepare(cls, ratings)
227 raise TypeError("Ratings should be represented by either an RDD or a DataFrame, "
228 "but got %s." % type(ratings))
--> 229 first = ratings.first()
230 if isinstance(first, Rating):
231 pass
/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/rdd.py in first(self)
1359 ValueError: RDD is empty
1360 """
-> 1361 rs = self.take(1)
1362 if rs:
1363 return rs[0]
/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/rdd.py in take(self, num)
1341
1342 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1343 res = self.context.runJob(self, takeUpToNumLeft, p)
1344
1345 items += res
/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
963 # SparkContext#runJob.
964 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 965 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
966 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
967
/usr/local/Cellar/apache-spark/2.1.0/libexec/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in call(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/local/Cellar/apache-spark/2.1.0/libexec/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/Cellar/apache-spark/2.1.0/libexec/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
あなたの回答
tips
プレビュー