ユーニックス総合研究所

  • home
  • archives
  • python-pararell-processing

とりあえず出来るPythonの並列・並行処理【Thread, Process】

  • 作成日: 2023-05-01
  • 更新日: 2023-12-25
  • カテゴリ: Python

Pythonの並列・並行処理

Pythonで並列処理をやる場合、基本となるのが

  • マルチスレッド
  • マルチプロセス

になります。
この2つの技術はPythonに限らず他のプログラミング言語でも同様に重要なものです。
最近はasync/awaitなどもありますが、とりあえずこの2つを押さえておいた方がいいでしょう。

この記事では簡単にPythonのマルチスレッドマルチプロセスの技術について解説していきます。

並列・並行処理の基本概念

プロセス」と「スレッド」とは概念的な単位です。

アプリケーションを実行ファイルやスクリプトなどから実行すると、まずLinux環境などでは「プロセス」が作られます。
このプロセスは「プロセスID」という整数がそれぞれ割り振られています。
Ubuntuなどではプロセスの一覧は「ps aux」などのコマンドで確認できます。

スレッドはそのプロセスの中に存在する処理単位です。
アプリケーションを実行するとプロセスが作られ、さらにそのプロセスの中にメインスレッドというスレッドが作られます。
実際のアプリケーションの処理はそのメインスレッド内で実行されます。

プロセスはスレッドを複数持つことができます。
これをマルチスレッドと言います。
マルチスレッドではプロセスの中で複数のスレッドが並行的に動きます。

たとえばゲームプログラミングなどではメインの処理はメインスレッドで、そしてBGMを鳴らすのは別のスレッドで並行的に行う、などが一般的です。
並行処理によって絵と音を同時に出せるので、こういった並行的な処理が必要な場合はまず候補としてマルチスレッドによる実装が挙げられることも多いかと思います。

マルチスレッドは同一プロセス内で実行されるので、プロセス内のデータには自由に(制限が加えられる場合もある)アクセスできます。
たとえばメインスレッドで定義した変数に別のスレッドからアクセスする、ということも可能になっています。

1つのアプリケーションのためにプロセスを複数作るというのも行われることがあります。
これをマルチプロセスと言います。
マルチプロセスではプロセスがそれぞれ独立して動くので、データをやり取りするときは「プロセス間通信」というものが必要です。

Bashなどのシェルを使っている人にはマルチプロセスは馴染みのあるものです。
たとえば「ls | grep myfile」などのコマンドラインもマルチプロセスな処理と見なすことができます。
このコマンドラインはそれぞれの「ls」と「grep」のコマンド(プロセス)をパイプというプロセス間通信で接続し、2つのプロセスを協調させて動作させています。

🦝 < 身近なマルチプロセス処理

スレッドによる並行処理

Pythonではスレッドを扱うには「threading」から必要なオブジェクトをインポートします。

たとえば「Thread」はスレッドを作成するためのクラスです。

from threading import Thread  

このThreadを使った並行処理のサンプルは以下になります。

from threading import Thread  


def worker(*args):  
    message = args[0]  # メインスレッドから渡された引数を取得する  
    print(message)  # hello  


def main():  
    thread = Thread(target=worker, args=('hello', ))  # スレッドの作成  
    thread.start()  # スレッドの開始  
    thread.join()  # スレッドの終了を待つ  


main()  

このプログラムを実行すると

hello  

という出力が出ます。

各スレッドでのループ処理と同期

これだけでは並行処理してるのか? となってしまいますのでより並行的な具体例を見てみます。

from threading import Thread  
import time  


loop = True  # 無限ループを制御するフラグ  


def worker():  
    # この関数はメインスレッドとは別のスレッドで実行される  
    global loop  

    while loop:  
        print('worker')  
        time.sleep(1)  


def main():  
    global loop  

    thread = Thread(target=worker)  # スレッドの作成  
    thread.start()  # スレッドの開始  

    # 以下のループはメインスレッドで実行される  
    while loop:  
        print('main thread')  

        try:  
            time.sleep(1)  
        except KeyboardInterrupt:  
            # Ctrl + Cなどが発生したらフラグを折って無限ループを中断する  
            loop = False  

    thread.join()  # スレッドの終了を待つ  


main()  

このプログラムを実行すると以下のような出力が得られます。

worker  
main thread  
main thread  
worker  
main thread  
worker  

このプログラムではメインスレッドとworkerのスレッドを別々に回し、それぞれループでprint()を実行します。
メインスレッド内でCtrl + Cなどのキーボードインタラプトが発生したらloopフラグを折ります。
loopフラグが折れるとメインスレッドのループもworkerのスレッドのループも終了します。

