質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.35%
並列処理

複数の計算が同時に実行される手法

Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

pandas

Pandasは、PythonでRにおけるデータフレームに似た型を持たせることができるライブラリです。 行列計算の負担が大幅に軽減されるため、Rで行っていた集計作業をPythonでも比較的簡単に行えます。 データ構造を変更したりデータ分析したりするときにも便利です。

Q&A

解決済

4回答

1383閲覧

Pythonの並行処理でベースのデータフレームに、それぞれ読み込んだデータを書き込みたい

Q_1986-kt

総合スコア13

並列処理

複数の計算が同時に実行される手法

Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

pandas

Pandasは、PythonでRにおけるデータフレームに似た型を持たせることができるライブラリです。 行列計算の負担が大幅に軽減されるため、Rで行っていた集計作業をPythonでも比較的簡単に行えます。 データ構造を変更したりデータ分析したりするときにも便利です。

0グッド

1クリップ

投稿2020/02/12 08:52

やりたい事

Pythonでスケジュールにそって実行される複数の処理(ネット上からスクレイピングしてデータを読み込む)で、
元になるデータフレームに書き込んで1つのデータフレームにしたいのですが、うまくいきません。

質問用の簡易コード

次に示すコードは、スクレイピング部分を省略して各データフレームからベースに書き込むようにしたものですが
基本的にやりたいことは以下の通りです。

import time import sched import pandas as pd from datetime import datetime import concurrent.futures # 時間表示フォーマット datetime.now().strftime("%Y/%m/%d %H:%M:%S") # スケジューラー scheduler = sched.scheduler(time.time, time.sleep) Start_t = '2020/02/12 18:00:00' df = pd.DataFrame({'ID': ['A01', 'A02', 'A03', 'A04', 'A05', 'B01', 'B02', 'B03', 'B04', 'B05', 'C01', 'C02', 'C03', 'C04', 'C05'], 'A': ['', '', '', '', '', '', '', '', '', '', '', '', '', '', ''], 'B': ['', '', '', '', '', '', '', '', '', '', '', '', '', '', ''], 'C': ['', '', '', '', '', '', '', '', '', '', '', '', '', '', '']}) df.set_index('ID', inplace=True) def test_1(): id_no = ['A01', 'A02', 'A03', 'A04', 'A05'] for j in id_no: df0 = pd.DataFrame({'ID': ['A01', 'A02', 'A03', 'A04', 'A05'], 'A': [1, 2, 3, 4, 5], 'B': [1, 2, 3, 4, 5], 'C': [1, 2, 3, 4, 5]}) df0.set_index('ID', inplace=True) df.loc[j] = df0.loc[j] time.sleep(60) # 実際には1分ごとのスクレイピングのイメージ def test_2(): id_no = ['B01', 'B02', 'B03', 'B04', 'B05'] for j in id_no: df1 = pd.DataFrame({'ID': ['B01', 'B02', 'B03', 'B04', 'B05'], 'A': [11, 12, 13, 14, 15], 'B': [11, 12, 13, 14, 15], 'C': [11, 12, 13, 14, 15]}) df1.set_index('ID', inplace=True) df.loc[j] = df1.loc[j] time.sleep(60) # 実際には1分ごとのスクレイピングのイメージ def test_3(): id_no = ['C01', 'C02', 'C03', 'C04', 'C05'] for j in id_no: df2 = pd.DataFrame({'ID': ['C01', 'C02', 'C03', 'C04', 'C05'], 'A': [21, 22, 23, 24, 25], 'B': [21, 22, 23, 24, 25], 'C': [21, 22, 23, 24, 25]}) df2.set_index('ID', inplace=True) df.loc[j] = df2.loc[j] time.sleep(60) # 実際には1分ごとのスクレイピングのイメージ def test(): executor = concurrent.futures.ProcessPoolExecutor(max_workers=3) executor.submit(test_1) executor.submit(test_2) executor.submit(test_3) if __name__ == "__main__": Start_time1 = Start_t run1 = datetime.strptime(Start_time1, '%Y/%m/%d %H:%M:%S') run1 = int(time.mktime(run1.utctimetuple())) scheduler.enterabs(run1, 1, test) scheduler.run()

補足

このままでも、3つの関数それぞれの保存するデータフレームの名前を変えれば3つのデータフレームとして保存は可能です。また、def test(): の部分を

