🎄teratailクリスマスプレゼントキャンペーン2024🎄』開催中!

\teratail特別グッズやAmazonギフトカード最大2,000円分が当たる!/

詳細はこちら
BigQuery

BigQueryは、Google Cloud Platformが提供しているビッグデータ解析サービス。数TB(テラバイト)またはPB(ペタバイト)の膨大なデータに対し、SQL風のクエリを実行し、高速で集計・分析を行うサービスです。

Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

Q&A

解決済

1回答

3032閲覧

BigQueryのストリーミングインサートで挿入先の既存データとの重複を避ける方法はありますか??

amaturePy

総合スコア131

BigQuery

BigQueryは、Google Cloud Platformが提供しているビッグデータ解析サービス。数TB(テラバイト)またはPB(ペタバイト)の膨大なデータに対し、SQL風のクエリを実行し、高速で集計・分析を行うサービスです。

Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

0グッド

0クリップ

投稿2021/01/14 04:35

編集2021/01/14 05:18

CSVのデータをBigQueryへ5〜6秒おきにストリーミングで挿入しているのですが、luidというレコードデータが既存のデータにある物は省きたいのですが、これは可能でしょうか??
当たり前ですが、現状既存のデータに蓄積されてデータが入っております。

安直にも挿入の直前でレコードデータを削除も考えたのですが、レコードデータの削除はできないようで、テーブルの削除→再作成→ストリーミング挿入の流れも考えたのですが、処理が重くなり、エラーの危惧とストリーミング後のデータは90分間バッファとして保存され、削除、変更などの処理ができないようなので諦めました。

何か良い方法があればご教授頂きたいです。
よろしくお願いします。

def stream_upload(): with open('./test.csv','w') as f: f.write(response_get.text) print("made csv") # BigQuery client = bigquery.Client() project_id = 'test' dataset_name = 'test' table_name = "test" full_table_name = dataset_name + '.' + table_name json_rows = [] with open('./test.csv','r') as f: for line in csv.DictReader(f): del line[None] line_json = dict(line) json_rows.append(line_json) errors = client.insert_rows_json( full_table_name,json_rows,row_ids=[row['luid'] for row in json_rows] ) if errors == []: print("New rows have been added.") else: print("Encountered errors while inserting rows: {}".format(errors)) print("end") schedule.every(0.5).seconds.do(stream_upload) while True: schedule.run_pending() time.sleep(0.1)

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guest

回答1

0

ベストアンサー

おそらくですが、値を見ての重複除去はできないと思うんですよねぇ。

https://cloud.google.com/bigquery/streaming-data-into-bigquery?hl=ja#dataconsistency
のように insertId を付与じゃダメなんですよね?

なので、読み出し時の重複除去しながらの SELECT が一般的ではと思います。

select max(xxx) from dataset.table group by yyy と集約できるようなデータであれば、BigQuery のマテビューが使えるかも?

投稿2021/01/14 10:50

68user

総合スコア2022

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

amaturePy

2021/01/14 11:53

なるほどですね。ありがとうございます。一応row_idsの部分がinsertIdに当たるのですが、重複して入ってしまっています。自由にカラムを指定することはできないのかもしれないです。 やはり呼び出しの時に工夫してあげる必要がありそうですね。。。
68user

2021/01/14 12:14

簡単にできるといいですよねぇ。 前段に pubsub -> dataflow -> bq としたとしても、値によるdedup はできない認識です。 https://www.case-k.jp/entry/2020/06/04/142656#1-%E3%83%A1%E3%83%83%E3%82%BB%E3%83%BC%E3%82%B8%E9%87%8D%E8%A4%87 あと、今の quota だと特定テーブルに 1分に一度 INSERTというのができるはずなので、1分間遅延してよいなら 1分おきに一時テーブルに入れて、INSERT INTO TARGET_TABLE WHERE NOT EXISTS (SELECT * FROM TMP_TABLE WHERE ....) もアリかもしれません。
amaturePy

2021/01/14 12:24

なるほどです。 とても参考になります。 これがどうしても数秒おきに処理しなくては行けないんです。。。。 ちなみに経験不足ゆえ、ご意見を頂戴したいのですが、MySQLなどでもこのように高頻度にデータを入れ続けるのは可能なのでしょうか??
68user

2021/01/14 12:32

MySQL にコネクション張りっぱなしにして INSERT INTO .... や MERGE や INSERT後の重複除去などを数秒おきにやればいいと思いますので、MySQL のほうが向いているかもしれませんね。最終的にはデータ量・SELECTクエリの速度・MySQL (Cloud SQL) 管理コストとのバランス次第での判断なるとは思います。 なお、数秒おきに重複除去というのがいわゆるクラウド的ではないので (重複除去ということは過去データを参照しなければならない=集約しなければならない=分散処理に向いていない)、その設計を求める人と小一時間議論したいところではあります。
amaturePy

2021/01/14 12:48 編集

なるほどです。Cloud SQLも最初考えたのですが、BigQUeryでもなんとかいけるものかと突き進んでしまいました。進めて行くうちにstreamingで入れたデータは90分ほどバッファとして保存されるため、削除などできないと他にもいろいろ制約があるのを知りました。 人材の関係で社内でコードを書けるのが私だけなので、こういったご意見を頂けると大変勉強になります! ありがとうございます!
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.36%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問