質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.48%
スクレイピング

スクレイピングとは、公開されているWebサイトからページ内の情報を抽出する技術です。

Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

非同期処理

非同期処理とは一部のコードを別々のスレッドで実行させる手法です。アプリケーションのパフォーマンスを向上させる目的でこの手法を用います。

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

Q&A

解決済

1回答

9657閲覧

Python3でrequestsを非同期処理させてどんどんリクエストを投げたい

KohnoseLami

総合スコア17

スクレイピング

スクレイピングとは、公開されているWebサイトからページ内の情報を抽出する技術です。

Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

非同期処理

非同期処理とは一部のコードを別々のスレッドで実行させる手法です。アプリケーションのパフォーマンスを向上させる目的でこの手法を用います。

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

0グッド

1クリップ

投稿2020/07/01 00:18

編集2020/07/01 00:26

前提・実現したいこと

現在from concurrent.futures import ThreadPoolExecutorを使用し、スレッド処理にて高速requestsさせているのですがサイト自体を読み込むのに時間がかかりスレッドで複数実行して待機するよりも一つで何個もどんどん投げたほうが効率が良くなるかなと思い書き換えようと思っております。過去にも同様の質問がteratailにされており拝見させていただいたところ本質的な回答はされておらずスレッド処理を推奨していたので質問させていただきます。

この過去の質問と同様の状況でしてかなり情報が散らばっていたり昔の書式で合ったり自分なりにしっかりと調べたつもりでわからなかったので質問させていただいております。ここからは自分なりに調べてなんとなく動作が理解できているところを書かせていただきます。

できるだけ、今のコードを編集せずAsyncioを実装してみようと思い調べたところrequestsはそれ自体がnon blockingではないためrun_in_executor()を使用することで並列的に?requestsでも高速で処理ができるという記事を見ました。
さらにget_event_loop() でイベントループを取得し、run_until_complete()で Futureの完了を待ち、 結果を取得してイベントループをclose()などといったひとつひとつの動作はなんとなくりかいできたのですが書式などがいまいちよくわかりません。

もし、requestsでは非同期処理をすることはできない、やそれよりもaiohttpを使ったほうが速いなども教えていただけると嬉しいです。

また、async/awaitなどの使い方もわかりやすいサイトなどがあれば教えていただけると助かります。
いまいちそこの時点でこんがらがってしまい理解できてない気がするので...

最後に、grequestsも試しましたがなぜか簡単な一リクエストも実行されずお手上げでした。

該当のソースコード

Python3

