ユーニックス総合研究所

  • home
  • archives
  • python-queue

Pythonのqueue.Queueの使い方をわかりやすく解説

  • 作成日: 2021-06-24
  • 更新日: 2023-12-24
  • カテゴリ: Python

Pythonのqueue.Queueの使い方

Pythonにはマルチスレッドで安全にデータを取り扱う仕組みの1つとして同期キューが備わっています。
これはqueueモジュール内のオブジェクトを利用することで可能です。

結論から言うとPythonの同期キューは↓のように使います。

import queue  
import threading  


q = queue.Queue()  # 同期FIFOキューの作成  


def worker():  
    """  
    キューのアイテムを消費するワーカー  
    """  
    while True:  
        item = q.get()  # キューからアイテムを取得  
        print('item is', item)  # アイテムを使用  
        q.task_done()  # アイテムの使用完了をキューに知らせる  


thread = threading.Thread(target=worker, daemon=True)  # デーモンスレッド作成  
thread.start()  # スレッドを開始  

# キューにアイテムを追加  
q.put('nyan')   
q.put('wan')  
q.put('pilolo')  

q.join()  # キューのアイテムが消費されるのを待つ  

この記事ではPythonの同期キューについて具体的に↓を見ていきます。

  • キューとは?
  • Queueの主要なメソッド
  • ワーカーにアイテムを消費させる
  • 生産者と消費者の例

キューとは?

キュー(Queue)とはデータ構造の一種です。
FIFO(First In, First Out)という特徴を持つデータ構造で、先に入れたものが先に取り出されます。

キューの例としてわかりやすいのがプリンタキューです。
プリンタに印刷を依頼するとき、印刷物をキューの末尾にプッシュします。
そしてプリンタはそのキューを監視し、キューに印刷物がプッシュされたキューの先頭から印刷物を取り出します。
プリンタは印刷物を印刷して消費します。

プリンタキューにはプリンタが印刷している間も印刷物が末尾にプッシュされていきます。
しかしプリンタはキューの先頭から常に印刷物を取り出すので、プリンタの使用者はお願いした順番通りの印刷結果を受け取ることができます。

キューは配列やリストなどと肩を並べる比較的にポピュラーなデータ構造です。
非同期プログラミングと相性が良いデータ構造のため、マルチスレッドプログラミングなどで利用されることがあります。

同期FIFOキューとはマルチスレッドプログラミングで同期的に利用できるキューのことを言います。
これはPythonではqueueモジュールのQueueが相当します。

Queueの主要なメソッド

queue.Queueの主要なメソッドを紹介します。

put()

Queue.put()はアイテムをキューに追加します。

put()は↓のような構造になっています。

Queue.put(item, block=True, timeout=None)  

第1引数のitemはキューに追加するアイテムです。
第2引数のblockTrueで第3引数のtimeoutNoneのとき、キューにアイテムを追加できるまでブロックします。

blockTruetimeoutに秒数を指定した場合は、キューはアイテムを追加できない場合はその秒数だけブロックします。
最終的にアイテムを追加できなかった場合は例外Fullを送出します。

blockFalseの場合は、キューにアイテムを追加できる場合はただちにアイテムを追加します。
追加できない場合は例外Fullを送出します。

追加できる/できないというのは、キューが他のスレッドに使用されているかどうかに依存します。
他のスレッドがキューを操作している場合は、キューへの操作は基本的にブロックされます。
このブロックの期間を制限したい場合はtimeoutに秒数を指定するといいでしょう(例外Fullのキャッチを忘れずに)。

get()

Queue.get()はキューからアイテムを取り除いて取得します。

get()は↓のような構造になっています。

Queue.get(block=True, timeout=None)  

第1引数のblockTrueで第2引数のtimeoutNoneの場合、アイテムが取り出せるまでブロックします。

blockTruetimeoutに秒数を指定した場合はアイテムを取り出せるまでtimeout秒間ブロックします。
最終的にアイテムを取り出せなかった場合は例外Emptyを送出します。

blockFalseの場合はただちにアイテムを取り出しますが、取り出せなかった場合は例外Emptyを送出します。

join()

Queue.join()はキュー内のアイテムがすべて取り出され処理されるまでブロックします。

join()は↓のような構造になっています。

Queue.join()  

このjoin()を呼び出すとキューのアイテムがすべて取り出され消費されるまで処理がブロックされます。
つまりキューのすべてのアイテムがget()で取り出されてtask_done()されたらブロックが解除されます。

task_done()

Queue.task_done()はキューのタスク(アイテムを使った処理)が完了したときに呼び出します。

task_done()は↓のような構造になっています。

Queue.task_done()  

