検索
連載

Python 3.14新機能:同一プロセスで複数のインタプリターを実行可能 concurrent.interpretersモジュールとは?Python最新情報キャッチアップ

Python 3.14では真の並列処理を実現するフリースレッド版Pythonとはまた別の仕組みが導入されています。それがconcurrent.interpretersモジュールです。その概要と基本的な使い方を紹介します。

PC用表示 関連情報
Share
Tweet
LINE
Hatena
「Python最新情報キャッチアップ」のインデックス

連載目次

 バージョン3.13までのPythonではC APIを使うことで同一プロセスで複数のインタプリターを実行できた。Python 3.14では、この機構をPythonのコードからも使えるように、標準ライブラリに新たにconcurrent.interpretersモジュールが追加された。以下では、その概要と基本的な使い方を紹介する。なお、concurrent.interpretersモジュールはPEP 734で提案されたものだが、その公式なドキュメントは「concurrent.interpreters - Multiple interpreters in the same process」となる。

concurrent.interpreters.Interpreterオブジェクトとは

 ここで言う「インタプリター(concurrent.interpreters.Interpreterオブジェクト)」とはPythonのコードを実行するのに必要なさまざまな情報(コンテキスト)を持つオブジェクトのことだ。例えば、globals関数やlocals関数で取得可能な名前空間やそこに存在するオブジェクト、スレッドの状態、それからGILなどがそうである。個々のインタプリターが独自にGILを持つということは、複数のインタプリターが同一プロセスに存在する場合、GILもその数だけ存在するということだ。つまり、これはフリースレッド版Pythonとはまた別の形で真の並列処理をPythonで実現するための機構といえる(これについては後ほど、スレッドによる並列処理のコードをインタプリターによる並列処理のコードに書き換えてみる)。

 また、インタプリターは同一プロセスに複数存在可能だが、それらは互いに隔離され、インタプリター間でメモリを共有することはない。つまり、インタプリター間で競合やデッドロックなどの心配をする必要はない。インタプリター間で情報を交換するにはconcurrent.interpreters.Queueオブジェクトを使用する。

 それから、インタプリターはあくまでもPythonコードを実行するための環境であって、特定のPythonスレッド(threading.Threadオブジェクトやその基盤となるOS固有のスレッド)とは独立している。そのため、インタプリターを使って並列処理を実際に行うにはプログラマーの側でそうしたコードを書くことになる。


かわさき

 どうもHPかわさきです。

 原稿をあらかた書き終わったところで、これを書いています。書き始めたときにはこんなに長くなるとは思っていなかったのですが、想像以上に書くことが多かったです(Wikipedia風に表現すると「独自研究」と注釈が付きそうなところもあります)。

 concurrent.interpretersモジュールはまだ最初の足掛かりができた程度のもので、まだサードパーティーのモジュールでの対応も進んでいませんし、モジュール自身が持つ制約もそれなりにあるようです。このモジュールが広く使われるようになるかどうかは、モジュールの改善とサードパーティーによる広範なサポートが必要になるでしょう。

 というわけで、読んでも無駄かもしれませんが、興味のある方にはぜひご覧いただいて、実際にコードを書いてみてくださいませ。


インタプリターの作成と使用

 concurrent.interpretersモジュールでは次のような関数やクラスが定義されている。

  • list_all関数:現在のプロセスに存在する全てのInterpreterオブジェクトを列挙する
  • get_current関数:現在のPythonコードを実行しているInterpreterオブジェクトを取得する
  • get_main関数:メインのInterpreterオブジェクトを取得する。メインのInterpreterオブジェクトとはREPL起動時またはPythonスクリプトをコマンドラインで実行した際に作成されるもの
  • create関数:新規にInterpreterオブジェクトを作成する
  • create_queue関数:Interpreterオブジェクト間で通信するのに使うキューを作成する
  • Interpreterクラス:Pythonコードを実行するために必要な情報を含むオブジェクト
  • Queueクラス:Interpreterオブジェクト間で通信するのに使用する

 まずは幾つかの関数を使ってみよう。

from concurrent.interpreters import list_all, get_current, get_main, create

interp_list = list_all()  # 現在のプロセスに存在するインタプリターの取得
print(interp_list)  # [Interpreter(0)]

interp = get_main()  # メインのInterpreterオブジェクトを取得
print(interp)  # Interpreter(0)

interp = get_current()  # 現在のInterpreterオブジェクトを取得
print(interp)  # Interpreter(0)