1import requests 2import json 3import time 4import re 5import os 6import sys 7from bs4 import BeautifulSoup 8from concurrent.futures import ThreadPoolExecutor 9 10if os.path.exists("list.txt"): 11 pass 12else: 13 print("\nエラー:list.txtが存在しません") 14 time.sleep(3) 15 sys.exit() 16 17args = sys.argv 18if len(args) == 2: 19 input_file = args[1] 20else: 21 input_file = "list.txt" 22 23keywords_list = [ 24 "", 25 "", 26 "", 27 "", 28] 29proxies = { 30 "http": "", 31 "https": "", 32} 33ua = "" 34headers = {"User-Agent": ua} 35header = {"Authorization": "Bearer " + ""} 36count = 0 37 38logs = [] 39 40while True: 41 42 start = time.time() 43 44 def check_url(target_url, headers=headers, proxies=proxies, retry=3): 45 46 for i in range(retry): 47 48 try: 49 start = time.time() 50 51         #ここ↓のリクエストが時間がかかるので非同期処理をさせたい 52 53 req = requests.get( 54 target_url, headers=headers, proxies=proxies, allow_redirects=False 55 ) 56 logs.append(str(req.status_code) + "\t" + target_url) 57 target_urlll = re.sub("(.*)(?=/)|/|(?=?)(.*)", "", target_url) 58 print(str(req.status_code) + "\t" + target_urlll) 59 60 if req.status_code == 404: 61 html = BeautifulSoup(req.text, "html.parser") 62 title = html.find("title").text 63 body = html.find("body").text 64 65 for keyword in keywords_list: 66 if keyword in body: 67 68 #ここからのリクエストは非同期処理をさせなくてよいので気にしないでください 69 70 target_url = re.sub( 71 "(.*)(?=/)|/|(?=?)(.*)", "", target_url 72 ) 73 74 r = requests.get( 75 "" 76 + str(target_url) 77 ) 78 jsondata = json.loads(r.text) 79 result = jsondata[""] 80 81 if result == True: 82 print("" + str(target_url)) 83 message = "" + str(target_url) 84 payload = {"message": message} 85 requ = requests.post( 86 "", 87 headers=header, 88 params=payload, 89 ) 90 91 elif "" in r.text: 92 print("" + str(target_url)) 93 94                  #ここまで 95 96 break 97 98 elif req.status_code == 200: 99 html = BeautifulSoup(req.text, "html.parser") 100 # print(str(html)) 101 title = html.find("title").text 102 body = html.find("body").text 103 104 for keyword in keywords_list: 105 if keyword in body: 106 107            #ここからのリクエストは非同期処理をさせなくてよいので気にしないでください 108 109 target_url = re.sub( 110 "(.*)(?=/)|/|(?=?)(.*)", "", target_url 111 ) 112 113 r = requests.get( 114 "" 115 + str(target_url) 116 ) 117 jsondata = json.loads(r.text) 118 result = jsondata[""] 119 120 if result == True: 121 print("" + str(target_url)) 122 message = "" + str(target_url) 123 payload = {"message": message} 124 requ = requests.post( 125 "", 126 headers=header, 127 params=payload, 128 ) 129 130 elif "" in r.text: 131 print("" + str(target_url)) 132 133                 #ここまで 134 135 break 136 137 return 138 except requests.exceptions.ConnectTimeout: 139 logs.append("TIMEOUT" + "\t" + target_url) 140 time.sleep(10) 141 142 except requests.exceptions.ConnectionError: 143 logs.append("ERROR" + "\t" + target_url) 144 time.sleep(10) 145 146 except requests.exceptions.ChunkedEncodingError: 147 logs.append("ERROR" + "\t" + target_url) 148 time.sleep(10) 149 150 with open("list.txt") as f: 151 urls = f.read().splitlines() 152 153 threads = [] 154 155 with ThreadPoolExecutor(max_workers=()) as pool: 156 threads = [res for res in pool.map(check_url, urls)] 157 158 with open("log.txt", "w") as f: 159 f.write("\n".join(logs)) 160 161 elapsed_time = time.time() - start 162 163 count = count + 1 164 165 print("\nelapsed_time:{0}".format(elapsed_time) + "[sec]\n") 166 print(str(count) + "回目") 167

補足情報(FW/ツールのバージョンなど)

ここにより詳細な情報を記載してください。

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

Penpen7

2020/07/01 04:22

どんどんリクエストを飛ばして相手のサーバーへの負荷は大丈夫なのですか?
guest

回答1

0

ベストアンサー

ほぼ完成しているように見えます

修正が必要な箇所

1
ThreadPoolExecutor() の引数 max_workers に int 型の値を設定します:

python

1 with ThreadPoolExecutor(max_workers=()) as pool:

python

1 # ↓ たとえば 1 度に 2 つのリクエストを同時に行う場合 2 with ThreadPoolExecutor(max_workers=2) as pool:

2
少なくとも、同じリソースへの繰り返しアクセスは 1 秒以上間隔を空けた方が良いでしょう:

python

1 print(str(count) + "回目")

python

1 print(str(count) + "回目") 2 time.sleep(1)

スレッドから非同期への変更

次の箇所を変更します:

python

1 with ThreadPoolExecutor(max_workers=2) as pool: 2 threads = [res for res in pool.map(check_url, urls)]

python

1 async def asynchronous_process(urls): 2 loop = asyncio.get_event_loop() 3 futures = [loop.run_in_executor(None, check_url, url) for url in urls] 4 await asyncio.gather(*futures) 5 asyncio.run(asynchronous_process(urls))

参考: Answer: How could I use requests in asyncio?

検証

次のように行いました:

python

