欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

强化学习之DQN:算法思想、案例及详解,使用DQN算法实现自动走迷宫

程序员文章站 2022-03-03 18:45:07
DQN算法及案例:如何使用DQN实现走迷宫算法原理案例详解算法原理在了解DQN之前,我们需要知道值函数近似这一思想。值函数近似:在数据较大情况下,根据s&a去查询Q值会比较困难。值函数近似是输入s和a,近似地计算得到Q值,在DQN中使用的是神经网络完成这一步骤,简单情况下使用线性函数也行。DQN:融合神经网络和Q值。Q值难以记录,可以通过神经网络输入s、a,然后输出Q值进行策略的选择。前面提到的算法训练目标是Q矩阵,DQN训练目标训练一个神经网络,可以通过这个网络来计算得到Q值并选择最优动作...

DQN算法及案例:如何使用DQN实现走迷宫

算法原理

在了解DQN之前,我们需要知道值函数近似这一思想。
值函数近似:在数据较大情况下,根据s&a去查询Q值会比较困难。值函数近似是输入s和a,近似地计算得到Q值,在DQN中使用的是神经网络完成这一步骤,简单情况下使用线性函数也行。

DQN:融合神经网络和Q值。Q值难以记录,可以通过神经网络输入s、a,然后输出Q值进行策略的选择。前面提到的算法训练目标是Q矩阵,DQN训练目标训练一个神经网络,可以通过这个网络来计算得到Q值并选择最优动作。

那么简单而言,相比于Q-Learning,DQN做的改进是使用了卷积神经网络来逼近行为值函数。

DQN之所以能有效地进行,主要归功于它的两大利器:
(1)Experience replay(经验回放):神经网络的训练样本独立,而 RL 中的前后状态相关,所以需要做一点改动。因为 Q-Learning 是一个离线学习算法,所以在每次 DQN 更新时,可以随机选取一些过去的状态来进行学习。这种方式打乱了状态之间的相关性,可以使神经网络更有效率。

(2)Fixed Q-targets(固定 Q-目标):Fixed Q-targets 也是一种打乱相关性的机理, 如果使用 fixed Q-targets, 我们就会在 DQN 中使用到两个结构相同但参数不同的神经网络, 预测 Q 估计 的神经网络具备最新的参数ω, 而预测 Q 现实 的神经网络使用的参数则是隔段时间更新的ω−。需要注意的是:
Q_eval 根据喂的数据(s,a)来直接计算Q值。
Q_target =reward+ γmaxQ(St+1,a′;ω−)。(Q(St+1,a′;ω−)取St+1状态下选取a’动作所能达到的最大值,属于Q_learning的更新方式,也可以用其它方法计算Q现实)

首先初始化Memory D,它的容量为N;
初始化Q网络,随机生成权重ω;
初始化target Q网络,权重为ω−=ω;

循环遍历episode =1, 2, …, M:
	初始化initial state S1;
	循环遍历step =1,2,…, T:
		用ϵ−greedy策略生成action at:以ϵ概率选择随机action,或选择at=maxaQ(St,a;ω);
		执行action at,接收reward rt及新的state St+1;
		将transition样本 (St,at,rt,St+1)存入D中;
		从D中随机抽取一个minibatch的transitions (Sj,aj,rj,Sj+1);
			经验回放,每次存入D的样本都是过去的样本,每次更新不会影响下一个S。
		如果 j+1步是terminal的话,令yj=rj;否则,令 yj=rj+γmaxa′Q(St+1,a′;ω−);
			此时yj为真实值,Q为预测值。
		对(yj−Q(St,aj;ω))2关于ω使用梯度下降法进行更新;
		每隔C steps更新target Q网络,ω−=ω。
	End For;
End For.

案例详解

案例使用莫烦Python中DQN算法案例。
贴一下github地址:https://morvanzhou.github.io/tutorials/
我们的训练目标是下面这样一个迷宫游戏。强化学习之DQN:算法思想、案例及详解,使用DQN算法实现自动走迷宫
从左上角红色方块出发,走到黑色方块视为失败,走到黄色圆圈视为成功。
有关训练的结果也会在最后展示。

代码主要由3个py文件组成。
如图所示,maze_env.py是环境文件,写了一个可以用于强化学习的环境——小迷宫。
RL_brain.py是主要算法的细节实现文件。
run_this.py是主程序。
强化学习之DQN:算法思想、案例及详解,使用DQN算法实现自动走迷宫
下面贴完整代码。有关代码的解释以及运行效果在之后。