Interpreterオブジェクトの取得

 list_all関数は呼び出し時点で現在Pythonを実行しているプロセス内に存在するInterpreterオブジェクトを一覧できる。新規に実行したREPL環境でこのコードを実行すると、その環境を実行した際に作成された「メインのInterpreterオブジェクト」だけが存在しているので「Interpreter(0)」だけを要素とするリストが返される。

 get_main関数はメインのInterpreterオブジェクトを取得する。上でも述べた通り、これは「Interpreter(0)」だ。get_current関数はPythonコードを現在実行しているInterpreterオブジェクトを取得する。この例ではInterpreterオブジェクトは1つしかないので、この関数の戻り値も「Interpreter(0)」になっている。

 macOSでこのコードを実行した結果を以下に示す。

concurrent.interpretersモジュールの関数を呼び出した結果
concurrent.interpretersモジュールの関数を呼び出した結果

 次にcreate関数でInterpreterオブジェクトを新規に作成してみる。

interp = create()  # Interpreterオブジェクトを新規に作成
interp_list = list_all()  # 現在のプロセスに存在するインタプリターの取得
print(interp_list)  # [Interpreter(0), Interpreter(1)]

concurrent.interpretersモジュールの関数を使ったコードの例

 create関数で新規にInterpreterオブジェクトを作成すると、list_all関数はそのオブジェクトを含むリストを返すようになる。

Interpreterオブジェクトを作成した結果
Interpreterオブジェクトを作成した結果

 Interpreterオブジェクトには次のような属性やメソッドがある(一部抜粋)。

  • id属性:Interpreterオブジェクトを特定するID
  • is_runningメソッド:そのInterpreterオブジェクトでコードを実行中ならTrue。そうでなければFalse
  • closeメソッド:Interpreterオブジェクトを削除する
  • prepare_mainメソッド:マッピングもしくはキーワード引数(name=value)の形式でInterpreterオブジェクトの名前空間に特定のオブジェクトをバインドする。後述のexecメソッドで何らかのオブジェクトを使いたいときに使用する(そのオブジェクトはpickle化されてInterpreterオブジェクトの名前空間に渡された後に非pickle化される)
  • execメソッド:Pythonコードを記述した文字列を渡すと、そのコードがInterpreterオブジェクトで実行される(現在のスレッドを使用)。グローバルな名前空間やローカルな名前空間はInterpreterオブジェクトに独自なものなので、必要なオブジェクトは文字列として記述したコード内で用意するか、prepare_mainメソッドでバインドする必要がある点に注意。実質的には実行先のInterpreterオブジェクトの__main__名前空間でexec関数を実行するものと考えられる。戻り値はない点には注意
  • callメソッド:呼び出し可能オブジェクト(関数など)をInterpreterオブジェクトで実行する(現在のスレッドを使用)、呼び出し可能オブジェクトは「ステートレス」である必要がある(自由変数をキャプチャーしたり、グローバル変数にアクセスしたりはできない)。また、渡せる引数は共有可能オブジェクト(多くの場合はpickle化が可能なオブジェクト)でなければならない。呼び出し可能オブジェクトを呼び出した結果がcallメソッドの戻り値となる
  • call_in_threadメソッド:新規にスレッドを作成して、そのスレッドの中でInterpreterオブジェクトを使って呼び出し可能オブジェクトの実行を行う

 execメソッドとcallメソッド、call_in_threadメソッドではあらゆるPythonコードを実行できるわけではない点に注意が必要だ。まずはexecメソッドを使ってみよう。

# 今は2つのインタプリターが存在する
print(list_all())  # [Interpreter(0), Interpreter(1)]

# codeにPythonコードを記述しておく
# get_current関数を呼び出せるように関数内でimportもする
code = '''def func():
    from concurrent.interpreters import get_current
    print(f'hello from {get_current()}')

func()
'''

interp.exec(code)  # 先ほど作成したインタプリターでcodeを実行

execメソッドの実行例

 ここでは先ほど作成したInterpreterオブジェクト(interp)のexecメソッドにcodeを渡し、そこに(文字列として)書かれているPythonコードを実行している。codeの内容は以下の通り(codeにはこのコードをトリプルクオートで囲んで文字列化したものが代入されている)。

def func():
    from concurrent.interpreters import get_current
    print(f'hello from {get_current()}')

func()