Lockを使った排他的制御

ではスレッドで排他的制御をやってみましょう。
排他的制御とは、データを保護する技術です。

マルチスレッドではメモリ上の同一の変数にアクセスできますが、並行処理では困ることがあります。
それは一方のスレッドと別のスレッドで同時に変数を読み込み/書き込みをした場合に、データの正確性が保証されないということです。

たとえばカウント変数があって、そのカウント変数を2つのスレッドでカウントするとします。
一方のスレッドでカウントしている間はそのカウント変数は独占的にカウントしたいわけです。
ですのでこういう時に排他的制御を行って別のスレッドがカウント変数をカウントできないようにします。

Pythonのthreadingでは排他的制御を行うためのクラスが存在し、その名も「Lock」と言います。
このLockを使ったサンプルは以下になります。

from threading import Thread, Lock  
import time  


count = 0  # カウント変数  
count_lock = Lock()  # カウント変数のLock  


def worker():  
    global count, count_lock  

    # 排他的制御の開始  
    with count_lock:  
        for _ in range(10):  
            count += 1  
            print('worker', count)  


def main():  
    global count, count_lock  

    thread = Thread(target=worker)  # スレッドの作成  
    thread.start()  # スレッドの開始  

    # 排他的制御の開始  
    with count_lock:  
        for _ in range(10):  
            count += 10  
            print('main', count)  

    thread.join()  # スレッドの終了を待つ  


main()  

このプログラムを実行すると以下のような結果になります。

worker 1  
worker 2  
worker 3  
worker 4  
worker 5  
worker 6  
worker 7  
worker 8  
worker 9  
worker 10  
main 20  
main 30  
main 40  
main 50  
main 60  
main 70  
main 80  
main 90  
main 100  
main 110  

マルチスレッドなのでcount_lockで排他的制御をしないと、メインスレッドとworkerが交互にデータを変更します。
試しに上記のコードからwith count_lock:のコードを削除して実行してみてください。
そうすると以下のような出力になると思います。

worker 1  
worker 12  
worker 13  
worker 14  
main 11  
main 25  
main 35  
worker 15  
worker 46  
worker 47  
worker 48  
main 45  
main 58  
worker 59  
worker 70  
main 69  
main 80  
main 90  
main 100  
main 110  

Queueを使ったスレッド間のデータ共有

Pythonにはqueueという同期キュークラスがあります。

この同期キューを使うとスレッド間でデータのやり取りが行えるようになります。
以下はマルチスレッドと同期キューを使ったサンプルです。

from threading import Thread  
from queue import Queue  


q = Queue()  


def worker():  
    while True:  
        item = q.get()  # キューからデータ(タスク)を取得  
        print('worker', item)  
        q.task_done()  # タスクの完了をキューに通知  
        if item == -1:  # データが-1なら無限ループを終了  
            break  


def main():  
    thread = Thread(target=worker)  # スレッドの作成  
    thread.start()  # スレッドの開始  

    for i in range(10):  
        q.put(i)  # キューにデータを追加  
    q.put(-1)  # 終了を通知するデータを追加  

    thread.join()  # スレッドの終了を待つ  


main()  

同期キューが威力を発揮するのはより多くのスレッドを起動させる時です。
キューにデータを追加しそれを取得する、というフォーマットに従うことで安全にスレッド間でデータのやり取りができるようになります。
上記のコードではスレッドは2つだけなのでいまいち効果を実感できませんが、興味がある人はスレッドを増やして実験してみてください。

プロセスによる並行処理

Pythonでマルチプロセスをやるには「multiprocessing」というモジュールが使えます。

このモジュールに「Process」というクラスがあるのでこれでプロセスを作成できます。

from multiprocessing import Process  
import os  


def worker(*args):  
    # 引数を取得  
    message = args[0]  
    print('worker', message)  

    # このスレッドの親のプロセスIDとこのスレッドのプロセスID  
    print('worker', os.getppid(), os.getpid())  


def main():  
    print('main', os.getpid())  # メインスレッドのプロセスID  
    process = Process(target=worker, args=('hello', ))  # プロセスの作成  
    process.start()  # プロセスをスタート  
    process.join()  # プロセスの終了を待つ  



if __name__ == '__main__':  # このif文を書いておく  
    main()  

Processの使い方はThreadの場合と同じ感じになっています。
インターフェースが同じだと学習コストが下がっていいですね。

os.getpid()は現在のプロセスのプロセスIDを取得します。
os.getppid()は現在のプロセスの親プロセスのプロセスIDを取得します。

