import tensorflow as tf
from tensorflow.contrib.layers import fully_connected
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets(r"G:\A_深度学习1\tensorflow\rnn\MNIST_data")
# 参数
batch_size = 100 # mini-batch批大小
n_steps = 28 # 时间步数(序列长度)
n_inputs = 28 # 输入数据长度
n_neurons = 100 # 隐藏状态,神经元个数
n_outputs = 10 # 输出10分类
learning_rate = 0.001 # 学习率
n_epochs = 3 # 训练大周期
# 输入输出占位符
X = tf.placeholder(tf.float32, [None, n_steps, n_inputs]) # 三维数据(?, 28, 28)
y = tf.placeholder(tf.int32, [None]) # 一维输出,标签0-9
# 模型使用最简单的BasicRNNcell
basic_cell = tf.contrib.rnn.BasicRNNCell(num_units=n_neurons) # 定义RNN基础单元
outputs, states = tf.nn.dynamic_rnn(basic_cell, X, dtype=tf.float32) # outputs(?, 28, 100) states(?, 100)
# logits = fully_connected(states, n_outputs, activation_fn=None)
logits = fully_connected(outputs[:, -1], n_outputs, activation_fn=None) # 用最后一个时间步的输出
# 代价或损失函数
loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logits, labels=y))
training_op = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(loss)
# 计算准确率,只有等于y才是对的,其它都错
correct = tf.nn.in_top_k(logits, y, 1) # 每个样本的预测结果的前k个最大的数里面是否包含targets预测中的标签
accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
for epoch in range(n_epochs):
for iteration in range(mnist.train.num_examples // batch_size): # 55000
X_batch, y_batch = mnist.train.next_batch(batch_size)
X_batch = X_batch.reshape((-1, n_steps, n_inputs)) # (?, 28, 28)
sess.run(training_op, feed_dict={X: X_batch, y: y_batch})
acc_train = accuracy.eval(feed_dict={X: X_batch, y: y_batch})
acc_test = accuracy.eval(feed_dict={X: mnist.test.images.reshape((-1, n_steps, n_inputs)),
y: mnist.test.labels})
print(epoch, "Train accuracy:", acc_train, "Test accuracy:", acc_test)
''' 这里解析一下tensorflow.nn.dynamic_rnn(cell,inputs,initial_state=None,dtype=None,time_major=False) 返回值有两个outputs,states.outputs的形状是[batch_size,time_step,cell.output_dim] 这个三维张量的第一个维度是batch_size,我们来看二三维度,相当于每一个batch_size对应于一个二维矩阵, 二维矩阵的每一行长度是rnn的输出维度,代表在当前的batch_size中的当前的time_step的特征。 将outputs取转置并且在第一个维度上取最后一个元素[-1,batch_size,:],也就得到了每一个batch_size中所有数据的最后一个time_step,也就是 最后一个时刻的输出[batch_size,output_dim], 在语言模型中,time_step就是一个句子,所以这个张量[batch_size,output_dim]通常作为下一个时刻的输入,因为这个张量的每一行代表的意义是在 读取了整个句子后所得到的特征,这是很有意义的。如果将outputs取了转置后[time_step,batch_size,output_dim]对这个张量取[1,batch_size,output_dim] 那么现在得到的张量仅仅是在读取了句子中第一个单词后获得的输出,这样的张量如果作为下一层的输入那么模型的效果将惨不忍睹。 现在来讨论states,如果cell是LSTM类型的cell,那么states是一个有两个元素的元祖,为什么有两个元素呢,因为LSTM类型的cell有两个状态 一个是cell state代表该神经元的细胞状态,另一个是hidden state代表该神经元的隐藏状态。 而且这两个状态都是最后一个时刻的神经元的状态,所以states的形状与time_step无关 这两个状态张量的形状都是[batch_size,output_dim] states[0]是cell的状态,states[1]是hidden的状态 所以states[1]与上面提到的提取每一个time_step的最后一个时刻得到的张量[batch_size,output_dim]是一样的,因为它们都是每一个batch_size中 每一个数据的最后一个时刻的特征提取。 '''
封装成类
import tensorflow as tf
import tensorflow.contrib as con
from tensorflow.contrib.layers import fully_connected
from tensorflow.examples.tutorials.mnist import input_data
class myRnnModel():
def __init__(self):
self.mnist = input_data.read_data_sets(r'G:\A_深度学习1\tensorflow\rnn\MNIST_data')
self.epochs = 3 # 迭代总批次
self.batch_size = 100 # mini-batch 大小
self.n_steps = 28 # 时间步数 序列长度
self.n_inputs = 28 # 数据长度
self.n_neurons = 100 # 隐藏状态 神经元个数
self.n_outputs = 10 # 10分类
def train_get_ready(self):
# 三维 一维
self.X, self.y = tf.placeholder('float', shape=[None, self.n_steps, self.n_inputs]), tf.placeholder(tf.int32, shape=[None])
# 定义基础rnn单元
self.basic_cell = con.rnn.BasicRNNCell(num_units=self.batch_size)
# 定义动态rnn
self.outputs, self.states = tf.nn.dynamic_rnn(self.basic_cell, self.X, dtype=tf.float32)
self.logots = fully_connected(self.outputs[:, -1], 10, activation_fn=None) # 用最后一个时间步的输出
def cost_optimizer(self):
self.cost = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=self.logots, labels=self.y))
self.optimizer = tf.train.AdamOptimizer(0.001).minimize(self.cost)
self.correct = tf.nn.in_top_k(self.logots, self.y, 1)
self.acc = tf.reduce_mean(tf.cast(self.correct, 'float'))
def train(self):
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
for e in range(self.epochs):
self.total = int(self.mnist.train.num_examples / self.batch_size)
for i in range(self.total):
x_batch, y_batch = self.mnist.train.next_batch(self.batch_size)
x_batch = x_batch.reshape(-1, self.n_steps, self.n_inputs)
sess.run(self.optimizer, feed_dict={self.X: x_batch, self.y: y_batch})
print(
f'e: {e + 1}, train acc: {sess.run(self.acc, feed_dict={self.X: x_batch, self.y: y_batch})}, test acc: {sess.run(self.acc, feed_dict={self.X: self.mnist.test.images.reshape(-1, self.n_steps, self.n_inputs), self.y: self.mnist.test.labels})}')
rnn = myRnnModel()
rnn.train_get_ready()
rnn.cost_optimizer()
rnn.train()