この関数を実行するInterpreterオブジェクトを含んだメッセージを表示する

 func関数はget_current関数でこのコードを実行しているInterpreterオブジェクトを取得して、それを含んだメッセージをprint関数で表示している。このコードを記述しているトップレベルで既にget_current関数をインポートしているのに、func関数内でもローカルにこれをインポートしているのは、実行先のInterpreterオブジェクトの名前空間と現在のコードを記述している名前空間が別々のものであり、実行先の名前空間でもこの関数を使えるようにセットアップする必要があるからだ。

 実行すると次のような結果になる。

文字列として記述したコードをexecメソッドで実行
文字列として記述したコードをexecメソッドで実行

 このようにインタプリターはそれぞれに独自の名前空間を持つ点には注意が必要だ。上の例では文字列として記述したコード内でセットアップしているが、以下のようにprepare_mainメソッドで実行先のInterpreterオブジェクトの__main__にセットアップすることも可能だ。

interp.prepare_main({'foo': 'FOO'}, bar='BAR')
code = '''print(f'{foo=}, {bar=}')'''
interp.exec(code)

実行先のInterpreterオブジェクトの名前空間にfooとbarをセットアップする

 これを実行すると次のようになる。

実行先のインタプリターの名前空間にバインドされたfooとbarの値が表示された
実行先のインタプリターの名前空間にバインドされたfooとbarの値が表示された

 execメソッドで何かを実行する場合には必要な情報は上で見たように、そのインタプリター内で用意してやる必要がある。

 次にcallメソッドを見てみよう。

def func2(msg):
    from concurrent.interpreters import get_current
    ip = get_current()
    print(f'{msg} from {ip}')

interp.call(func2, 'hello')

func2関数をcallメソッドで呼び出す

 func2関数は上で見たfunc関数と同様にローカルにget_current関数をインポートして、パラメーターmsgに受け取った文字列とget_current関数の戻り値をprint関数で表示するものだ。上のcallメソッド呼び出しではこの関数と引数として文字列'hello'を渡している。実行結果を以下に示す。

func2関数は実行できた
func2関数は実行できた

 func2関数はステートレス(何らかの状態に依存しない)、つまり自由変数をキャプチャーしたりグローバル変数にアクセスしたりしていないので呼び出せる。しかし、以下に示すfunc3関数は実行できない。

