質問をすることでしか得られない、回答やアドバイスがある。

15分調べてもわからないことは、質問しよう!

ただいまの
回答率

87.92%

thread queueの最大数を設定して、その範囲で回す

解決済

回答 2

投稿

  • 評価
  • クリップ 1
  • VIEW 7,318

score 44

queueの最大数を2個として満杯の場合は待機、
空きができ次第queueを追加するような処理がしたいです。

from threading import Thread, active_count
from queue import Queue
import time

queue = Queue()


def main(q):
    i = q.get()
    print('スレッド ' + str(i) + ' 開始')
    time.sleep(i * 2)
    print('スレッド ' + str(i) + ' 終了')
    queue.task_done()

if __name__ == '__main__':
    for i in 10, 5, 2:
        th = Thread(target=main, args=(queue,))
        th.start()
        queue.put(i)
        if active_count() > 2:
            queue.join()

上記のコードを実行したら結果は以下になります。
スレッド 10 開始
スレッド 5 開始
スレッド 5 終了
スレッド 10 終了
スレッド 2 開始
スレッド 2 終了

queue.join()を使うとqueueが全て無くなるまで待機となるので目的と合致しません。
目的は以下のようにしたいです。
スレッド 10 開始
スレッド 5 開始
スレッド 5 終了
スレッド 2 開始
スレッド 2 終了
スレッド 10 終了

よろしくお願いします。

  • 気になる質問をクリップする

    クリップした質問は、後からいつでもマイページで確認できます。

    またクリップした質問に回答があった際、通知やメールを受け取ることができます。

    クリップを取り消します

  • 良い質問の評価を上げる

    以下のような質問は評価を上げましょう

    • 質問内容が明確
    • 自分も答えを知りたい
    • 質問者以外のユーザにも役立つ

    評価が高い質問は、TOPページの「注目」タブのフィードに表示されやすくなります。

    質問の評価を上げたことを取り消します

  • 評価を下げられる数の上限に達しました

    評価を下げることができません

    • 1日5回まで評価を下げられます
    • 1日に1ユーザに対して2回まで評価を下げられます

    質問の評価を下げる

    teratailでは下記のような質問を「具体的に困っていることがない質問」、「サイトポリシーに違反する質問」と定義し、推奨していません。

    • プログラミングに関係のない質問
    • やってほしいことだけを記載した丸投げの質問
    • 問題・課題が含まれていない質問
    • 意図的に内容が抹消された質問
    • 過去に投稿した質問と同じ内容の質問
    • 広告と受け取られるような投稿

    評価が下がると、TOPページの「アクティブ」「注目」タブのフィードに表示されにくくなります。

    質問の評価を下げたことを取り消します

    この機能は開放されていません

    評価を下げる条件を満たしてません

    評価を下げる理由を選択してください

    詳細な説明はこちら

    上記に当てはまらず、質問内容が明確になっていない質問には「情報の追加・修正依頼」機能からコメントをしてください。

    質問の評価を下げる機能の利用条件

    この機能を利用するためには、以下の事項を行う必要があります。

回答 2

checkベストアンサー

+1

queue.Queueクラスのコンストラクタに最大容量maxsizeを指定できます。キューが一杯の間はput操作がブロックされ、get操作によりキューに空きが出来ると処理が進むようになります。


追記: 「スレッド処理が完了するまで」を排他制御したい場合、下記のような実装方法があり得ます。tokensキューに"実行許可トークン"を2つ入れておき、各スレッドの処理開始直前で同トークンを取得、終了時にトークンを返却することで、同時に発行されるトークン数=並列実行数を制限することができます。また、トークン返却(tq.put(token))は確実に行わないと並列実行数が減少(0になればデッドロック)してしまうので、try~finallyで囲ってあります。

from threading import Thread, active_count
from queue import Queue
import time

queue = Queue()
tokens = Queue()

def main(tq, q):
    i = q.get()
    token = tq.get() # get token
    try:
        print('スレッド ' + str(i) + ' 開始')
        time.sleep(i)
        print('スレッド ' + str(i) + ' 終了')
    finally:
        tq.put(token) # return token

if __name__ == '__main__':
    [tokens.put(x) for x in range(2)]
    for i in 10, 5, 2:
        th = Thread(target=main, args=(tokens, queue))
        th.start()
        queue.put(i)

tokensキューによる効果はセマフォ(threading.Semaphore)動作そのものなので、セマフォを用いて実装する方法もあります。セマフォ版の方がシンプルかと思います。