"""
this is maze_env.py
Reinforcement learning maze example.

Red rectangle:          explorer.
Black rectangles:       hells       [reward = -1].
Yellow bin circle:      paradise    [reward = +1].
All other states:       ground      [reward = 0].

This script is the environment part of this example.
The RL is in RL_brain.py.

View more on my tutorial page: https://morvanzhou.github.io/tutorials/
"""
import numpy as np
import time
import sys
if sys.version_info.major == 2:
    import Tkinter as tk
else:
    import tkinter as tk

UNIT = 40   # pixels
MAZE_H = 4  # grid height
MAZE_W = 4  # grid width


class Maze(tk.Tk, object):
    def __init__(self):
        super(Maze, self).__init__()
        self.action_space = ['u', 'd', 'l', 'r']
        self.n_actions = len(self.action_space)
        self.n_features = 2
        self.title('maze')
        self.geometry('{0}x{1}'.format(MAZE_H * UNIT, MAZE_H * UNIT))
        self._build_maze()

    def _build_maze(self):
        self.canvas = tk.Canvas(self, bg='white',
                           height=MAZE_H * UNIT,
                           width=MAZE_W * UNIT)

        # create grids
        for c in range(0, MAZE_W * UNIT, UNIT):
            x0, y0, x1, y1 = c, 0, c, MAZE_H * UNIT
            self.canvas.create_line(x0, y0, x1, y1)
        for r in range(0, MAZE_H * UNIT, UNIT):
            x0, y0, x1, y1 = 0, r, MAZE_W * UNIT, r
            self.canvas.create_line(x0, y0, x1, y1)

        # create origin
        origin = np.array([20, 20])

        # hell
        hell1_center = origin + np.array([UNIT * 2, UNIT])
        self.hell1 = self.canvas.create_rectangle(
            hell1_center[0] - 15, hell1_center[1] - 15,
            hell1_center[0] + 15, hell1_center[1] + 15,
            fill='black')
        # hell
        # hell2_center = origin + np.array([UNIT, UNIT * 2])
        # self.hell2 = self.canvas.create_rectangle(
        #     hell2_center[0] - 15, hell2_center[1] - 15,
        #     hell2_center[0] + 15, hell2_center[1] + 15,
        #     fill='black')

        # create oval
        oval_center = origin + UNIT * 2
        self.oval = self.canvas.create_oval(
            oval_center[0] - 15, oval_center[1] - 15,
            oval_center[0] + 15, oval_center[1] + 15,
            fill='yellow')

        # create red rect
        self.rect = self.canvas.create_rectangle(
            origin[0] - 15, origin[1] - 15,
            origin[0] + 15, origin[1] + 15,
            fill='red')

        # pack all
        self.canvas.pack()

    def reset(self):
        self.update()
        time.sleep(0.1)
        self.canvas.delete(self.rect)
        origin = np.array([20, 20])
        self.rect = self.canvas.create_rectangle(
            origin[0] - 15, origin[1] - 15,
            origin[0] + 15, origin[1] + 15,
            fill='red')
        # return observation
        return (np.array(self.canvas.coords(self.rect)[:2]) - np.array(self.canvas.coords(self.oval)[:2]))/(MAZE_H*UNIT)

    def step(self, action):
        s = self.canvas.coords(self.rect)
        base_action = np.array([0, 0])
        if action == 0:   # up
            if s[1] > UNIT:
                base_action[1] -= UNIT
        elif action == 1:   # down
            if s[1] < (MAZE_H - 1) * UNIT:
                base_action[1] += UNIT
        elif action == 2:   # right
            if s[0] < (MAZE_W - 1) * UNIT:
                base_action[0] += UNIT
        elif action == 3:   # left
            if s[0] > UNIT:
                base_action[0] -= UNIT

        self.canvas.move(self.rect, base_action[0], base_action[1])  # move agent

        next_coords = self.canvas.coords(self.rect)  # next state

        # reward function
        if next_coords == self.canvas.coords(self.oval):
            reward = 1
            done = True
        elif next_coords in [self.canvas.coords(self.hell1)]:
            reward = -1
            done = True
        else:
            reward = 0
            done = False
        s_ = (np.array(next_coords[:2]) - np.array(self.canvas.coords(self.oval)[:2]))/(MAZE_H*UNIT)
        return s_, reward, done

    def render(self):
        # time.sleep(0.01)
        self.update()



"""
this is RL_brain.py
This part of code is the DQN brain, which is a brain of the agent.
All decisions are made in here.
Using Tensorflow to build the neural network.

View more on my tutorial page: https://morvanzhou.github.io/tutorials/

Using:
Tensorflow: 1.0
gym: 0.7.3
"""