def test(): test_1() test_2() test_3()

こんな感じで順番で処理をすれば、望むデータフレームにはなりますが、
本来は、決められたスケジュールでスクレイピングが始まり、
それぞれが決められた時間ごとに同時にデータを読みに行く処理をしています。
そして、1つの元になるデータフレームの指定のIDの場所に書き込み1つのデータフレームにしたいのですが、
思うように書き込めません。

やりたい書き込んでいる途中経過としては、こんな感じです。

df =
ID A B C
A01 1 1 1
A02 2 2 2
A03
A04
A05
B01 11 11 11
B02 12 12 12
B03
B04
B05
C01 21 21 21
C02 22 22 22
C03
C04
C05

また、それぞれ読み込んだ後に3つのデータフレームを1つのまとめるのではだめで、
読み込んだデータは時間経過でリアルタイムグラフを考えています。
質問、説明があまり上手ではないかもしれませんが
何卒宜しくお願い致します。

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

t_obara

2020/02/12 09:08

一番できないことは何なのでしょうか?複数のDataFrameをマージすることができないのですか?一番の課題は何で、そのためには確認したのはどんなコードで、その結果期待したのはこうで、実際はこうなったという状況をご提示いただくと回答が得られやすいかと思います。
meg_

2020/02/12 11:13

「1つのデータフレームにしたいのですが、うまくいきません。」とはどういう状況が不明ですが、メモリ共有が適切に行われていないのではないでしょうか?
Q_1986-kt

2020/02/13 01:20

質問がわかりずらく申し訳ありません。 実際に使っているコードは、Webのページをスクレイピングしてデータを読み込んでいるために 自分なりに、整理してわからない部分を書いたつもりだったのですが 1番の問題は、1つの元になるデータフレームに複数のデータ元からのデータを該当する同じIDの場所に 書き込むことです。なので複数のデータフレームをマージ(結合だとすると)とはちょつと違うかと 思います。もう少し、整理して考えてみます。 ご指摘ありがとうございます。
guest

回答4

0

実際のコードでの問題点もわかりました。
スクレイピングするための変数やURLなどExcelのファイルから読み込んでいました。

import time import sched import pandas as pd from datetime import datetime from concurrent import futures from selenium.webdriver import Chrome, ChromeOptions import xlwings as xw # 時間表示フォーマット datetime.now().strftime("%Y/%m/%d %H:%M:%S") # スケジューラー scheduler = sched.scheduler(time.time, time.sleep) wb = xw.Book('./Data/web_test.xlsx') sh = wb.sheets['Sheet1'] Start_time = str(sh.range('C1').value) + ' ' + str(sh.range('C2').value) Stop_time = str(sh.range('C1').value) + ' ' + str(sh.range('C3').value) url = sh.range('C4').value def test_1(): # ヘッドレスモードを有効にする(次の行をコメントアウトすると画面が表示される)。 options.add_argument('--headless') # ChromeのWebDriverオブジェクトを作成する。 driver = Chrome(options=options) # ページロードのタイムアウトを設定 driver.set_page_load_timeout(10) # 要素が見つかるまで5秒待つ設定 driver.implicitly_wait(5) # seconds # Webページの取得 driver.get(url) # 作業が終わるまで待つ time.sleep(2) # aタグを抽出 elem_list = driver.find_elements_by_tag_name("a") for elem in elem_list: # attributeの中からhrefを抽出して出力 url = elem.get_attribute("href") print(url)

こんな感じです。

これだと、Excelの値をよめなかったので、以下のように変更することで問題なく作業を行えました。

def test_1(): wb = xw.Book('./Data/web_test.xlsx') sh = wb.sheets['Sheet1'] url = sh.range('C4').value options = ChromeOptions() # ヘッドレスモードを有効にする(次の行をコメントアウトすると画面が表示される)。 options.add_argument('--headless') # ChromeのWebDriverオブジェクトを作成する。 driver = Chrome(options=options) # ページロードのタイムアウトを設定 driver.set_page_load_timeout(10) # 要素が見つかるまで5秒待つ設定 driver.implicitly_wait(5) # seconds # Webページの取得 driver.get(url) # 作業が終わるまで待つ time.sleep(2) # aタグを抽出 elem_list = driver.find_elements_by_tag_name("a") for elem in elem_list: # attributeの中からhrefを抽出して出力 url = elem.get_attribute("href") print(url)

