Viva! JXTA
「JXTAがもたらすネットワークの変革」




(2)シェル・コマンドの作成からアーキテクチャに触れる

JXTAネットワークプログラミング

 JXTAでは、どのようにネットワーク上で通信を行うのかを見てみましょう。以下に見るサンプルは、ネットワーク上であるマシンからほかのマシンに文字列を送信するJXTAプログラミングです。普通のTCP/IPのソケットプログラミングでしたら、メッセージを受け取るサーバ側が特定のポート番号でソケットを開いて待機し、メッセージを送るクライアント側がサーバのIPアドレスとポート番号を指定してソケットを開きサーバにコネクションを張ります。細かい話は省略しますが、ここではサーバとクライアントとの間でサービスに利用されるIPアドレスとポート番号の組が共有されていることが本質的な部分です。ところが、JXTAでは、サーバとクライアントの通信にIPアドレスとポート番号を指定することはありません。

Advertisementとそのpublish/discovery

 それでは、両者の間でどのようなやりとりが行われるのでしょうか? ここでJXTAプログラミングにとって大事な「Advertisement」という概念が必要です。Advertisementというのは「広告」ですよね。以下では、このAdvertisementに「告知」という訳を与えています。「告知」というのは、実体としてはXMLのドキュメントです。詳しいことは別の機会に説明しますが、peer、group、service、pipeといったJXTAのリソースたちは、「告知」によってそのさまざまな特徴が記述されるとともに一意な名前が付けられます。今回の例では、サーバ・サービスとパイプ・サービスという2つのリソースに「サービス告知」と「パイプ告知」の2つの「告知」が使われています。

 JXTAでは、サーバとクライアントが次のようなやりとりを行います。まず、サーバが通信に利用されるリソースの「告知」を生成しネットワーク上に「公開(publish)」します。クライアントは、公開されたリソースの「告知」を「探索・発見(discovery)」し「獲得」します。「告知」の記述性と一意性は、サービスの告知やパイプの告知を獲得したクライアント側が、ターゲットであるサービスやパイプにアクセスすることを可能とします。

 以下の具体例でいえば、サーバは、サーバ側のサービスに“Maruyama-1”という名前を付けて、その「告知」を公開します。クライアントは、この“Maruyama-1”という名前を目印に、サーバのサービスを見つけ出します。

LocalとRemote、二重のpublish/discovery

 サーバ側が「告知」を「公開」し、クライアント側が「告知」を「獲得」するというのがJXTAプログラミングの基本パターンですが、1つ注意してもらいたいことがあります。サーバ側の「告知公開」もクライアント側の「告知獲得」も自分のマシン上とネットワーク上とで二重に行われるということです。

サーバ側の「告知公開」
discovery.publish(serviceAdv, Discovery.ADV); // ローカル・キャッシュへの公開。
discovery.remotePublish(serviceAdv, Discovery.ADV); // ネットワーク上での公開。

クライアント側の「告知獲得」
enum = discovery.getLocalAdvertisements(Discovery.ADV, "Name", "Maruyama-1");
discovery.getRemoteAdvertisements(null, Discovery.ADV, "Name", "Maruyama-1",1);

 「告知」の情報は、ローカル・マシン上のデータベースにキャッシュとして蓄積されます。クライアント側の探索の場合、求める「告知」がローカルに見つからなければ、ネットワーク上で「サービス告知」の探索を始めます。リモートの探索は非同期的に行われるので、どれくらい時間がかかるかは分かりません。重要なことは、リモートの探索結果はローカル・キャッシュに必ず反映されるということです。

JXTAシェル・コマンドの作成

 今回は十分に説明できませんが、JXTAではShellが提供されていて、PtoPアプリケーションをShell上で動く形で簡単に作成できます。今回ご紹介するサンプルもシェル・コマンドとして作られています。

 シェル・コマンド作成のポイントは2つあります。1つは、net.jxta.impl.shell.ShellAppクラスを拡大して、コマンド名と同じ名前のクラスを作成することです。コマンドの処理の本体は、このクラス内のstartAppメソッドに記述すればいいのです。あと1つのポイントは、このクラスの属するパッケージ名です。net.jxta.impl.shell.bin.以下に、このコマンドの名前を付けてパッケージ名とします。

 ウィンドウ系の場合でしたら、JXTAをインストールしたディレクトリを%JXTA_DIR%とすれば、%JXTA_DIR%\shell\shellディレクトリにjxta.exeコマンドがあると思います(NTでは、このままではうまく動きません。最後にNT用のバッチ・コマンドを記しておきます)。今回の例では、このディレクトリに次のような深いディレクトリを作りソースファイルを置くことになります。

%JXTA_DIR%\shell\shell\net\jxta\impl\shell\bin\receiver\receiver.java
%JXTA_DIR%\shell\shell\net\jxta\impl\shell\bin\sender\sender.java

 これをコンパイルして、次のようなクラス・ファイルを作ります。