1import asyncio 2import requests 3import json 4import time 5import re 6import os 7import sys 8from bs4 import BeautifulSoup 9# from concurrent.futures import ThreadPoolExecutor 10 11if os.path.exists("list.txt"): 12 pass 13else: 14 print("\nエラー:list.txtが存在しません") 15 time.sleep(3) 16 sys.exit() 17 18args = sys.argv 19if len(args) == 2: 20 input_file = args[1] 21else: 22 input_file = "list.txt" 23 24keywords_list = [ 25 "", 26 "", 27 "", 28 "", 29] 30proxies = { 31 "http": "", 32 "https": "", 33} 34ua = "" 35headers = {"User-Agent": ua} 36header = {"Authorization": "Bearer " + ""} 37count = 0 38 39logs = [] 40 41while True: 42 43 start = time.time() 44 45 def check_url(target_url, headers=headers, proxies=proxies, retry=3): 46 for i in range(retry): 47 48 try: 49 start = time.time() 50 51 #ここ↓のリクエストが時間がかかるので非同期処理をさせたい 52 req = requests.get( 53 target_url, headers=headers, proxies=proxies, allow_redirects=False 54 ) 55 logs.append(str(req.status_code) + "\t" + target_url) 56 target_urlll = re.sub("(.*)(?=/)|/|(?=?)(.*)", "", target_url) 57 print(target_url) 58 print(str(req.status_code) + "\t" + target_urlll) 59 60 # if req.status_code == 404: 61 # html = BeautifulSoup(req.text, "html.parser") 62 # title = html.find("title").text 63 # body = html.find("body").text 64 65 # for keyword in keywords_list: 66 # if keyword in body: 67 68 # #ここからのリクエストは非同期処理をさせなくてよいので気にしないでください 69 70 # target_url = re.sub( 71 # "(.*)(?=/)|/|(?=?)(.*)", "", target_url 72 # ) 73 74 # r = requests.get( 75 # "" 76 # + str(target_url) 77 # ) 78 # jsondata = json.loads(r.text) 79 # result = jsondata[""] 80 81 # if result == True: 82 # print("" + str(target_url)) 83 # message = "" + str(target_url) 84 # payload = {"message": message} 85 # requ = requests.post( 86 # "", 87 # headers=header, 88 # params=payload, 89 # ) 90 91 # elif "" in r.text: 92 # print("" + str(target_url)) 93 94 # #ここまで 95 96 # break 97 98 # elif req.status_code == 200: 99 # html = BeautifulSoup(req.text, "html.parser") 100 # # print(str(html)) 101 # title = html.find("title").text 102 # body = html.find("body").text 103 104 # for keyword in keywords_list: 105 # if keyword in body: 106 107 # #ここからのリクエストは非同期処理をさせなくてよいので気にしないでください 108 109 # target_url = re.sub( 110 # "(.*)(?=/)|/|(?=?)(.*)", "", target_url 111 # ) 112 113 # r = requests.get( 114 # "" 115 # + str(target_url) 116 # ) 117 # jsondata = json.loads(r.text) 118 # result = jsondata[""] 119 120 # if result == True: 121 # print("" + str(target_url)) 122 # message = "" + str(target_url) 123 # payload = {"message": message} 124 # requ = requests.post( 125 # "", 126 # headers=header, 127 # params=payload, 128 # ) 129 130 # elif "" in r.text: 131 # print("" + str(target_url)) 132 133 # #ここまで 134 135 # break 136 137 return 138 except requests.exceptions.ConnectTimeout: 139 logs.append("TIMEOUT" + "\t" + target_url) 140 time.sleep(10) 141 142 except requests.exceptions.ConnectionError: 143 logs.append("ERROR" + "\t" + target_url) 144 time.sleep(10) 145 146 except requests.exceptions.ChunkedEncodingError: 147 logs.append("ERROR" + "\t" + target_url) 148 time.sleep(10) 149 150 with open("list.txt") as f: 151 urls = f.read().splitlines() 152 153 # threads = [] 154 155 # with ThreadPoolExecutor(max_workers=()) as pool: 156 # threads = [res for res in pool.map(check_url, urls)] 157 async def asynchronous_process(urls): 158 loop = asyncio.get_event_loop() 159 futures = [loop.run_in_executor(None, check_url, url) for url in urls] 160 await asyncio.gather(*futures) 161 asyncio.run(asynchronous_process(urls)) 162 163 with open("log.txt", "w") as f: 164 f.write("\n".join(logs)) 165 166 elapsed_time = time.time() - start 167 168 count = count + 1 169 170 print("\nelapsed_time:{0}".format(elapsed_time) + "[sec]\n") 171 print(str(count) + "回目") 172 time.sleep(1)

