前景介绍
上期文章介绍TensorFlow入门基础篇,本意就是给介绍强化学习做一篇前置。
本期我们将尝试利用深度强化学习来让神经网络学习自动地玩一款经典的吃豆人小游戏。让我们愉快地开始吧~
吃豆人小游戏的介绍与pygame实现方式参见:Pygame吃豆人小游戏制作
本文实现的版本与上文中实现的版本略有不同,算是上文中实现版本的改进版。
开发工具
Python版本3.7.7
相关模块:
- pygame(1.9.6)
- pytorch(0.4.1)
- torchvision(0.2.2)
- opencv-python
- numpy
- matplotlib
- 以及一些python自带的模块。
原理简介
一、DQN 简介
DQN,即Deep Q Network。
1.1 强化学习(RL)
要介绍DQN,就得先介绍强化学习。强化学习,说白了,就是让AI通过不断地试错来学习,直接上图说明:
先解释图中比较关键的单词吧:
- agent:智能体
- observation:观察
- reward:奖励
- action:动作
- environment:环境
上图中,大脑代表智能体,地球代表环境,其中只有一部分可被大脑观察到,奖励为环境提供给大脑的反馈,动作为智能体根据环境和奖励做出的行动。强化学习的最终目的,就是找到一个好的策略,可以让自己根据这个策略做出的行动获得的奖励最多。
举个例子,现在我们有一个机器手臂(agent),我们想让它实现抓娃娃这个任务。那么机器手臂周围的物体就是环境(environment),而机器手臂可以通过例如摄像头来观察(observation)当前的环境,不过由于只是一个摄像头,所以机器手臂只能观察到环境的一部分。现在,机器手臂根据当前观察到的环境开始了抓娃娃的行动(policy),如果机器手臂离娃娃变近了,那么当前行动带来的环境变化是有利于我们利用机器手臂去抓娃娃的,这时机器手臂获得的奖励(reward)应当是正的,反之就应当是负的。
一般地,我们假设agent所处的状态s为当前观察到的环境,agent根据当前观察到的环境做出的行动为a,从s到a的过程则可称为一个策略π。也即:a=π(s)或者π(a|s)。
1.2 马尔可夫过程(MDP)
马尔可夫过程,即MDP(Markov Decision Process)。其基本假设为“未来只取决于当前”。数学上表示为:
其中,P为概率,St代表某时刻的状态。当然,这里的状态代表的是整个环境,而非observation到的环境。慢着,我们不是在讨论DQN吗,咋扯到了MDP?是这样的,强化学习问题都可以模型化为MDP问题。Why?一个简单的解释就是如果预测模型把之前所有的状态都考虑进来去估计下一个状态,那建立的这个模型也未免太大了。如果将强化学习问题模型化为MDP问题,那么agent只需要根据现在的状态来预测未来的状态,而知道了未来,agent也就有可能找到最好的行动方式了。换而言之,对于每个未来状态,agent都有一个最佳行动与之对应。
1.3 价值函数(Value Function)
接下来,我们考虑这样一个问题:如何定量地描述状态或者说状态的好坏,从而根据状态好坏来确定接下来的行动?首先,我们假设t时刻的状态将获得的回报Gt为:
其中,R代表reward;λ代表折扣因子(discount factor),一般小于1,以体现越是未来所给的reward,对现在的影响越小。上面那个式子看上去可以很好地刻画t时刻状态的好坏,未来的回报越大,这个状态显然越好嘛。然而,上面那个式子存在一个致命的问题:我们必须等到未来所有的时间全部结束之后才能计算出Gt。此时,我们就需要引入一个概念,即价值函数(value function),以更好地刻画t时刻状态未来的潜在价值。其数学形式表示为:
其含义为状态s对未来reward的期望,reward的期望越高,价值自然也就越大。由此,我们就可以通过估计价值函数来间接优化策略π了,即我们知道了每一种状态的优劣,也就知道该如何做出决策了。
当然我们也可以直接优化策略π,我们这里只关注间接优化策略π是因为DQN是基于该思想的。
1.4 贝尔曼方程(Bellman Equation)
贝尔曼方程,也称“动态规划方程”。此处引入贝尔曼方程,是为了估算价值函数的需要。我们把1.3中给出的价值函数展开:
也就是说,当前状态的价值和当前的reward以及下一状态的价值有关。换句话说,价值函数是可以通过迭代来求解的。
1.5 动作价值函数(Action-Value Function)
现在,我们来考虑动作(action),对于t时刻的状态,我们一般有很多种动作可以选择,每个动作之后的t+1时刻的状态是不同的。显然,如果知道了每个动作的价值,那么我们就可以选择价值最大的那个动作去执行了,这就是动作价值函数。其数学形式为:
这里的r为reward,表示在状态s时执行完动作a后得到的reward。π为策略,代表该动作价值函数为在策略π下的动作价值函数。这很好理解,因为对于每个动作,都需要由策略根据当前的状态生成。由于动作价值函数更加直观,应用方便,因此我们一般使用动作价值函数而非价值函数。
1.6 最优价值函数(Optimal Value Function)
如前所述,我们只要找到最优的价值函数,自然也就找到了最优策略。(当然最优策略的求解方法不止这一种,因为DQN是基于此思想的,所以我们只关注该求解方法。)数学形式上很好定义最优动作价值函数:
也就是最优的动作价值函数就是所有策略下的动作价值函数的最大值。显然,最优动作价值函数具有唯一性。应用贝尔曼方程,易得:
显然,当a’取得最大Q值时,Q值为最优值。
基于Bellman方程有两种最基本的算法,策略迭代和价值迭代。策略迭代本质上就是使用当前策略获得新的样本,然后根据新的样本估计当前策略的价值,从而更新当前策略;而价值迭代更新的是价值,最后收敛得到的是当前状态下的最优价值。我们将介绍的DQN是基于价值迭代算法的。
1.7 Q-Learning
Q-Learning的思想基于价值迭代,直观地理解就是每次利用新得到的reward和原本的Q值来更新现在的Q值。其数学形式表示为:
这里并没有直接将估计的Q值作为新的Q值,而是采用类似梯度下降的方式,每次朝target迈近一小步,而步长取决于α,这有利于减少估计误差造成的影响,类似于随机梯度下降,最后将收敛到最优Q值。
具体而言,Q-Learning算法表述如下:
在上面的算法中,我们需要某个策略来生成动作(action),一般而言,我们可以选取以下两种策略:
- 随机生成一个动作;
- 根据当前的Q值计算出一个最优动作。
第二种策略也称为贪心策略,数学形式表示为:
举例而言,假设我们现在在走迷宫,我向上走的Q值为2,向下走的Q值为3,向左走的Q值为5,向右走的Q值为1,那么我们就向左走。
一般而言,我们称第一种策略(随机行动)为exploration;称第二种策略(贪心)为exploitation;将两种策略结合起来就是ϵ-greedy策略,即以概率ϵ进行exploration,以概率1-ϵ进行exploitation,且ϵ一般是一个很小的值。
现在我们来考虑另外一个问题,那就是算法中Q[s, a]如何存储的问题,显然我们可以建立一个二维的表格,类似这样:
但这存在维度灾难问题,比如输入为90*90像素的图片,对于8bit图像,每个像素点都有256种选择,那么总状态数就有256^(8100)种。显然,我们有必要对状态的维度进行压缩,因为我们不可能通过表格来存储如此多的状态,这时就需要引入价值函数近似这个概念。简单而言,价值函数近似就是用一个函数来表示Q(s, a),即:
这样,我们就可以不用去考虑s的表格存储问题了,每次只需要输入当前的状态s和行动a,通过简单的矩阵运算就可以直接得到所需的Q值了。当然,这个近似是存在误差的,因为我们并不知道Q值的实际分布情况。我们再仔细考虑一下,发现动作一般都是低维数据,其实没有必要一起输入到近似函数f中,于是我们有:
即只把状态作为输入,输出值为每个动作的Q值,即输出值是一个向量:
1.8 DQN
前面我们说到,我们可以使用近似函数f来计算Q值。显然,这个函数就可以用神经网络来近似啊!!!换句话说,就是用神经网络来表示Q值,那么这样的网络我们就可以称其为Q-Network了。
那么问题又来了,神经网络都是需要训练的呀,Q网络自然也不例外,于是现在的问题变成了:如何为Q网络提供有标签的样本进行训练呢?重新考察Q值更新公式:
我们发现,Q值的更新依靠的是目标Q值这部分:
因此,我们直接把它当作标签不就行了么,因为我们的最终目标就是要让Q值趋近于目标Q值呀。因此,Q网络的训练损失函数就是:
接下来,我们来看看DQN的算法描述:
这里我们就讲讲原版本的吧(即NeurIPS 2013版本)。算法看起来很简单,其本质其实就是反复实验,并存储实验数据,当实验数据足够多时,就从中随机采样数据,利用梯度下降算法训练Q网络。换句话说,在DQN中,增强学习Q-Learning算法和深度学习的SGD训练是同步进行的,也即通过Q-Learning获取训练样本,然后对神经网络进行训练。
这里涉及到一个问题,那就是为什么需要先存储足够多的实验数据才能开始网络的训练?其实很简单,由于采集的样本是一个时间序列,样本之间具有连续性,如果每次一得到样本就更新Q网络,那么受样本分布的影响,训练效果是不会好的(深度学习一般要求训练样本满足独立同分布)。因此,一个很直接的想法就是把样本先存起来,当样本足够多时,再对其进行随机采样。这个方法就是所谓的Experience Reply。
DQN玩吃豆人
-
游戏介绍:
参见:Pygame吃豆人小游戏制作 -
逐步实现DQN:
(1)游戏实现
首先,当然是实现吃豆人小游戏啦!这个开篇就讲了,实现思路和之前的差不多,做了一些简单的改进。完整源代码在相关文件里的gameAPI文件夹下。gameAPI提供了三个可调用的函数:- nextFrame:用于模型训练和模型测试,玩家不可控制Pacman,由电脑自动操作,函数将返回游戏每帧的数据
- reset:用于游戏重置
这里稍微讲下nextFrame的实现,nextFrame返回了每帧游戏的画面以及自己设计的reward数据等模型训练必要的数据,代码实现如下:
def nextFrame(self, action=None):
if action is None:
action = random.choice(self.actions)
pygame.event.pump()
pressed_keys = pygame.key.get_pressed()
if pressed_keys[pygame.K_q]:
sys.exit(-1)
pygame.quit()
is_win = False
is_gameover = False
reward = 0
self.pacman_sprites.update(action, self.wall_sprites, None)
for pacman in self.pacman_sprites:
food_eaten = pygame.sprite.spritecollide(pacman, self.food_sprites, True)
capsule_eaten = pygame.sprite.spritecollide(pacman, self.capsule_sprites, True)
nonscared_ghost_sprites = pygame.sprite.Group()
dead_ghost_sprites = pygame.sprite.Group()
for ghost in self.ghost_sprites:
if ghost.is_scared:
if pygame.sprite.spritecollide(ghost, self.pacman_sprites, False):
reward += 6
dead_ghost_sprites.add(ghost)
else:
nonscared_ghost_sprites.add(ghost)
for ghost in dead_ghost_sprites:
ghost.reset()
del dead_ghost_sprites
reward += len(food_eaten) * 2
reward += len(capsule_eaten) * 3
if len(capsule_eaten) > 0:
for ghost in self.ghost_sprites:
ghost.is_scared = True
self.ghost_sprites.update(self.wall_sprites, None, self.config.ghost_action_method, self.pacman_sprites)
self.screen.fill(self.config.BLACK)
self.wall_sprites.draw(self.screen)
self.food_sprites.draw(self.screen)
self.capsule_sprites.draw(self.screen)
self.pacman_sprites.draw(self.screen)
self.ghost_sprites.draw(self.screen)
# get frame
frame = pygame.surfarray.array3d(pygame.display.get_surface())
frame = cv2.transpose(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
self.config.frame_size = frame.shape[0]//4, frame.shape[1]//4, frame.shape[2]
frame = cv2.resize(frame, self.config.frame_size[:2])
# show the score
self.score += reward
text = self.font.render('SCORE: %s' % self.score, True, self.config.WHITE)
self.screen.blit(text, (2, 2))
pygame.display.update()
# judge whether game over
if len(self.food_sprites) == 0 and len(self.capsule_sprites) == 0:
is_win = True
is_gameover = True
reward = 10
if pygame.sprite.groupcollide(self.pacman_sprites, nonscared_ghost_sprites, False, False):
is_win = False
is_gameover = True
reward = -15
if reward == 0:
reward = -2
return frame, is_win, is_gameover, reward, action
(2)模型实现
网络模型采用的是resnet18,就把最后的fc层输出从1000改成了4。具体而言,代码实现如下:
'''DQN'''
class DQNet(nn.Module):
def __init__(self, config, **kwargs):
super(DQNet, self).__init__()
self.resnet18 = torchvision.models.resnet18()
self.resnet18.conv1 = nn.Conv2d(in_channels=config.num_continuous_frames*3, out_channels=64, kernel_size=7, stride=2, padding=3, bias=False)
self.resnet18.fc = nn.Linear(in_features=512, out_features=4)
def forward(self, x):
x = self.resnet18(x)
return x
损失函数如何定义已经在前面DQN的原理介绍部分详细说明了,这里就不再多说了,其代码实现如下:
q_t = self.dqn_net(images_input_torch)
q_t = torch.max(q_t, dim=1)[0]
loss = self.mse_loss(torch.Tensor(rewards).type(FloatTensor) + (1 - torch.Tensor(is_gameovers).type(FloatTensor)) * (0.95 * q_t),
(self.dqn_net(images_prev_input_torch) * torch.Tensor(actions).type(FloatTensor)).sum(1))
(3)config.py文件说明
config.py文件里是一些预定义的参数,主要包括模型训练和模型测试以及游戏实现所需要的参数,默认训练次数为十万帧,每一万帧存储一次,如下所示:
'''训练'''
batch_size = 32 # 批次大小
max_explore_iterations = 5000 # 最大迭代大小
max_memory_size = 100000 # 最大内存大小
max_train_iterations = 1000000 # 最大训练次数
save_interval = 10000 # 训练pkl保存间隔
save_dir = 'model_saved' # 训练pkl保存路径
frame_size = None # 框架尺寸根据布局自动计算
num_continuous_frames = 1 # 连续帧
logfile = 'train.log' # 日志文件保存位置
use_cuda = torch.cuda.is_available() # 使用cuda检测存在性
eps_start = 1.0 # 开始探索点
eps_end = 0.1 # 结束探索点
eps_num_steps = 10000 # 探索步数
'''测试'''
weightspath = os.path.join(save_dir, str(max_train_iterations)+'.pkl') # 调用训练集
全部代码
因为代码量过大以及之前写过吃豆人的代码,在这里不重新写入
百度网盘: 百度网盘链接 ,密码: bvb6
此次代码为DQN的完整代码:
import os
import sys
import time
import torch
import random
import numpy as np
import torch.nn as nn
from collections import deque
'''DQN'''
class DQNet(nn.Module):
def __init__(self, config, **kwargs):
super(DQNet, self).__init__()
self.conv1 = nn.Conv2d(in_channels=config.num_element_types*config.num_continuous_frames, out_channels=16, kernel_size=3, stride=1, padding=1)
self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1)
self.fc1 = nn.Linear(in_features=config.frame_size[0]*config.frame_size[1]*32, out_features=256)
self.fc2 = nn.Linear(in_features=256, out_features=4)
self.relu = nn.ReLU(inplace=True)
self.__initWeights()
def forward(self, x):
x = self.conv1(x)
x = self.relu(x)
x = self.conv2(x)
x = self.relu(x).view(x.size(0), -1)
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
return x
def __initWeights(self):
nn.init.normal_(self.conv1.weight, std=0.01)
nn.init.normal_(self.conv2.weight, std=0.01)
nn.init.normal_(self.fc1.weight, std=0.01)
nn.init.normal_(self.fc2.weight, std=0.01)
nn.init.constant_(self.conv1.bias, 0.1)
nn.init.constant_(self.conv2.bias, 0.1)
nn.init.constant_(self.fc1.bias, 0.1)
nn.init.constant_(self.fc2.bias, 0.1)
'''agent实现'''
class DQNAgent():
def __init__(self, game_pacman_agent, dqn_net, config, **kwargs):
self.game_pacman_agent = game_pacman_agent
self.dqn_net = dqn_net
self.config = config
self.game_memories = deque()
self.mse_loss = nn.MSELoss(reduction='elementwise_mean')
'''训练'''
def train(self):
# 准备阶段
if not os.path.exists(self.config.save_dir):
os.mkdir(self.config.save_dir)
if self.config.use_cuda:
self.dqn_net = self.dqn_net.cuda()
FloatTensor = torch.cuda.FloatTensor if self.config.use_cuda else torch.FloatTensor
# 开始训练
frames = []
optimizer = torch.optim.Adam(self.dqn_net.parameters())
num_iter = 0
image = None
image_prev = None
action_pred = None
score_best = 0
num_games = 0
num_wins = 0
while True:
if len(self.game_memories) > self.config.max_memory_size:
self.game_memories.popleft()
frame, is_win, is_gameover, reward, action = self.game_pacman_agent.nextFrame(action=action_pred)
score_best = max(self.game_pacman_agent.score, score_best)
if is_gameover:
self.game_pacman_agent.reset()
if len(self.game_memories) >= self.config.max_explore_iterations:
num_games += 1
num_wins += int(is_win)
frames.append(frame)
if len(frames) == self.config.num_continuous_frames:
image_prev = image
image = np.concatenate(frames, -1)
exprience = (image, image_prev, reward, self.formatAction(action, outformat='networkformat'), is_gameover)
frames.pop(0)
if image_prev is not None:
self.game_memories.append(exprience)
# 探索
if len(self.game_memories) < self.config.max_explore_iterations:
self.__logging('[状态]: explore, [内存]: %d' % len(self.game_memories), self.config.logfile)
# 训练
else:
num_iter += 1
images_input = []
images_prev_input = []
is_gameovers = []
actions = []
rewards = []
for each in random.sample(self.game_memories, self.config.batch_size):
image_input = each[0].astype(np.float32)
image_input.resize((1, *image_input.shape))
images_input.append(image_input)
image_prev_input = each[1].astype(np.float32)
image_prev_input.resize((1, *image_prev_input.shape))
images_prev_input.append(image_prev_input)
rewards.append(each[2])
actions.append(each[3])
is_gameovers.append(each[4])
images_input_torch = torch.from_numpy(np.concatenate(images_input, 0)).permute(0, 3, 1, 2).type(FloatTensor)
images_prev_input_torch = torch.from_numpy(np.concatenate(images_prev_input, 0)).permute(0, 3, 1, 2).type(FloatTensor)
# 损失函数
optimizer.zero_grad()
q_t = self.dqn_net(images_input_torch).detach()
q_t = torch.max(q_t, dim=1)[0]
loss = self.mse_loss(torch.Tensor(rewards).type(FloatTensor) + (1 - torch.Tensor(is_gameovers).type(FloatTensor)) * (0.95 * q_t),
(self.dqn_net(images_prev_input_torch) * torch.Tensor(actions).type(FloatTensor)).sum(1))
loss.backward()
optimizer.step()
# 做下步决定
prob = max(self.config.eps_start-(self.config.eps_start-self.config.eps_end)/self.config.eps_num_steps*num_iter, self.config.eps_end)
if random.random() > prob:
with torch.no_grad():
self.dqn_net.eval()
image_input = image.astype(np.float32)
image_input.resize((1, *image_input.shape))
image_input_torch = torch.from_numpy(image_input).permute(0, 3, 1, 2).type(FloatTensor)
action_pred = self.dqn_net(image_input_torch).view(-1).tolist()
action_pred = self.formatAction(action_pred, outformat='oriactionformat')
self.dqn_net.train()
else:
action_pred = None
self.__logging('[状态]: training, [重复]: %d, [损失值]: %.3f, [行动]: %s, [最高分]: %d, [训练程度]: %d/%d' % (num_iter, loss.item(), str(action_pred), score_best, num_wins, num_games), self.config.logfile)
if num_iter % self.config.save_interval == 0 or num_iter == self.config.max_train_iterations:
torch.save(self.dqn_net.state_dict(), os.path.join(self.config.save_dir, '%s.pkl' % num_iter))
if num_iter == self.config.max_train_iterations:
self.__logging('训练完成', self.config.logfile)
sys.exit(-1)
'''训练'''
def test(self):
if self.config.use_cuda:
self.dqn_net = self.dqn_net.cuda()
self.dqn_net.eval()
FloatTensor = torch.cuda.FloatTensor if self.config.use_cuda else torch.FloatTensor
frames = []
action_pred = None
while True:
frame, is_win, is_gameover, reward, action = self.game_pacman_agent.nextFrame(action=action_pred)
if is_gameover:
self.game_pacman_agent.reset()
frames.append(frame)
if len(frames) == self.config.num_continuous_frames:
image = np.concatenate(frames, -1)
if random.random() > self.config.eps_end:
with torch.no_grad():
image_input = image.astype(np.float32)
image_input.resize((1, *image_input.shape))
image_input_torch = torch.from_numpy(image_input).permute(0, 3, 1, 2).type(FloatTensor)
action_pred = self.dqn_net(image_input_torch).view(-1).tolist()
action_pred = self.formatAction(action_pred, outformat='oriactionformat')
else:
action_pred = None
frames.pop(0)
print('[行动]: %s' % str(action_pred))
def formatAction(self, action, outformat='networkformat'):
if outformat == 'networkformat':
if action == [-1, 0]:
return [1, 0, 0, 0]
elif action == [1, 0]:
return [0, 1, 0, 0]
elif action == [0, -1]:
return [0, 0, 1, 0]
elif action == [0, 1]:
return [0, 0, 0, 1]
elif outformat == 'oriactionformat':
idx = action.index(max(action))
if idx == 0:
return [-1, 0]
elif idx == 1:
return [1, 0]
elif idx == 2:
return [0, -1]
elif idx == 3:
return [0, 1]
def __logging(self, message, savefile=None):
content = '%s %s' % (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), message)
if savefile:
f = open(savefile, 'a')
f.write(content + '\n')
f.close()
print(content)
运行代码
模型训练: 根据自己的需要修改config.py文件(可以不修改),然后运行train.py文件即可。
模型测试: 根据自己的需要修改config.py文件(可以不修改),然后运行demo.py文件即可。