実はプロセスには親子関係というものがあります。
あるプロセスから別のプロセスを派生(フォーク)させた場合、派生元のプロセスを親、派生したプロセスを子と表現します。
C言語ではあるプロセスから別のプロセスを分岐させる関数にfork()という関数があります。
このfork()を実行すると子プロセスが分岐してプロセスの親子関係ができます。
ちなみにPythonにもos.fork()がありますのでこれで実験してみてもいいかもしれません。

子プロセスはフォーク時の設定によっては親プロセスの環境変数などを継承することができます。
これによって親プロセスに設定された設定を子プロセスに引き継ぐことができます。

Queueを使ったプロセス間のデータ共有

multiprocessingにもQueueというクラスがあります。
このキューを使うとプロセス間でデータの共有が可能になります。

from multiprocessing import Process, Queue  


def worker(q):  
    # qはグローバル変数ではなく引数であることに注意  
    while True:  
        item = q.get()  # キューからデータを取得  
        print('worker', item)  
        if item == -1:  # データが-1なら無限ループから抜ける  
            break  

    print('done worker')  


def main():  
    q = Queue()  
    process = Process(target=worker, args=(q, ))  # プロセスの作成  
    process.start()  # プロセスをスタート  

    for i in range(10):  
        q.put(i)  # キューにデータを入れる  
    q.put(-1)  # 終了データを入れる  

    process.join()  # プロセスの終了を待つ  



if __name__ == '__main__':  # このif文を書いておく  
    main()  

プロセスのキューを使う場合に注意したいのがqがグローバル変数ではなく引数になっている点です。
マルチプロセスでは各プロセスのデータは独立しています。
そのため一方のプロセス内で定義した変数にもう一方のプロセスからアクセスするということが基本出来ません。
そのためグローバル変数ではなく、プロセスに引数としてキューを渡します。

multiprocessingのキューを使うとスレッドとqueueを使ったような使い方がでデータを共有できます。
スレッドとプロセスでこういったある程度共通したインターフェースを提供するというのはライブラリ制作者の努力のたまものだと言えます。

🦝 < えらいぞライブラリ制作者

このキューを使うとプロセス間通信していることを忘れてしまいそうになるほど見事なものですが、やってることはプロセス間通信なのでそれは忘れないようにしてください。
バグが出たときはマルチプロセスであることを思い出してください。

Pipeを使ったプロセス間通信

multiprocessingにはPipeというパイプ(プロセス間通信用の技術)があります。
これを使うとプロセス間でパイプが行えます。

パイプというのは低レベルなプロセス間通信の実装をやるとわかるのですが、ファイルディスクリプタを使った通信技術です。
ファイルディスクリプタとはファイルのライブラリが内部で使っている低レベルなI/O用の整数のことです。
プロセスで読み込みと書き込みのファイルディスクリプタを開くと、そのストリームは自プロセスに繋がります。
その状態でフォークすると子プロセスとストリーム(データの流れ)を共有した形になります。
その状態で親の読み込みディスクリプタを閉じて、子の書き込みディスクリプタを閉じると、親の書き込みディスクリプタが子の読み込みディスクリプタに繋がっている状態のストリームができます。
この状態になると親がデータをディスクリプタに流すと子がそのデータをディスクリプタから受信できるようになります。
これがパイプで、コマンドラインのパイプを実現している技術です。なんか騙されたような仕組みでおもしろいですよね。

from multiprocessing import Process, Pipe  


def worker(conn):  
    while True:  
        data = conn.recv()  # パイプからデータを受信  
        print('worker', data)  
        if data == -1:  # データが-1なら無限ループを終了  
            break  

    print('worker done')  


def main():  
    parent_conn, child_conn = Pipe()  # 親子間の通信で使うパイプを作成  
    process = Process(target=worker, args=(child_conn, ))  # プロセスの作成  
    process.start()  # プロセスをスタート  

    for i in range(10):  
        parent_conn.send(i)  # 親から子にデータを流す  
    parent_conn.send(-1)  # 終了データを流す  

    process.join()  # プロセスの終了を待つ  



if __name__ == '__main__':  # このif文を書いておく  
    main()  

パイプを使ってコードでコマンドラインを実現してみるのもいい勉強になるかもしれません。
それができればシェルの実装でパイプが実装できるはずです。

おわりに

今回はPythonの並列・並行処理について解説しました。
なにか参考になれば幸いです。

🦝 < Pythonの並行処理はシンプルで簡単

🦝 < ほんまやな