from threading import Thread, Semaphore
from queue import Queue
import time

queue = Queue()
sem = Semaphore(2)

def main(sem, q):
    i = q.get()
    with sem:
        print('スレッド ' + str(i) + ' 開始')
        time.sleep(i)
        print('スレッド ' + str(i) + ' 終了')

if __name__ == '__main__':
    for i in 10, 5, 2:
        th = Thread(target=main, args=(sem, queue))
        th.start()
        queue.put(i)

投稿

編集

  • 回答の評価を上げる

    以下のような回答は評価を上げましょう

    • 正しい回答
    • わかりやすい回答
    • ためになる回答

    評価が高い回答ほどページの上位に表示されます。

  • 回答の評価を下げる

    下記のような回答は推奨されていません。

    • 間違っている回答
    • 質問の回答になっていない投稿
    • スパムや攻撃的な表現を用いた投稿

    評価を下げる際はその理由を明確に伝え、適切な回答に修正してもらいましょう。

  • 2016/09/29 13:19

    ```python
    queue = Queue(maxsize=2)
    ```
    こうしても全く機能していないようなんです。
    結果がこれです。
    スレッド 10 開始
    スレッド 5 開始
    スレッド 2 開始
    スレッド 2 終了
    スレッド 5 終了
    スレッド 10 終了

    キャンセル

  • 2016/09/29 13:27 編集

    スレッドの"開始"~"終了"までが排他制御対象となると、Queue(maxsize=2)だけでは所望の結果が得られないですね。回答追記しましたので確認下さい。

    キャンセル

  • 2016/09/29 14:26

    ありがとうございます。
    期待通りの処理結果が得られました。

    ただ、Semaphoreを使わないと行列待ちの処理ができないとなると
    queueとは何のために存在しているのか分からなくなってきました。

    キャンセル

  • 2016/09/29 18:33

    Queueは「スレッド間での安全なデータ授受」に対して責任を持ちます。その仕様上、データをgetした時点でQueueクラスの責務は完了しますから、今回のようにデータ取り出し後に関する要望に対しては機能不十分となります。

    一方のSemaphoreが管理するのは、特定リソース(今回はコード区間)への同時アクセス数制御ですから、Queueクラスとは異なる目的を持った制御機構です。

    今回の要望に対しては、QueueとSemaphoreの両方を組合せる必要があります。

    キャンセル

0

私はこんな感じの管理クラスを作成していました。

class ThreadManager(object):

    def __init__(self, max_thread_count, interval_count):
        self.__max_thread_count = max_thread_count
        self.__interval_count = interval_count
        self.__thread_list = []

    def add_thread_object(self, thread):
        self.__thread_list.append(thread)

    def terminate_all_thread(self):
        [x.join() for x in self.__thread_list]
        self.__thread_list = []

    def wait_until_can_create_new_thread(self):
        while True:
            self.__check_move()
            if self.__can_create_new_process():
                break
            else:
                time.sleep(self.__interval_count)

    def __check_move(self):
        tmp_list = []
        for thread in self.__thread_list:
            if thread.isAlive() is True:
                tmp_list.append(thread)
        self.__thread_list = tmp_list

    def __can_create_new_process(self):
        if len(self.__thread_list) < self.__max_thread_count:
            return True
        return False

    def join_all_thread(self):
        [x.join() for x in self.__thread_list]

クラス生成時、最大スレッド数と最大スレッド起動時の待機時間を設定してあげて、
あとはよしなに追加しておけば、自動で待機などなどをしてくれます。

もっといい方法あるかもしれませんが。

投稿

  • 回答の評価を上げる

    以下のような回答は評価を上げましょう

    • 正しい回答
    • わかりやすい回答
    • ためになる回答

    評価が高い回答ほどページの上位に表示されます。

  • 回答の評価を下げる

    下記のような回答は推奨されていません。

    • 間違っている回答
    • 質問の回答になっていない投稿
    • スパムや攻撃的な表現を用いた投稿

    評価を下げる際はその理由を明確に伝え、適切な回答に修正してもらいましょう。

  • 2016/09/29 19:02

    (単一スレッドからしかこのクラスを操作しない前提でしょうか?スレッドセーフを考慮していないため、スレッド間で共有されるデータがあるとまともに動作しませんね。)

    キャンセル

15分調べてもわからないことは、teratailで質問しよう!

  • ただいまの回答率 87.92%
  • 質問をまとめることで、思考を整理して素早く解決
  • テンプレート機能で、簡単に質問をまとめられる

関連した質問

同じタグがついた質問を見る