第4回は、Pythonでストリーミング型のgRPCサービスを開発してみます。ここでは、幾つかあるストリーミング型のうち、サーバからの複数レスポンスとなるサーバサイドストリーミングを実装してみます。
この記事は会員限定です。会員登録(無料)すると全てご覧いただけます。
本連載のサンプルコードをGitHubで公開しています。こちらからダウンロードしてみてください。
今回のテーマは、Pythonによるサーバサイドストリーミング型のgRPCサービスです。そのために、まずはサーバサイドストリーミングについておさらいしましょう。gRPCでは、以下の4つの通信方式を使うことができます。
このうち、サーバサイドストリーミングとは、図1のように一つのリクエストに対して複数のレスポンスが返る通信方式です。例えば、一つの検索リクエストに対して複数の検索結果が返ってくるイメージです。
複数の検索結果を受け取るには、レスポンスのメッセージにrepeatedキーワードを付与して配列にする方法があります。ただし、この場合はあくまでも一つのメッセージであることには変わりがないので、全てのメッセージを受け取るまでプログラムの実行がブロックされてしまいます。メッセージのサイズによってはパフォーマンスに悪影響が出ることが考えられます。
ストリーミングを指定することで、個々のメッセージは非同期に受け取るようになります。大量のメッセージの受け取りでも処理待ちを最小限に抑えることで、パフォーマンスの悪化を防ぐことができます。
開発に必要な環境を準備しておきます。macOSを基本に解説しますが、Windowsにも必要に応じて触れます。なお、あくまでも学習用としての位置付けなので、サーバとクライアントは同じホストに配置することにして、開発環境は共通とします。
本連載で共通で使用するコードエディタVisual Studio Code(以降、VSCode)と、汎用(はんよう)のgRPCクライアントであるEvansをインストールしておいてください。VSCodeでは、拡張機能「Python」のインストールもおすすめします。Evansのインストールと利用方法は、連載第2回を参照してください。
Pythonの実行環境を用意します。gRPCの公式サイトによると、gRPCの利用が可能なPythonインタープリタとpip(パッケージマネジャー)の本稿作成時点におけるバージョンは以下の通りです。
これらがインストールされていないか、バージョンが古い場合には最新バージョンをインストールしてください。バージョンは、それぞれ以下のように確認できます(python3コマンドかpythonコマンド。以降も同様)。
% python3 --version Python 3.10.8 % python3 -m pip --version pip 22.2.2 from /usr/local/lib/python3.10/site-packages/pip (python 3.10)
Pythonのインストールは、macOSではHomebrewの利用をおすすめします(pipも同時にインストールされます)。Windowsでは、pythonコマンドによるMicrosoft Storeからのインストールを利用できます(ただしWindows 10以降)。PowerShellなどで「python」コマンドを実行すると、Microsoft Storeアプリが起動してPython 3.10の入手画面になるので、[入手]ボタンのクリックでインストールできます(こちらでもpipが同時にインストールされます)。
インストールしたら、念のために上記コマンドでPythonとpipのバージョンを確認してください。
Pythonとpipが準備できたら、gRPCのためのライブラリとツールをインストールします。以下のコマンドを実行します。
% python3 -m pip install grpcio % python3 -m pip install grpcio-tools
これにて、grpcio、protobuf、grpcio-toolsの3つのパッケージがインストールされます。ここでプロジェクトを配置するフォルダを用意して(ここではatmarkit_grpc/pythonとしました)、以降はVSCodeのメニューから[ターミナル]−[新しいターミナル]を選択し、ターミナルを開いて作業します。ターミナル上でカレントフォルダをここに移動しておきます。
環境が整ったところで、PythonでgRPCサーバとクライアントを実装していきます。作成するアプリケーションのテーマは連載第3回と同様に「書籍情報検索サービス『BookInfo』」です。BookInfoは、静的に用意されたリストからキーワード検索し、結果をストリーミングで返すだけのシンプルなものですが、Pythonを用いて独自のgRPCサービスを実装したいときの手順として参考になるでしょう。
作業の内容は、既定のファイルを複製し、BookInfoサービスに合わせてクラス名などを修正していくイメージです。大まかに、以下の手順でサービスを実装していきます。
PythonでgRPCアプリケーションを作成するとき、フォルダ構成の決まりは特にありません。そのため、全てのファイルを同じ階層に配置してもよいのですが、ここでは見通しをよくするために、ファイルの役割ごとにフォルダを分けることにします。
既定のファイルは、gRPC公式サイトのGitHubリポジトリにあるサンプルを用いることにします。もちろん、ゼロから手順を踏んでコーディングしていっても問題ありませんが、ある程度定型的な処理となるので、その形を理解しながら作業を進めた方が習得も早いでしょう。あらかじめ用意されているサンプルのうち、サーバサイドストリーミングを実装したhellostreamingworldをひな型に利用することにします。このサービスは、連載第2回で紹介した.NETとC#によるGreeterサービスと同じ機能を、サーバサイドストリーミングで実装したものです。
BookInfoサービスで利用するデータをDataフォルダを作成して配置します。本来は、データソースとしてデータベースやWebサービスを利用しますが、ここでは簡略化のためにJSONファイルを読み込んで検索するものとしています。以下は、書籍情報を記述したJSONファイルの内容の一部です。
[ { "id": 1, "title": "Ruby on Rails 7 ポケットリファレンス", "author": "山内直", "publisher": "技術評論社", "isbn": "978-4-297-13062-6", "price": 3200, "type": "BOOK" }, …略… ]
プロトコル定義ファイルをProtosフォルダを作成して配置します。連載第1回で作成したbook.protoファイルを、bookinfo.protoファイルとしてProtosフォルダにコピーします。そして、以下のように修正しておきます。変更の多くは、クラス名やフィールド名の衝突や混同を避けるためですが、サーバサイドストリーミングに必要な重要な修正もあります。
// パッケージ名を変更する:AtmarkIt⇒bookinfo package bookinfo; …略… // 名前の衝突を避けるためBook⇒BookItemとメッセージ名を変更する message BookItem { …略… } …略… message SearchResponse { int32 status_code = 1; BookItem item = 2; // メッセージ名BookItemに合わせて変更 } // 名前の衝突を避けるためBookService⇒BookInfoと変更する service BookInfo { // 手続きの戻り値にストリーミングを表すstreamを付与する rpc Search(SearchRequest) returns (stream SearchResponse); (1) }
重要なのは(1)です。ここでは、戻り値のSearchResponseにstreamキーワードを付与しています。これにより、戻り値がストリーミングによって返されることが指定されます。このキーワードがないと、ただのUnary通信となり、生成されるファイルの内容が大きく変わってしまうので、必ず指定するようにします。
ここからは、サーバアプリケーションのための作業です。Protosフォルダと同様にServerフォルダを作成して、プロトコル定義ファイルをもとに、メッセージやサービスのコードを生成します。生成は、grpc-toolsのprotocユーティリティーを使います。以下のようにコマンドを実行します。
% python3 -m grpc_tools.protoc --proto_path=./Protos --python_out=./Server --pyi_out=./Server --grpc_python_out=./Server ./Protos/bookinfo.proto
オプションがたくさんありますが、全て指定が必要です。--proto_pathはプロトコル定義ファイルの置かれるパスを指定しますが、この場合は同階層のProtosフォルダです。--python_out、--pyi_out、--grpc_python_outの3つのオプションは、生成されるファイルの出力先フォルダの指定です。この場合はServerフォルダを指定します。出力先がカレントディレクトリでも省略できません。プロトコル定義ファイルであるbookinfo.protoを入力に、以下の3つのファイルが生成されます。
サービスの処理を実装する際には、これらで定義されているクラスなどを継承したクラスを作成し、手続きに相当するメソッドを必要に応じてオーバライドしていきます。なお、連載第2回と第3回でも見てきたように、自動生成されるファイルは内容を変更しませんし、極論すれば中身を見る必要もありません。定義されたクラスなどをそのまま利用すればいいだけです。
なお、ここで作成したファイルはクライアントアプリケーションでもそのまま使用します。また、プロトコル定義ファイルを変更した場合には、上記のコマンドでファイルの生成をやり直す必要があります。
続けて、サーバアプリケーションを作成していきます。ベースとするのは、リポジトリにあるasync_greeter_server.pyファイルです。このファイルを、bookinfo_server.pyとしてServerフォルダに配置し、BookInfoサービスのために修正した内容が以下です。必要に応じてasync_greeter_server.pyファイルも参照してください。
# 基本となるインポート import asyncio (1) import logging import json (2) # gRPCサービスのためのインポート import grpc (3) from bookinfo_pb2 import BookItem (4) from bookinfo_pb2 import SearchResponse from bookinfo_pb2 import SearchRequest from bookinfo_pb2_grpc import MultiBookInfoServicer from bookinfo_pb2_grpc import add_MultiBookInfoServicer_to_server # JSONファイルから読み込んでリストを取得する USER_DB = "../Data/bookinfo.json" (5) with open(USER_DB, mode="r") as fp: books = json.load(fp) # BookInfoサービスのためのクラス class BookInfo(MultiBookInfoServicer): (6) # Search手続きのオーバライド async def Search(self, request: SearchRequest, (7) context: grpc.aio.ServicerContext) -> SearchResponse: logging.info("Serving Search request %s", request) text = request.text (8) # 全てのリスト要素に対して検索を実行 for i in books: # 書名、著者名、出版社名にキーワードが含まれれば検索成功 title = i["title"] author = i["author"] publisher = i["publisher"] if text in title or text in author or text in publisher: # リストからBookItemオブジェクトを生成 bookitem = BookItem(title=i["title"], author=i["author"], publisher=i["publisher"], isbn=i["isbn"], price=i["price"], type=i["type"]) # SearchResponseオブジェクトをyieldで返す yield SearchResponse(status_code = 1, item = bookitem) (9) # サーバ処理本体 async def serve() -> None: (10) server = grpc.aio.server() add_MultiBookInfoServicer_to_server(BookInfo(), server) listen_addr = "[::]:50051" server.add_insecure_port(listen_addr) logging.info("Starting server on %s", listen_addr) await server.start() await server.wait_for_termination() if __name__ == "__main__": (11) logging.basicConfig(level=logging.INFO) asyncio.run(serve())
(1)のimport文は、基本となるものです。ストリーミングでは非同期入出力を扱いますのでasyncioモジュールを、ここではJSONデータを扱うのでjsonモジュールを、それぞれインポートしています。
(3)のimport文は、gRPCサービスに必要なものです。自動生成したモジュールは、ここでインポートします。(4)は既定では存在しませんが、BookItemメッセージを扱うために個別にインポートしています。
(5)では、データファイルを読み込んでいます。jsonモジュールのloadメソッドによって、リストとして取り込んでいます。
(6)では、自動生成されたクラスMultiBookInfoServicerを継承して、サーバアプリケーション用に新たにクラスを定義しています。ストリーミングを扱うクラスは、このように「Multi」と付けられた名前が自動的に付与されます。
(7)は、プロトコル定義ファイルで定義されているSearch手続きに相当するメソッドの定義です。asyncキーワードが付与されていることから分かるように、非同期で実行される関数となっています。もし、このSearchメソッドの定義がない場合、親クラス(MultiBookInfoServicer)の方のSearchメソッドによって、実装がない旨の例外が発生します。
(8)以降は、検索キーワードを取得してJSONデータの各要素を巡回し、書名(title)、著者名(author)、出版社名(publisher)のいずれかに含まれれば検索成功として、レスポンスであるSearchResponseオブジェクトを生成して返しています。このとき、(9)のようにyieldキーワードによってオブジェクトが返されていることから分かるように、Searchメソッドは非同期ジェネレータとなっています。
(10)は、サーバ処理の本体に相当するserveメソッドです。これも、asyncキーワードが付与されている通り非同期で実行される関数です。ほぼ定型となる処理になっており、grpc.aio.serverオブジェクトを生成し、BookInfoサービスを登録します。そして、リスンするアドレスとポートを設定し、サーバを開始します。
(11)は、スクリプトのメインです。ロギングを開始し、(10)で定義されたserve関数を非同期で実行します。
クライアントアプリケーションの作成に移る前に、サーバアプリケーションが正しく動作するか、Evansで検証しましょう。サーバの起動は、もう一つターミナルを開き、Serverフォルダで以下のコマンドを実行します。起動したサーバは、[Ctrl]+[C]の入力によって停止します。
% python3 bookinfo_server.py
Evansの操作手順は、連載第2回を参照してください。ここでは、Evansを起動するコマンドのみ紹介します。
% evans --proto ./Protos/bookinfo.proto --host localhost --port 50051 repl
ここからは、クライアントアプリケーションを作成していきます。このアプリケーションは、コマンドラインパラメーターにて検索語を指定し、サーバのSearch手続きを呼び出してその結果を表示するものとします。
クライアントアプリケーションの作成に先立ち、サーバアプリケーションで生成したインタフェースファイルなどをClientフォルダにコピーしておいてください。このように、サーバとクライアントで生成物を完全に共有できます。
ベースとするのは、リポジトリにあるasync_greeter_client.pyファイルです。このファイルを、bookinfo_client.pyとしてClientフォルダに配置し、BookInfoサービスのために修正した内容が以下です。必要に応じてasync_greeter_client.pyファイルも参照してください。
# 基本となるインポート import asyncio (1) import logging import sys (2) # gRPCサービスのためのインポート import grpc (3) import bookinfo_pb2 import bookinfo_pb2_grpc # 処理のメイン async def run(keyword) -> None: (4) async with grpc.aio.insecure_channel("localhost:50051") as channel: (5) stub = bookinfo_pb2_grpc.MultiBookInfoStub(channel) (6) # 非同期イテレータによる検索 async for response in stub.Search( (7) bookinfo_pb2.SearchRequest(text=keyword)): print("Client received from async generator: " + response.item.title) # 直接読み出しによる検索 stream = stub.Search( (8) bookinfo_pb2.SearchRequest(text=keyword)) while True: response = await stream.read() (9) if response == grpc.aio.EOF: break print("Client received from direct read: " + response.item.title) if __name__ == "__main__": (10) logging.basicConfig() if len(sys.argv) >= 2: (11) asyncio.run(run(sys.argv[1]))
(1)のimport文は、基本となるものです。ストリーミングでは非同期入出力を扱いますのでasyncioモジュールを、ここではコマンドラインパラメーターを扱うのでsysモジュールを、それぞれインポートしています。
(3)のimport文は、gRPCサービスに必要なものです。自動生成したモジュールは、ここでインポートします。
(4)は、クライアントの処理のメインとなる関数です。asyncキーワードが付与されている通り、非同期で実行される関数です。(5)でチャネルを生成し、(6)でスタブ(クライアントのこと)を生成しています。
以降、(7)と(8)はそれぞれSearch手続きで検索し結果を表示しますが、(7)は非同期イテレータによる検索、(8)は直接読み出しによる検索と2つの方法が既定で含まれているので、そのまま利用しています。
(7)の非同期イテレータによる検索では、スタブからSearchメソッドを引数にSearchRequestオブジェクトを与えて呼び出し、結果を表示しています。この場合、メソッドから返り値がある限り、その結果が表示されます。
(8)の直接読み出しによる検索では、Searchメソッドの戻り値をいったん保持した後、(9)で各要素を順番に取り出してEOF判定されるまで表示します。
(10)はスクリプトのメインです。ロギングを開始し、(4)で定義されたrun関数を非同期で実行します。(11)のように、引数が2個以上、すなわち検索キーワードが指定されているときのみ実行します。
以下は、実行例です。サーバアプリケーションを起動しておくのを忘れないでください。
% python3 bookinfo_client.py 山 Client received from async generator: Ruby on Rails 7 ポケットリファレンス Client received from async generator: 改訂3版JavaScript本格入門 Client received from async generator: 速習 React 第2版 Client received from direct read: Ruby on Rails 7 ポケットリファレンス Client received from direct read: 改訂3版JavaScript本格入門 Client received from direct read: 速習 React 第2版
今回は、Pythonによる書籍情報検索サービスをサーバサイドストリーミング方式のgRPCサービスとして実装してみました。
次回は、Node.js環境でJavaScriptによる書籍情報検索サービスをクライアントサイドストリーミング方式のgRPCサービスとして実装してみます。
WINGSプロジェクト
有限会社 WINGSプロジェクトが運営する、テクニカル執筆コミュニティー(代表山田祥寛)。主にWeb開発分野の書籍/記事執筆、翻訳、講演等を幅広く手掛ける。2021年10月時点での登録メンバーは55人で、現在も執筆メンバーを募集中。興味のある方は、どしどし応募頂きたい。著書、記事多数。
・サーバーサイド技術の学び舎 - WINGS(https://wings.msn.to/)
・RSS(https://wings.msn.to/contents/rss.php)
・Twitter: @yyamada(https://twitter.com/yyamada)
・Facebook(https://www.facebook.com/WINGSProject)
Copyright © ITmedia, Inc. All Rights Reserved.