質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

ただいまの
回答率

89.10%

Python3でrequestsを非同期処理させてどんどんリクエストを投げたい

解決済

回答 1

投稿 編集

  • 評価
  • クリップ 1
  • VIEW 194

KohnoseLami

score 9

前提・実現したいこと

現在from concurrent.futures import ThreadPoolExecutorを使用し、スレッド処理にて高速requestsさせているのですがサイト自体を読み込むのに時間がかかりスレッドで複数実行して待機するよりも一つで何個もどんどん投げたほうが効率が良くなるかなと思い書き換えようと思っております。過去にも同様の質問がteratailにされており拝見させていただいたところ本質的な回答はされておらずスレッド処理を推奨していたので質問させていただきます。

この過去の質問と同様の状況でしてかなり情報が散らばっていたり昔の書式で合ったり自分なりにしっかりと調べたつもりでわからなかったので質問させていただいております。ここからは自分なりに調べてなんとなく動作が理解できているところを書かせていただきます。

できるだけ、今のコードを編集せずAsyncioを実装してみようと思い調べたところrequestsはそれ自体がnon blockingではないためrun_in_executor()を使用することで並列的に?requestsでも高速で処理ができるという記事を見ました。
さらにget_event_loop() でイベントループを取得し、run_until_complete()で Futureの完了を待ち、 結果を取得してイベントループをclose()などといったひとつひとつの動作はなんとなくりかいできたのですが書式などがいまいちよくわかりません。

もし、requestsでは非同期処理をすることはできない、やそれよりもaiohttpを使ったほうが速いなども教えていただけると嬉しいです。

また、async/awaitなどの使い方もわかりやすいサイトなどがあれば教えていただけると助かります。
いまいちそこの時点でこんがらがってしまい理解できてない気がするので...

最後に、grequestsも試しましたがなぜか簡単な一リクエストも実行されずお手上げでした。

該当のソースコード

import requests
import json
import time
import re
import os
import sys
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor

if os.path.exists("list.txt"):
    pass
else:
    print("\nエラー:list.txtが存在しません")
    time.sleep(3)
    sys.exit()

args = sys.argv
if len(args) == 2:
    input_file = args[1]
else:
    input_file = "list.txt"

keywords_list = [
    "",
    "",
    "",
    "",
]
proxies = {
    "http": "",
    "https": "",
}
ua = ""
headers = {"User-Agent": ua}
header = {"Authorization": "Bearer " + ""}
count = 0

logs = []

while True:

    start = time.time()

    def check_url(target_url, headers=headers, proxies=proxies, retry=3):

        for i in range(retry):

            try:
                start = time.time()

         #ここ↓のリクエストが時間がかかるので非同期処理をさせたい

                req = requests.get(
                    target_url, headers=headers, proxies=proxies, allow_redirects=False
                )
                logs.append(str(req.status_code) + "\t" + target_url)
                target_urlll = re.sub("(.*)(?=/)|/|(?=\?)(.*)", "", target_url)
                print(str(req.status_code) + "\t" + target_urlll)

                if req.status_code == 404:
                    html = BeautifulSoup(req.text, "html.parser")
                    title = html.find("title").text
                    body = html.find("body").text

                    for keyword in keywords_list:
                        if keyword in body:

                        #ここからのリクエストは非同期処理をさせなくてよいので気にしないでください

                            target_url = re.sub(
                                "(.*)(?=/)|/|(?=\?)(.*)", "", target_url
                            )

                            r = requests.get(
                                ""
                                + str(target_url)
                            )
                            jsondata = json.loads(r.text)
                            result = jsondata[""]

                            if result == True:
                                print("" + str(target_url))
                                message = "" + str(target_url)
                                payload = {"message": message}
                                requ = requests.post(
                                    "",
                                    headers=header,
                                    params=payload,
                                )

                            elif "" in r.text:
                                print("" + str(target_url))

                  #ここまで

                                break

                elif req.status_code == 200:
                    html = BeautifulSoup(req.text, "html.parser")
                    # print(str(html))
                    title = html.find("title").text
                    body = html.find("body").text

                    for keyword in keywords_list:
                        if keyword in body:

            #ここからのリクエストは非同期処理をさせなくてよいので気にしないでください

                            target_url = re.sub(
                                "(.*)(?=/)|/|(?=\?)(.*)", "", target_url
                            )

                            r = requests.get(
                                ""
                                + str(target_url)
                            )
                            jsondata = json.loads(r.text)
                            result = jsondata[""]

                            if result == True:
                                print("" + str(target_url))
                                message = "" + str(target_url)
                                payload = {"message": message}
                                requ = requests.post(
                                    "",
                                    headers=header,
                                    params=payload,
                                )

                            elif "" in r.text:
                                print("" + str(target_url))

                 #ここまで

                                break

                return
            except requests.exceptions.ConnectTimeout:
                logs.append("TIMEOUT" + "\t" + target_url)
                time.sleep(10)

            except requests.exceptions.ConnectionError:
                logs.append("ERROR" + "\t" + target_url)
                time.sleep(10)

            except requests.exceptions.ChunkedEncodingError: 
                logs.append("ERROR" + "\t" + target_url)
                time.sleep(10)

    with open("list.txt") as f:
        urls = f.read().splitlines()

    threads = []

    with ThreadPoolExecutor(max_workers=()) as pool:
        threads = [res for res in pool.map(check_url, urls)]

    with open("log.txt", "w") as f:
        f.write("\n".join(logs))

    elapsed_time = time.time() - start

    count = count + 1

    print("\nelapsed_time:{0}".format(elapsed_time) + "[sec]\n")
    print(str(count) + "回目")

