とりあえず出来るPythonの並列・並行処理【Thread, Process】
- 作成日: 2023-05-01
- 更新日: 2023-12-25
- カテゴリ: Python
Pythonの並列・並行処理
Pythonで並列処理をやる場合、基本となるのが
- マルチスレッド
- マルチプロセス
になります。
この2つの技術はPythonに限らず他のプログラミング言語でも同様に重要なものです。
最近はasync/awaitなどもありますが、とりあえずこの2つを押さえておいた方がいいでしょう。
この記事では簡単にPythonのマルチスレッド、マルチプロセスの技術について解説していきます。
並列・並行処理の基本概念
「プロセス」と「スレッド」とは概念的な単位です。
アプリケーションを実行ファイルやスクリプトなどから実行すると、まずLinux環境などでは「プロセス」が作られます。
このプロセスは「プロセスID」という整数がそれぞれ割り振られています。
Ubuntuなどではプロセスの一覧は「ps aux」などのコマンドで確認できます。
スレッドはそのプロセスの中に存在する処理単位です。
アプリケーションを実行するとプロセスが作られ、さらにそのプロセスの中にメインスレッドというスレッドが作られます。
実際のアプリケーションの処理はそのメインスレッド内で実行されます。
プロセスはスレッドを複数持つことができます。
これをマルチスレッドと言います。
マルチスレッドではプロセスの中で複数のスレッドが並行的に動きます。
たとえばゲームプログラミングなどではメインの処理はメインスレッドで、そしてBGMを鳴らすのは別のスレッドで並行的に行う、などが一般的です。
並行処理によって絵と音を同時に出せるので、こういった並行的な処理が必要な場合はまず候補としてマルチスレッドによる実装が挙げられることも多いかと思います。
マルチスレッドは同一プロセス内で実行されるので、プロセス内のデータには自由に(制限が加えられる場合もある)アクセスできます。
たとえばメインスレッドで定義した変数に別のスレッドからアクセスする、ということも可能になっています。
1つのアプリケーションのためにプロセスを複数作るというのも行われることがあります。
これをマルチプロセスと言います。
マルチプロセスではプロセスがそれぞれ独立して動くので、データをやり取りするときは「プロセス間通信」というものが必要です。
Bashなどのシェルを使っている人にはマルチプロセスは馴染みのあるものです。
たとえば「ls | grep myfile」などのコマンドラインもマルチプロセスな処理と見なすことができます。
このコマンドラインはそれぞれの「ls」と「grep」のコマンド(プロセス)をパイプというプロセス間通信で接続し、2つのプロセスを協調させて動作させています。
🦝 < 身近なマルチプロセス処理
スレッドによる並行処理
Pythonではスレッドを扱うには「threading」から必要なオブジェクトをインポートします。
たとえば「Thread」はスレッドを作成するためのクラスです。
from threading import Thread
このThread
を使った並行処理のサンプルは以下になります。
from threading import Thread
def worker(*args):
message = args[0] # メインスレッドから渡された引数を取得する
print(message) # hello
def main():
thread = Thread(target=worker, args=('hello', )) # スレッドの作成
thread.start() # スレッドの開始
thread.join() # スレッドの終了を待つ
main()
このプログラムを実行すると
hello
という出力が出ます。
各スレッドでのループ処理と同期
これだけでは並行処理してるのか? となってしまいますのでより並行的な具体例を見てみます。
from threading import Thread
import time
loop = True # 無限ループを制御するフラグ
def worker():
# この関数はメインスレッドとは別のスレッドで実行される
global loop
while loop:
print('worker')
time.sleep(1)
def main():
global loop
thread = Thread(target=worker) # スレッドの作成
thread.start() # スレッドの開始
# 以下のループはメインスレッドで実行される
while loop:
print('main thread')
try:
time.sleep(1)
except KeyboardInterrupt:
# Ctrl + Cなどが発生したらフラグを折って無限ループを中断する
loop = False
thread.join() # スレッドの終了を待つ
main()
このプログラムを実行すると以下のような出力が得られます。
worker
main thread
main thread
worker
main thread
worker
このプログラムではメインスレッドとworkerのスレッドを別々に回し、それぞれループでprint()を実行します。
メインスレッド内でCtrl + Cなどのキーボードインタラプトが発生したらloop
フラグを折ります。
loop
フラグが折れるとメインスレッドのループもworkerのスレッドのループも終了します。
Lockを使った排他的制御
ではスレッドで排他的制御をやってみましょう。
排他的制御とは、データを保護する技術です。
マルチスレッドではメモリ上の同一の変数にアクセスできますが、並行処理では困ることがあります。
それは一方のスレッドと別のスレッドで同時に変数を読み込み/書き込みをした場合に、データの正確性が保証されないということです。
たとえばカウント変数があって、そのカウント変数を2つのスレッドでカウントするとします。
一方のスレッドでカウントしている間はそのカウント変数は独占的にカウントしたいわけです。
ですのでこういう時に排他的制御を行って別のスレッドがカウント変数をカウントできないようにします。
Pythonのthreadingでは排他的制御を行うためのクラスが存在し、その名も「Lock」と言います。
このLockを使ったサンプルは以下になります。
from threading import Thread, Lock
import time
count = 0 # カウント変数
count_lock = Lock() # カウント変数のLock
def worker():
global count, count_lock
# 排他的制御の開始
with count_lock:
for _ in range(10):
count += 1
print('worker', count)
def main():
global count, count_lock
thread = Thread(target=worker) # スレッドの作成
thread.start() # スレッドの開始
# 排他的制御の開始
with count_lock:
for _ in range(10):
count += 10
print('main', count)
thread.join() # スレッドの終了を待つ
main()
このプログラムを実行すると以下のような結果になります。
worker 1
worker 2
worker 3
worker 4
worker 5
worker 6
worker 7
worker 8
worker 9
worker 10
main 20
main 30
main 40
main 50
main 60
main 70
main 80
main 90
main 100
main 110
マルチスレッドなのでcount_lock
で排他的制御をしないと、メインスレッドとworker
が交互にデータを変更します。
試しに上記のコードからwith count_lock:
のコードを削除して実行してみてください。
そうすると以下のような出力になると思います。
worker 1
worker 12
worker 13
worker 14
main 11
main 25
main 35
worker 15
worker 46
worker 47
worker 48
main 45
main 58
worker 59
worker 70
main 69
main 80
main 90
main 100
main 110
Queueを使ったスレッド間のデータ共有
Pythonにはqueue
という同期キュークラスがあります。
この同期キューを使うとスレッド間でデータのやり取りが行えるようになります。
以下はマルチスレッドと同期キューを使ったサンプルです。
from threading import Thread
from queue import Queue
q = Queue()
def worker():
while True:
item = q.get() # キューからデータ(タスク)を取得
print('worker', item)
q.task_done() # タスクの完了をキューに通知
if item == -1: # データが-1なら無限ループを終了
break
def main():
thread = Thread(target=worker) # スレッドの作成
thread.start() # スレッドの開始
for i in range(10):
q.put(i) # キューにデータを追加
q.put(-1) # 終了を通知するデータを追加
thread.join() # スレッドの終了を待つ
main()
同期キューが威力を発揮するのはより多くのスレッドを起動させる時です。
キューにデータを追加しそれを取得する、というフォーマットに従うことで安全にスレッド間でデータのやり取りができるようになります。
上記のコードではスレッドは2つだけなのでいまいち効果を実感できませんが、興味がある人はスレッドを増やして実験してみてください。
プロセスによる並行処理
Pythonでマルチプロセスをやるには「multiprocessing」というモジュールが使えます。
このモジュールに「Process」というクラスがあるのでこれでプロセスを作成できます。
from multiprocessing import Process
import os
def worker(*args):
# 引数を取得
message = args[0]
print('worker', message)
# このスレッドの親のプロセスIDとこのスレッドのプロセスID
print('worker', os.getppid(), os.getpid())
def main():
print('main', os.getpid()) # メインスレッドのプロセスID
process = Process(target=worker, args=('hello', )) # プロセスの作成
process.start() # プロセスをスタート
process.join() # プロセスの終了を待つ
if __name__ == '__main__': # このif文を書いておく
main()
Process
の使い方はThread
の場合と同じ感じになっています。
インターフェースが同じだと学習コストが下がっていいですね。
os.getpid()
は現在のプロセスのプロセスIDを取得します。
os.getppid()
は現在のプロセスの親プロセスのプロセスIDを取得します。
実はプロセスには親子関係というものがあります。
あるプロセスから別のプロセスを派生(フォーク)させた場合、派生元のプロセスを親、派生したプロセスを子と表現します。
C言語ではあるプロセスから別のプロセスを分岐させる関数にfork()
という関数があります。
このfork()
を実行すると子プロセスが分岐してプロセスの親子関係ができます。
ちなみにPythonにもos.fork()
がありますのでこれで実験してみてもいいかもしれません。
子プロセスはフォーク時の設定によっては親プロセスの環境変数などを継承することができます。
これによって親プロセスに設定された設定を子プロセスに引き継ぐことができます。
Queueを使ったプロセス間のデータ共有
multiprocessing
にもQueue
というクラスがあります。
このキューを使うとプロセス間でデータの共有が可能になります。
from multiprocessing import Process, Queue
def worker(q):
# qはグローバル変数ではなく引数であることに注意
while True:
item = q.get() # キューからデータを取得
print('worker', item)
if item == -1: # データが-1なら無限ループから抜ける
break
print('done worker')
def main():
q = Queue()
process = Process(target=worker, args=(q, )) # プロセスの作成
process.start() # プロセスをスタート
for i in range(10):
q.put(i) # キューにデータを入れる
q.put(-1) # 終了データを入れる
process.join() # プロセスの終了を待つ
if __name__ == '__main__': # このif文を書いておく
main()
プロセスのキューを使う場合に注意したいのがq
がグローバル変数ではなく引数になっている点です。
マルチプロセスでは各プロセスのデータは独立しています。
そのため一方のプロセス内で定義した変数にもう一方のプロセスからアクセスするということが基本出来ません。
そのためグローバル変数ではなく、プロセスに引数としてキューを渡します。
multiprocessing
のキューを使うとスレッドとqueue
を使ったような使い方がでデータを共有できます。
スレッドとプロセスでこういったある程度共通したインターフェースを提供するというのはライブラリ制作者の努力のたまものだと言えます。
🦝 < えらいぞライブラリ制作者
このキューを使うとプロセス間通信していることを忘れてしまいそうになるほど見事なものですが、やってることはプロセス間通信なのでそれは忘れないようにしてください。
バグが出たときはマルチプロセスであることを思い出してください。
Pipeを使ったプロセス間通信
multiprocessing
にはPipe
というパイプ(プロセス間通信用の技術)があります。
これを使うとプロセス間でパイプが行えます。
パイプというのは低レベルなプロセス間通信の実装をやるとわかるのですが、ファイルディスクリプタを使った通信技術です。
ファイルディスクリプタとはファイルのライブラリが内部で使っている低レベルなI/O用の整数のことです。
プロセスで読み込みと書き込みのファイルディスクリプタを開くと、そのストリームは自プロセスに繋がります。
その状態でフォークすると子プロセスとストリーム(データの流れ)を共有した形になります。
その状態で親の読み込みディスクリプタを閉じて、子の書き込みディスクリプタを閉じると、親の書き込みディスクリプタが子の読み込みディスクリプタに繋がっている状態のストリームができます。
この状態になると親がデータをディスクリプタに流すと子がそのデータをディスクリプタから受信できるようになります。
これがパイプで、コマンドラインのパイプを実現している技術です。なんか騙されたような仕組みでおもしろいですよね。
from multiprocessing import Process, Pipe
def worker(conn):
while True:
data = conn.recv() # パイプからデータを受信
print('worker', data)
if data == -1: # データが-1なら無限ループを終了
break
print('worker done')
def main():
parent_conn, child_conn = Pipe() # 親子間の通信で使うパイプを作成
process = Process(target=worker, args=(child_conn, )) # プロセスの作成
process.start() # プロセスをスタート
for i in range(10):
parent_conn.send(i) # 親から子にデータを流す
parent_conn.send(-1) # 終了データを流す
process.join() # プロセスの終了を待つ
if __name__ == '__main__': # このif文を書いておく
main()
パイプを使ってコードでコマンドラインを実現してみるのもいい勉強になるかもしれません。
それができればシェルの実装でパイプが実装できるはずです。
おわりに
今回はPythonの並列・並行処理について解説しました。
なにか参考になれば幸いです。
🦝 < Pythonの並行処理はシンプルで簡単
🦝 < ほんまやな