import numpy as np
import pandas as pd
# import tensorflow as tf
# 为了解决版本问题,采取了以下方式导入tensorflow 1.x 版本。
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()

np.random.seed(1)
tf.set_random_seed(1)


# Deep Q Network off-policy
class DeepQNetwork:
    def __init__(
            self,
            n_actions,
            n_features,
            learning_rate=0.01,
            reward_decay=0.9, # R折减率
            e_greedy=0.9,
            replace_target_iter=300, # 300次迭代更新一次Q_target
            memory_size=500,
            batch_size=32,
            e_greedy_increment=None,
            output_graph=False,
    ):
        self.n_actions = n_actions
        self.n_features = n_features
        self.lr = learning_rate
        self.gamma = reward_decay # γ
        self.epsilon_max = e_greedy
        self.replace_target_iter = replace_target_iter
        self.memory_size = memory_size
        self.batch_size = batch_size
        self.epsilon_increment = e_greedy_increment
        self.epsilon = 0 if e_greedy_increment is not None else self.epsilon_max

        # total learning step
        self.learn_step_counter = 0

        # initialize zero memory [s, a, r, s_]
        self.memory = np.zeros((self.memory_size, n_features * 2 + 2))

        # consist of [target_net, evaluate_net]
        self._build_net()
        t_params = tf.get_collection('target_net_params')
        e_params = tf.get_collection('eval_net_params')
        self.replace_target_op = [tf.assign(t, e) for t, e in zip(t_params, e_params)]

        self.sess = tf.Session()

        if output_graph:
            # $ tensorboard --logdir=logs
            # tf.train.SummaryWriter soon be deprecated, use following
            tf.summary.FileWriter("logs/", self.sess.graph)

        self.sess.run(tf.global_variables_initializer())
        self.cost_his = []

    def _build_net(self):
        # ------------------ build evaluate_net ------------------
        self.s = tf.placeholder(tf.float32, [None, self.n_features], name='s')  # input
        self.q_target = tf.placeholder(tf.float32, [None, self.n_actions], name='Q_target')  # for calculating loss
        with tf.variable_scope('eval_net'):
            # c_names(collections_names) are the collections to store variables
            c_names, n_l1, w_initializer, b_initializer = \
                ['eval_net_params', tf.GraphKeys.GLOBAL_VARIABLES], 10, \
                tf.random_normal_initializer(0., 0.3), tf.constant_initializer(0.1)  # config of layers

            # first layer. collections is used later when assign to target net
            # tf中矩阵加法, nxm + 1xm , 直接在nxm的矩阵每行加上1xm矩阵中的对应元素。
            # relu函数:x = 0 if x<0 else x=x
            with tf.variable_scope('l1'):
                w1 = tf.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer, collections=c_names)
                b1 = tf.get_variable('b1', [1, n_l1], initializer=b_initializer, collections=c_names)
                l1 = tf.nn.relu(tf.matmul(self.s, w1) + b1)

            # second layer. collections is used later when assign to target net
            with tf.variable_scope('l2'):
                w2 = tf.get_variable('w2', [n_l1, self.n_actions], initializer=w_initializer, collections=c_names)
                b2 = tf.get_variable('b2', [1, self.n_actions], initializer=b_initializer, collections=c_names)
                self.q_eval = tf.matmul(l1, w2) + b2

        with tf.variable_scope('loss'):
            self.loss = tf.reduce_mean(tf.squared_difference(self.q_target, self.q_eval))
        with tf.variable_scope('train'):
            self._train_op = tf.train.RMSPropOptimizer(self.lr).minimize(self.loss)

        # ------------------ build target_net ------------------
        self.s_ = tf.placeholder(tf.float32, [None, self.n_features], name='s_')    # input
        with tf.variable_scope('target_net'):
            # c_names(collections_names) are the collections to store variables
            c_names = ['target_net_params', tf.GraphKeys.GLOBAL_VARIABLES]

            # first layer. collections is used later when assign to target net
            with tf.variable_scope('l1'):
                w1 = tf.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer, collections=c_names)
                b1 = tf.get_variable('b1', [1, n_l1], initializer=b_initializer, collections=c_names)
                l1 = tf.nn.relu(tf.matmul(self.s_, w1) + b1)

            # second layer. collections is used later when assign to target net
            with tf.variable_scope('l2'):
                w2 = tf.get_variable('w2', [n_l1, self.n_actions], initializer=w_initializer, collections=c_names)
                b2 = tf.get_variable('b2', [1, self.n_actions], initializer=b_initializer, collections=c_names)
                self.q_next = tf.matmul(l1, w2) + b2

    def store_transition(self, s, a, r, s_):
        if not hasattr(self, 'memory_counter'):
            self.memory_counter = 0

        transition = np.hstack((s, [a, r], s_))

        # replace the old memory with new memory
        index = self.memory_counter % self.memory_size
        self.memory[index, :] = transition

        self.memory_counter += 1

    def choose_action(self, observation):
        # to have batch dimension when feed into tf placeholder
        observation = observation[np.newaxis, :]

        if np.random.uniform() < self.epsilon:
            # forward feed the observation and get q value for every actions
            actions_value = self.sess.run(self.q_eval, feed_dict={self.s: observation})
            action = np.argmax(actions_value)
        else:
            action = np.random.randint(0, self.n_actions)
        return action

    def learn(self):
        # check to replace target parameters
        if self.learn_step_counter % self.replace_target_iter == 0:
            self.sess.run(self.replace_target_op)
            print('\ntarget_params_replaced\n')

        # sample batch memory from all memory
        if self.memory_counter > self.memory_size:
            sample_index = np.random.choice(self.memory_size, size=self.batch_size)
        else:
            sample_index = np.random.choice(self.memory_counter, size=self.batch_size)
        batch_memory = self.memory[sample_index, :]

        q_next, q_eval = self.sess.run(
            [self.q_next, self.q_eval],
            feed_dict={
                self.s_: batch_memory[:, -self.n_features:],  # fixed params
                self.s: batch_memory[:, :self.n_features],  # newest params
            })

        # change q_target w.r.t q_eval's action
        q_target = q_eval.copy()

        batch_index = np.arange(self.batch_size, dtype=np.int32)
        eval_act_index = batch_memory[:, self.n_features].astype(int)
        reward = batch_memory[:, self.n_features + 1]

        q_target[batch_index, eval_act_index] = reward + self.gamma * np.max(q_next, axis=1)

        """
        For example in this batch I have 2 samples and 3 actions:
        q_eval =
        [[1, 2, 3],
         [4, 5, 6]]

        q_target = q_eval =
        [[1, 2, 3],
         [4, 5, 6]]

        Then change q_target with the real q_target value w.r.t the q_eval's action.
        For example in:
            sample 0, I took action 0, and the max q_target value is -1;
            sample 1, I took action 2, and the max q_target value is -2:
        q_target =
        [[-1, 2, 3],
         [4, 5, -2]]

        So the (q_target - q_eval) becomes:
        [[(-1)-(1), 0, 0],
         [0, 0, (-2)-(6)]]

        We then backpropagate this error w.r.t the corresponding action to network,
        leave other action as error=0 cause we didn't choose it.
        """

        # train eval network
        _, self.cost = self.sess.run([self._train_op, self.loss],
                                     feed_dict={self.s: batch_memory[:, :self.n_features],
                                                self.q_target: q_target})
        self.cost_his.append(self.cost)

        # increasing epsilon
        self.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_max
        self.learn_step_counter += 1

    def plot_cost(self):
        import matplotlib.pyplot as plt
        plt.plot(np.arange(len(self.cost_his)), self.cost_his)
        plt.ylabel('Cost')
        plt.xlabel('training steps')
        plt.show()