list.txt:

text

1https://www.google.com/ 2https://www.yahoo.co.jp/ 3https://www.bing.com/ 4https://www.amazon.co.jp/ 5https://www.facebook.com/ 6https://www.instagram.com/ 7https://twitter.com/

実行結果:

console

1$ pipenv run python test.py 2https://www.yahoo.co.jp/ 3200 4https://www.facebook.com/ 5302 6https://www.google.com/ 7200 8https://www.bing.com/ 9200 10https://www.instagram.com/ 11200 12https://twitter.com/ 13200 14https://www.amazon.co.jp/ 15200 16 17elapsed_time:0.526411771774292[sec] 18 191回目

レスポンスの早さの違いによって
実行結果の出力順が list.txt の順序と入れ替わっているのがわかります

aiohttp を使った方法

以前にほぼ同様のことを実現するためのパッケージを作成しました

yukihiko-shinoda/parallel-html-scraper
parallelhtmlscraper · PyPI

(README.md を記述していなくて恐縮です)

パッケージとして使えば再発明の必要がなくなりますし、
パッケージのコードの中身を読んでいただくと、
どのように実装すべきかが理解いただけると思います

ライブラリーの利用例:

python

1from bs4 import BeautifulSoup 2 3from parallelhtmlscraper.html_analyzer import HtmlAnalyzer 4from parallelhtmlscraper.parallel_html_scraper import ParallelHtmlScraper 5 6class AnalyzerForTest(HtmlAnalyzer): 7 async def execute(self, soup: BeautifulSoup) -> str: 8 return soup.find('title').text 9 10host_google = 'https://www.google.co.jp' 11path_and_content = [ 12 '/webhp?tab=rw', # Google 検索 13 '/imghp?hl=ja&tab=wi&ogbl', # Google 画像検索 14 '/shopping?hl=ja&source=og&tab=wf', # Google ショッピング 15 '/save', # コレクション 16 'https://www.google.co.jp/maps', # Google マップ 17 'https://www.google.co.jp/drive/apps.html', # Google ドライブ 18 'https://www.google.co.jp/mail/help/intl/ja/about.html?vm=r', # GMail 19] 20 21list_response = ParallelHtmlScraper.execute(host_google, path_and_content, AnalyzerForTest()) 22print(list_response)

実行結果:

python

1$ pipenv run python test.py 2['Google 画像検索', ' Google マップ ', '\n Gmail - Google のメール\n ', 'Google ショッピング', 'Google', '\n Google ドライブ\n ', 'コレクション']

このパッケージを実装していたときは、
とにかく公式ドキュメントばかりを読んでいた記憶があります

コルーチンと Task — Python 3.8.4rc1 ドキュメント

投稿2020/07/01 04:45

編集2020/08/10 06:03
y_shinoda

総合スコア3272

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

KohnoseLami

2020/07/01 17:02

すみません! 私が記載しているコードは既に常用していて完成しているのですがそれでは速度やスレッド依存な処理を辞めたいなと思いCPU使用率100%まで使用してくれる非同期処理でどんどんリクエストをしたいといった感じです。 今のこのコードをできるだけ変更せずにconcurrent.futuresの並列アクセスからこれをはぶきAsyncioなどを使ったコードに変更したいです。 やはりrequestsのままAsyncioを使用するのは厳しいですかね?... 厳しいようであればaiohttpを考えてみようと思います。
y_shinoda

2020/07/01 21:53

失礼しました、質問を読み違えていました・・ 回答を修正しました、 「スレッドから非同期への変更」という項目を追加し、 requests のまま asyncio を使って処理するように変更しました 検証結果についても更新してあります
KohnoseLami

2020/07/02 03:45

ありがとうございます! 無事こちらでも動作確認いたしましてとてもまとまっており、わからなかった詳しい仕組みなど書式についても理解することができました! とても丁寧に回答していただいたのでベストアンサーさせていただきました!
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.48%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問