ひとりでのアプリ開発 - fineの備忘録 -

ひとりでアプリ開発をするなかで起こったことや学んだことを書き溜めていきます

Python - 並列処理・並行処理

初めに

 Pythonにおける並列処理、並行処理の違いとこれらを実装するモジュール concurrent.futures の使い方についてまとめます。

並列処理・並行処理とは

プロセスとスレッド

プロセス プログラムの実行単位
スレッド 1プロセス中に1つ以上のスレッドが含まれる
スレッドがCPUで実行される単位となる

 つまり、プログラムを実行するときに実行されるのがプロセスであり、プロセスの中にスレッドが含まれていることが分かります。

GIL (Global Interpreter Lock)

 GIL (Global Interpreter Lock) とは、複数のスレッドで処理を行う場合でも、同時に実行できるスレッドを1つに制限する排他ロックのことです。

 Python(CPython)には、GILが存在しています。

qiita.com

逐次処理・並列処理・並行処理の違い

 処理の仕方には逐次処理、並列処理、並行処理があります。

処理 説明
逐次処理 上から順番に処理を実行
前の処理が完了したら次の処理が始まる
並列処理 複数のプロセスで同時に処理を行う
並行処理 単一のプロセスで処理の待ち時間に別の処理を行う

 並列処理と並行処理の違いは次のようになります。

  • 並列処理:複数の作業者が複数の処理を行う
  • 並行処理:単一の作業者が処理の待ち時間に別の処理を行い、見かけ上では複数の処理を同時に実行するように見える
マルチプロセス・マルチスレッド

マルチプロセス 複数のプロセスを使う
マルチスレッド 複数のスレッドを使う
プロセスは1つ

 Pythonにおいては、GILがあるためマルチプロセスとマルチスレッドでは処理の仕方に違いがでます。

  • マルチプロセス:並列処理
  • マルチスレッド:並行処理
CPU負荷、I/O負荷

 マルチプロセスとマルチスレッドの使い分けとして、CPU負荷、I/O負荷を考えます。

CPU負荷 CPU(中央演算処理装置)にかかる負荷
CPUはプログラムの実行や計算処理を行う
I/O負荷 input/Output処理の負荷
I/O処理はハードディスクやネットワークなどとのデータの読み書きを行う処理

CPU負荷が高い処理の例

  • 計算量が多い処理
  • 数学計算
  • 画像処理
  • 動画処理
  • 機械学習
  • 暗号化

I/O負荷が高い処理の例

  • データの読み書き
  • ファイル操作
  • ネットワーク通信
  • データベースアクセス
  • Webブラウジング
  • ストリーミング

 CPU負荷が高い処理はマルチプロセスを実装し、I/O負荷が高い処理は読み書きの待ち時間が長いためマルチスレッドを実装します。

並列処理・並行処理の実装

使用モジュール

 Pythonで並列処理・並行処理を実装するには、次のモジュールを使っていました。

処理 モジュール
並列処理(マルチプロセス) multiprocessing
並行処理(マルチスレッド) threading

 Python3.2以降では、マルチプロセスとマルチスレッドを統一的に扱える concurrent.futures モジュールを使うことができるようになりました。

concurrent.futures モジュールの基本的な使い方

 次のコードを記述し、モジュールをインポートします。

import concurrent.futures

 マルチスレッドを実装するためのThreadPoolExecutorとマルチプロセスを実装するためのProcessPoolExecutorがあり、これらの Executor オブジェクトを用いて並行処理、並列処理を実装します。

並行処理

 並行処理を実装するには、ThreadPoolExecutorを用います。

import concurrent.futures  # concurrrent.futuresモジュールのインポート
import time

# 適当な関数の定義
# 引数を処理にかかる時間としている
def task_1(x):
    print("task_1開始")
    time.sleep(x)
    print("task_1終了。かかった時間:" + str(x))

def task_2(y):
    print("task_2開始")
    time.sleep(y)
    print("task_2終了。かかった時間:" + str(y))

def main():
    print("開始")
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        # マルチスレッド処理
        executor.submit(task_1, 5)
        executor.submit(task_2, 2)
        executor.submit(task_2, 2)
        executor.submit(task_1, 3)

if __name__ == "__main__":
    main()

(実行結果:max_workers=2

 concurrent.futures.ThreadPoolExecutor(max_workers=2)の引数で指定しているmax_workersは非同期実行に使う最大スレッド数です。

 上記のコードをmax_workers=4に変えて実行してみます。

(実行結果:max_workers=4

 上記では、submit()を使って関数を実行していました。Executorオブジェクトで関数を実行する方法として、submit()map()があります。

関数 説明
submit() タスクをexecutorに送信し、タスクの完了後に結果を取得できる。
第1引数は実行する関数、第2引数以降は指定した関数の引数を指定する。
指定した関数にreturnがあれば、戻り値をFutureオブジェクトで返される。
map() イテレータ中の各要素に対して関数を適用し、結果もイテレータで返す。第1引数は実行する関数、第2引数はイテレータ

import concurrent.futures
import time

def task(x):
  time.sleep(x)
  return x

def main():
  # submit
  with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(task, 5)
    result = future.result()
    print(result)

  # map
  with concurrent.futures.ThreadPoolExecutor() as executor:
    results = executor.map(task, [1, 2, 3])
    print(list(results))

if __name__ == "__main__":
  main()

(実行結果)


並列処理

 並列処理を実装するには、ProcessPoolExecutorを用います。

 ProcessPoolExecutorThreadPoolExecutorもExecutorオブジェクトであるため、並行処理で用いたThreadPoolExecutor部分をProcessPoolExecutorに変更するだけで並行処理を並列処理に変更することができます。
 

import concurrent.futures  # concurrrent.futuresモジュールのインポート
import time

# 適当な関数の定義
# 引数を処理にかかる時間としている
def task_1(x):
    print("task_1開始")
    time.sleep(x)
    print("task_1終了。かかった時間:" + str(x))

def task_2(y):
    print("task_2開始")
    time.sleep(y)
    print("task_2終了。かかった時間:" + str(y))

def main():
    print("開始")
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
        # マルチプロセス処理
        executor.submit(task_1, 5)
        executor.submit(task_2, 2)
        executor.submit(task_2, 2)
        executor.submit(task_1, 3)

if __name__ == "__main__":
    main()

(実行結果)

 マルチスレッドの場合と見かけ上、同じに見えますが、タスクマネージャーを確認するとプロセス数が異なることがわかります。

マルチスレッド マルチプロセス