%JXTA_DIR%\shell\shell\net\jxta\impl\shell\bin\receiver\receiver.class
%JXTA_DIR%\shell\shell\net\jxta\impl\shell\bin\sender\sender.class

 これだけで、Shellの内部から、新しいコマンドを呼び出すことができます。

 以下に、サーバ側のソースとクライアント側のソースを紹介します。

サーバ側ソース receiver.java
package net.jxta.impl.shell.bin.receiver;

import net.jxta.impl.shell.ShellApp;
import net.jxta.discovery.Discovery;
import net.jxta.peergroup.PeerGroupID;
import net.jxta.document.AdvertisementFactory;
import net.jxta.protocol.ServiceAdvertisement;
import net.jxta.protocol.PipeAdvertisement;
import net.jxta.pipe.Pipe;
import net.jxta.pipe.PipeID;
import net.jxta.pipe.InputPipe;
import net.jxta.endpoint.Message;
import java.io.InputStream;


public class receiver extends ShellApp {
  

  private Discovery discovery; // 探索サービス
  private Pipe pipes; // パイプ・サービス
  private InputPipe msgPipe; // 入力用パイプ・サービス
  private Message message; // メッセージ
  private PeerGroupID gid; // ピア・グループID


  public receiver() {
  }


  public int startApp (String[] args) {
    try {

 // パイプ・サービス告知を生成して、告知に必要なパラメータを設定する。
    PipeAdvertisement pipeAdv = (PipeAdvertisement)
    AdvertisementFactory.newAdvertisement("jxta:PipeAdvertisement");
    // パイプ・サービスの名前の設定
    pipeAdv.setName("Pipe Maruyama-1");
 // ネット・ピア・グループのIDはユニークであることが保証されているので
 // それを使って、ユニークなパイプIDを生成し、設定する。
    pipeAdv.setPipeID(new PipeID(group.getID()));


 // サービス告知を生成して、告知に必要なパラメータを設定する。
    ServiceAdvertisement serviceAdv = (ServiceAdvertisement)
    AdvertisementFactory.newAdvertisement
       ("jxta:ServiceAdvertisement");
    // サービスの名前の設定
    serviceAdv.setName("Maruyama-1");
    // サービスのプロバイダ名の設定
    serviceAdv.setProvider("wakhok.ac.jp");
    // サービスのバージョン設定
    serviceAdv.setVersion("Version 1.0");
 // サービス告知の中にパイプ・サービス告知を設定する。
    serviceAdv.setPipe(pipeAdv);



 // サービス告知ができたので、これをローカル・キャッシュと
 // ネットワーク上に公開する。
    discovery = group.getDiscovery();
    // ローカル・キャッシュへの公開。
    discovery.publish(serviceAdv, Discovery.ADV);
     // ネットワーク上での公開。
    discovery.remotePublish(serviceAdv, Discovery.ADV);

 // ピア・グループから、パイプ・サービスを獲得し、先に作った
 // パイプ・サービス告知を利用して、入力用のパイプを生成する。
    println("Service advertisement was published" );

    pipes = group.getPipe();
    msgPipe = pipes.createInputPipe(pipeAdv);

    } catch (Exception ex) {
      ex.printStackTrace();
      System.err.println("receiver: Error publishing the service");
    }


    // パイプからの入力の間、繰り返す。
    while (true) {
      
println("Waiting ......");
    try {

    // メッセージがくるのを待つ。
    message = msgPipe.waitForMessage();
    } catch (Exception e) {
      System.err.println("Error listening");
      msgPipe.close(); return 1;
    }


  // メッセージの中身を取り出すためにストリームを作る。
  // そのとき、サーバとクライアントはタグ名を共有しなければ

  //ならない。今回は、"HelloTag"というタグである。
    try {
    // メッセージからタグ部分を取り出す。
    InputStream info = message.pop ("HelloTag");
    if (info != null) {
      byte[] msgbuffer = new byte [info.available()];
      int n = info.read (msgbuffer);
      println("receive message: " + new String(msgbuffer));
      info.close();
    } else
      System.err.println("receiver: no tag");
    } catch (Exception e) {
      System.err.println("receiver: error ");
    }

   }
 }

  public void stopApp () {
  }

}