'''
this is run_this.py
'''
from maze_env import Maze
from RL_brain import DeepQNetwork

def run_maze():
    step = 0
    for episode in range(300):
        # initial observation
        observation = env.reset()
        count_step = 0
        while True:
            # fresh env 刷新环境
            env.render()
            count_step = count_step+1


            # RL choose action based on observation
            action = RL.choose_action(observation)

            # RL take action and get next observation and reward
            observation_, reward, done = env.step(action)
            # 目前done只能显示游戏结束,如果对done值形式改变一下可以反应游戏胜利/失败。

            RL.store_transition(observation, action, reward, observation_)

            if (step > 200) and (step % 5 == 0):
                # 前面步骤Q网络没更新好,此时学习不具意义。在运行一段时间后,可以进行神经网络的更新。
                RL.learn()

            # swap observation
            observation = observation_

            # break while loop when end of this episode
            if done:
                print("本次所尝试的步数:",count_step)
                break
            step += 1

    # end of game
    print('game over')
    env.destroy()


if __name__ == "__main__":
    # tensorboard --logdir=logs 网页查看结构
    # maze game
    env = Maze()
    RL = DeepQNetwork(env.n_actions, env.n_features,
                      learning_rate=0.01,
                      reward_decay=0.9,
                      e_greedy=0.99,
                      replace_target_iter=200,
                      memory_size=2000,
                      output_graph=True,
                      e_greedy_increment=0.01
                      )
    env.after(100, run_maze)
    env.mainloop()
    RL.plot_cost()

