RNN实践之写文章

文章下载

下载文章或重新找一篇文章:
https://pan.baidu.com/s/1-dZd1oKZSawCN0R7LQWz1g

导入环境

import numpy as np
import tensorflow as tf
from tensorflow.contrib import rnn
import random
import time
from collections import Counter

start_time = time.time()
tf.reset_default_graph()
train_file = 'words.txt'

简单时间处理

def str_time(sec):
if sec < 60:
return str(sec) + " sec"
elif sec < (60 * 60):
return str(sec / 60) + " min"
else:
return str(sec / (60 * 60)) + " hour"

处理汉字

def get_char(txt_file):
labels = str()
with open(file=txt_file, mode='rb') as f:
for label in f:
labels = label.decode("utf-8")
return labels

处理多个中文文件

def readfile(files):
labels = list()
for txt_file in files:
target = get_char(txt_file)
labels.append(target)
return labels

将文本数组转换为向量

def char_vector(files, num_map, label=None):
word_size = len(num_map)
vector = lambda word: num_map.get(word, word_size)
if files:
label = get_char(files)
labels_vector = list(map(vector, label))

return labels_vector

样本预处理

train_data = get_char(train_file)
print("Loading training data...")

print(len(train_data))
counter = Counter(train_data)
words = sorted(counter)
words_size = len(words)
words_num_map = dict(zip(words, range(words_size)))

print("字表大小:", words_size)
word_label = char_vector(train_file, words_num_map)

超参数设置

learning_rate = 0.001
epochs = 100000
display_step = 1000
n_input = 4 # 每次输入4个汉字, 预测第5个汉字

# 隐层神经元
n_hidden1 = 256
n_hidden2 = 512
n_hidden3 = 512
keep_prob=0.8
layer_num=3
batch_size=1
# 定义X, Y的placeholder
x = tf.placeholder("float", [None, n_input, 1])
y = tf.placeholder("float", [None, words_size])

# 对 weights biases 初始值的定义
weights = {

'in': tf.Variable(tf.random_normal([n_input,n_hidden1])),

'out': tf.Variable(tf.random_normal([n_hidden2,words_size]))
}
biases = {
# shape (128, )
'in': tf.Variable(tf.constant(0.1, shape=[n_hidden1,])),
# shape (10, )
'out': tf.Variable(tf.constant(0.1, shape=[words_size, ]))
}

定义网络结构


def lstm_call():
cell = tf.nn.rnn_cell.LSTMCell(num_units=n_hidden1, reuse=tf.get_variable_scope().reuse)
return tf.nn.rnn_cell.DropoutWrapper(cell, output_keep_prob=keep_prob)

def RNN(x, weights, biases):

x = tf.reshape(x, [batch_size, n_input, 1]) # (1,4,1) 相当于batch =1
# rnn
cell = tf.contrib.rnn.BasicLSTMCell(n_hidden2)
init_state = cell.zero_state(batch_size, dtype=tf.float32)
# final_state 的维度是 batch * n_hidden --> 1 * 512
# outputs 的维度是 batch * n_input(time_step) * n_hidden --> 1 * 4 * 512
outputs, final_state = tf.nn.dynamic_rnn(cell, x, initial_state=init_state, time_major=False)

# print ("before unstack , output shape : ",outputs.shape) # output shape : (1,3,512) (batch,time_step,cell_n_hidden)
# unstack 更改维度
outputs = tf.unstack(tf.transpose(outputs, [1, 0, 2]))
# 这个时候 outputs 变成了list
# print ("output shape[-1] 2: ",outputs[-1].shape) # output shape : (3,1,512), outputs[-1] shape (1,512)
results = tf.matmul(outputs[-1], weights['out']) + biases['out']
# (1,112) 这个的表示意义是一个(1,112)的onehot,112表示字典里面总共有112个词汇
return results

计算损失值并初始化optimizer

predicted = RNN(x,weights,biases)
# Loss optimizer
loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=predicted, labels=y))
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(loss)

# Model evaluation
correct_pred = tf.equal(tf.argmax(predicted, 1), tf.argmax(y, 1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))

# 保存模型
save_dir = "model/"
saver = tf.train.Saver(max_to_keep=1)

# 初始化所有变量
init = tf.global_variables_initializer()

训练及测试模型

with tf.Session() as sess:
sess.run(init)

# 每训练一次,取后面四个文字向量当做输入,第五个文字向量当做标签用作计算loss
offset = random.randint(0, n_input + 1)
end_offset = n_input + 1
step = 0
loss_total = 0.
acc_total = 0.

# 恢复模型并继续训练
model = tf.train.latest_checkpoint(save_dir)
print("model-ckpt:", model)
start_epoch = 0
if model:
saver.restore(sess, model)
ind = model.find("-")
start_epoch = int(model[ind + 1:])
print(start_epoch)
step = start_epoch

while step < epochs:

# 随机选择一个位置
if offset > (len(train_data) - end_offset):
offset = random.randint(0, n_input + 1)

# 按照指定的位置获取后四个文字向量,当做输入
in_words = [[word_label[word]] for word in range(offset, offset + n_input)]
in_words = np.reshape(np.array(in_words), [-1, n_input, 1])

out_onehot = np.zeros([words_size], dtype=float)
out_onehot[word_label[offset + n_input]] = 1.0
# 所有的字都变成onehot
out_onehot = np.reshape(out_onehot, [1, -1])

_, acc, loss_val, onehot_pred = sess.run([optimizer, accuracy, loss, predicted],
feed_dict={x: in_words, y: out_onehot})
loss_total += loss_val
acc_total += acc
if (step + 1) % display_step == 0:
print("Iter= " + str(step + 1) +
", Average Loss= " + "{:.6f}".format(loss_total / display_step) +
", Average Accuracy= " + "{:.2f}%".format(100 * acc_total / display_step))

acc_total = 0.
loss_total = 0.
in2 = [words[word_label[i]] for i in range(offset, offset + n_input)]
out2 = words[word_label[offset + n_input]]
out_pred = words[int(tf.argmax(onehot_pred, 1).eval())]
print("%s - [%s] vs [%s]" % (in2, out2, out_pred))
saver.save(sess, save_dir + "CharRNN.cpkt", global_step=step)
# 中间隔了一个,作为预测
offset += (n_input + 1)
step += 1

print("Finished!")
saver.save(sess, save_dir + "CharRnn.cpkt", global_step=step)
print("Elapsed time: ", str_time(time.time() - start_time))

# 测试模型
while True:
prompt = "请输入%s个字: " % n_input
sentence = input(prompt)
input_word = sentence.strip()

if len(input_word) != n_input:
print("您输入的字符长度为:", len(input_word), "请输入4个字")
continue
try:
input_word = char_vector(None, words_num_map, input_word)

for i in range(100):
keys = np.reshape(np.array(input_word), [-1, n_input, 1])
onehot_pred = sess.run(predicted, feed_dict={x: keys})
onehot_pred_index = int(tf.argmax(onehot_pred, 1).eval())
sentence = "%s%s" % (sentence, words[onehot_pred_index])
input_word = input_word[1:]
input_word.append(onehot_pred_index)
print(sentence)

except:
print("该字我还没学会")
------ 本文结束 🎉🎉 谢谢观看 ------
0%