質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.50%
Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

Q&A

解決済

2回答

786閲覧

並行処理:Aの処理が早く終わればBの処理を止める.Bの処理が早く終わればAの処理を止める

kt19906

総合スコア47

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

0グッド

0クリップ

投稿2022/07/25 10:41

write_txt() :test.txtに不定期的に1,2,3を書き込む関数
check_file.over5s() : 5秒間test.txtが更新されなければ"5秒"と返すメソッド

write_txt()とcheck_file.over5s()を同時に動かすことでtest.txtが5秒間更新されなければ処理を停止するプログラムを書きたいと考えています.

下のコードは5秒間test.txtが更新されなければ"5秒"と出力され,逆に5秒以内にtest.txtが更新され続ければ"書き込み完了"と期待どおり出力されます.

しかし,"5秒"と出力された後もwrite_txt()は動き続けるため,ファイルが更新されてしまいます.
2つの処理を同時に行い,一方の処理が早く終了すれば,もう一方の処理を強制終了させるためにはどのようにすれば良いでしょうか?

python

1from concurrent.futures import ThreadPoolExecutor 2import concurrent 3import random,time,pathlib 4 5 6def write_txt(text): 7 for i in range(10): 8 with open("test.txt","a") as f: 9 f.write(f"{i}\n") 10 sleep_time = random.random()*10 11 time.sleep(sleep_time) 12 return text 13 14class check_file(): 15 def __init__(self): 16 self.file = pathlib.Path('test.txt') 17 self.update_time = self.file.stat().st_mtime 18 19 def over5s(self,text): 20 while True: 21 time.sleep(5) 22 if self.update_time != self.file.stat().st_mtime: 23 self.update_time = self.file.stat().st_mtime 24 else: 25 break 26 return text 27 28with open("test.txt","w") as f: 29 pass 30with ThreadPoolExecutor(max_workers=2) as executor: 31 file_checker = check_file() 32 results = [] 33 results.append(executor.submit(file_checker.over5s, "5秒")) 34 results.append(executor.submit(write_txt, "書き込み完了")) 35 concurrent.futures.wait(results, return_when=concurrent.futures.FIRST_COMPLETED) 36 for f in concurrent.futures.as_completed(results): 37 f_success = f.result() 38 print(f_success) 39

concurrent.futures.wait(results, return_when=concurrent.futures.FIRST_COMPLETED)でうまくいくかなと思ったのですが,うまくいきませんでした.
もう一方の処理を停止するコードも書かなきゃだめそうな事はわかるのですが,ドキュメントを調べてもよくやり方が分かりませんでした.

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guest

回答2

0

ベストアンサー

すでに回答が付いていますが、以前回答したやつも少しは役に立つと思うので、貼っておきます。

https://teratail.com/questions/ei3ctb6rr2thsy

投稿2022/07/25 13:12

TakaiY

総合スコア12657

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

kt19906

2022/07/25 13:55

回答ありがとうございます. 私の説明不足で申し訳ないのですが,実際にはwrite_txtのような処理をライブラリを使って1行のコードで行っており.添付して下さったURLのようにWhile文で逐一変数を確認することできません. そのため強制的に終了させる必要があると思うのですが,そのような方法はありますか? ライブラリを使ってあるデータの解析を行なっており,解析の進捗状況はtxtファイルに書き出されます.データは複数あるのでfor文を使って解析を行なっていますが,データによっては解析に時間がかかり,数時間経ってもファイルが更新されないことがあります.そのようなデータは無視し,次のループに移りたいと考えています.
TakaiY

2022/07/25 14:04

ライブラリの関数をそのままスレッド/プロセスで動かすのでなく、そのライブラリを使って計算する関数なりを作って動かせばいいでしょう。 そうすれば、その関数の中で状態は確認できます。 「数時間経ってもファイルが更新されないことがあります」というような場合に、強制的にプロセスを止めたりすれば、メモリが解放されなかったり、リソースを掴んだままだったりと、不都合が置きる可能性もありますが、そのあたりは大丈夫ですか?
kt19906

2022/07/25 14:17

メモリなどに関する知識があまりないのですが,大量にデータを流す予定なので,その間メモリが解放されなかったらよくないと思うので関数を作って動かしてみたいと思います. その場合は並行処理せずともファイル更新するまでの処理を関数化しその関数にタイムアウトを設ければよさそうですね
TakaiY

2022/07/25 14:20

と、書きましたが、その呼び出したライブラリ内で時間がかかっている場合は手が出ませんね。 基本的にthreadを強制的に終了させる方法は無いので、どうしてもというなら、process にして killするのがよさそうですね。 (やったことはありません)
kt19906

2022/07/25 15:48

ライブラリソースコードを見たところ,while文で逐次textファイルを更新していたので,while文の中身を関数化してtimeout_decoratorというライブラリでその関数のタイムアウトの時間を設定し解決しました. ありがとうございました.
guest

0

threading.Event を使ってみてはいかがでしょうか。
(処理の流れがわかりやすいように、print を追加しています)

python

1from concurrent.futures import ThreadPoolExecutor 2import concurrent 3import random,time,pathlib 4from threading import Event 5 6def write_txt(text,event): 7 for i in range(10): 8 with open("test.txt","a") as f: 9 f.write(f"{i}\n") 10 print(f"write_txt: ファイルに{i}を書き込みました。") 11 sleep_time = random.random()*10 12 print(f"write_txt: {sleep_time}秒待ちます。") 13 if event.wait(sleep_time): 14 print(f"write_txt: 処理終了の通知を受けました。終了します。") 15 break 16 return text 17 18class check_file(): 19 def __init__(self, event): 20 self.file = pathlib.Path('test.txt') 21 self.update_time = self.file.stat().st_mtime 22 self.event = event 23 24 def over5s(self,text): 25 while True: 26 print("check_file: 5秒待ちます。") 27 time.sleep(5) 28 print("check_file: 5秒経過。ファイルの更新時間を判定します。") 29 if self.update_time != self.file.stat().st_mtime: 30 print("check_file: 5秒以内にファイルが更新されていました。update_time を更新して、処理を継続します。") 31 self.update_time = self.file.stat().st_mtime 32 else: 33 print("check_file: 5秒以内にファイルが更新されていませんでした。write_fileに処理終了を通知します。") 34 self.event.set() 35 break 36 return text 37 38with open("test.txt","w") as f: 39 pass 40with ThreadPoolExecutor(max_workers=2) as executor: 41 event = Event() 42 file_checker = check_file(event) 43 results = [] 44 results.append(executor.submit(file_checker.over5s, "5秒")) 45 results.append(executor.submit(write_txt, "書き込み完了", event)) 46 47 for f in concurrent.futures.as_completed(results): 48 f_success = f.result() 49 print(f_success) 50
補足

event.wait(timeout)

timeout が与えられている場合、
timeout 秒経過するか、他のスレッドに set されるまで、スレッドをブロックします。
timeout 秒経過前に 他のスレッドに set された場合、True を返します。
setされる前にtimeout秒経過した場合、ブロックされていたスレッドを再開し、Falseを返します。

投稿2022/07/25 13:02

編集2022/07/25 13:08
退会済みユーザー

退会済みユーザー

総合スコア0

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

kt19906

2022/07/25 14:00 編集

回答ありがとうございます. 私の説明不足で申し訳ないのですが,TakaiYさんの方にも返信したように,write_txtの部分を実際にはライブラリを使って1行のコードで行っており.if event.wait(sleep_time)のように途中でeventを確認する作業を追加することができません. このような場合でも停止する方法はありますか? せっかく丁寧にコードを書いてくださったのに申し訳ありません!
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.50%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問