「文書生成」チャレンジの後編。ネットワークにLSTM、ライブラリにKeras+TensorFlowを採用し、さらに精度を改善していく。最後に、全然関係ない入力文章から、江戸川乱歩風文書が生成されるかを試す。
ご注意:本記事は、@IT/Deep Insider編集部(デジタルアドバンテージ社)が「deepinsider.jp」というサイトから、内容を改変することなく、そのまま「@IT」へと転載したものです。このため用字用語の統一ルールなどは@ITのそれとは一致しません。あらかじめご了承ください。
時系列データの処理に適しているLSTM(Long Short-Term Memory)を応用して、文書生成に挑戦してみようというのが、前回の主旨である。文書を構成する各単語にインデックスを付与すれば、それは単なる数字の羅列である。株価予測のときと同じように、入力の \(n\) 個(前回記事では40)の数字から、次の数字を予測するよう、ニューラルネットを学習させる。これで、理屈の上では、いくらでも文章を紡ぎ出せるはずであった。
しかしながら、世の中はそんなに甘くない。単純にLSTMレイヤーを積んだだけでは、全然ダメだった。そこで、ニューラルネットを「単語出現頻度を予測するもの」と「頻度グループごとに単語を予測するもの」の2段構えにした。単語出現頻度予測の方は、分類数が語彙数から頻度区分け数(前回記事では7つ)に減少するため、学習が容易になって精度が向上すると考えた。また、頻度グループごとの単語予測は、各単語の出現パターンが似てくるため、こちらも予測精度が向上すると期待した。
この目論見はある程度的中し、比較的日本語っぽい文章の生成に成功した、というのが、前回の状況である。
ニューラルネットの単語予測の改善により、より日本語らしい文書生成の実現を目指す。ニューラルネット全体の予測精度は、最終的に96%程度まで改善した。このニューラルネットを使って、江戸川乱歩とは全然関係ない入力文章から、江戸川乱歩風文書が生成されるかどうかを見てみる。
本稿ではKeras(バックエンド:TensorFlow)を使って実装する。また、TensorFlowやKerasはインストール済みを前提に論を進める。
なお、データ量が多いため、前回同様、本稿に記載するコードを実行する際には、GPUマシンかクラウドのGPUサービス利用を推奨する。
また、前回作成した「model_words_」で始まる名前のファイル7個や単語分類用ニューラルネットは今回も使用するので注意してほしい。
前回は7種類に分類したが、分類数を減らせば精度向上が期待できる。しかしそれでは、後続の単語予測ニューラルネットの精度が下がってしまうので、2択分類のニューラルネットを多段(今回の事例では3段)に配置することで、精度向上を図る。word2vecの記事で紹介した、階層化Softmaxに似た発想である。以下にそのイメージを示す。
以下にリストを示す。リスト1からリスト3までは、前回と同じである。
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import csv
import pandas as pd
import random
import numpy.random as nr
import sys
import h5py
import keras
import math
from __future__ import print_function
from keras.layers.core import Activation
from keras.layers.core import Dense
from keras.layers.core import Dropout
from keras.layers.core import Flatten
from keras.layers.core import Masking
from keras.models import Sequential
from keras.layers import Input
from keras.models import Model
from keras.layers.recurrent import SimpleRNN
from keras.layers.recurrent import LSTM
from keras.layers.embeddings import Embedding
from keras.callbacks import EarlyStopping
from keras.callbacks import ReduceLROnPlateau
from keras.layers.normalization import BatchNormalization
from keras.initializers import glorot_uniform
from keras.initializers import uniform
from keras.initializers import orthogonal
from keras.initializers import TruncatedNormal
from keras.optimizers import RMSprop
from keras import regularizers
from keras.constraints import maxnorm, non_neg
from keras.utils.data_utils import get_file
from keras.utils import np_utils
# 元データ
df1 = csv.reader(open('rampo_separate.csv', 'r'))
df2 = csv.reader(open('rampo_separate2.csv', 'r'))
df3 = csv.reader(open('rampo_separate3.csv', 'r'))
data1 = [ v for v in df1]
data2 = [ v for v in df2]
data3 = [ v for v in df3]
mat1 = np.array(data1)
mat2 = np.array(data2)
mat3 = np.array(data3)
mat = np.r_[mat1[:, 0], mat2[:, 0], mat3[:, 0]]
print(mat.shape)
words = sorted(list(set(mat)))
cnt = np.zeros(len(words))
print('total words:', len(words))
word_indices = dict((w, i) for i, w in enumerate(words)) # 単語をキーにインデックス検索
indices_word = dict((i, w) for i, w in enumerate(words)) # インデックスをキーに単語を検索
# 単語の出現数をカウント
for j in range (0, len(mat)):
cnt[word_indices[mat[j]]] += 1
# 出現頻度の少ない単語を未知語「UNK」で置き換え
words_unk = [] # 未知語一覧
for k in range(0, len(words)):
if cnt[k] <= 3:
words_unk.append(words[k])
words[k] = 'UNK'
print('低頻度語数:', len(words_unk)) # words_unkはUNKに変換された単語のリスト
words = sorted(list(set(words)))
print('total words:', len(words))
word_indices = dict((w, i) for i, w in enumerate(words)) # 単語をキーにインデックス検索
indices_word = dict((i, w) for i, w in enumerate(words)) # インデックスをキーに単語を検索
以下のリスト4は、頻度の2択分類訓練データ作成処理である。リスト中の以下の変数の値を変えて、表2のように6パターン実行する。まずはパターン0を実行しておく。
パターン | 学習対象頻度 | 分類しきい値 | n_lower | n_split | n_upper |
---|---|---|---|---|---|
パターン0 | 全単語 | 300 | 0 | 300 | 400000 |
パターン1 | 300未満 | 28 | 0 | 28 | 300 |
パターン2 | 28未満 | 10 | 0 | 10 | 28 |
パターン3 | 28〜300 | 100 | 28 | 100 | 300 |
パターン4 | 300以上 | 2000 | 300 | 2000 | 400000 |
パターン5 | 2000以上 | 15000 | 2000 | 15000 | 400000 |
maxlen = 40 # 入力語数
n_upper = 400000 # 学習対象単語の出現頻度上限
n_split = 300 # 分類しきい値
n_lower = 0 # 学習対象単語の出現頻度下限
mat_urtext = np.zeros((len(mat), 1), dtype=int)
for i in range(0, len(mat)):
#row = np.zeros(len(words), dtype=np.float32)
if mat[i] in word_indices : # 出現頻度の低い単語のインデックスをUNKのそれに置き換え
if word_indices[mat[i]] != 0 : # 0パディング対策
mat_urtext[i, 0] = word_indices[mat[i]]
else:
mat_urtext[i, 0] = len(words)
else:
mat_urtext[i, 0] = word_indices['UNK']
print(mat_urtext.shape)
# 単語の出現数をもう一度カウント:UNK置き換えでwords_indeicesが変わっているため
cnt = np.zeros(len(words)+1)
for j in range (0, len(mat)):
cnt[mat_urtext[j, 0]] += 1
print(cnt.shape)
len_seq = len(mat_urtext)-maxlen
data = []
target = []
for i in range(0, len_seq):
# 答えの単語の出現頻度がn_lower以上でかつn_upper 未満の場合を学習対象にする
if cnt[mat_urtext[i+maxlen, :]] >= n_lower and cnt[mat_urtext[i+maxlen, :]] <= n_upper:
data.append(mat_urtext[i:i+maxlen, :])
#target.append(mat_urtext[i+maxlen, :])
# 出現頻度に応じてラベルの値を設定
if cnt[mat_urtext[i+maxlen, :]] < n_split : # 頻度がn_split未満なら0
target.append(0)
else: # 頻度がn_split以上なら1
target.append(1)
x = np.array(data).reshape(len(data), maxlen, 1)
t = np.array(target).reshape(len(data), 1)
z = list(zip(x, t))
nr.seed(12345)
nr.shuffle(z) # シャッフル
x, t = zip(*z)
x = np.array(x).reshape(len(data), maxlen, 1)
t = np.array(t).reshape(len(data), 1)
print(x.shape, t.shape)
x_train = x # 元データを訓練用と評価用に分割しない
t_train = t
print(x_train.shape, t_train.shape)
ニューラルネット本体は以下のとおり。前回のリスト5-1の単語分類用ニューラルネットと少し異なるので注意してほしい。具体的には、分類が2択なので、活性化関数をsigmoidに変更してある。また、dropoutを使用しない方が精度が向上したので、dropout無しにしてある。
class Prediction :
def __init__(self, maxlen, n_hidden, input_dim, vec_dim, output_dim):
self.maxlen = maxlen
self.n_hidden = n_hidden
self.input_dim = input_dim
self.output_dim = output_dim
self.vec_dim = vec_dim
def create_model(self):
model = Sequential()
print('#3')
model.add(Embedding(self.input_dim, self.vec_dim, input_length=self.maxlen, trainable=True,
embeddings_initializer=uniform(seed=20170719)))
model.add(BatchNormalization(axis=-1))
print('#4')
model.add(Masking(mask_value=0, input_shape=(self.maxlen, self.vec_dim)))
model.add(LSTM(self.n_hidden, batch_input_shape=(None, self.maxlen, self.vec_dim),
kernel_initializer=glorot_uniform(seed=20170719),
recurrent_initializer=orthogonal(gain=1.0, seed=20170719)
))
print('#5')
model.add(BatchNormalization(axis=-1))
print('#6')
model.add(Dense(self.output_dim, activation='sigmoid', use_bias=True,
kernel_initializer=glorot_uniform(seed=20170719)))
model.compile(loss="binary_crossentropy", optimizer="RMSprop", metrics=['binary_accuracy'])
return model
# 学習
def train(self, x_train, t_train, batch_size, epochs, emb_param) :
early_stopping = EarlyStopping(monitor='loss', patience=4, verbose=1)
print('#2', t_train.shape)
model = self.create_model()
print('#7')
model.fit(x_train, t_train, batch_size=batch_size, epochs=epochs, verbose=1,
shuffle=True, callbacks=[early_stopping], validation_split=0.0)
return model
メイン処理は以下のとおり。ポイントは出力次元output_dimが1になっているところである。
n_pattern = 0
vec_dim = 400
epochs = 31
batch_size = 200
input_dim = len(words)+1
output_dim = 1
n_hidden = int(vec_dim*1.5) # 隠れ層の次元
prediction = Prediction(maxlen, n_hidden, input_dim, vec_dim, output_dim)
emb_param = 'param_classify_by_freq_'+str(n_pattern)+'_'+str(n_lower)+'_'+str(n_split)+'_'+str(n_upper)+'.hdf5' # パラメーター名
print(emb_param)
row = x_train.shape[0]
x_train = x_train.reshape(row, maxlen)
model = prediction.train(x_train, t_train, batch_size, epochs, emb_param)
model.save_weights(emb_param) # 学習済みパラメーターセーブ
score = model.evaluate(x_train, t_train, batch_size=batch_size, verbose=1)
print("score:", score)
パラメーターファイルは、分類パターンに応じて、以下のようにそれぞれ別名を付与する。
パターン | 学習対象頻度 | パラメーターファイル名 |
---|---|---|
パターン0 | 全単語 | param_classify_by_freq_0_0_300_400000.hdf5 |
パターン1 | 300未満 | param_classify_by_freq_1_0_28_300.hdf5 |
パターン2 | 28未満 | param_classify_by_freq_2_0_10_28.hdf5 |
パターン3 | 28〜300 | param_classify_by_freq_3_28_100_300.hdf5 |
パターン4 | 300以上 | param_classify_by_freq_4_300_2000_400000.hdf5 |
パターン5 | 2000以上 | param_classify_by_freq_5_2000_15000_400000.hdf5 |
表2 頻度分類パラメーターファイル名 |
ここまでに示したリスト1〜リスト6-1を実行すると、表2に示したパターン0が完了している状態だ。残りの5つのパターン、つまりパターン1〜パターン5もここで実行する。
これにはまず、n_lower/n_split/n_upperの値を前掲の表2に示したものに置き換えて、リスト4-1を再実行する。
次に、n_patternの値をパターン名の「1」〜「5」に置き換えて、リスト6-1を再実行すればよい。
この2つの再実行をパターン5まで繰り返すことで、6つのパラメーターファイルを生成する。
これにより、各ニューラルネットの正解率は98%以上に改善した。単語出現頻度分類全体では、ニューラルネットが3段になっているので、86%以上の正解率となる。単語予測のほうは90%以上の正解率だったので、全体としての正解率は78%ということになる。
生成したパラメーターファイルを使って、文書生成を実行してみる。
まず、ニューラルネット定義である。分類用と、単語推定用の2種類用意する。なお、その前に、本稿のリスト1〜リスト3と前回のリスト4を実行して訓練データを作成しておく必要がある。
# 頻度分類用
class Prediction_freq :
def __init__(self, maxlen, n_hidden, input_dim, vec_dim, output_dim):
self.maxlen = maxlen
self.n_hidden = n_hidden
self.input_dim = input_dim
self.output_dim = output_dim
self.vec_dim = vec_dim
#self.t_dim = t_dim
def create_model(self):
model = Sequential()
print('#3')
model.add(Embedding(self.input_dim, self.vec_dim, input_length=self.maxlen))
model.add(BatchNormalization(axis=-1))
print('#4')
model.add(Masking(mask_value=0, input_shape=(self.maxlen, self.vec_dim)))
model.add(LSTM(self.n_hidden, batch_input_shape=(None, self.maxlen, self.vec_dim)))
print('#5')
model.add(BatchNormalization(axis=-1))
print('#6')
model.add(Dense(self.output_dim, activation='sigmoid'))
return model
# 単語推定用
class Prediction_words :
def __init__(self, maxlen, n_hidden, input_dim, vec_dim, output_dim):
self.maxlen = maxlen
self.n_hidden = n_hidden
self.input_dim = input_dim
self.vec_dim = vec_dim
self.output_dim = output_dim
def create_model(self):
model = Sequential()
print('#3')
model.add(Embedding(self.input_dim, self.vec_dim, input_length=self.maxlen, trainable=True,
embeddings_initializer=uniform(seed=20170719)))
model.add(BatchNormalization(axis=-1))
print('#4')
model.add(Masking(mask_value=0, input_shape=(self.maxlen, self.vec_dim)))
model.add(LSTM(self.n_hidden, batch_input_shape=(None, self.maxlen, self.vec_dim)))
print('#5')
model.add(BatchNormalization(axis=-1))
print('#6')
model.add(Dense(self.output_dim, activation='softmax'))
return model
次にパラメーターのロードである。パラメーターファイルの種類と数が変わっているので、それに応じて変更してある。
vec_dim = 400
epochs = 100
batch_size = 200
input_dim = len(words)+1
unk_dim = len(words_unk)+1
output_dim = input_dim
n_sigmoid = 1
n_hidden = int(vec_dim*1.5) # 隠れ層の次元
# 頻度分類用
prediction_freq = Prediction_freq(maxlen, n_hidden, input_dim, vec_dim, n_sigmoid)
print('頻度分類用ニューラルネット_0活性化')
model_classify_freq_0 = prediction_freq.create_model()
print('頻度分類用ニューラルネット_1活性化')
model_classify_freq_1 = prediction_freq.create_model()
print('頻度分類用ニューラルネット_2活性化')
model_classify_freq_2 = prediction_freq.create_model()
print('頻度分類用ニューラルネット_3活性化')
model_classify_freq_3 = prediction_freq.create_model()
print('頻度分類用ニューラルネット_4活性化')
model_classify_freq_4 = prediction_freq.create_model()
print('頻度分類用ニューラルネット_5活性化')
model_classify_freq_5 = prediction_freq.create_model()
print()
# 単語予測用
prediction_words = Prediction_words(maxlen, n_hidden, input_dim, vec_dim, output_dim)
print('単語分類用ニューラルネット(0_10)活性化')
model_words_0_10 = prediction_words.create_model()
print('単語分類用ニューラルネット(10-28)活性化')
model_words_10_28 = prediction_words.create_model()
print('単語分類用ニューラルネット(28-100)活性化')
model_words_28_100 = prediction_words.create_model()
print('単語分類用ニューラルネット(100-300)活性化')
model_words_100_300 = prediction_words.create_model()
print('単語分類用ニューラルネット(300-2000)活性化')
model_words_300_2000 = prediction_words.create_model()
print('単語分類用ニューラルネット(2000-15000)活性化')
model_words_2000_15000 = prediction_words.create_model()
print('単語分類用ニューラルネット(15000-400000)活性化')
model_words_15000_400000 = prediction_words.create_model()
print()
# パラメーターロード
print('頻度分類用ニューラルネット_0パラメーターロード')
model_classify_freq_0.load_weights('param_classify_by_freq_0_0_300_400000.hdf5')
print('頻度分類用ニューラルネット_1パラメーターロード')
model_classify_freq_1.load_weights('param_classify_by_freq_1_0_28_300.hdf5')
print('頻度分類用ニューラルネット_2パラメーターロード')
model_classify_freq_2.load_weights('param_classify_by_freq_2_0_10_28.hdf5')
print('頻度分類用ニューラルネット_3パラメーターロード')
model_classify_freq_3.load_weights('param_classify_by_freq_3_28_100_300.hdf5')
print('頻度分類用ニューラルネット_4パラメーターロード')
model_classify_freq_4.load_weights('param_classify_by_freq_4_300_2000_400000.hdf5')
print('頻度分類用ニューラルネット_5パラメーターロード')
model_classify_freq_5.load_weights('param_classify_by_freq_5_2000_15000_400000.hdf5')
print()
print('単語分類用ニューラルネット(0-10)パラメーターロード')
model_words_0_10.load_weights('param_words_0_0_10.hdf5')
print('単語分類用ニューラルネット(10-28)パラメーターロード')
model_words_10_28.load_weights('param_words_1_10_28.hdf5')
print('単語分類用ニューラルネット(28-100)パラメーターロード')
model_words_28_100.load_weights('param_words_2_28_100.hdf5')
print('単語分類用ニューラルネット(100-300)パラメーターロード')
model_words_100_300.load_weights('param_words_3_100_300.hdf5')
print('単語分類用ニューラルネット(300-2000)パラメーターロード')
model_words_300_2000.load_weights('param_words_4_300_2000.hdf5')
print('単語分類用ニューラルネット(2000-15000)パラメーターロード')
model_words_2000_15000.load_weights('param_words_5_2000_15000.hdf5')
print('単語分類用ニューラルネット(15000-400000)パラメーターロード')
model_words_15000_400000.load_weights('param_words_6_15000_400000.hdf5')
print()
文書生成のメイン処理である。分類判定が多段のため、分岐が複雑になっている。
n_init = 6000
# 単語
x_validation = x_train[n_init, :, :]
x_validation = x_validation.T
row = x_validation.shape[0] # 評価データ数
x_validation = x_validation.reshape(row, maxlen)
text_gen = '' # 生成テキスト
for i in range(0, maxlen) :
text_gen += indices_word[x_validation[0, i]]
print(text_gen)
print()
# 正解データ
text_correct = ''
for j in range(0, 4) :
x_correct = x_train[n_init+j*maxlen, :, :]
x_correct = x_correct.T
x_correct = x_correct.reshape(row, maxlen)
for i in range(0, maxlen) :
text_correct += indices_word[x_correct[0, i]]
print('正解')
print(text_correct)
print()
# 応答文生成
for k in range (0, 100) :
# 単語予測
# 300
ret_0 = model_classify_freq_0.predict(x_validation, batch_size=batch_size, verbose=0) # 評価結果
ret_0 = ret_0.reshape(row, n_sigmoid)
flag_0 = ret_0[0, 0]
# 最大値インデックス
if flag_0 < 0.5 : # 300未満
ret_1 = model_classify_freq_1.predict(x_validation, batch_size=batch_size, verbose=0) # 評価結果
ret_1 = ret_1.reshape(row, n_sigmoid)
flag_1 = ret_1[0, 0]
if flag_1 < 0.5 : # 28未満
ret_2 = model_classify_freq_2.predict(x_validation, batch_size=batch_size, verbose=0) # 評価結果
ret_2 = ret_2.reshape(row, n_sigmoid)
flag_2 = ret_2[0, 0]
if flag_2< 0.5 : # 10未満
pred_freq = 0
ret = model_words_0_10.predict(x_validation, batch_size=batch_size, verbose=0)
else : # 10以上28未満
pred_freq = 1
ret = model_words_10_28.predict(x_validation, batch_size=batch_size, verbose=0)
else : # 28以上
ret_3 = model_classify_freq_3.predict(x_validation, batch_size=batch_size, verbose=0) # 評価結果
ret_3 = ret_3.reshape(row, n_sigmoid)
flag_3 = ret_3[0, 0]
if flag_3 <0.5 : # 28以上100未満
pred_freq = 2
ret = model_words_28_100.predict(x_validation, batch_size=batch_size, verbose=0)
else : # 100以上300未満
pred_freq = 3
ret = model_words_100_300.predict(x_validation, batch_size=batch_size, verbose=0)
else : # 300以上
ret_4 = model_classify_freq_4.predict(x_validation, batch_size=batch_size, verbose=0) # 評価結果
ret_4 = ret_4.reshape(row, n_sigmoid)
flag_4 = ret_4[0, 0]
if flag_4 <0.5 : # 300以上2000未満
pred_freq = 4
ret = model_words_300_2000.predict(x_validation, batch_size=batch_size, verbose=0)
else : # 2000以上
ret_5 = model_classify_freq_5.predict(x_validation, batch_size=batch_size, verbose=0) # 評価結果
ret_5 = ret_5.reshape(row, n_sigmoid)
flag_5 = ret_5[0, 0]
if flag_5 < 0.5 : # 2000以上15000未満
pred_freq = 5
ret = model_words_2000_15000.predict(x_validation, batch_size=batch_size, verbose=0)
else : # 15000以上
pred_freq = 6
ret = model_words_15000_400000.predict(x_validation, batch_size=batch_size, verbose=0)
ret_word = ret.argmax(1)[0]
print(pred_freq, '\t', indices_word[ret_word])
text_gen += indices_word[ret_word] # 生成文字を追加
x_validation[0, 0:maxlen-1] = x_validation[0, 1:maxlen]
x_validation[0, maxlen-1] = ret_word # 1文字シフト
print()
print(text_gen)
これらのリストを、以下の順で実行する。
結果は以下のとおり。前回よりマシになったかというと、微妙なところである。
「はございますまいか。考えて見れば、この世界の、人目につかぬ隅々では、どの様にUNK、恐ろしい事柄が、行われているか、ほんとうに想像の外《ほか》で
ございます。無論始めの終りは差入屋の方で、犯人の煙草について、差入屋《UNK》という様な差入屋の手が都合のよい――何でも政治上の秘密な運動を行うせるかどうか、実際三島駅に在UNK帰るというのだ。それはどんな問題だというのか、蕗屋の点は考の種類の老婆だったのでしょう。あの旭屋の外に過ぎないので、いやあなめには、いやあなめ」
Copyright© Digital Advantage Corp. All Rights Reserved.