第5回は、Node.jsでストリーミング型のgRPCサービスを開発してみます。ここでは、ストリーミング型のうちクライアントからの複数リクエストとなるクライアントサイドストリーミングを実装してみます。
この記事は会員限定です。会員登録(無料)すると全てご覧いただけます。
本連載のサンプルコードをGitHubで公開しています。こちらからダウンロードしてみてください。
今回のテーマは、Node.js(JavaScript)によるクライアントサイドストリーミング型のgRPCサービスです。gRPCでは、4つの通信方式を使うことができますが、このうちクライアントサイドストリーミングとは、図1のように複数のリクエストに対して一つのレスポンスが返るという通信方式です。
同種のメッセージ、例えばユーザー情報といったものを大量にサーバに送信してデータベースに登録する必要があるとします。それらを一つの大きなメッセージとして送信するのではなく、個々のメッセージとして送信して、サーバからはトータルでの処理結果をメッセージとして受け取るというイメージです。
同種のメッセージをまとめて送るには、リクエストのメッセージにrepeatedキーワードを付与して配列にするという方法があります。ただし、この場合はあくまでも一つのメッセージであることには変わりがないので、全てのメッセージを送信し終わるまで(サーバではそれらを全て受け取るまで)プログラムの実行がブロックされてしまいます。メッセージのサイズによってはパフォーマンスに悪影響が出ることが考えられます。
クライアントサイドストリーミングを指定することで、個々のメッセージは非同期に送信されるようになります。大量のメッセージの送信でも処理待ちを最小限に抑えることで、パフォーマンスの悪化を防ぐことができます。
開発に必要な環境を準備しておきます。なお、あくまでも学習用としての位置付けなので、サーバとクライアントは同じホストに配置することにして、開発環境は共通とします。
本連載で共通で使用するコードエディタVisual Studio Code(以降、VSCode)と、汎用(はんよう)のgRPCクライアントであるEvans(インストールと利用方法については連載第2回を参照)。そしてNode.jsの実行環境をインストールしておいてください。
gRPCの公式サイトによると、gRPCの利用が可能なNode.jsの本稿作成時点におけるバージョンは8.13.0以降です。インストールされていないか、バージョンが古い場合には最新バージョンをインストールしてください。Node.jsのインストールは、macOS/Windowsともに公式サイトからインストーラを入手して実施してください。
gRPCサービスでは、プロトコル定義ファイルからクラスファイルなどの生成が必要です。連載第2回の.NETではビルドプロセスにて生成し、連載第4回のPythonではコマンドを明示的に実行してファイルを生成していました。Node.jsを使ってgRPCサービスを実装する場合には、動的生成と静的生成を選択することができます。このため、gRPC公式リポジトリにあるNode.jsのサンプルでも、それぞれに対応したものが用意されています。
動的な生成では、スクリプトに生成のためのコードを埋め込んでおくことで、実行時に自動的にファイルが生成されます。静的な生成では、Pythonと同様にgrpc-toolsのコマンドを明示的に実行してファイルを生成します。今回は、前者の動的な生成を採用することにします。
環境を整えたら、Node.jsでgRPCサーバとクライアントを実装していきます。プロジェクトを配置するフォルダを用意して(ここではatmarkit_grpc/nodejs)、以降はVSCodeのメニューから[ターミナル]−[新しいターミナル]を選択し、ターミナルを開いて作業します。ターミナル上でカレントフォルダをここに移動しておきます。
作成するアプリケーションのテーマはこれまでと同様に「書籍情報検索サービス『BookInfo』」です。今回作成するBookInfoは、2つの手続きを実装します。Search手続きはこれまでも実装してきたもので、Registerは新しい手続きとなります。
作業の内容は、既定のファイルを複製し、BookInfoサービスに合わせたクラス名などを修正していくというイメージです。大まかに、以下の手順でサービスを実装していきます。
Node.jsでgRPCアプリケーションを作成するとき、フォルダ構成の決まりは特にありません。そのため、全てのファイルを同じ階層に配置してもよいのですが、ここでは見通しをよくするために、ファイルの役割ごとにフォルダを分けることにします。
既定のファイルは、連載第4回と同様、gRPC公式サイトのGitHubリポジトリにあるサンプルを用いることにします。もちろん、ゼロから手順を踏んでコーディングしていっても問題ないのですが、ある程度定型的な処理となるので、その形を理解しながら作業を進めた方が習得も早いでしょう。あらかじめ用意されているサンプルのうち、全てのストリーミング方式を実装したroute_guideの動的生成版をひな型に利用することにします。
リポジトリにあるpackage.jsonファイルを作業フォルダに配置して、npm installコマンドを実行します。これにより、今回のgRPCサービスに必要なパッケージproto-loader、grpc-js、google-protobuf、async、lodash、minimistがインストールされます。
BookInfoサービスの既定のデータ(bookinfo.json)をDataフォルダを作成して配置します。本来は、データソースとしてデータベースやWebサービスを利用することになりますが、ここでは簡略化のためにJSONファイルを読み込んで検索するものとしています。内容は、連載第4回で紹介したものと同じです。これに加えて、クライアントから動的に登録するためのデータregister.jsonも配置します。これも同様の形式です。
プロトコル定義ファイルをProtosフォルダを作成して配置します。連載第4回で作成したbook.protoファイルを、Protosフォルダにコピーします。そして、以下のように修正しておきます。クライアントサイドストリーミングを使うRegister手続きとその引数と戻り値に必要なメッセージを追加しています。
…略… // Register手続きの引数と戻り値のためのメッセージを追加する message RegisterRequest { BookItem item = 1; } message RegisterResponse { int32 count = 1; } service BookInfo { rpc Search(SearchRequest) returns (stream SearchResponse); (1) // 登録手続きを引数にストリーミングを表すstreamを付与して追加する rpc Register(stream RegisterRequest) returns (RegisterResponse); (2) }
今回のテーマはクライアントサイドストリーミングですが、連載第4回と同様にSearch手続きもサーバサイドストリーミングで実装します。それが(1)です。(2)は、今回独自に実装するRegister手続きです。こちらは、引数の方にstreamキーワードが付与されており、これによりクライアントサイドストリーミングのためのコードが生成されるようになります。
サーバアプリケーションを作成していきます。ベースとするのは、リポジトリにあるroute_guide_server.jsファイルです。このファイルを、bookinfo_server.jsとしてServerフォルダに配置し、BookInfoサービスのために修正した内容が以下です。必要に応じてroute_guide_server.jsファイルも参照してください。
// 基本となるインポート const fs = require('fs'); (1) const _ = require('lodash'); // gRPCサービスのためのファイル生成 const PROTO_PATH = __dirname + '/../Protos/bookinfo.proto'; (2) const grpc = require('@grpc/grpc-js'); const protoLoader = require('@grpc/proto-loader'); const packageDefinition = protoLoader.loadSync( PROTO_PATH, {keepCase: true, // フィールドをCamelケースに変換しない longs: String, // long値をStringとして扱う enums: String, // enumをStringとして扱う defaults: true, // フィールド省略時はデフォルト値にする oneofs: true // oneofsフィールドを使うか }); const bookinfo = grpc.loadPackageDefinition(packageDefinition).bookinfo; // 書籍情報リストを保持する配列 let bookinfo_list = []; (3) // Search手続きに対応するsearch関数 function search(call) { // 処理内容は省略 // 参考:連載第4回のbookinfo_server.jsのsearch関数 } // Register手続きに対応するregister関数 function register(call, callback) { (4) let count = 0; call.on('data', (request) => { count += 1; bookinfo_list.push(request.item); }); call.on('end', () => { callback(null, { count: count }); }); } // サービスの登録 function getServer() { (5) const server = new grpc.Server(); server.addService(bookinfo.BookInfo.service, { search: search, register: register }); return server; } // サーバの開始 if (require.main === module) { (6) const server = getServer(); server.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => { const DB_PATH = __dirname + '/../Data/bookinfo.json'; fs.readFile(DB_PATH, function(err, data) { if (err) throw err; bookinfo_list = JSON.parse(data); server.start(); }); }); }
(1)からのrequireは、ファイル入出力のためのfs、アンダースコア(_)でユーティリティー関数を呼び出せるLodashをそれぞれ取り込んでいます。
(2)からの文は、動的にgRPC関連ファイルを生成するための定型的な処理です。gRPC関連の2つのモジュールを読み込み、このうちproto-loaderモジュールのloadSyncメソッドでは指定されるプロトコル定義ファイルを読み込み、それに基づいてgrpc-jsモジュールのloadPackageDefinitionメソッドでメッセージ型やサービスのプロトタイプをオブジェクトとして生成しています。最終的に、変数bookinfoにgRPCサービス利用のためのオブジェクトが入ります。なお、loadSyncメソッドに指定されたオプションについては、コメントを参照してください。
(3)は、(6)で読み込む書籍情報リストを保持する配列です。
(4)からは、Register手続きに対応するregister関数で、クライアントサイドストリーミングに対応する手続きの定型的な内容となっています。引数は2個あり、一つは手続き呼び出しにおける情報が入ったcallオブジェクト、もう一つはストリーミング終了時に戻り値を返すためのコールバック関数です。簡略化すると、以下のようにcall.onメソッドによるイベント処理になっており、'data'がメッセージ受信、'end'がストリーミング終了に対応します。つまり、メッセージ受信時の処理はcall.on('data', …)に、ストリーミング終了時の処理はcall.on('end', …)に、それぞれ関数として記述すればよいわけです。
function register(call, callback) { call.on('data', (request) => { // メッセージ受信で処理する内容(requestがメッセージ) }); call.on('end', () => { // ストリーミング終了で処理する内容 // 通常は、渡されたコールバック関数callbackで戻り値を返す }); }
メッセージ受信時の処理内容としては、カウンタを増やしてメッセージの内容(itemフィールド)を配列に追加するだけです。ここでは省きましたが、できれば重複登録を避けたり、上書き登録をしたりするような処理を加えた方がよいでしょう。
(5)は、gRPCサーバオブジェクトを生成し、そこにサービスと手続きを登録する関数です。そして(6)では、スクリプトがNode.jsから呼び出された場合のみ実行する内容として、(5)でサーバオブジェクトを生成し、bookinfo.jsonを(3)の配列に読み込んで、サーバを開始します。
クライアントアプリケーションの作成に移る前に、できればサーバアプリケーションが正しく動作するか、Evansで検証しておきましょう。詳しい手順は、連載第2回を参照してください。ここでは、ダミーデータを2件登録してみた例を示します。
% evans --proto ./Protos/bookinfo.proto --host localhost --port 50051 repl …略… bookinfo.BookInfo@localhost:50051> call Register item::id (TYPE_INT32) => 11 1個目の登録 item::title (TYPE_STRING) => PHP …略… item::id (TYPE_INT32) => 12 2個目の登録 item::title (TYPE_STRING) => Java …略… item::id (TYPE_INT32) => 終了はCtrl+Dを入力 { "count": 2 2件登録したという結果が返る }
ここからは、クライアントアプリケーションを作成していきます。クライアントアプリケーションは2つ作成します。一つは、Search手続きを呼び出す検索アプリケーションbookinfo_search.js、もう一つはRegister手続きを呼び出す登録アプリケーションbookinfo_register.jsです。
ベースとする、リポジトリにあるroute_guide_client.jsファイルを、bookinfo_search.jsおよびとbookinfo_register.jsとしてClientフォルダに配置し、BookInfoサービスのために修正します。このうち、bookinfo_register.jsを以下に示します。必要に応じてroute_guide_client.jsファイルも参照してください。
// 基本となるインポート const async = require('async'); (1) const fs = require('fs'); const parseArgs = require('minimist'); const _ = require('lodash'); // gRPCサービスのためのファイル生成 const PROTO_PATH = __dirname + '/../Protos/bookinfo.proto'; (2) const grpc = require('@grpc/grpc-js'); const protoLoader = require('@grpc/proto-loader'); const packageDefinition = protoLoader.loadSync( PROTO_PATH, {keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }); const bookinfo = grpc.loadPackageDefinition(packageDefinition).bookinfo; // クライアントオブジェクトの取得 const client = new bookinfo.BookInfo('localhost:50051', (3) grpc.credentials.createInsecure()); // Register手続きを呼び出す関数 function runRegister(callback) { (4) const argv = parseArgs(process.argv, { (5) string: 'file' }); const file = argv.file; if(file != undefined) { fs.readFile(file, (err, data) => { if (err) { callback(err); return; } const bookinfo_list = JSON.parse(data); // サーバのregister関数へのcallオブジェクトを取得する const call = client.register((error, response) => { (6) if (error) { callback(error); return; } callback(); console.log(`Registered ${response.count} item(s).`); }); // 項目(item)を単独で送信する関数 function itemSender(item) { (7) return (callback) => { console.log(`Register item: ${item.title}`); call.write({ item: item }); _.delay(callback, _.random(500, 1500)); }; } // 個々の項目を送信する関数の配列を作成 let item_senders = []; (8) for (let i = 0; i < bookinfo_list.length; i++) { item_senders[i] = itemSender(bookinfo_list[i]); } // 送信関数を順番に非同期で呼び出して、最後にストリーミングを終了 async.series(item_senders, () => { call.end(); }); (9) }); } } // エントリポイント if (require.main === module) { (10) async.series([ runRegister ]); }
(1)からのrequireでは、サーバで指定したもの以外では、非同期処理のためのasync、コマンドライン引数解析のためのminimistを、それぞれ取り込んでいます。
(2)からの文は、動的なgRPC関連ファイルを生成するための定型的な処理でサーバと同様です。
(3)では、変数clientにgRPCクライアントのオブジェクトを取得しています。このオブジェクトを通じて、サーバの関数を呼び出します。
(4)は、Register手続きを呼び出すための関数で、エントリポイントから非同期で呼び出されます。(5)以降では、コマンドライン引数から「--file」オプションで登録するJSONファイルが指定されているときのみ処理を継続するようにしています。
(6)〜(9)は、クライアントサイドストリーミングに対応した定型的な処理です。(6)で、サーバのregister関数を呼び出すためのcallオブジェクトを生成しています。この関数の引数はコールバック関数であり、エラー発生時やストリーミング処理が終了した時点で呼び出されます。エラー発生時には、引数errorにエラーオブジェクトがセットされているので、それを呼び出し側に返して処理を中断します。正常終了時には引数responseにレスポンスオブジェクトがセットされているので、この場合は登録件数countをコンソールに出力する処理を入れられるというわけです。
(7)は、ストリーミングで実際に送信するための関数になります。引数は、手続きの引数であるメッセージの各フィールドになります。この場合、フィールドは一つだけなので、引数も一つとなっています。この関数で実行しているcallオブジェクトのwriteメソッドが、個別のデータをサーバへ送信する部分となります。
(7)の関数は、(8)で送信データを引数に与えた関数オブジェクト配列として作成され、(9)で順番に非同期に呼び出されます。全ての呼び出しが終了したら、callオブジェクトのendメソッドによってストリーミングの終了がサーバに告知されます。
最後の(10)は、スクリプトがNode.jsから呼び出された場合のみ実行するためのエントリポイントです。(4)の関数runRegisterを非同期で呼び出します。
以下は、実行例です。2回の検索で、実行結果が登録内容を反映したもので異なっていることが分かります。
% node Client/bookinfo_search.js --keyword 齊藤 Search by 齊藤 Found item: Androidアプリ開発の教科書 第3版 Kotlin対応 % node Client/bookinfo_register.js --file ./Data/register.json Register item: Vue 3 フロントエンド開発の教科書 Register item: 独習C# 第5版 Register item: 速習 ASP.NET Core - Razor Pages編 Register item: これからはじめるVue.js 3実践入門 Register item: 基礎からしっかり学ぶC#の教科書 第3版 C# 10対応 Registered 5 item(s). % node Client/bookinfo_search.js --keyword 齊藤 Search by 齊藤 Found item: Androidアプリ開発の教科書 第3版 Kotlin対応 Found item: Vue 3 フロントエンド開発の教科書
bookinfo_search.jsは掲載を割愛します。登録処理に比べると(クライアントサイドストリーミングとサーバサイドストリーミングの差はあるにしろ)はるかにシンプルですので、配布サンプルを参照して読み解いてみてください(連載第4回も参考になります)。
今回は、Node.jsによる書籍情報検索サービスを、サーバサイドストリーミング方式とクライアントサイドストリーミング方式の手続きを持つgRPCサービスとして実装してみました。
次回は、KotlinとAndroid OSで、gRPCクライアントを開発してみます。
WINGSプロジェクト
有限会社 WINGSプロジェクトが運営する、テクニカル執筆コミュニティー(代表山田祥寛)。主にWeb開発分野の書籍/記事執筆、翻訳、講演等を幅広く手掛ける。2021年10月時点での登録メンバーは55人で、現在も執筆メンバーを募集中。興味のある方は、どしどし応募頂きたい。著書、記事多数。
・サーバーサイド技術の学び舎 - WINGS(https://wings.msn.to/)
・RSS(https://wings.msn.to/contents/rss.php)
・Twitter: @yyamada(https://twitter.com/yyamada)
・Facebook(https://www.facebook.com/WINGSProject)
Copyright © ITmedia, Inc. All Rights Reserved.