検索
連載

いまさら聞けないKVSの常識をHbaseで身につけるビッグデータ処理の常識をJavaで身につける(3)(3/3 ページ)

Hadoopをはじめ、Java言語を使って構築されることが多い「ビッグデータ」処理のためのフレームワーク/ライブラリを紹介しながら、大量データを活用するための技術の常識を身に付けていく連載

PC用表示 関連情報
Share
Tweet
LINE
Hatena
前のページへ |       

JavaからHbaseを操作する

 次に、シェルでやったのと同じ処理をJavaでも記述してみます。準備に必要なのは、Eclipseのプロジェクトに「hbase-0.92.0.jar」「lib/*.jar」をクラスパスを設定するだけです。

 後は、mainメソッドを持ったクラスを作れば、JavaのコードでHbaseを操作できます。mainメソッドの中では、まずはHbaseに接続します。

Configuration config = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(config);

 特に設定を変更しない場合は、ローカルホストで動作しているHbaseに接続にいきます。別のマシン上のHbaseに接続するのであれば、「hbase-site.xml」で指定するか、プログラムで「config」に設定します。

config.set("hbase.zookeeper.quorum", "ホスト名");

 IPアドレスを直接指定した場合には、ホスト名とIPアドレスの正引きと逆引きが何度も行われるため、うまく接続できません。クライアントのhostsファイルにHbaseが動作しているサーバのIPアドレスとホスト名を設定しておくと確実です。

 次に、テーブルを作成します。

HTableDescriptor htd = new HTableDescriptor("t1");
htd.addFamily(new HColumnDescriptor("csv"));
admin.createTable(htd);

 テーブルを削除するには、シェルと同様にdisableにしてからdeleteします。

admin.disableTable(Bytes.toBytes("t1"));
admin.deleteTable(Bytes.toBytes("t1"));

 テーブルができたら、CSVファイルを読み込んで、Hbaseに登録してみます。今回は、タイトル行を修飾子に使用してテーブル上に、このように格納します。

 Key値は、「row」+行番号という文字列をbyte[]にしたものです。データもbyte[]にしたものを登録します。つまり、どんな型のものでもbyte[]にしてしまえば、Keyにもデータにもなれるのです。byte[]との変換にはユーティリティクラス(org.apache.hadoop.hbase.util.Bytes)を使用します。

HTable table = new HTable(config, Bytes.toBytes("t1"));    // テーブルを指定
 
// ファイルを読み込んで、登録
InputStream is = Test.class.getResourceAsStream("data.csv");
InputStreamReader in = new InputStreamReader(is, "SJIS");
BufferedReader br = new BufferedReader(in);
String line = br.readLine();    // 1行目はタイトル
String[] titles = line.split(",", -1);
int count = 0;
while ((line = br.readLine()) != null) {
    count++;
    Put p1 = new Put(Bytes.toBytes("row-" + count));    // Keyを指定
    String[] csvs = line.split(",", -1);
    for (int i = 0; i < csvs.length; i++) {
        p1.add(Bytes.toBytes("csv"), Bytes.toBytes(titles[i]), Bytes.toBytes(csvs[i]));
    }
    table.put(p1);  // 登録
}
br.close();
in.close();
is.close();

 登録したデータを取得してみます。KVSでは、基本的にはKeyを指定してアクセスしなければなりません。

HTable table = new HTable(config, Bytes.toBytes("t1"));    // テーブルを指定
Result result = table.get(new Get(Bytes.toBytes("row-1"))); // Keyを指定
 
if (result == null || result.getRow() == null){
    System.out.println("ROW not found");
}
NavigableMap<byte[], byte[]> map = result.getFamilyMap(Bytes.toBytes("csv"));
if (map == null){
    System.out.println("ColumnFamily not found");
} else {
    for (Entry<byte[], byte[]> entry : map.entrySet()) {
        String key = Bytes.toString(entry.getKey());
        String val = Bytes.toString(entry.getValue());
        System.out.println (key + ":" + val);   // デバッグ出力
    }
}

 しかし、いくら「Keyを指定しなければならない」といわれても、ときには全件を取得したり、「csvの特定の要素が○○だったら」という条件で取得したりしたい場合もあります。

 そのような場合には、「フィルタ」という機能を使用すれば条件にあったデータだけを取得できます。この処理はRDBでのテーブルスキャンに相当するため、非常に時間がかかります。大量データを処理するためには、Hadoopの完全分散モードを利用して、処理を分散しなければなりません。

Scan scan = new Scan();
Filter filter = new DependentColumnFilter(
        Bytes.toBytes("csv"), 		// カラムファミリを指定
        Bytes.toBytes("name"), 		// 修飾子を指定
        false,
        CompareOp.EQUAL,
        new BinaryPrefixComparator(Bytes.toBytes("namae1"))
);
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);
for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
:

 完全一致だけではなく、前方一致などもできます。Filterの代わりにFilterListを使えば、条件に複数要素も指定できます。

Scan scan = new Scan();
Filter filter1 = new DependentColumnFilter(
        Bytes.toBytes("csv"), 
        Bytes.toBytes("name"), 
        false,
        CompareOp.EQUAL,
        new BinaryComparator(Bytes.toBytes("namae1"))
);
Filter filter2 = new DependentColumnFilter(
        Bytes.toBytes("csv"), 
        Bytes.toBytes("email"), 
        false,
        CompareOp.EQUAL,
        new BinaryComparator(Bytes.toBytes("email@email1"))
);
FilterList filter = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filter.addFilter(filter1);
filter.addFilter(filter2);
scan.setFilter(filter);
ResultScanner scanner = table.getScanner(scan);

コラム「Webブラウザ/HTTPでも操作できる」

今回はJavaのAPIを使ってHbaseにアクセスしましたが、「hbase rest start -p 8080」として、HTTPを使ってアクセスする方法もあります。データ形式には、XMLやJSONが指定できます。

    http://ホスト名:8080/t1/row-1/csv

「t1」がテーブル名、「row-1」がKey値、「csv」がカラムファミリ、とWebブラウザから指定すれば、中身がXMLで返ってきます。


HadoopとHbaseで行列計算をしてみよう

 KVSの最大のメリットは「分散」です。そこで、HadoopとHbaseを使って処理量が非常に多くなるアプリを作成し、分散処理を実装してみます。

 Hbase 0.92.0には「hadoop-core-1.0.0.jar」が含まれているので、Hadoopはバージョン1.0.0を使用します。

 HadoopからHbaseを使ったアプリを実行する簡単な方法は、「hadoop-1.0.0/lib」の下に「hbase-0.92.0.jar」「hbase-0.92.0/lib/*.jar」をすべてコピーし、hadoopコマンドを使って実行します。また、HadoopとHbaseは分散モードで起動します。

 以下のようなアプリを作成します。

  1. 元データはHbase上のテーブル「t2」に入っているものとする
  2. 1点に対して複数の値(double:0〜1.0)を持っている
  3. Hadoopで、それぞれの点間の距離(double:0〜1.0)を算出する
  4. 距離の近い順に別テーブル「t3」に格納

 このような大量の距離計算は、地図などのGISだけではなく、テキストマイニングなどでも頻繁に行われています。Hadoopの各ノードでは、各点のデータを何度も読み込み、大量の結果データを書き込むため、Hadoop+Hbaseによる分散処理が有効です。

 まず、事前にテーブル「t2」「t3」を作り、t2には「Math.random()」で100点分のデータを入れておきます。

Configuration config = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(config);
 
// テーブルt2を作成
HTableDescriptor htd = new HTableDescriptor("t2");
htd.addFamily(new HColumnDescriptor("data"));
admin.createTable(htd);
 
// テーブルt3を作成
htd = new HTableDescriptor("t3");
htd.addFamily(new HColumnDescriptor("data"));
admin.createTable(htd);
 
// ランダムデータを格納する
HTable table2 = new HTable(config, Bytes.toBytes("t2"));
for (int i = 1 ; i <= 100 ; i++){
    Put p1 = new Put(Bytes.toBytes("row-" + i));    // Key値を指定
    for (int j = 1; j <= 2; j++) {                    // 次元数を指定
        double d = Math.random();
        p1.add(Bytes.toBytes("data"), Bytes.toBytes("x"+j), Bytes.toBytes(d));
    }
    table2.put(p1); // 登録
}

 次に、HadoopのMap処理で、テーブルの中身を1行ずつ取り出してReduceに渡します。

@Override
protected void map(ImmutableBytesWritable row, Result result, Context context) throws IOException, InterruptedException {
    String rowid = Bytes.toString(row.get());
    StringBuffer buf = new StringBuffer();
    // 1行全部を1つの文字列に変換
    for (KeyValue keyvalue : result.raw()) {
        String qualifier = Bytes.toString(keyvalue.getBuffer(), 
keyvalue.getQualifierOffset(), keyvalue.getQualifierLength());
        double value = Bytes.toDouble(keyvalue.getBuffer(), 
keyvalue.getValueOffset());
        buf.append(qualifier + "=" + value + " ");
    }
    // 1行単位で、Reduceに渡す
    context.write(new Text(rowid), new Text(buf.toString()));
}

 Reduceでは、1行ずつに対して全データを総当たりで読み込みます。

@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    for (Text value : values) {
        // Mapで文字列化したデータをdoubleに戻す
        String[] strs = value.toString().split(" ");
        String[] qualifiers = new String[strs.length];
        double[] p1 = new double[strs.length];
        for (int i = 0 ; i < strs.length ; i++){
            qualifiers[i] = strs[i].split("=")[0];
            p1[i] = Double.parseDouble(strs[i].split("=")[1]);
        }
        
        // 1行に対して、全レコードをぶつける
        Configuration config = context.getConfiguration();
        HTable table2 = new HTable(config, Bytes.toBytes("t2"));
        Scan scan = new Scan();
        ResultScanner scanner = table2.getScanner(scan);
        for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
            NavigableMap<byte[], byte[]> map = rr.getFamilyMap(Bytes.toBytes("data"));
            if (map != null){
                double[] p2 = new double[qualifiers.length];
                for (int i = 0 ; i < qualifiers.length ; i++){
                    p2[i] = Bytes.toDouble(map.get(Bytes.toBytes(qualifiers[i])));
                }
                // 2点間の距離を算出
                double d = distance(p1, p2);
                
            // ここで結果を’t3’に格納する
            }
        }
        scanner.close();
    }
}

 最後に、計算結果を「t3」に格納します。

// 結果を整形
String key2 = Bytes.toString(rr.getRow());
DecimalFormat df = new DecimalFormat("0.0000000000");
String newKey = df.format(d) + "/" + key.toString() + ":" + key2;
 
// HDFSに出力
  StringBuffer str = new StringBuffer();
  for (int i = 0 ; i < qualifiers.length ; i++){
      str.append("x" + (i+1) + "=" + p1[i] + " ");
      str.append("x" + (i+1) + "=" + p2[i] + "/");
  }
context.write(key, new Text(newKey + " " + str.toString()));
 
//テーブルに出力
HTable table3 = new HTable(config, Bytes.toBytes("t3"));
Put put = new Put(Bytes.toBytes(newKey));   // Keyを指定
put.add(Bytes.toBytes("data"), Bytes.toBytes("key1"), Bytes.toBytes(key.toString()));
put.add(Bytes.toBytes("data"), Bytes.toBytes("key2"), Bytes.toBytes(key2));
table3.put(put);    // 登録

 HbaseはKey値でソートして格納するので、算出した距離をKey値にすれば、単純に取り出しただけで「近い順」になってくれます。逆に「Double.MAX_VALUE-距離」をKey値にすれば「遠い順」に取り出せます。

 それでは、「hbase shell」で結果を見てみましょう。

hbase(main):002:0> count 't3'
Current count: 1000, row: 0.1131250133/row-91:row-14
Current count: 2000, row: 0.1434975286/row-72:row-11
Current count: 3000, row: 0.1691559387/row-48:row-41
Current count: 4000, row: 0.1933620176/row-60:row-26
Current count: 5000, row: 0.2184668296/row-62:row-44
Current count: 6000, row: 0.2445907168/row-7:row-5
Current count: 7000, row: 0.2733257841/row-52:row-15
Current count: 8000, row: 0.3113542664/row-8:row-59
Current count: 9000, row: 0.3675337066/row-52:row-18
Current count: 10000, row: 0.7355109677/row-83:row-63
10000 row(s) in 1.0410 seconds

 元データ「t2」が100件だったので、2乗の1万件が「t3」に登録されています。

 距離が近い順に格納できたのでしょうか。先頭から5件を取り出してみます。

hbase(main):003:0> scan 't3', {LIMIT => 5}
ROW                     COLUMN+CELL
 0.0000000000/row-100:r column=data:key1, timestamp=1329350527267, value=row-100
 0.0000000000/row-100:r column=data:key2, timestamp=1329350527267, value=row-100
 0.0000000000/row-10:ro column=data:key1, timestamp=1329350529985, value=row-10
 0.0000000000/row-10:ro column=data:key2, timestamp=1329350529985, value=row-10
 0.0000000000/row-11:ro column=data:key1, timestamp=1329350526165, value=row-11
 0.0000000000/row-11:ro column=data:key2, timestamp=1329350526165, value=row-11
 0.0000000000/row-12:ro column=data:key1, timestamp=1329350527514, value=row-12
 0.0000000000/row-12:ro column=data:key2, timestamp=1329350527514, value=row-12
 0.0000000000/row-13:ro column=data:key1, timestamp=1329350530359, value=row-13
 0.0000000000/row-13:ro column=data:key2, timestamp=1329350530359, value=row-13
5 row(s) in 0.0890 seconds

 同じ点同士の距離も計算してしまっているため、「row-100とrow-100の距離は0」が一番上に来ていますが、確かに近い順に取得できました。

 今回のように数値の小さい順に結果が欲しい場合には、その数値をKey値にしつつ、Key値がユニークであることも保証する必要があります。

 今回は、計算結果が0.0〜1.0だったので、単に文字化すると「0.00001→1.0E-5」となってしまって大小関係が入れ替わってしまうため、DecimalFormat("0.0000000000")で「0.00001→0.0000100000」となるようにしました。もし1以上であれば、文字列でソートしても期待している順番になるように、「DecimalFormat("0000000000")」とします。

 また、今回は最終的にKey値を「数値+比較した2点のKey値」とすることで、ユニークであることも確保しました。

Key値を何にするかが非常に重要

 KVSでは、Key値を何にするかが非常に重要です。RDBのプライマリキーには機械的に通し番号を振ることが多いですが、KVSのKey値はユニークであることに加え、データを使うときの取り出し方も考えておく必要があります。

 そこが、RDBとKVSの大きな違いかもしれません。

著者紹介

荒本 道隆(あらもと みちたか)

2011年度より、TIS−Hadoopersにて、TISとともに検証活動実施。社内では新しい技術を実験的に実システムへ導入し、評価と検証に取り組んでいる。また社外でも、主に先端IT活用推進コンソーシアムで活動中。


Copyright © ITmedia, Inc. All Rights Reserved.

前のページへ |       
ページトップに戻る