質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.35%
SQL

SQL(Structured Query Language)は、リレーショナルデータベース管理システム (RDBMS)のデータベース言語です。大きく分けて、データ定義言語(DDL)、データ操作言語(DML)、データ制御言語(DCL)の3つで構成されており、プログラム上でSQL文を生成して、RDBMSに命令を出し、RDBに必要なデータを格納できます。また、格納したデータを引き出すことも可能です。

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

Spark

Spark(Apache Spark)とは、膨大なデータを迅速、柔軟に分散並行処理を行うフレームワークです。分析ツールであるApache Hadoopと比較し、最大で100倍の速度でデータ処理ができるとされています。

Q&A

0回答

1103閲覧

Spark SQLの2つのカラムの値の差を計算して新しい行に追加する方法を教えてください

shosinsya2

総合スコア4

SQL

SQL(Structured Query Language)は、リレーショナルデータベース管理システム (RDBMS)のデータベース言語です。大きく分けて、データ定義言語(DDL)、データ操作言語(DML)、データ制御言語(DCL)の3つで構成されており、プログラム上でSQL文を生成して、RDBMSに命令を出し、RDBに必要なデータを格納できます。また、格納したデータを引き出すことも可能です。

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

Spark

Spark(Apache Spark)とは、膨大なデータを迅速、柔軟に分散並行処理を行うフレームワークです。分析ツールであるApache Hadoopと比較し、最大で100倍の速度でデータ処理ができるとされています。

0グッド

1クリップ

投稿2021/10/26 11:11

前提・実現したいこと

Spark sqlで新しい行を作成して、それぞれのTimestampカラムの値とその下の値の差の秒数を格納したいです。
1行目のカラムは1行目と2行目のtimestampの差の秒数を格納、2行目は2行目と3行目のtimestampの差...という感じです。

発生している問題・エラーメッセージ

Py4JJavaError Traceback (most recent call last) /var/folders/3f/cj3qrr9x2dlgwkp147wyd6280000gn/T/ipykernel_13721/276948560.py in <module> 4 5 sort_df_FD.withColumn("previous_t", lead(sort_df_FD.Timestamp, 1).over(w))\ ----> 6 .select(sort_df_FD.Timestamp, (unix_timestamp(sort_df_FD.Timestamp) - unix_timestamp(col("previous_t"))).alias("staying time"))\ 7 .show() /opt/anaconda3/envs/tf/lib/python3.7/site-packages/pyspark/sql/dataframe.py in show(self, n, truncate, vertical) 376 """ 377 if isinstance(truncate, bool) and truncate: --> 378 print(self._jdf.showString(n, 20, vertical)) 379 else: 380 print(self._jdf.showString(n, int(truncate), vertical)) /opt/anaconda3/envs/tf/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args) 1255 answer = self.gateway_client.send_command(command) 1256 return_value = get_return_value( -> 1257 answer, self.gateway_client, self.target_id, self.name) 1258 1259 for temp_arg in temp_args: /opt/anaconda3/envs/tf/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() /opt/anaconda3/envs/tf/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( Py4JJavaError: An error occurred while calling o1088.showString. : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange rangepartitioning(Timestamp#137 ASC NULLS FIRST, 200) +- *(1) FileScan csv [Timestamp#137] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/iwayamayuto/Desktop/yoochoose-clicks.dat], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Timestamp:timestamp>

該当のソースコード

from pyspark.sql.types import * from pyspark.sql.functions import to_timestamp myManualSchema = StructType([ StructField("Session_ID", StringType(), True), StructField("Timestamp", TimestampType(), True), StructField("Item_ID", StringType(), True), StructField("Category", StringType(), True)]) df_FD = spark \ .read \ .format("csv").schema(myManualSchema) \ .load(directory+"/yoochoose-clicks.dat") sort_df_FD = df_FD.sort("Timestamp") sort_df_FD.show(10) +----------+--------------------+---------+--------+ |Session_ID| Timestamp| Item_ID|Category| +----------+--------------------+---------+--------+ | 351646|2014-04-01 12:00:...|214717005| 0| | 389654|2014-04-01 12:00:...|214826705| 0| | 263073|2014-04-01 12:00:...|214716982| 0| | 210798|2014-04-01 12:00:...|214581827| 0| | 375257|2014-04-01 12:00:...|214644307| 0| | 171168|2014-04-01 12:00:...|214594678| 0| | 164903|2014-04-01 12:00:...|214716935| 0| | 171168|2014-04-01 12:00:...|214820231| 0| | 33388|2014-04-01 12:00:...|214544780| 0| | 460339|2014-04-01 12:00:...|214835287| 0| +----------+--------------------+---------+--------+ from pyspark.sql.functions import col, lead, unix_timestamp from pyspark.sql.window import Window w = Window.partitionBy("Timestamp").orderBy("Timestamp") sort_df_FD.withColumn("previous_t", lead(sort_df_FD.Timestamp, 1).over(w))\ .select(sort_df_FD.Timestamp, (unix_timestamp(sort_df_FD.Timestamp) - unix_timestamp(col("previous_t"))).alias("staying time"))\ .show()

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

まだ回答がついていません

会員登録して回答してみよう

アカウントをお持ちの方は

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.35%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問