前提・実現したいこと
Spark sqlで新しい行を作成して、それぞれのTimestampカラムの値とその下の値の差の秒数を格納したいです。
1行目のカラムは1行目と2行目のtimestampの差の秒数を格納、2行目は2行目と3行目のtimestampの差...という感じです。
発生している問題・エラーメッセージ
Py4JJavaError Traceback (most recent call last) /var/folders/3f/cj3qrr9x2dlgwkp147wyd6280000gn/T/ipykernel_13721/276948560.py in <module> 4 5 sort_df_FD.withColumn("previous_t", lead(sort_df_FD.Timestamp, 1).over(w))\ ----> 6 .select(sort_df_FD.Timestamp, (unix_timestamp(sort_df_FD.Timestamp) - unix_timestamp(col("previous_t"))).alias("staying time"))\ 7 .show() /opt/anaconda3/envs/tf/lib/python3.7/site-packages/pyspark/sql/dataframe.py in show(self, n, truncate, vertical) 376 """ 377 if isinstance(truncate, bool) and truncate: --> 378 print(self._jdf.showString(n, 20, vertical)) 379 else: 380 print(self._jdf.showString(n, int(truncate), vertical)) /opt/anaconda3/envs/tf/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args) 1255 answer = self.gateway_client.send_command(command) 1256 return_value = get_return_value( -> 1257 answer, self.gateway_client, self.target_id, self.name) 1258 1259 for temp_arg in temp_args: /opt/anaconda3/envs/tf/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() /opt/anaconda3/envs/tf/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( Py4JJavaError: An error occurred while calling o1088.showString. : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange rangepartitioning(Timestamp#137 ASC NULLS FIRST, 200) +- *(1) FileScan csv [Timestamp#137] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/iwayamayuto/Desktop/yoochoose-clicks.dat], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Timestamp:timestamp>
該当のソースコード
from pyspark.sql.types import * from pyspark.sql.functions import to_timestamp myManualSchema = StructType([ StructField("Session_ID", StringType(), True), StructField("Timestamp", TimestampType(), True), StructField("Item_ID", StringType(), True), StructField("Category", StringType(), True)]) df_FD = spark \ .read \ .format("csv").schema(myManualSchema) \ .load(directory+"/yoochoose-clicks.dat") sort_df_FD = df_FD.sort("Timestamp") sort_df_FD.show(10) +----------+--------------------+---------+--------+ |Session_ID| Timestamp| Item_ID|Category| +----------+--------------------+---------+--------+ | 351646|2014-04-01 12:00:...|214717005| 0| | 389654|2014-04-01 12:00:...|214826705| 0| | 263073|2014-04-01 12:00:...|214716982| 0| | 210798|2014-04-01 12:00:...|214581827| 0| | 375257|2014-04-01 12:00:...|214644307| 0| | 171168|2014-04-01 12:00:...|214594678| 0| | 164903|2014-04-01 12:00:...|214716935| 0| | 171168|2014-04-01 12:00:...|214820231| 0| | 33388|2014-04-01 12:00:...|214544780| 0| | 460339|2014-04-01 12:00:...|214835287| 0| +----------+--------------------+---------+--------+ from pyspark.sql.functions import col, lead, unix_timestamp from pyspark.sql.window import Window w = Window.partitionBy("Timestamp").orderBy("Timestamp") sort_df_FD.withColumn("previous_t", lead(sort_df_FD.Timestamp, 1).over(w))\ .select(sort_df_FD.Timestamp, (unix_timestamp(sort_df_FD.Timestamp) - unix_timestamp(col("previous_t"))).alias("staying time"))\ .show()
あなたの回答
tips
プレビュー