質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

新規登録して質問してみよう
ただいま回答率
85.48%
Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

Q&A

解決済

2回答

10230閲覧

thread queueの最大数を設定して、その範囲で回す

horik

総合スコア44

Python 3.x

Python 3はPythonプログラミング言語の最新バージョンであり、2008年12月3日にリリースされました。

0グッド

1クリップ

投稿2016/09/29 04:06

queueの最大数を2個として満杯の場合は待機、
空きができ次第queueを追加するような処理がしたいです。

python

1from threading import Thread, active_count 2from queue import Queue 3import time 4 5queue = Queue() 6 7 8def main(q): 9 i = q.get() 10 print('スレッド ' + str(i) + ' 開始') 11 time.sleep(i * 2) 12 print('スレッド ' + str(i) + ' 終了') 13 queue.task_done() 14 15if __name__ == '__main__': 16 for i in 10, 5, 2: 17 th = Thread(target=main, args=(queue,)) 18 th.start() 19 queue.put(i) 20 if active_count() > 2: 21 queue.join()

上記のコードを実行したら結果は以下になります。
スレッド 10 開始
スレッド 5 開始
スレッド 5 終了
スレッド 10 終了
スレッド 2 開始
スレッド 2 終了

queue.join()を使うとqueueが全て無くなるまで待機となるので目的と合致しません。
目的は以下のようにしたいです。
スレッド 10 開始
スレッド 5 開始
スレッド 5 終了
スレッド 2 開始
スレッド 2 終了
スレッド 10 終了

よろしくお願いします。

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

guest

回答2

0

ベストアンサー

queue.Queueクラスのコンストラクタに最大容量maxsizeを指定できます。キューが一杯の間はput操作がブロックされ、get操作によりキューに空きが出来ると処理が進むようになります。


追記: 「スレッド処理が完了するまで」を排他制御したい場合、下記のような実装方法があり得ます。tokensキューに"実行許可トークン"を2つ入れておき、各スレッドの処理開始直前で同トークンを取得、終了時にトークンを返却することで、同時に発行されるトークン数=並列実行数を制限することができます。また、トークン返却(tq.put(token))は確実に行わないと並列実行数が減少(0になればデッドロック)してしまうので、try~finallyで囲ってあります。

python

1from threading import Thread, active_count 2from queue import Queue 3import time 4 5queue = Queue() 6tokens = Queue() 7 8def main(tq, q): 9 i = q.get() 10 token = tq.get() # get token 11 try: 12 print('スレッド ' + str(i) + ' 開始') 13 time.sleep(i) 14 print('スレッド ' + str(i) + ' 終了') 15 finally: 16 tq.put(token) # return token 17 18if __name__ == '__main__': 19 [tokens.put(x) for x in range(2)] 20 for i in 10, 5, 2: 21 th = Thread(target=main, args=(tokens, queue)) 22 th.start() 23 queue.put(i)

tokensキューによる効果はセマフォ(threading.Semaphore)動作そのものなので、セマフォを用いて実装する方法もあります。セマフォ版の方がシンプルかと思います。

python

1from threading import Thread, Semaphore 2from queue import Queue 3import time 4 5queue = Queue() 6sem = Semaphore(2) 7 8def main(sem, q): 9 i = q.get() 10 with sem: 11 print('スレッド ' + str(i) + ' 開始') 12 time.sleep(i) 13 print('スレッド ' + str(i) + ' 終了') 14 15if __name__ == '__main__': 16 for i in 10, 5, 2: 17 th = Thread(target=main, args=(sem, queue)) 18 th.start() 19 queue.put(i)

投稿2016/09/29 04:14

編集2016/09/29 04:38
yohhoy

総合スコア6191

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

horik

2016/09/29 04:19

```python queue = Queue(maxsize=2) ``` こうしても全く機能していないようなんです。 結果がこれです。 スレッド 10 開始 スレッド 5 開始 スレッド 2 開始 スレッド 2 終了 スレッド 5 終了 スレッド 10 終了
yohhoy

2016/09/29 04:28 編集

スレッドの"開始"~"終了"までが排他制御対象となると、Queue(maxsize=2)だけでは所望の結果が得られないですね。回答追記しましたので確認下さい。
horik

2016/09/29 05:26

ありがとうございます。 期待通りの処理結果が得られました。 ただ、Semaphoreを使わないと行列待ちの処理ができないとなると queueとは何のために存在しているのか分からなくなってきました。
yohhoy

2016/09/29 09:33

Queueは「スレッド間での安全なデータ授受」に対して責任を持ちます。その仕様上、データをgetした時点でQueueクラスの責務は完了しますから、今回のようにデータ取り出し後に関する要望に対しては機能不十分となります。 一方のSemaphoreが管理するのは、特定リソース(今回はコード区間)への同時アクセス数制御ですから、Queueクラスとは異なる目的を持った制御機構です。 今回の要望に対しては、QueueとSemaphoreの両方を組合せる必要があります。
guest

0

私はこんな感じの管理クラスを作成していました。

Python

1class ThreadManager(object): 2 3 def __init__(self, max_thread_count, interval_count): 4 self.__max_thread_count = max_thread_count 5 self.__interval_count = interval_count 6 self.__thread_list = [] 7 8 def add_thread_object(self, thread): 9 self.__thread_list.append(thread) 10 11 def terminate_all_thread(self): 12 [x.join() for x in self.__thread_list] 13 self.__thread_list = [] 14 15 def wait_until_can_create_new_thread(self): 16 while True: 17 self.__check_move() 18 if self.__can_create_new_process(): 19 break 20 else: 21 time.sleep(self.__interval_count) 22 23 def __check_move(self): 24 tmp_list = [] 25 for thread in self.__thread_list: 26 if thread.isAlive() is True: 27 tmp_list.append(thread) 28 self.__thread_list = tmp_list 29 30 def __can_create_new_process(self): 31 if len(self.__thread_list) < self.__max_thread_count: 32 return True 33 return False 34 35 def join_all_thread(self): 36 [x.join() for x in self.__thread_list]

クラス生成時、最大スレッド数と最大スレッド起動時の待機時間を設定してあげて、
あとはよしなに追加しておけば、自動で待機などなどをしてくれます。

もっといい方法あるかもしれませんが。

投稿2016/09/29 04:37

Akira-Taniguchi

総合スコア10

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

yohhoy

2016/09/29 10:02

(単一スレッドからしかこのクラスを操作しない前提でしょうか?スレッドセーフを考慮していないため、スレッド間で共有されるデータがあるとまともに動作しませんね。)
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.48%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問