task_done()は、キューからアイテムをget()で取り出し、その後にtask_done()を呼び出すことで、そのアイテムを使ったタスクが完了したことをキューに教えます。
join()が呼び出されていた場合、join()はキューのすべてのアイテムのタスクが完了したらブロックを解放します。
これはつまりキュー内のすべてのアイテムをget()で取り出しtask_done()したら、ということになります。

キュー内のアイテム数より多くtask_done()が呼ばれた場合は例外ValueErrorが送出されます。

ワーカーにアイテムを消費させる

簡単なマルチスレッドプログラミングにおけるキューの使い方を↓に示します。

import queue  
import threading  


q = queue.Queue()  # 同期FIFOキューの作成  


def worker():  
    """  
    キューのアイテムを消費するワーカー  
    """  
    while True:  
        item = q.get()  # キューからアイテムを取得  
        print('item is', item)  # アイテムを使用  
        q.task_done()  # アイテムの使用完了をキューに知らせる  


thread = threading.Thread(target=worker, daemon=True)  # デーモンスレッド作成  
thread.start()  # スレッドを開始  

# キューにアイテムを追加  
q.put('nyan')   
q.put('wan')  
q.put('pilolo')  

q.join()  # キューのアイテムが消費されるのを待つ  

↑のコードを実行すると↓のような結果になります。

item is nyan  
item is wan  
item is pilolo  

↑のコード内の変数qはキューですが、これは複数のスレッドから参照されます。

threading.Thread()でワーカースレッドを作成しています。ワーカースレッドはキューのアイテムを消費するスレッドです。
デーモンスレッドとは、プログラムが終了すると自動的に回収されるスレッドのことで、このスレッドはjoin()する必要がありません。ここではデーモンスレッドを使っています。
デーモンスレッドを作成してstart()してスレッドを開始します。するとworker()がそのスレッド内で呼び出されます。
worker内の無限ループではqを参照しています。
get()でアイテムを取得していますが、timeoutを指定していないので、アイテムが存在しない場合はブロックされます。
アイテムを取得出来たらtask_done()を呼び出し、アイテムを消費したことをキューに知らせます。

q.put()でキューにアイテム(ただの文字列)を追加しています。今回は3つだけ追加しています。
そのあとにq.join()でブロックしています。このブロックはキュー内のすべてのアイテムが消費されたら(task_done()されたら)解放されます。

生産者と消費者の例

もう少し複雑なマルチスレッドプログラミングの例を紹介します。

import queue  
import threading  
import time  


q = queue.Queue()  # 同期FIFOキューを作成  


def consumer():  
    """  
    消費者  
    """  
    while True:  
        item = q.get()  # キューからアイテムを取得  
        if item == 'done':  # 完了の合図  
            break  # ループから抜ける  
        print('consumer: got', item)  # アイテムを消費  
    print('consumer done')  


def producer():  
    """  
    生産者  
    """  
    s = ''  
    while True:  
        time.sleep(1)  
        s += 'a'  # アイテムを作成  
        q.put(s)  # アイテムをキューに追加  
        if len(s) >= 5:  # アイテムの長さがオーバーしたら  
            q.put('done')  # 完了を通知  
            break  # ループから抜ける  
    print('producer done')  


consumer_thread = threading.Thread(target=consumer)  
producer_thread = threading.Thread(target=producer)  
consumer_thread.start()  # スレッドを開始  
producer_thread.start()  # 同上  
consumer_thread.join()  # スレッドの終了を待機  
producer_thread.join()  # 同上  

↑のコードを実行すると↓のような結果になります。

consumer: got a  
consumer: got aa  
consumer: got aaa  
consumer: got aaaa  
producer done  
consumer: got aaaaa  
consumer done  

consumerとは「消費者」と言う意味で、producerは「生産者」という意味になります。
↑のコードはキューに対してproducerがアイテムを生産して、consumerがそれを消費しています。

producerは1秒経過するごとにアイテムを作成してキューに追加します。
consumerget()でキューからアイテムを絶えず取得します。

producerは変数sの長さが5以上になったらqueuedoneというアイテムを追加します。
consumerdoneというアイテムを取得したらスレッドを終了させます。producerも同様です。

このパターンを応用するとプリンタキューも作れるかと思います。
producerがジョブ(印刷物)を追加する側で、consumerが印刷を行う側ですね。

おわりに

今回はPythonのqueue.Queueについて見てみました。
同期FIFOキューを使えるようになるといろいろ応用範囲が広がると思います。
ゲームプログラミングなどにも使えるかもしれませんね。

🦝 < 先入れ先出し

🐭 < マイペースなデータ構造