これで、やりたいことができるようになりました。

初心者なので、まだまだ、あまりよくないコードの書き方や不明な点も多いのですが、
勉強しながらやってみます。

投稿2020/02/13 06:40

Q_1986-kt

総合スコア13

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

0

やってみたこと

hayataka2049さんに教えてもらった。ThreadPoolExecutor を使ってコードを書き換えてみました。

import time import sched import pandas as pd from datetime import datetime # import concurrent.futures from concurrent import futures # 時間表示フォーマット datetime.now().strftime("%Y/%m/%d %H:%M:%S") # スケジューラー scheduler = sched.scheduler(time.time, time.sleep) Start_t = '2020/02/13 11:01:00' df = pd.DataFrame({'ID': ['A01', 'A02', 'A03', 'A04', 'A05', 'B01', 'B02', 'B03', 'B04', 'B05', 'C01', 'C02', 'C03', 'C04', 'C05'], 'A': ['', '', '', '', '', '', '', '', '', '', '', '', '', '', ''], 'B': ['', '', '', '', '', '', '', '', '', '', '', '', '', '', ''], 'C': ['', '', '', '', '', '', '', '', '', '', '', '', '', '', '']}) df.set_index('ID', inplace=True) def test_1(): id_no = ['A01', 'A02', 'A03', 'A04', 'A05'] for j in id_no: df0 = pd.DataFrame({'ID': ['A01', 'A02', 'A03', 'A04', 'A05'], 'A': [1, 2, 3, 4, 5], 'B': [1, 2, 3, 4, 5], 'C': [1, 2, 3, 4, 5]}) df0.set_index('ID', inplace=True) df.loc[j] = df0.loc[j] print(df) time.sleep(5) # 途中経過を可視化するために少しづつ時間をずらしてdfを表示 def test_2(): id_no = ['B01', 'B02', 'B03', 'B04', 'B05'] for j in id_no: df1 = pd.DataFrame({'ID': ['B01', 'B02', 'B03', 'B04', 'B05'], 'A': [11, 12, 13, 14, 15], 'B': [11, 12, 13, 14, 15], 'C': [11, 12, 13, 14, 15]}) df1.set_index('ID', inplace=True) df.loc[j] = df1.loc[j] print(df) time.sleep(7) # 途中経過を可視化するために少しづつ時間をずらしてdfを表示 def test_3(): id_no = ['C01', 'C02', 'C03', 'C04', 'C05'] for j in id_no: df2 = pd.DataFrame({'ID': ['C01', 'C02', 'C03', 'C04', 'C05'], 'A': [21, 22, 23, 24, 25], 'B': [21, 22, 23, 24, 25], 'C': [21, 22, 23, 24, 25]}) df2.set_index('ID', inplace=True) df.loc[j] = df2.loc[j] print(df) time.sleep(2) # 途中経過を可視化するために少しづつ時間をずらしてdfを表示 def test(): # executor = concurrent.futures.ProcessPoolExecutor(max_workers=3) with futures.ThreadPoolExecutor(max_workers=3) as executor: executor.submit(test_1) executor.submit(test_2) executor.submit(test_3) if __name__ == "__main__": Start_time1 = Start_t run1 = datetime.strptime(Start_time1, '%Y/%m/%d %H:%M:%S') run1 = int(time.mktime(run1.utctimetuple())) scheduler.enterabs(run1, 1, test) scheduler.run() print(df)

このコードでは、やりたいことはできました。

少し残念なこと

実際に使っているコードでスクレイピングしてデータを読み込むのはうまくいきません。
自分のコードにいろいろ問題があるようです。

1つ1つ順を追って確認をしてみます。
進捗は、随時更新したいと思います。

投稿2020/02/13 06:24

Q_1986-kt

総合スコア13

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

0

やってみたこと

hayataka2049さんに教えてもらった。ThreadPoolExecutor を使ってコードを書き換えてみました。