補足情報(FW/ツールのバージョンなど)

ここにより詳細な情報を記載してください。

  • 気になる質問をクリップする

    クリップした質問は、後からいつでもマイページで確認できます。

    またクリップした質問に回答があった際、通知やメールを受け取ることができます。

    クリップを取り消します

  • 良い質問の評価を上げる

    以下のような質問は評価を上げましょう

    • 質問内容が明確
    • 自分も答えを知りたい
    • 質問者以外のユーザにも役立つ

    評価が高い質問は、TOPページの「注目」タブのフィードに表示されやすくなります。

    質問の評価を上げたことを取り消します

  • 評価を下げられる数の上限に達しました

    評価を下げることができません

    • 1日5回まで評価を下げられます
    • 1日に1ユーザに対して2回まで評価を下げられます

    質問の評価を下げる

    teratailでは下記のような質問を「具体的に困っていることがない質問」、「サイトポリシーに違反する質問」と定義し、推奨していません。

    • プログラミングに関係のない質問
    • やってほしいことだけを記載した丸投げの質問
    • 問題・課題が含まれていない質問
    • 意図的に内容が抹消された質問
    • 過去に投稿した質問と同じ内容の質問
    • 広告と受け取られるような投稿

    評価が下がると、TOPページの「アクティブ」「注目」タブのフィードに表示されにくくなります。

    質問の評価を下げたことを取り消します

    この機能は開放されていません

    評価を下げる条件を満たしてません

    評価を下げる理由を選択してください

    詳細な説明はこちら

    上記に当てはまらず、質問内容が明確になっていない質問には「情報の追加・修正依頼」機能からコメントをしてください。

    質問の評価を下げる機能の利用条件

    この機能を利用するためには、以下の事項を行う必要があります。

質問への追記・修正、ベストアンサー選択の依頼

  • Penpen7

    2020/07/01 13:22

    どんどんリクエストを飛ばして相手のサーバーへの負荷は大丈夫なのですか?

    キャンセル

回答 1

checkベストアンサー

+5

ほぼ完成しているように見えます

修正が必要な箇所

1
ThreadPoolExecutor() の引数 max_workers に int 型の値を設定します:

    with ThreadPoolExecutor(max_workers=()) as pool:

    # ↓ たとえば 1 度に 2 つのリクエストを同時に行う場合
    with ThreadPoolExecutor(max_workers=2) as pool:

2
少なくとも、同じリソースへの繰り返しアクセスは 1 秒以上間隔を空けた方が良いでしょう:

    print(str(count) + "回目")

    print(str(count) + "回目")
    time.sleep(1)

スレッドから非同期への変更

次の箇所を変更します:

    with ThreadPoolExecutor(max_workers=2) as pool:
        threads = [res for res in pool.map(check_url, urls)]

    async def asynchronous_process(urls):
        loop = asyncio.get_event_loop()
        futures = [loop.run_in_executor(None, check_url, url) for url in urls]
        await asyncio.gather(*futures)
    asyncio.run(asynchronous_process(urls))

参考: Answer: How could I use requests in asyncio? 

検証

次のように行いました:

import asyncio
import requests
import json
import time
import re
import os
import sys
from bs4 import BeautifulSoup
# from concurrent.futures import ThreadPoolExecutor

if os.path.exists("list.txt"):
    pass
else:
    print("\nエラー:list.txtが存在しません")
    time.sleep(3)
    sys.exit()

args = sys.argv
if len(args) == 2:
    input_file = args[1]
else:
    input_file = "list.txt"

keywords_list = [
    "",
    "",
    "",
    "",
]
proxies = {
    "http": "",
    "https": "",
}
ua = ""
headers = {"User-Agent": ua}
header = {"Authorization": "Bearer " + ""}
count = 0

logs = []

while True:

    start = time.time()

    def check_url(target_url, headers=headers, proxies=proxies, retry=3):
        for i in range(retry):

            try:
                start = time.time()

                #ここ↓のリクエストが時間がかかるので非同期処理をさせたい
                req = requests.get(
                    target_url, headers=headers, proxies=proxies, allow_redirects=False
                )
                logs.append(str(req.status_code) + "\t" + target_url)
                target_urlll = re.sub("(.*)(?=/)|/|(?=\?)(.*)", "", target_url)
                print(target_url)
                print(str(req.status_code) + "\t" + target_urlll)

                # if req.status_code == 404:
                #     html = BeautifulSoup(req.text, "html.parser")
                #     title = html.find("title").text
                #     body = html.find("body").text

                #     for keyword in keywords_list:
                #         if keyword in body:

                #         #ここからのリクエストは非同期処理をさせなくてよいので気にしないでください

                #             target_url = re.sub(
                #                 "(.*)(?=/)|/|(?=\?)(.*)", "", target_url
                #             )

                #             r = requests.get(
                #                 ""
                #                 + str(target_url)
                #             )
                #             jsondata = json.loads(r.text)
                #             result = jsondata[""]

                #             if result == True:
                #                 print("" + str(target_url))
                #                 message = "" + str(target_url)
                #                 payload = {"message": message}
                #                 requ = requests.post(
                #                     "",
                #                     headers=header,
                #                     params=payload,
                #                 )

                #             elif "" in r.text:
                #                 print("" + str(target_url))

                #                 #ここまで

                #                 break

                # elif req.status_code == 200:
                #     html = BeautifulSoup(req.text, "html.parser")
                #     # print(str(html))
                #     title = html.find("title").text
                #     body = html.find("body").text

                #     for keyword in keywords_list:
                #         if keyword in body:

                #         #ここからのリクエストは非同期処理をさせなくてよいので気にしないでください

                #             target_url = re.sub(
                #                 "(.*)(?=/)|/|(?=\?)(.*)", "", target_url
                #             )

                #             r = requests.get(
                #                 ""
                #                 + str(target_url)
                #             )
                #             jsondata = json.loads(r.text)
                #             result = jsondata[""]

                #             if result == True:
                #                 print("" + str(target_url))
                #                 message = "" + str(target_url)
                #                 payload = {"message": message}
                #                 requ = requests.post(
                #                     "",
                #                     headers=header,
                #                     params=payload,
                #                 )

                #             elif "" in r.text:
                #                 print("" + str(target_url))

                #                 #ここまで

                #                 break

                return
            except requests.exceptions.ConnectTimeout:
                logs.append("TIMEOUT" + "\t" + target_url)
                time.sleep(10)

            except requests.exceptions.ConnectionError:
                logs.append("ERROR" + "\t" + target_url)
                time.sleep(10)

            except requests.exceptions.ChunkedEncodingError: 
                logs.append("ERROR" + "\t" + target_url)
                time.sleep(10)

    with open("list.txt") as f:
        urls = f.read().splitlines()

    # threads = []

    # with ThreadPoolExecutor(max_workers=()) as pool:
    #     threads = [res for res in pool.map(check_url, urls)]
    async def asynchronous_process(urls):
        loop = asyncio.get_event_loop()
        futures = [loop.run_in_executor(None, check_url, url) for url in urls]
        await asyncio.gather(*futures)
    asyncio.run(asynchronous_process(urls))

    with open("log.txt", "w") as f:
        f.write("\n".join(logs))

    elapsed_time = time.time() - start

    count = count + 1

    print("\nelapsed_time:{0}".format(elapsed_time) + "[sec]\n")
    print(str(count) + "回目")
    time.sleep(1)

list.txt:

https://www.google.com/
https://www.yahoo.co.jp/
https://www.bing.com/
https://www.amazon.co.jp/
https://www.facebook.com/
https://www.instagram.com/
https://twitter.com/

実行結果:

$ pipenv run python test.py
https://www.yahoo.co.jp/
200     
https://www.facebook.com/
302     
https://www.google.com/
200     
https://www.bing.com/
200     
https://www.instagram.com/
200     
https://twitter.com/
200     
https://www.amazon.co.jp/
200     

elapsed_time:0.526411771774292[sec]

1回目

レスポンスの早さの違いによって
実行結果の出力順が list.txt の順序と入れ替わっているのがわかります

aiohttp を使った方法

以前にほぼ同様のことを実現するためのパッケージを作成しました

yukihiko-shinoda/parallel-html-scraper
parallelhtmlscraper · PyPI

(README.md を記述していなくて恐縮です)

パッケージとして使えば再発明の必要がなくなりますし、
パッケージのコードの中身を読んでいただくと、
どのように実装すべきかが理解いただけると思います

ライブラリーの利用例:

from bs4 import BeautifulSoup

from parallelhtmlscraper.html_analyzer import HtmlAnalyzer
from parallelhtmlscraper.parallel_html_scraper import ParallelHtmlScraper

class AnalyzerForTest(HtmlAnalyzer):
    async def execute(self, soup: BeautifulSoup) -> str:
        return soup.find('title').text

host_google = 'https://www.google.co.jp'
path_and_content = [
    '/webhp?tab=rw',                                              # Google 検索
    '/imghp?hl=ja&tab=wi&ogbl',                                   # Google 画像検索
    '/shopping?hl=ja&source=og&tab=wf',                           # Google ショッピング
    '/save',                                                      # コレクション
    'https://www.google.co.jp/maps',                              # Google マップ
    'https://www.google.co.jp/drive/apps.html',                   # Google ドライブ
    'https://www.google.co.jp/mail/help/intl/ja/about.html?vm=r', # GMail
]

list_response = ParallelHtmlScraper.execute(f'{host_google}', path_and_content.keys(), AnalyzerForTest())
print(list_response)

実行結果:

$ python test.py
['Google', 'Google 画像検索', 'Google ショッピング', 'コレクション', 'Google マップ', '\n        Google ドライブ\n    ', '\n        Gmail - Google のメール\n    ']

このパッケージを実装していたときは、
とにかく公式ドキュメントばかりを読んでいた記憶があります

コルーチンと Task — Python 3.8.4rc1 ドキュメント

投稿

編集

  • 回答の評価を上げる

    以下のような回答は評価を上げましょう

    • 正しい回答
    • わかりやすい回答
    • ためになる回答

    評価が高い回答ほどページの上位に表示されます。

  • 回答の評価を下げる

    下記のような回答は推奨されていません。

    • 間違っている回答
    • 質問の回答になっていない投稿
    • スパムや攻撃的な表現を用いた投稿

    評価を下げる際はその理由を明確に伝え、適切な回答に修正してもらいましょう。

  • 2020/07/02 02:02

    すみません!
    私が記載しているコードは既に常用していて完成しているのですがそれでは速度やスレッド依存な処理を辞めたいなと思いCPU使用率100%まで使用してくれる非同期処理でどんどんリクエストをしたいといった感じです。

    今のこのコードをできるだけ変更せずにconcurrent.futuresの並列アクセスからこれをはぶきAsyncioなどを使ったコードに変更したいです。
    やはりrequestsのままAsyncioを使用するのは厳しいですかね?...
    厳しいようであればaiohttpを考えてみようと思います。

    キャンセル

  • 2020/07/02 06:53

    失礼しました、質問を読み違えていました・・

    回答を修正しました、
    「スレッドから非同期への変更」という項目を追加し、
    requests のまま asyncio を使って処理するように変更しました
    検証結果についても更新してあります

    キャンセル

  • 2020/07/02 12:45

    ありがとうございます!
    無事こちらでも動作確認いたしましてとてもまとまっており、わからなかった詳しい仕組みなど書式についても理解することができました!
    とても丁寧に回答していただいたのでベストアンサーさせていただきました!

    キャンセル

15分調べてもわからないことは、teratailで質問しよう!

  • ただいまの回答率 89.10%
  • 質問をまとめることで、思考を整理して素早く解決
  • テンプレート機能で、簡単に質問をまとめられる