Pythonはバージョン3.4で非同期I/O処理を行うためのasyncioパッケージが導入された。その後、バージョン3.5でasync/await構文による非同期イテレータを記述できるようになり(PEP 492)、バージョン3.6ではasync/await構文による非同期ジェネレータがサポートされるようになった(PEP 525)。以下では、asyncioパッケージを使用して、バージョン3.5での非同期イテレータとバージョン3.6の非同期ジェネレータの利用について簡単に見ていこう。
まず「非同期」が付かないイテレータ(あるいはイテラブル=反復可能オブジェクト)とジェネレータについて簡単におさらいをしておく。Pythonのドキュメントではイテラブル(反復可能なオブジェクト)を「構成要素を一度に一つずつ返すことができるオブジェクト」と、イテレータを「データの流れを表現するオブジェクト」としている。
前者はリストなどPythonに組み込みのデータ型や以下に示すような何らかの連続的なデータを格納し、それらを1つずつ取り出すことができるオブジェクトのことであり、後者は「現在、イテラブルに含まれているどの要素を扱っているかを認識し、要素を取り出したり、要素がなければそのことを(例外を通じて)通知したりできるオブジェクト」のことだ。実際にはイテラブルが「イテレータを取り出す__iter__メソッド」と「次の要素を取得する__next__メソッド」を提供し、イテレータとしても振る舞うことが多い。
次に示すコードはそのように振る舞う単純なイテレータ(イテラブル)の例だ。
class MyIterable():
def __init__(self, num):
self.data = [x for x in range(num)]
self.idx = -1
def __next__(self):
self.idx += 1
if self.idx >= len(self.data):
raise StopIteration
else:
return self.data[self.idx]
def __iter__(self):
return self
for item in MyIterable(3):
print(item)
__init__メソッドではnumパラメーターの値に応じてデータを用意し、現在の扱っている要素を示すインデックス値を初期化している(処理の都合でここでは「-1」としている)。__next__メソッドは要素の取り出しを行うためのメソッドであり、for文などでは内部的にこのメソッドが呼び出されている。ここでは、インデックス値をインクリメントした上で、まだデータがあればそれを返送し、なければ例外を発生している。__iter__メソッドは反復処理が可能なオブジェクトが必要なところで自動的に呼び出されるメソッドであり、ここでは__next__メソッドによる反復処理が可能である自分自身を返送している。
これに対して、ジェネレータは「イテレータを生成する関数」だと考えられる。以下に簡単な例を示す。これは上と同様な処理を行う「ジェネレータ-イテレータ」を生成する。
def MyGenerator(num):
for item in range(num):
yield item
for item in MyGenerator(3):
print(item)
まず目に付くのはジェネレータは非常にコードがシンプルなことだ。また、全てのデータを最初に確保していないところも先ほどのイテレータとは異なる(この特性はメモリ負荷を削減することにもつながる)。次に、「return」で値を返すのではなく、「yield」で値を返しているところが異なる。もう1つ大きな違いは__next__メソッドでは繰り返しの中の一度の処理を記述していたが、ジェネレータが表現するのはデータを反復的に返送していく処理の全体を記述しているところだ。
ジェネレータでは「yield」により値を返送する際には、同時に制御も呼び出し側にいったん戻され、その後、ジェネレータが生成したイテレータに制御が移ると、以前に実行したコードの直後から実行が再開される(上のコードであれば、for文で次のループが開始されるということだ)。必要な時点で必要な計算を行い、その結果を戻すといった場合にジェネレータは役に立つ。
こうした構造を模して、非同期に反復的な処理を行えるようにするための機構がasyncioパッケージとPython 3.5/3.6で追加されたasync/await処理ということになる。
ただし、Python 3.5でサポートされたのはあくまでも非同期イテレータまでだった。ちょっとサンプルとして難しいコードになってしまったが、非同期イテレータを使用したサンプルコードを以下に示す。これはコンソールで数値を入力してもらい、その値だけスリープした後にコンソールに出力を行い、再度入力を待つという動作をする(反復的に値を取り出す処理であるかどうかは難しいが、ユーザーが入力した値を返しているので許してほしい。また、エラー処理は省略している)。
import asyncio
import datetime
class MyAsyncIterable():
def __aiter__(self):
return self
async def __anext__(self):
time = input('input wait time (0 to break): ')
time = int(time)
if time == 0:
raise StopAsyncIteration()
await mysleep(time)
return time
async def mysleep(time):
to = datetime.datetime.now() + datetime.timedelta(seconds=time)
while True:
await asyncio.sleep(1)
print('*', sep='', end='', flush=True)
if datetime.datetime.now() > to:
break
print('')
async def main():
aiter = MyAsyncIterable()
async for msg in aiter:
print('sleep time: {', msg, '} sec(s)', sep='')
#running = True
#while running:
# try:
# msg = await aiter.__anext__()
# except StopAsyncIteration as e:
# print('catch StopAsyncIteration')
# running = False
# else:
# print(f'sleep time: {msg} sec(s)')
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
ここで注目してほしいのは、MyAsyncIterableクラスとmain関数だ。mysleep関数については「1秒ごとにアスタリスクをコンソールに出力しながら、指定された秒数だけ待機する関数」だと思ってほしい(詳しい説明は割愛)。また、コード末尾の3行は非同期I/Oを行う際の定型処理だ。イベントループを取得して、そこに実行したい処理を投げ込んで、最後にループをクローズしている。
そこでまずはMyAsyncIterableクラスを見てみよう。
class MyAsyncIterable():
def __aiter__(self):
return self
async def __anext__(self):
time = input('input wait time (0 to break): ')
time = int(time)
if time == 0:
raise StopAsyncIteration()
await mysleep(time)
return time
最初に見たシンプルなMyIterableクラスでは__init__メソッドでデータの初期化などを行っていたが、ここではデータを保持していないので__init__メソッドはない。あるのは__aiter__メソッドと__anext__メソッドの2つだ。前者の名前は「__iter__」に、後者の名前は「__next__」に非同期(asynchronous)を意味する「a」をそれぞれ付加したものだ。
やっていることも同様であり、__aiter__メソッドはイテレータとして自身を返送している(__aiter__メソッドの仕様はPython 3.5.2で変更されているため、このコードはそれより前のバージョンでは動作しない。詳細は「PEP 492 - コルーチン、 async と await 構文」の「注釈」を参照されたい)。また、__anext__メソッドでは「反復処理の中の1回の処理」を記述している。
注意したいのは、関数定義が「async def」で始まっているところと、内部でawaitによる処理の待機(「await mysleep(time)」で指定された秒数だけ待機する関数呼び出しが終了するのを待機している)を行っているところだ。「await」は「async def」で定義される関数内でのみ使用可能であり、他の部分では構文エラーとなる。待機秒数として「0」が入力された場合には「StopAsyncIteration」例外を送出しているが、これはmain関数での「async for」文で反復処理の終了を判断するために使われている)。
main関数は次のようになっている。
async def main():
aiter = MyAsyncIterable()
async for msg in aiter:
print('sleep time: {', msg, '} sec(s)', sep='')
#running = True
#while running:
# try:
# msg = await aiter.__anext__()
# except StopAsyncIteration as e:
# print('catch StopAsyncIteration')
# running = False
# else:
# print(f'sleep time: {msg} sec(s)')
main関数では「async for」文でループ処理を行っている。非同期イテレータ/ジェネレータを利用して反復処理を行うには通常のfor文ではなく、async for文を使用する必要がある。また、「await」と同じく、これをasync for文はasync defで定義される関数内でしか使えないことにも注意しよう。
コメントアウトしてあるのは、その上のasync forループと同等な処理だ。async forループと下のコードでコメントイン/コメントアウトを入れ替えて実行してみると、StopAsyncIterationが発生した時点でループが終了するのが分かるはずだ。
実際に実行した様子を以下に示す。
> python gen35.py
input wait time (0 to break): 3
***
sleep time: {3} sec(s)
input wait time (0 to break): 0
> python gen35.py
input wait time (0 to break): 1
*
sleep time: 1 sec(s)
input wait time (0 to break): 0
catch StopAsyncIteration
非同期イテレータの特性をまとめると次のようになる。
そして、非同期ジェネレータはこうした特性のうち、__aiter__メソッド/__anext__メソッドの定義が必要になる非同期イテレータの実装を隠してくれる便利なジェネレータだといえる。
上で見た非同期イテレータと同等な処理を行う非同期ジェネレータのコードを以下に示す。
import asyncio
import datetime
async def MyAsyncGenerator():
while True:
time = input('input wait time (0 to break): ')
time = int(time)
if time == 0:
break
await mysleep(time)
yield time
async def mysleep(time):
# …… 省略 ……
async def main():
agen = MyAsyncGenerator()
async for msg in agen:
print(f'sleep time: {msg} sec(s)')
#running = True
#while running:
# try:
# msg = await agen.__anext__()
# except StopAsyncIteration as e:
# print('catch StopAsyncIteration')
# running = False
# else:
# print(f'sleep time: {msg} sec(s)')
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
先ほどはクラスとしてMyAsyncIterableを定義していたが、MyAsyncGeneratorは非同期ジェネレータ関数として「async def」を利用して定義している。単純なジェネレータ関数と同様、反復処理の全体をまとめて記述している点に注意しよう(ここではMyAsyncIterable.__anext__メソッドと同じ処理を「while True:」による無限ループにくるんでいる)。内部での処理はMyAsyncIterable.__anext__メソッドとほぼ同じだが、「return」ではなく「yield」で値(と制御)を戻している点と、ユーザーが「0」を入力し、反復処理を終了する際にStopAsyncIteration例外を送出していないのが異なる。非同期ジェネレータではStopAsyncIteration例外がラップされるので、自前でこの例外を送出する必要はない。
main関数での処理は変数名、クラスのインスタンス生成/ジェネレータ関数呼び出しが異なるだけでやっていることは同じだ。コメントアウトの内容も同等なので、main関数で行う処理を入れ替えて実行し、本当にStopAsyncIteration例外が発生しているかを確認してみてほしい。
Python 3.xでの非同期処理に関しては、他にも待機可能オブジェクトやasyncioパッケージ関連の話題など、語るべきことが多数あるが別項に譲ることにする。
最後に非同期ジェネレータに関連して、非同期処理を内包表記で使用する簡単なサンプルも紹介しておこう(PEP 530)。これは2秒待機した後に、数値を返送するジェネレータとそれを利用するコードだ。コードは見れば分かる通りなので、説明は割愛する。
import asyncio
from random import randint
async def SimpleAsyncGenerator(num):
for item in range(num):
await asyncio.sleep(2)
yield item
async def main():
result = [x async for x in SimpleAsyncGenerator(3)]
print(result)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
本稿では、Python 3.6で追加された機能の中から、文字列補間(f文字列)、数値リテラル内でのアンダースコアの利用、拡張された型注釈機能、非同期プログラミングについて紹介した。機会があれば、本稿で取り上げられなかったものもまた紹介していこう。
Copyright© Digital Advantage Corp. All Rights Reserved.