import time import sched import pandas as pd from datetime import datetime # import concurrent.futures from concurrent import futures # 時間表示フォーマット datetime.now().strftime("%Y/%m/%d %H:%M:%S") # スケジューラー scheduler = sched.scheduler(time.time, time.sleep) Start_t = '2020/02/13 11:01:00' df = pd.DataFrame({'ID': ['A01', 'A02', 'A03', 'A04', 'A05', 'B01', 'B02', 'B03', 'B04', 'B05', 'C01', 'C02', 'C03', 'C04', 'C05'], 'A': ['', '', '', '', '', '', '', '', '', '', '', '', '', '', ''], 'B': ['', '', '', '', '', '', '', '', '', '', '', '', '', '', ''], 'C': ['', '', '', '', '', '', '', '', '', '', '', '', '', '', '']}) df.set_index('ID', inplace=True) def test_1(): id_no = ['A01', 'A02', 'A03', 'A04', 'A05'] for j in id_no: df0 = pd.DataFrame({'ID': ['A01', 'A02', 'A03', 'A04', 'A05'], 'A': [1, 2, 3, 4, 5], 'B': [1, 2, 3, 4, 5], 'C': [1, 2, 3, 4, 5]}) df0.set_index('ID', inplace=True) df.loc[j] = df0.loc[j] print(df) time.sleep(5) # 途中経過を可視化するために少しづつ時間をずらしてdfを表示 def test_2(): id_no = ['B01', 'B02', 'B03', 'B04', 'B05'] for j in id_no: df1 = pd.DataFrame({'ID': ['B01', 'B02', 'B03', 'B04', 'B05'], 'A': [11, 12, 13, 14, 15], 'B': [11, 12, 13, 14, 15], 'C': [11, 12, 13, 14, 15]}) df1.set_index('ID', inplace=True) df.loc[j] = df1.loc[j] print(df) time.sleep(7) # 途中経過を可視化するために少しづつ時間をずらしてdfを表示 def test_3(): id_no = ['C01', 'C02', 'C03', 'C04', 'C05'] for j in id_no: df2 = pd.DataFrame({'ID': ['C01', 'C02', 'C03', 'C04', 'C05'], 'A': [21, 22, 23, 24, 25], 'B': [21, 22, 23, 24, 25], 'C': [21, 22, 23, 24, 25]}) df2.set_index('ID', inplace=True) df.loc[j] = df2.loc[j] print(df) time.sleep(2) # 途中経過を可視化するために少しづつ時間をずらしてdfを表示 def test(): # executor = concurrent.futures.ProcessPoolExecutor(max_workers=3) with futures.ThreadPoolExecutor(max_workers=3) as executor: executor.submit(test_1) executor.submit(test_2) executor.submit(test_3) if __name__ == "__main__": Start_time1 = Start_t run1 = datetime.strptime(Start_time1, '%Y/%m/%d %H:%M:%S') run1 = int(time.mktime(run1.utctimetuple())) scheduler.enterabs(run1, 1, test) scheduler.run() print(df)

このコードでは、やりたいことはできました。

少し残念なこと

実際に使っているコードでスクレイピングしてデータを読み込むのはうまくいきません。
自分のコードにいろいろ問題があるようです。

1つ1つ順を追って確認をしてみます。
進捗は、随時更新したいと思います。

投稿2020/02/13 02:32

Q_1986-kt

総合スコア13

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

0

ベストアンサー

ProcessPoolExecutorはプロセス並列で処理を行うため、特別な共有処理を何かしら考えない限りは結果を呼び出し元のプロセスで受け取ることは出来ません。グローバル変数のdfも各プロセスごとにコピーされています。

今回の目的であればThreadPoolExecutorに置き換えれば問題なく実行できるかと思います。

ThreadPoolExecutor | concurrent.futures -- 並列タスク実行 — Python 3.8.2rc1 ドキュメント

投稿2020/02/12 12:05

編集2020/02/13 02:03
hayataka2049

総合スコア30935

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

Q_1986-kt

2020/02/13 01:43

hayataka2049さん、ありがとうございます。 試しに、上記のコードで試してみた結果、うまくいきました。 実際に使用しているコードでも確認してみます。
hayataka2049

2020/02/13 02:03

了解です。 @低評価された方へ 内容に問題があれば仰ってください。
Q_1986-kt

2020/02/13 06:44

実際のコードでも実行できました。 解決方法で書きましたが、最初は、Excelの読み込みに問題があって動きませんでしたが、 各関数ごとに読み込むことで無事に動かすことができました。ありがとうございました。
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.35%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問