Python 3.14新機能:同一プロセスで複数のインタプリターを実行可能 concurrent.interpretersモジュールとは?:Python最新情報キャッチアップ
Python 3.14では真の並列処理を実現するフリースレッド版Pythonとはまた別の仕組みが導入されています。それがconcurrent.interpretersモジュールです。その概要と基本的な使い方を紹介します。
バージョン3.13までのPythonではC APIを使うことで同一プロセスで複数のインタプリターを実行できた。Python 3.14では、この機構をPythonのコードからも使えるように、標準ライブラリに新たにconcurrent.interpretersモジュールが追加された。以下では、その概要と基本的な使い方を紹介する。なお、concurrent.interpretersモジュールはPEP 734で提案されたものだが、その公式なドキュメントは「concurrent.interpreters - Multiple interpreters in the same process」となる。
concurrent.interpreters.Interpreterオブジェクトとは
ここで言う「インタプリター(concurrent.interpreters.Interpreterオブジェクト)」とはPythonのコードを実行するのに必要なさまざまな情報(コンテキスト)を持つオブジェクトのことだ。例えば、globals関数やlocals関数で取得可能な名前空間やそこに存在するオブジェクト、スレッドの状態、それからGILなどがそうである。個々のインタプリターが独自にGILを持つということは、複数のインタプリターが同一プロセスに存在する場合、GILもその数だけ存在するということだ。つまり、これはフリースレッド版Pythonとはまた別の形で真の並列処理をPythonで実現するための機構といえる(これについては後ほど、スレッドによる並列処理のコードをインタプリターによる並列処理のコードに書き換えてみる)。
また、インタプリターは同一プロセスに複数存在可能だが、それらは互いに隔離され、インタプリター間でメモリを共有することはない。つまり、インタプリター間で競合やデッドロックなどの心配をする必要はない。インタプリター間で情報を交換するにはconcurrent.interpreters.Queueオブジェクトを使用する。
それから、インタプリターはあくまでもPythonコードを実行するための環境であって、特定のPythonスレッド(threading.Threadオブジェクトやその基盤となるOS固有のスレッド)とは独立している。そのため、インタプリターを使って並列処理を実際に行うにはプログラマーの側でそうしたコードを書くことになる。
どうもHPかわさきです。
原稿をあらかた書き終わったところで、これを書いています。書き始めたときにはこんなに長くなるとは思っていなかったのですが、想像以上に書くことが多かったです(Wikipedia風に表現すると「独自研究」と注釈が付きそうなところもあります)。
concurrent.interpretersモジュールはまだ最初の足掛かりができた程度のもので、まだサードパーティーのモジュールでの対応も進んでいませんし、モジュール自身が持つ制約もそれなりにあるようです。このモジュールが広く使われるようになるかどうかは、モジュールの改善とサードパーティーによる広範なサポートが必要になるでしょう。
というわけで、読んでも無駄かもしれませんが、興味のある方にはぜひご覧いただいて、実際にコードを書いてみてくださいませ。
インタプリターの作成と使用
concurrent.interpretersモジュールでは次のような関数やクラスが定義されている。
- list_all関数:現在のプロセスに存在する全てのInterpreterオブジェクトを列挙する
- get_current関数:現在のPythonコードを実行しているInterpreterオブジェクトを取得する
- get_main関数:メインのInterpreterオブジェクトを取得する。メインのInterpreterオブジェクトとはREPL起動時またはPythonスクリプトをコマンドラインで実行した際に作成されるもの
- create関数:新規にInterpreterオブジェクトを作成する
- create_queue関数:Interpreterオブジェクト間で通信するのに使うキューを作成する
- Interpreterクラス:Pythonコードを実行するために必要な情報を含むオブジェクト
- Queueクラス:Interpreterオブジェクト間で通信するのに使用する
まずは幾つかの関数を使ってみよう。
from concurrent.interpreters import list_all, get_current, get_main, create
interp_list = list_all() # 現在のプロセスに存在するインタプリターの取得
print(interp_list) # [Interpreter(0)]
interp = get_main() # メインのInterpreterオブジェクトを取得
print(interp) # Interpreter(0)
interp = get_current() # 現在のInterpreterオブジェクトを取得
print(interp) # Interpreter(0)
list_all関数は呼び出し時点で現在Pythonを実行しているプロセス内に存在するInterpreterオブジェクトを一覧できる。新規に実行したREPL環境でこのコードを実行すると、その環境を実行した際に作成された「メインのInterpreterオブジェクト」だけが存在しているので「Interpreter(0)」だけを要素とするリストが返される。
get_main関数はメインのInterpreterオブジェクトを取得する。上でも述べた通り、これは「Interpreter(0)」だ。get_current関数はPythonコードを現在実行しているInterpreterオブジェクトを取得する。この例ではInterpreterオブジェクトは1つしかないので、この関数の戻り値も「Interpreter(0)」になっている。
macOSでこのコードを実行した結果を以下に示す。
次にcreate関数でInterpreterオブジェクトを新規に作成してみる。
interp = create() # Interpreterオブジェクトを新規に作成
interp_list = list_all() # 現在のプロセスに存在するインタプリターの取得
print(interp_list) # [Interpreter(0), Interpreter(1)]
create関数で新規にInterpreterオブジェクトを作成すると、list_all関数はそのオブジェクトを含むリストを返すようになる。
Interpreterオブジェクトには次のような属性やメソッドがある(一部抜粋)。
- id属性:Interpreterオブジェクトを特定するID
- is_runningメソッド:そのInterpreterオブジェクトでコードを実行中ならTrue。そうでなければFalse
- closeメソッド:Interpreterオブジェクトを削除する
- prepare_mainメソッド:マッピングもしくはキーワード引数(name=value)の形式でInterpreterオブジェクトの名前空間に特定のオブジェクトをバインドする。後述のexecメソッドで何らかのオブジェクトを使いたいときに使用する(そのオブジェクトはpickle化されてInterpreterオブジェクトの名前空間に渡された後に非pickle化される)
- execメソッド:Pythonコードを記述した文字列を渡すと、そのコードがInterpreterオブジェクトで実行される(現在のスレッドを使用)。グローバルな名前空間やローカルな名前空間はInterpreterオブジェクトに独自なものなので、必要なオブジェクトは文字列として記述したコード内で用意するか、prepare_mainメソッドでバインドする必要がある点に注意。実質的には実行先のInterpreterオブジェクトの__main__名前空間でexec関数を実行するものと考えられる。戻り値はない点には注意
- callメソッド:呼び出し可能オブジェクト(関数など)をInterpreterオブジェクトで実行する(現在のスレッドを使用)、呼び出し可能オブジェクトは「ステートレス」である必要がある(自由変数をキャプチャーしたり、グローバル変数にアクセスしたりはできない)。また、渡せる引数は共有可能オブジェクト(多くの場合はpickle化が可能なオブジェクト)でなければならない。呼び出し可能オブジェクトを呼び出した結果がcallメソッドの戻り値となる
- call_in_threadメソッド:新規にスレッドを作成して、そのスレッドの中でInterpreterオブジェクトを使って呼び出し可能オブジェクトの実行を行う
execメソッドとcallメソッド、call_in_threadメソッドではあらゆるPythonコードを実行できるわけではない点に注意が必要だ。まずはexecメソッドを使ってみよう。
# 今は2つのインタプリターが存在する
print(list_all()) # [Interpreter(0), Interpreter(1)]
# codeにPythonコードを記述しておく
# get_current関数を呼び出せるように関数内でimportもする
code = '''def func():
from concurrent.interpreters import get_current
print(f'hello from {get_current()}')
func()
'''
interp.exec(code) # 先ほど作成したインタプリターでcodeを実行
ここでは先ほど作成したInterpreterオブジェクト(interp)のexecメソッドにcodeを渡し、そこに(文字列として)書かれているPythonコードを実行している。codeの内容は以下の通り(codeにはこのコードをトリプルクオートで囲んで文字列化したものが代入されている)。
def func():
from concurrent.interpreters import get_current
print(f'hello from {get_current()}')
func()
func関数はget_current関数でこのコードを実行しているInterpreterオブジェクトを取得して、それを含んだメッセージをprint関数で表示している。このコードを記述しているトップレベルで既にget_current関数をインポートしているのに、func関数内でもローカルにこれをインポートしているのは、実行先のInterpreterオブジェクトの名前空間と現在のコードを記述している名前空間が別々のものであり、実行先の名前空間でもこの関数を使えるようにセットアップする必要があるからだ。
実行すると次のような結果になる。
このようにインタプリターはそれぞれに独自の名前空間を持つ点には注意が必要だ。上の例では文字列として記述したコード内でセットアップしているが、以下のようにprepare_mainメソッドで実行先のInterpreterオブジェクトの__main__にセットアップすることも可能だ。
interp.prepare_main({'foo': 'FOO'}, bar='BAR')
code = '''print(f'{foo=}, {bar=}')'''
interp.exec(code)
これを実行すると次のようになる。
execメソッドで何かを実行する場合には必要な情報は上で見たように、そのインタプリター内で用意してやる必要がある。
次にcallメソッドを見てみよう。
def func2(msg):
from concurrent.interpreters import get_current
ip = get_current()
print(f'{msg} from {ip}')
interp.call(func2, 'hello')
func2関数は上で見たfunc関数と同様にローカルにget_current関数をインポートして、パラメーターmsgに受け取った文字列とget_current関数の戻り値をprint関数で表示するものだ。上のcallメソッド呼び出しではこの関数と引数として文字列'hello'を渡している。実行結果を以下に示す。
func2関数はステートレス(何らかの状態に依存しない)、つまり自由変数をキャプチャーしたりグローバル変数にアクセスしたりしていないので呼び出せる。しかし、以下に示すfunc3関数は実行できない。
interp.exec('print(foo)') # FOO
def func3():
print(f'{foo=}')
interp.call(func3) # NotShareableError
この例ではまず「interp.exec('print(foo)')」を実行して、先ほどprepare_mainメソッドを使ってinterpオブジェクトの名前空間にバインドした変数fooが存在することを確認している。func3関数ではこれにアクセスしようとしているが、これはグローバル変数であり、そうしたオブジェクトにアクセスする関数はcallメソッドでは実行できないことからNotShareableError例外が発生している。
execメソッドは「任意のコードの実行」に、callメソッドは「関数などの実行」に使うのが基本的な使い分けの指針となるでしょう。ただし、それぞれに必要なことや制約があることには留意してください。また、抜け道的な使い方としては次のようなコードが考えられます(が、これがよい方法かどうかはよく考えましょう。知らんけど)。
interp.prepare_main(foo='FOO')
def func4():
import __main__
print(f'foo={__main__.foo}') # foo=FOO
interp.call(func4)
call_in_threadメソッドは新規にスレッドを作成して、そのスレッド内でInterpreterオブジェクトを使って呼び出し可能オブジェクトを(非同期に)実行する。非同期に実行する意味はあまりないが、func2関数を実行していたコードをcall_in_threadメソッドを使って書き直すと次のようになる。
def func2(msg):
from concurrent.interpreters import get_current
ip = get_current()
print(f'{msg} from {ip}')
t = interp.call_in_thread(func2, 'hello')
t.join()
なお、call_in_threadメソッドでは呼び出した関数が返す値は捨てられる。そのため、スレッドを新規に作成して、非同期に処理を行い、その戻り値を使いたいときにはconcurrent.interpreters.Queueクラスを使う必要がある(後述)。ここで、これまでに使ってきたInterpreterオブジェクトは削除しておこう。
interp.close()
Interpreterオブジェクトを使った非同期処理
Interpreterオブジェクトは特定のスレッドとは結び付いていない。execメソッドやcallメソッドは現在のスレッドを使って実行されるが、call_in_threadメソッドは新規にスレッドを作成し、その上で実行される。ここではcall_in_threadメソッドを使って、非同期に処理を実行してみよう。
その前に以下のコードを見てほしい。
import threading
from time import time, thread_time
def do_some_work(n):
print(f'thread #{n} start')
result = 0
t = thread_time() # スレッドの経過時間を計測する
while thread_time() - t < 10: # 10秒間はCPUに仕事をさせる
result += 1
print(f'thread #{n} end. {result=}')
def main():
threads = []
for n in range(5):
t = threading.Thread(target=do_some_work, args=(n,))
threads.append(t)
st = time()
for t in threads:
t.start()
for t in threads:
t.join()
ed = time()
print(f'time: {ed - st}')
if __name__ == '__main__':
main()
これはthreadingモジュールを使って、5つのスレッドに仕事をさせるコードだ。do_some_work関数の内部ではスレッド内の経過時間が10秒を過ぎるまではCPUに仕事をさせて、10秒が経過したら関数を終了している。main関数ではスレッドを5個作成して、それらにdo_some_work関数を実行させている。フリースレッドモードではないPythonでは並列処理にならないが、フリースレッド版Pythonでは並列に処理される。
以下にフリースレッド版ではないPython 3.14とフリースレッド版のPython 3.14でこのコードを実行した結果を示す。
フリースレッドではないpython3.14コマンドでは実行が終わるまでに50秒以上がかかっているが、フリースレッド版のpython3.14tコマンドでは10秒程度で実行が終わっている点に注目してほしい。これはフリースレッド版のPythonではGILによる「同時に実行できるPythonスレッドは1つだけ」という制約がなくなって、CPUコアを十分に活用できるようになっているからだ。
では、上のコードをconcurrent.interpretersモジュールを使って書き直すとどうなるかを試してみよう。ここではInterpreterオブジェクトを5個作成して、それらのcall_in_threadメソッドを使ってdo_some_work関数を並列に実行することにする。do_some_work関数は基本的に変わらない(thread_time関数を関数内でローカルにインポートするようにしたのと、print関数で出力する文字列を変更したくらいだ)。
import threading
from time import time
from concurrent.interpreters import create
def do_some_work(n):
from time import thread_time # 関数でローカルにインポート
print(f'interp #{n} start')
result = 0
t = thread_time() # スレッドの経過時間を計測する
while thread_time() - t < 10: # 10秒間はCPUに仕事をさせる
result += 1
print(f'interp #{n} end. {result=}')
def main():
interps = [create() for _ in range(5)]
threads = []
st = time()
for n, interp in enumerate(interps):
t = interp.call_in_thread(do_some_work, n)
threads.append(t)
for t in threads:
t.join()
ed = time()
print(f'time: {ed - st}')
for interp in interps:
interp.close()
if __name__ == '__main__':
main()
これをフリースレッド版ではないpython3.14コマンドで実行した結果を以下に示す。
これが可能なのは、GILがInterpreterオブジェクトごとに存在しているからだ。複数のスレッドでInterpreterオブジェクトを用いてコードを実行すれば、GILもその数だけ存在するため、それぞれのスレッドで実行されるコードが他のスレッドをブロックすることはない。また、Interpreterオブジェクトは基本的に他のInterpreterオブジェクトとは隔離されているので、お互いが管理しているオブジェクトを破壊することもない。
では、Interpreterオブジェクト間で情報をやりとりするにはどうすればよいのだろう。
Interpreterオブジェクト間での情報のやりとり
concurrent.interpretersモジュールにはQueueクラスと、そのファクトリであるcreate_queue関数がある。これを使うことで、Interpreterオブジェクト間で情報をやりとりできる。例えば、上で見たdo_some_work関数が値を返すものだったとしよう。
def do_some_work(n):
from time import thread_time # 関数でローカルにインポート
print(f'interp #{n} start')
result = 0
t = thread_time() # スレッドの経過時間を計測する
while thread_time() - t < 10: # 10秒間はCPUに仕事をさせる
result += 1
print(f'interp #{n} end. {result=}')
return result
これをcall_in_threadメソッドで実行すると、その戻り値は捨てられてしまう。そこでconcurrent.interpreters.Queueクラスを使って、この結果をキューに置くようにしてみよう。
def do_some_work(n, q):
from time import thread_time # 関数でローカルにインポート
print(f'interp #{n} start')
result = 0
t = thread_time() # スレッドの経過時間を計測する
while thread_time() - t < 10: # 10秒間はCPUに仕事をさせる
result += 1
print(f'interp #{n} end. {result=}')
q.put((f'interp #{n}', result))
Queueクラスにはputメソッドがあるので、これを使ってキューに加算結果(とインタプリターを識別する文字列)を置くだけだ。
また、呼び出し側のコードは次のようになる。
from concurrent.interpreters import create_queue
def main():
interps = [create() for _ in range(5)]
q = create_queue()
threads = []
st = time()
for n, interp in enumerate(interps):
t = interp.call_in_thread(do_some_work, n, q)
threads.append(t)
for t in threads:
t.join()
ed = time()
print(f'time: {ed - st}')
results = []
while not q.empty():
results.append(q.get())
print(results)
for interp in interps:
interp.close()
main()
concurrent.interpreters.Queueオブジェクトはcreate_queue関数で作成する。後はcall_in_threadメソッドでdo_some_work関数を実行する際に作成したキューを渡してやる。全てのスレッドの実行が終わったら、キューから全ての要素をgetメソッドで取り出している。これを実行すると次のようになる。
InterpreterPoolExecutor
ここまではconcurrent.interpretersモジュールが提供するクラスや関数を使ってきたが、そこで行われていた処理をもっと抽象化するInterpreterPoolExecutorというクラスがconcurrent.futuresモジュールで定義されている。詳しくは説明しないが、これを使うことで先ほどのコードは以下のように書き替えられる。
def do_some_work(n):
from time import thread_time # 関数でローカルにインポート
print(f'interp #{n} start')
result = 0
t = thread_time() # スレッドの経過時間を計測する
while thread_time() - t < 10: # 10秒間はCPUに仕事をさせる
result += 1
print(f'interp #{n} end. {result=}')
return (f'interp #{n}', result)
def main():
st = time()
with InterpreterPoolExecutor(max_workers=5) as ex:
futures = [ex.submit(do_some_work, n) for n in range(5)]
results = [fut.result() for fut in as_completed(futures)]
ed = time()
print(f'time: {ed - st}')
print(results)
「説明が足りないぞ!」「ちゃんとしろっ!」という声が聞こえてきそうですが、もう疲れちゃったので説明はしません(キリッ)。見りゃ分かるでしょ(分かってくれるとうれしいです)。
共有可能オブジェクト
これまでに見てきたように、Interpreterオブジェクトを用いて数行のコードや関数を実行したり、それらに引数を渡したりする場合には内部でpickleが使われる。そのため、pickle化や非pickle化が可能なオブジェクトでなければInterpreterオブジェクトに渡すことができない。こうしたpickle化可能なオブジェクトのことを「共有可能オブジェクト」と呼ぶ。concurrent.interpreters.Queueでやりとりできるのも共有可能オブジェクトだけだ。
このような共有可能オブジェクトとしては次のようなものが挙げられる。
- None
- True/False
- バイト列
- 文字列
- 整数
- 浮動小数点数
- タプル
また、共有可能オブジェクトはその受け渡しの際にpickle化/非pickle化されるということは、それぞれのInterpreterオブジェクトでは同じ値を持つ別々のオブジェクトがコピーとして受け渡されるということでもある(ただし、memoryviewなど一部例外もあるがここでは取り上げない)。
Copyright© Digital Advantage Corp. All Rights Reserved.