クライアント側ソース sender.java
package net.jxta.impl.shell.bin.sender;
import net.jxta.impl.shell.ShellApp;
import net.jxta.discovery.Discovery;
import net.jxta.peergroup.PeerGroup;
import net.jxta.document.AdvertisementFactory;
import net.jxta.endpoint.Message;
import net.jxta.pipe.Pipe;
import net.jxta.pipe.PipeID;
import net.jxta.pipe.OutputPipe;
import net.jxta.id.ID;
import net.jxta.protocol.ServiceAdvertisement;
import net.jxta.protocol.PipeAdvertisement;
import java.io.InputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Enumeration;


  public class sender extends ShellApp {

    private Discovery discovery; // 探索サービス
    private Pipe pipes; // パイプ・サービス
    private OutputPipe msgPipe; // 出力用パイプ
    private Message message; // メッセージ

    public sender() {
    }

    public int startApp (String[] args) {
      try {
  // Shellコマンドの引数を連結してメッセージ文字列を作成する。
        String strings = "";
        if ((args == null) || (args.length < 1)) {
          strings = "Hello World!";
        } else {
          for( int i=0; i<args.length; i++ )
          strings += args[i] + " ";
        }


  // サービス告知を見つけ出す探索サービスを、ピア・グループから獲得する。
        discovery = group.getDiscovery();


  // ほかのピアのサービス告知を見つけるまでループ。
        println(
           "searching for the Maruyama-1 Service advertisement");
        Enumeration enum = null;
        while (true) {
          try {


 // まず、ローカル・キャッシュでサービス告知を探索する。
 // 名前は、"Maruyama-1"。
            enum = discovery.getLocalAdvertisements(
                Discovery.ADV, "Name", "Maruyama-1");

 // ローカルに見つかれば成功。ループを抜ける.
            if ((enum != null) && enum.hasMoreElements()) break;


 // ローカルに見つからなければ、ネットワーク上でサービス告知を
 // 探索する。リモートの探索は、非同期的に行われるので、
 //
どれくらい時間がかかるかは分からない。
 // 適当に休んでいてよい。リモートの探索結果は、
 // ローカル・キャッシュに必ず反映される。
            discovery.getRemoteAdvertisements(
                null, Discovery.ADV, "Name", "Maruyama-1",1);
            try { Thread.sleep(2000); } catch (Exception e){}

          } catch (Exception e){}
        }

 // 発見されたサービス告知の中から、パイプ・サービスの告知を取り出す。
        ServiceAdvertisement serviceAdv =
            (ServiceAdvertisement) enum.nextElement();
        PipeAdvertisement pipeAdv = serviceAdv.getPipe();


 // ピア・グループから、パイプ・サービスを獲得し、
 // 先に取り出したパイプ・サービスの告知を利用して、
 // ほかのピアに接続している「出力パイプ」と
 // 空のメッセージを生成する。
        pipes = group.getPipe();
        msgPipe = pipes.createOutputPipe(
            pipeAdv, Pipe.NonBlocking, 5000);
        message = pipes.createMessage();


 // メッセージ用文字列をストリームに変え、
 // "HelloTag"というタグを付けてメッセージ
 // に押し込んで、パイプに送り出す。
        ByteArrayInputStream info =
        new ByteArrayInputStream (strings.getBytes());
        message.push ("HelloTag", info);
        msgPipe.send (message);
        println("Message: \"" + strings + "\" sent");


      } catch (Exception ex) {
        ex.printStackTrace();
        System.err.println("Error sending message");
      }
    return ShellApp.appNoError;
  }


  public void stopApp () {
  }

}


 実行結果は以下のようになります。

サーバ側実行例
JXTA>Shell -s
JXTA>receiver
Service advertisement was published
Waiting ......
receive message: Hello World!
Waiting ......
receive message: Hello World!
Waiting ......
receive message: Maruyama Fujio
Waiting ......
receive message: wakhok.ac.jp
Waiting ......

クライアント側実行例
JXTA>sender
searching for the Maruyama-1 Service advertisement
Message: "Hello World!" sent
JXTA>sender
searching for the Maruyama-1 Service advertisement
Message: "Hello World!" sent
JXTA>send Maruyama Fujio
send: cannot access Maruyama
JXTA>sender Maruyama Fujio
searching for the Maruyama-1 Service advertisement
Message: "Maruyama Fujio " sent
JXTA>sender wakhok.ac.jp
searching for the Maruyama-1 Service advertisement
Message: "wakhok.ac.jp " sent
JXTA>

 また、NTの環境で実行する場合は、以下を参考にしてください。

NT用シェル・コマンド
set JXTA_HOME=C:\p2p\jxta
set cl=%JXTA_HOME%\shell\lib\jxta.jar;%JXTA_HOME%\shell\lib
\jxtashell.jar;%JXTA_HOME%\shell\lib\instantp2p.jar;%JXTA_HOME%
\shell\lib\cms.jar;.
java -classpath %cl% net.jxta.impl.peergroup.Boot

 次の機会にJXTAをより詳しくご紹介するまで、是非、ご自分で試しておいてください。

 

Index
  (1)PtoPとは何か?
 「対等」なネットワーク
 サーバとクライアントの分離
 技術的な必然としてのPtoP
 「対応」なネットワークへ
 ネットワークの変化
 ギルダー測とブロードバンド時代の到来
 再び、高速・常時接続ネットワークへ
 ネットワーク接続の「直接性」について
 四角新しい動的なネットワーク像へ
(2)シェル・コマンドの作成からアーキテクチャに触れる
 JXTAネットワークプログラミング
 Advertisementとそのpublish/discovery
 LocalとRemote、二重のpublsih/discovery
 JXTAシェル・コマンドの作成

 



Java Agile フォーラム 新着記事
@ITメールマガジン 新着情報やスタッフのコラムがメールで届きます(無料)

注目のテーマ

Java Agile 記事ランキング

本日 月間