interp.exec('print(foo)'# FOO

def func3():
    print(f'{foo=}')

interp.call(func3)  # NotShareableError

実行先のInterpreterオブジェクトにはprepare_mainメソッドでfooをバインドしているが、例外が発生する

 この例ではまず「interp.exec('print(foo)')」を実行して、先ほどprepare_mainメソッドを使ってinterpオブジェクトの名前空間にバインドした変数fooが存在することを確認している。func3関数ではこれにアクセスしようとしているが、これはグローバル変数であり、そうしたオブジェクトにアクセスする関数はcallメソッドでは実行できないことからNotShareableError例外が発生している。


かわさき

 execメソッドは「任意のコードの実行」に、callメソッドは「関数などの実行」に使うのが基本的な使い分けの指針となるでしょう。ただし、それぞれに必要なことや制約があることには留意してください。また、抜け道的な使い方としては次のようなコードが考えられます(が、これがよい方法かどうかはよく考えましょう。知らんけど)。

interp.prepare_main(foo='FOO')

def func4():
    import __main__
    print(f'foo={__main__.foo}'# foo=FOO

interp.call(func4)

__main__をインポートすることで実行先のInterpreterオブジェクトでセットアップされたグローバル変数にアクセスする


 call_in_threadメソッドは新規にスレッドを作成して、そのスレッド内でInterpreterオブジェクトを使って呼び出し可能オブジェクトを(非同期に)実行する。非同期に実行する意味はあまりないが、func2関数を実行していたコードをcall_in_threadメソッドを使って書き直すと次のようになる。

def func2(msg):
    from concurrent.interpreters import get_current
    ip = get_current()
    print(f'{msg} from {ip}')

t = interp.call_in_thread(func2, 'hello')
t.join()

新規スレッドでfunc2関数を実行

 なお、call_in_threadメソッドでは呼び出した関数が返す値は捨てられる。そのため、スレッドを新規に作成して、非同期に処理を行い、その戻り値を使いたいときにはconcurrent.interpreters.Queueクラスを使う必要がある(後述)。ここで、これまでに使ってきたInterpreterオブジェクトは削除しておこう。

interp.close()

使い終わったInterpreterオブジェクトを削除

Interpreterオブジェクトを使った非同期処理

 Interpreterオブジェクトは特定のスレッドとは結び付いていない。execメソッドやcallメソッドは現在のスレッドを使って実行されるが、call_in_threadメソッドは新規にスレッドを作成し、その上で実行される。ここではcall_in_threadメソッドを使って、非同期に処理を実行してみよう。

 その前に以下のコードを見てほしい。

import threading
from time import time, thread_time

def do_some_work(n):
    print(f'thread #{n} start')
    result = 0
    t = thread_time()  # スレッドの経過時間を計測する
    while thread_time() - t < 10# 10秒間はCPUに仕事をさせる
        result += 1
    print(f'thread #{n} end. {result=}')

def main():
    threads = []
    for n in range(5):
        t = threading.Thread(target=do_some_work, args=(n,))
        threads.append(t)

    st = time()

    for t in threads:
        t.start()

    for t in threads:
        t.join()

    ed = time()

    print(f'time: {ed - st}')

if __name__ == '__main__':
    main()

use_thread.py

 これはthreadingモジュールを使って、5つのスレッドに仕事をさせるコードだ。do_some_work関数の内部ではスレッド内の経過時間が10秒を過ぎるまではCPUに仕事をさせて、10秒が経過したら関数を終了している。main関数ではスレッドを5個作成して、それらにdo_some_work関数を実行させている。フリースレッドモードではないPythonでは並列処理にならないが、フリースレッド版Pythonでは並列に処理される。

 以下にフリースレッド版ではないPython 3.14とフリースレッド版のPython 3.14でこのコードを実行した結果を示す。

python3.14コマンドでは50秒以上の実行時間がかかり、python3.14tコマンドでは10秒程度で実行が終わっている
python3.14コマンドでは50秒以上の実行時間がかかり、python3.14tコマンドでは10秒程度で実行が終わっている

 フリースレッドではないpython3.14コマンドでは実行が終わるまでに50秒以上がかかっているが、フリースレッド版のpython3.14tコマンドでは10秒程度で実行が終わっている点に注目してほしい。これはフリースレッド版のPythonではGILによる「同時に実行できるPythonスレッドは1つだけ」という制約がなくなって、CPUコアを十分に活用できるようになっているからだ。

 では、上のコードをconcurrent.interpretersモジュールを使って書き直すとどうなるかを試してみよう。ここではInterpreterオブジェクトを5個作成して、それらのcall_in_threadメソッドを使ってdo_some_work関数を並列に実行することにする。do_some_work関数は基本的に変わらない(thread_time関数を関数内でローカルにインポートするようにしたのと、print関数で出力する文字列を変更したくらいだ)。

import threading
from time import time
from concurrent.interpreters import create

def do_some_work(n):
    from time import thread_time  # 関数でローカルにインポート
    print(f'interp #{n} start')
    result = 0
    t = thread_time()  # スレッドの経過時間を計測する
    while thread_time() - t < 10# 10秒間はCPUに仕事をさせる
        result += 1
    print(f'interp #{n} end. {result=}')

def main():
    interps = [create() for _ in range(5)]

    threads = []
    st = time()
    for n, interp in enumerate(interps):
        t = interp.call_in_thread(do_some_work, n)
        threads.append(t)

    for t in threads:
        t.join()

    ed = time()

    print(f'time: {ed - st}')

    for interp in interps:
        interp.close()


if __name__ == '__main__':
    main()

use_interp.py

 これをフリースレッド版ではないpython3.14コマンドで実行した結果を以下に示す。

フリースレッド版ではないpython3.14コマンドでも処理が並列化された
フリースレッド版ではないpython3.14コマンドでも処理が並列化された

 これが可能なのは、GILがInterpreterオブジェクトごとに存在しているからだ。複数のスレッドでInterpreterオブジェクトを用いてコードを実行すれば、GILもその数だけ存在するため、それぞれのスレッドで実行されるコードが他のスレッドをブロックすることはない。また、Interpreterオブジェクトは基本的に他のInterpreterオブジェクトとは隔離されているので、お互いが管理しているオブジェクトを破壊することもない。

 では、Interpreterオブジェクト間で情報をやりとりするにはどうすればよいのだろう。

Interpreterオブジェクト間での情報のやりとり

 concurrent.interpretersモジュールにはQueueクラスと、そのファクトリであるcreate_queue関数がある。これを使うことで、Interpreterオブジェクト間で情報をやりとりできる。例えば、上で見たdo_some_work関数が値を返すものだったとしよう。

def do_some_work(n):
    from time import thread_time  # 関数でローカルにインポート
    print(f'interp #{n} start')
    result = 0
    t = thread_time()  # スレッドの経過時間を計測する
    while thread_time() - t < 10# 10秒間はCPUに仕事をさせる
        result += 1
    print(f'interp #{n} end. {result=}')
    return result

加算を続けた結果を返す

 これをcall_in_threadメソッドで実行すると、その戻り値は捨てられてしまう。そこでconcurrent.interpreters.Queueクラスを使って、この結果をキューに置くようにしてみよう。

def do_some_work(n, q):
    from time import thread_time  # 関数でローカルにインポート
    print(f'interp #{n} start')
    result = 0
    t = thread_time()  # スレッドの経過時間を計測する
    while thread_time() - t < 10# 10秒間はCPUに仕事をさせる
        result += 1
    print(f'interp #{n} end. {result=}')
    q.put((f'interp #{n}', result))

concurrent.interpreters.Queueクラスを使うように修正したdo_some_work関数

 Queueクラスにはputメソッドがあるので、これを使ってキューに加算結果(とインタプリターを識別する文字列)を置くだけだ。

 また、呼び出し側のコードは次のようになる。

from concurrent.interpreters import create_queue

def main():
    interps = [create() for _ in range(5)]
    q = create_queue()

    threads = []
    st = time()
    for n, interp in enumerate(interps):
        t = interp.call_in_thread(do_some_work, n, q)
        threads.append(t)

    for t in threads:
        t.join()

    ed = time()

    print(f'time: {ed - st}')
    results = []
    while not q.empty():
        results.append(q.get())

    print(results)

    for interp in interps:
        interp.close()

main()

create_queue関数でconcurrent.interpreters.Queueオブジェクトを作成し、それをdo_some_work関数に渡し、最後に結果をgetメソッドで取得する

 concurrent.interpreters.Queueオブジェクトはcreate_queue関数で作成する。後はcall_in_threadメソッドでdo_some_work関数を実行する際に作成したキューを渡してやる。全てのスレッドの実行が終わったら、キューから全ての要素をgetメソッドで取り出している。これを実行すると次のようになる。

実行結果
実行結果

InterpreterPoolExecutor

 ここまではconcurrent.interpretersモジュールが提供するクラスや関数を使ってきたが、そこで行われていた処理をもっと抽象化するInterpreterPoolExecutorというクラスがconcurrent.futuresモジュールで定義されている。詳しくは説明しないが、これを使うことで先ほどのコードは以下のように書き替えられる。

def do_some_work(n):
    from time import thread_time  # 関数でローカルにインポート
    print(f'interp #{n} start')
    result = 0
    t = thread_time()  # スレッドの経過時間を計測する
    while thread_time() - t < 10# 10秒間はCPUに仕事をさせる
        result += 1
    print(f'interp #{n} end. {result=}')
    return (f'interp #{n}', result)

def main():
    st = time()

    with InterpreterPoolExecutor(max_workers=5as ex:
        futures = [ex.submit(do_some_work, n) for n in range(5)]
        results = [fut.result() for fut in as_completed(futures)]

    ed = time()

    print(f'time: {ed - st}')
    print(results)

InterpreterPoolExecutorクラスを使ったコードの例


かわさき

 「説明が足りないぞ!」「ちゃんとしろっ!」という声が聞こえてきそうですが、もう疲れちゃったので説明はしません(キリッ)。見りゃ分かるでしょ(分かってくれるとうれしいです)。


共有可能オブジェクト

 これまでに見てきたように、Interpreterオブジェクトを用いて数行のコードや関数を実行したり、それらに引数を渡したりする場合には内部でpickleが使われる。そのため、pickle化や非pickle化が可能なオブジェクトでなければInterpreterオブジェクトに渡すことができない。こうしたpickle化可能なオブジェクトのことを「共有可能オブジェクト」と呼ぶ。concurrent.interpreters.Queueでやりとりできるのも共有可能オブジェクトだけだ。

 このような共有可能オブジェクトとしては次のようなものが挙げられる。

  • None
  • True/False
  • バイト列
  • 文字列
  • 整数
  • 浮動小数点数
  • タプル

 また、共有可能オブジェクトはその受け渡しの際にpickle化/非pickle化されるということは、それぞれのInterpreterオブジェクトでは同じ値を持つ別々のオブジェクトがコピーとして受け渡されるということでもある(ただし、memoryviewなど一部例外もあるがここでは取り上げない)。

「Python最新情報キャッチアップ」のインデックス

Python最新情報キャッチアップ

Copyright© Digital Advantage Corp. All Rights Reserved.

[an error occurred while processing this directive]
ページトップに戻る