Pythonのqueue.Queueの使い方をわかりやすく解説
- 作成日: 2021-06-24
- 更新日: 2023-12-24
- カテゴリ: Python
Pythonのqueue.Queueの使い方
Pythonにはマルチスレッドで安全にデータを取り扱う仕組みの1つとして同期キューが備わっています。
これはqueueモジュール内のオブジェクトを利用することで可能です。
結論から言うとPythonの同期キューは↓のように使います。
import queue
import threading
q = queue.Queue() # 同期FIFOキューの作成
def worker():
"""
キューのアイテムを消費するワーカー
"""
while True:
item = q.get() # キューからアイテムを取得
print('item is', item) # アイテムを使用
q.task_done() # アイテムの使用完了をキューに知らせる
thread = threading.Thread(target=worker, daemon=True) # デーモンスレッド作成
thread.start() # スレッドを開始
# キューにアイテムを追加
q.put('nyan')
q.put('wan')
q.put('pilolo')
q.join() # キューのアイテムが消費されるのを待つ
この記事ではPythonの同期キューについて具体的に↓を見ていきます。
- キューとは?
- Queueの主要なメソッド
- ワーカーにアイテムを消費させる
- 生産者と消費者の例
キューとは?
キュー(Queue)とはデータ構造の一種です。
FIFO(First In, First Out)という特徴を持つデータ構造で、先に入れたものが先に取り出されます。
キューの例としてわかりやすいのがプリンタキューです。
プリンタに印刷を依頼するとき、印刷物をキューの末尾にプッシュします。
そしてプリンタはそのキューを監視し、キューに印刷物がプッシュされたキューの先頭から印刷物を取り出します。
プリンタは印刷物を印刷して消費します。
プリンタキューにはプリンタが印刷している間も印刷物が末尾にプッシュされていきます。
しかしプリンタはキューの先頭から常に印刷物を取り出すので、プリンタの使用者はお願いした順番通りの印刷結果を受け取ることができます。
キューは配列やリストなどと肩を並べる比較的にポピュラーなデータ構造です。
非同期プログラミングと相性が良いデータ構造のため、マルチスレッドプログラミングなどで利用されることがあります。
同期FIFOキューとはマルチスレッドプログラミングで同期的に利用できるキューのことを言います。
これはPythonではqueue
モジュールのQueue
が相当します。
Queueの主要なメソッド
queue.Queue
の主要なメソッドを紹介します。
put()
Queue.put()
はアイテムをキューに追加します。
put()
は↓のような構造になっています。
Queue.put(item, block=True, timeout=None)
第1引数のitem
はキューに追加するアイテムです。
第2引数のblock
がTrue
で第3引数のtimeout
がNone
のとき、キューにアイテムを追加できるまでブロックします。
block
がTrue
でtimeout
に秒数を指定した場合は、キューはアイテムを追加できない場合はその秒数だけブロックします。
最終的にアイテムを追加できなかった場合は例外Full
を送出します。
block
がFalse
の場合は、キューにアイテムを追加できる場合はただちにアイテムを追加します。
追加できない場合は例外Full
を送出します。
追加できる/できないというのは、キューが他のスレッドに使用されているかどうかに依存します。
他のスレッドがキューを操作している場合は、キューへの操作は基本的にブロックされます。
このブロックの期間を制限したい場合はtimeout
に秒数を指定するといいでしょう(例外Full
のキャッチを忘れずに)。
get()
Queue.get()
はキューからアイテムを取り除いて取得します。
get()
は↓のような構造になっています。
Queue.get(block=True, timeout=None)
第1引数のblock
がTrue
で第2引数のtimeout
がNone
の場合、アイテムが取り出せるまでブロックします。
block
がTrue
でtimeout
に秒数を指定した場合はアイテムを取り出せるまでtimeout
秒間ブロックします。
最終的にアイテムを取り出せなかった場合は例外Empty
を送出します。
block
がFalse
の場合はただちにアイテムを取り出しますが、取り出せなかった場合は例外Empty
を送出します。
join()
Queue.join()
はキュー内のアイテムがすべて取り出され処理されるまでブロックします。
join()
は↓のような構造になっています。
Queue.join()
このjoin()
を呼び出すとキューのアイテムがすべて取り出され消費されるまで処理がブロックされます。
つまりキューのすべてのアイテムがget()
で取り出されてtask_done()
されたらブロックが解除されます。
task_done()
Queue.task_done()
はキューのタスク(アイテムを使った処理)が完了したときに呼び出します。
task_done()
は↓のような構造になっています。
Queue.task_done()
task_done()
は、キューからアイテムをget()
で取り出し、その後にtask_done()
を呼び出すことで、そのアイテムを使ったタスクが完了したことをキューに教えます。
join()
が呼び出されていた場合、join()
はキューのすべてのアイテムのタスクが完了したらブロックを解放します。
これはつまりキュー内のすべてのアイテムをget()
で取り出しtask_done()
したら、ということになります。
キュー内のアイテム数より多くtask_done()
が呼ばれた場合は例外ValueError
が送出されます。
ワーカーにアイテムを消費させる
簡単なマルチスレッドプログラミングにおけるキューの使い方を↓に示します。
import queue
import threading
q = queue.Queue() # 同期FIFOキューの作成
def worker():
"""
キューのアイテムを消費するワーカー
"""
while True:
item = q.get() # キューからアイテムを取得
print('item is', item) # アイテムを使用
q.task_done() # アイテムの使用完了をキューに知らせる
thread = threading.Thread(target=worker, daemon=True) # デーモンスレッド作成
thread.start() # スレッドを開始
# キューにアイテムを追加
q.put('nyan')
q.put('wan')
q.put('pilolo')
q.join() # キューのアイテムが消費されるのを待つ
↑のコードを実行すると↓のような結果になります。
item is nyan
item is wan
item is pilolo
↑のコード内の変数q
はキューですが、これは複数のスレッドから参照されます。
threading.Thread()
でワーカースレッドを作成しています。ワーカースレッドはキューのアイテムを消費するスレッドです。
デーモンスレッドとは、プログラムが終了すると自動的に回収されるスレッドのことで、このスレッドはjoin()
する必要がありません。ここではデーモンスレッドを使っています。
デーモンスレッドを作成してstart()
してスレッドを開始します。するとworker()
がそのスレッド内で呼び出されます。
worker
内の無限ループではq
を参照しています。
get()
でアイテムを取得していますが、timeout
を指定していないので、アイテムが存在しない場合はブロックされます。
アイテムを取得出来たらtask_done()
を呼び出し、アイテムを消費したことをキューに知らせます。
q.put()
でキューにアイテム(ただの文字列)を追加しています。今回は3つだけ追加しています。
そのあとにq.join()
でブロックしています。このブロックはキュー内のすべてのアイテムが消費されたら(task_done()
されたら)解放されます。
生産者と消費者の例
もう少し複雑なマルチスレッドプログラミングの例を紹介します。
import queue
import threading
import time
q = queue.Queue() # 同期FIFOキューを作成
def consumer():
"""
消費者
"""
while True:
item = q.get() # キューからアイテムを取得
if item == 'done': # 完了の合図
break # ループから抜ける
print('consumer: got', item) # アイテムを消費
print('consumer done')
def producer():
"""
生産者
"""
s = ''
while True:
time.sleep(1)
s += 'a' # アイテムを作成
q.put(s) # アイテムをキューに追加
if len(s) >= 5: # アイテムの長さがオーバーしたら
q.put('done') # 完了を通知
break # ループから抜ける
print('producer done')
consumer_thread = threading.Thread(target=consumer)
producer_thread = threading.Thread(target=producer)
consumer_thread.start() # スレッドを開始
producer_thread.start() # 同上
consumer_thread.join() # スレッドの終了を待機
producer_thread.join() # 同上
↑のコードを実行すると↓のような結果になります。
consumer: got a
consumer: got aa
consumer: got aaa
consumer: got aaaa
producer done
consumer: got aaaaa
consumer done
consumer
とは「消費者」と言う意味で、producer
は「生産者」という意味になります。
↑のコードはキューに対してproducer
がアイテムを生産して、consumer
がそれを消費しています。
producer
は1秒経過するごとにアイテムを作成してキューに追加します。
consumer
はget()
でキューからアイテムを絶えず取得します。
producer
は変数s
の長さが5
以上になったらqueue
にdone
というアイテムを追加します。
consumer
はdone
というアイテムを取得したらスレッドを終了させます。producer
も同様です。
このパターンを応用するとプリンタキューも作れるかと思います。
producer
がジョブ(印刷物)を追加する側で、consumer
が印刷を行う側ですね。
おわりに
今回はPythonのqueue.Queue
について見てみました。
同期FIFOキューを使えるようになるといろいろ応用範囲が広がると思います。
ゲームプログラミングなどにも使えるかもしれませんね。
🦝 < 先入れ先出し
🐭 < マイペースなデータ構造