maze_env.py:
是迷宫环境文件。主要是有关迷宫设置、显示的代码。定义了黄色圆圈(成功)的reward = 1,红色方块(失败)的reward = -1。(可以试着调整奖励与惩罚,让模型达到更好地训练效果)
有关环境文件的就不多说了。

RL_brain.py:
主要代码实现细节。是重中之重。

_build_net:  ——>用于建立主要网络,即一个Q_eval网络和Q_target网络。
其中Q_target网络与Q_eval网络具有相同结构。
--------------------------------evaluate_net------------------------------------------------
输入数据:s -> n rows;2 columns。 
q_target用以计算损失:q_target -> n rows;4 colummns。
评估函数:eval_net ->
layer1: s [nx2] × w1[2x10] + b1[1x10] = l1[nx10]
通过relu函数激活。
layer2: l1[nx10] × w2[10x4] + b2[1x4] = l2[nx4] -> q_eval
loss函数:loss -> q_target 与 q_eval 的MSE(均方误差)。
train函数:使用RMSProp优化器(类似SGD的一种方法),设定学习率最小化误差函数来实现训练。
--------------------------------target_net------------------------------------------------
网络结构同evaluate_net,同样layer1 and layer2,target网络只是用来隔段时间copy一下evaluate网络的。
choose_action:
根据状态s选择一个动作a。
以概率epsilon 选择此时价值最高的动作,剩下的概率选择随机动作。
store_transition:
用来存储transition,属于经验回放。
对还未经过本轮train的[state,action,reward,state_next] 进行存储。
设定中可存储500个该种数据[500×6](6列数据中,s:a:r:s_=2:1:1:2)
到来的最新的数据会覆盖掉最旧的数据。(始终有500个数据用于经验回放)
learn:——>单步训练的方法。
训练数据。
首先要判断每隔self.replace_target_iter个世代,将target_net中的网络参
数换成eval_net的参数,然后进行正常的训练步骤。tf.assign(t, e)即是将e
的参数传给t。

在经验回放的数据集memory中,随机地抽取一个数据batch_memory([s,a,r,s_])。
得到q_next(input为后两列)和q_eval(input为前两列)。
q_next/eval矩阵规模为n×4,分别用来计算s_的Q值和s的Q值。
注:Q值为s状态下选择a动作的值,V值为处于s状态下的值。

使用Q-Learning中的Q值更新方法来更新Q现实并计算损失。

可以选择根据情况可以选择持续增加的epsilon值来控制减少探索的比例。
(在e_greedy_increment设置为一个数时,会使epsilon从0递加这个数,
最终到设定的epsilon_max)

run_this.py:
运行主程序的地方。运行之后走迷宫会自动进行,并输出cost值。
强化学习之DQN:算法思想、案例及详解,使用DQN算法实现自动走迷宫
在经历300个世代的训练后,每走一步会计算一次loss损失。
这里可能大家会有疑问:损失一直很小,而且看起来像是提升了一样是什么意思,为了最小化误差,不是应该loss值图像慢慢变小的吗?
损失值代表的是Q_target计算的值与Q_eval计算的值间的差别造成的损失,这两者从一开始而言,都是一起缓慢更新成型的,每隔一定时间Q_target参数会更新成与Q_eval一样,所以总体并不存在什么很大的误差。

在网络还没有完全训练好的时候,结束游戏(成功或失败)所走的步数基本是这样的:
强化学习之DQN:算法思想、案例及详解,使用DQN算法实现自动走迷宫

在设置e = 0.90:尝试步数大多为3~8不等,因为就算训练好了网络,每走一步总有0.1概率用来探索。
走3步的情况是由于探索而失败的情况。
强化学习之DQN:算法思想、案例及详解,使用DQN算法实现自动走迷宫

在设置e_increment = 0.01 , e_max = 0.99后(递增的e-greedy)最后走迷宫所尝试步数基本都为4(最快的方式)。因为基本上都是选择Q值最大的方式走。
强化学习之DQN:算法思想、案例及详解,使用DQN算法实现自动走迷宫

本文地址:https://blog.csdn.net/weixin_44371912/article/details/109250263