tensorflow2搭建DeepQNet框架
关于DQN的工作过程可以参考以下图片:
图片来自链接: https://zhuanlan.zhihu.com/p/70009692.
结合莫烦的机器学习教程,我总结了我搭建的DQN的步骤以及过程。
- 参数的初始化。
import tensorflow as tf
from tensorflow.keras import layers
import numpy as np
class DeepQNetwork:
def __init__(
self,
n_actions,
n_features,
learning_rate=0.01,
reward_decay=0.9,
replace_target_iter=300,
e_greedy=0.9,
e_greedy_increment=0.009,
memory_size=500,
batch_size=32
):
self.n_actions = n_actions
self.n_features = n_features
self.lr = learning_rate
self.gamma = reward_decay
self.epsilon_max = e_greedy
self.replace_target_iter = replace_target_iter
self.epsilon_increment = e_greedy_increment
self.memory_size = memory_size
self.batch_size = batch_size
self.epsilon = 0.9
self._built_model()
self.memory_counter = 0 # 记录学习次数(用于判断是否更换 target_net 参数)
self.learn_step_counter = 0
self.cost_his = [] # 记录所有 cost 变化, 用于最后 plot 出来观看
self.memory = np.zeros((self.memory_size, n_features * 2 + 2)) # 两个state加上reward和action
- 网络的创建
def _built_model(self):
# 创建当前值网络
input_shape = np.zeros(4)
EvalModel = tf.keras.Sequential(
[
layers.Dense(self.n_features, activation='relu'),
layers.Dense(self.n_actions, activation=None)
]
)
EvalModel.compile(optimizer=tf.keras.optimizers.SGD(lr=0.1),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
metrics=['categorical_accuracy'])
self.EvalModel = EvalModel
# 创建目标值网络
TargetModel = tf.keras.Sequential(
[
layers.Dense(self.n_features, activation='relu'),
layers.Dense(self.n_actions, activation=None)
]
)
TargetModel.compile(optimizer=tf.keras.optimizers.SGD(lr=0.1),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
metrics=['categorical_accuracy'])
self.TargetModel = TargetModel
DQN需要两套网络,这里使用Sequential方式创建的,创建模式比较固定。
- 记忆库的更新
def store_transition(self, s, a, r, s_):
if not hasattr(self, 'memory_counter'):
self.memory_counter = 0
transition = np.hstack((s, [a, r], s_)) # 记录一条记录在水平方向上平埔帮
index = self.memory_counter % self.memory_size # 完成更新对于buffer的更新技巧
self.memory[index, :] = transition
self.memory_counter += 1 # 完成step更新
- 行为选择功能
def choose_action(self, observation):
observation = observation[np.newaxis, :] # 将观测值变成一维喂入预测模型中,进行下一步action选择
if np.random.uniform() < self.epsilon:
action_value = self.EvalModel.predict(observation) # 预测下一个action值进行选择
action = np.argmax(action_value) # 选择价值最大的动作
else:
action = np.random.randint(0, self.n_actions) # 随机一个动作
return action
- learn的方式
def learn(self):
# 实现target网络参数的更新
if self.learn_step_counter % self.replace_target_iter == 0:
self.TargetModel = self.EvalModel
print('Replace the target\n')
# 从 memory 中随机抽取 batch_size 这么多记忆
if self.memory_counter > self.memory_size:
sample_index = np.random.choice(self.memory_size, self.batch_size)
else:
sample_index = np.random.choice(self.memory_counter, self.batch_size)
batch_memory = self.memory[sample_index, :]
# 获取 q_next (target_net 产生了 q) 和 q_eval(eval_net 产生的 q)
q_next = self.TargetModel.predict(batch_memory[:, -self.n_features:])
q_eval = self.EvalModel.predict(batch_memory[:, :self.n_features])
q_target = q_eval.copy()
batch_index = np.arange(self.batch_size, dtype=np.int32)
eval_act_index = batch_memory[:, self.n_features].astype(int)
reward = batch_memory[:, self.n_features + 1]
q_target[batch_index, eval_act_index] = reward + self.gamma * np.max(q_next, axis=1)
self.cost = self.EvalModel.train_on_batch(batch_memory[:, :self.n_features], q_target[:, 2])
# self.cost = self.EvalModel.train_on_batch(a, q_target)
# self.cost = self.EvalModel.train_on_batch([1,2], [3,4])
self.cost_his.append(self.cost)
# increasing epsilon
self.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_max
self.learn_step_counter += 1