5.3.2. 创建强化学习环境#

为了成功实施强化学习,我们需要定义强化学习的另一个重要模块:环境(Environment)。强化学习的环境可以是一个网格,其中每个状态对应于二维网格上的一个图块,智能体可以采取的唯一动作是在网格向上、向下、向左或向右移动。智能体的目标是找到以最直接的方式通往目标方块的方法。

假设我们有一个 10 × 10 的网格,钢筋起始位置在左方,钢筋目标位置在右方。我们可以采用以下代码实现上述步骤。

../../_images/5-10.png
图 5-10 多智能体强化学习环境


首先让我们导入需要的库。

# 导入需要的库
import numpy as np
import time
import sys
import pandas as pd

if sys.version_info.major == 2:
    import Tkinter as tk
else:
    import tkinter as tk

下面这段代码定义了一个名为迷宫(Maze)的类,它继承了tkinter库中的tk类,并重写了其中的一些方法。在类的初始化函数__init__方法中,初始化了一些基本设置,如窗口大小、标题等,包括每个格子的像素数(UNIT),迷宫的高度(MAZE_H)和宽度(MAZE_W)。_ build_maze方法用来创建迷宫的布局,并在其中创建了迷宫的网格,包括智能体的起点(create_rectangle)、终点(create_oval)和障碍物(create_rectangle),然后使用pack方法将其全部打包在一起。

# 定义每个方格的像素点
UNIT = 20
# 定义迷宫高度
MAZE_H = 10
# 定义迷宫宽度
MAZE_W = 10


# 继承tkinter库中的tk类,创建迷宫类
class Maze(tk.Tk, object):
    def __init__(self):
        super(Maze, self).__init__()
        # 定义智能体动作:上、下、右
        self.action_space = ['u', 'd', 'r']
        self.n_actions = len(self.action_space)
        self.title('Rebar')
        # 定义环境的尺寸
        self.geometry('{0}x{1}'.format(MAZE_W * UNIT, MAZE_H * UNIT))
        self._build_maze()

    def _build_maze(self):
        # 定义环境的高度和宽度
        self.canvas = tk.Canvas(self, bg='white',
                                height=MAZE_H * UNIT,
                                width=MAZE_W * UNIT)

        # 创建环境网格
        for c in range(0, MAZE_W * UNIT, UNIT):
            x0, y0, x1, y1 = c, 0, c, MAZE_H * UNIT
            self.canvas.create_line(x0, y0, x1, y1)
        for r in range(0, MAZE_H * UNIT, UNIT):
            x0, y0, x1, y1 = 0, r, MAZE_W * UNIT, r
            self.canvas.create_line(x0, y0, x1, y1)

        # 创建环境起点
        origin = np.array([0, 0])

        # 创建环境中的障碍物1
        hell1_center = np.array([60, 140]) + 10
        self.hell1 = self.canvas.create_rectangle(
            hell1_center[0] - 10, hell1_center[1] - 10,
            hell1_center[0] + 10, hell1_center[1] + 10,
            fill='black')
        # 创建环境中的障碍物2
        hell2_center = np.array([60, 40]) + 10
        self.hell2 = self.canvas.create_rectangle(
            hell2_center[0] - 10, hell2_center[1] - 10,
            hell2_center[0] + 10, hell2_center[1] + 10,
            fill='black')
        # 创建环境中的障碍物3
        hell3_center = np.array([120, 40]) + 10
        self.hell3 = self.canvas.create_rectangle(
            hell3_center[0] - 10, hell3_center[1] - 10,
            hell3_center[0] + 10, hell3_center[1] + 10,
            fill='black')

        # 创建环境中的目标点1
        oval_center1 = np.array([180, 40]) + 10
        self.oval1 = self.canvas.create_oval(
            oval_center1[0] - 10, oval_center1[1] - 10,
            oval_center1[0] + 10, oval_center1[1] + 10,
            fill='yellow')

        # 创建环境中的目标点2
        oval_center2 = np.array([180, 40]) + 10
        self.oval2 = self.canvas.create_oval(
            oval_center2[0] - 10, oval_center2[1] - 10 + 40,
            oval_center2[0] + 10, oval_center2[1] + 10 + 40,
            fill='yellow')

        # 创建环境中的目标点3
        oval_center3 = np.array([180, 120]) + 10
        self.oval3 = self.canvas.create_oval(
            oval_center3[0] - 10, oval_center3[1] - 10 + 20,
            oval_center3[0] + 10, oval_center3[1] + 10 + 20,
            fill='yellow')

        # 创建环境中智能体的起点1
        self.rect1 = self.canvas.create_rectangle(
            origin[0] + 10 - 10, origin[1] + 30 - 10,
            origin[0] + 10 + 10, origin[1] + 30 + 10,
            fill='red')

        # 创建环境中智能体的起点2
        self.rect2 = self.canvas.create_rectangle(
            origin[0] + 10 - 10, origin[1] + 90 - 10,
            origin[0] + 10 + 10, origin[1] + 90 + 10,
            fill='red')

        # 创建环境中智能体的起点3
        self.rect3 = self.canvas.create_rectangle(
            origin[0] + 10 - 10, origin[1] + 170 - 10,
            origin[0] + 10 + 10, origin[1] + 170 + 10,
            fill='red')

        # pack()函数在水平和垂直框中排列所创建的网格和构件
        self.canvas.pack()

        # 创建智能体轨迹记录器
        self.track1 = np.array([])
        self.sumtrack1 = pd.DataFrame([])
        self.track2 = np.array([])
        self.sumtrack2 = pd.DataFrame([])
        self.track3 = np.array([])
        self.sumtrack3 = pd.DataFrame([])

    # 创建智能体1在环境中的重置函数
    def reset1(self, episode, n):
        self.update()
        time.sleep(0)
        # 使用"self.canvas.delete(self.rect1)"方法删除之前创建的矩形,使用"create_rectangle"方法创建一个新矩形,并将其填充为红色。
        self.canvas.delete(self.rect1)
        origin = np.array([0, 0])
        self.rect1 = self.canvas.create_rectangle(
            origin[0] + 10 - 10, origin[1] + 30 - 10,
            origin[0] + 10 + 10, origin[1] + 30 + 10,
            fill='red')
        # 检查episode编号是否大于或等于n-10,self.track1的大小不等于零。如果两个条件都成立,代码进入一个for循环,在self.track1数组的每4个元素处创建填充白色的矩形。
        if episode >= (n - 10):
            if self.track1.size != 0:
                for i in range(self.track1.size):
                    if i % 4 == 0:
                        self.trackrec1 = self.canvas.create_rectangle(
                            self.track1[i] - 10 + 10, self.track1[i + 1] - 10 + 10,
                            self.track1[i] + 10 + 10, self.track1[i + 1] + 10 + 10,
                            fill='white')

        # 使用self.track1数组创建一个DataFrame,将智能体的轨迹存入Dataframe中,并保存在"out1.csv"的CSV文件中,并重置self.track1数组
        track1 = pd.DataFrame(self.track1)
        self.sumtrack1 = pd.concat([track1, self.sumtrack1], axis=1, ignore_index=True)
        self.sumtrack1.to_csv('out1.csv')
        self.track1 = np.array([])
        # return observation
        return self.canvas.coords(self.rect1)

    # 创建智能体2的重置函数,与智能体1的重置函数类似,这里不赘述
    def reset2(self, episode, n):
        return self.canvas.coords(self.rect2)

    # 创建智能体3的重置函数,与智能体1的重置函数类似,这里不赘述
    def reset3(self, episode, n):
        return self.canvas.coords(self.rect3)

    # 创建智能体1动作函数
    def step1(self, action, episode, n):
        # 定义了三个变量s1, s2, s3分别存储三个智能体的坐标
        s1 = self.canvas.coords(self.rect1)
        s2 = self.canvas.coords(self.rect2)
        s3 = self.canvas.coords(self.rect3)

        base_action1 = np.array([0, 0])
        if s1 == self.canvas.coords(self.oval1):
            s1_ = 'terminal'
            reward1 = 0
            done1 = True
            return s1_, reward1, done1
        else:

            # 智能体向上移动
            if action == 0:  # up
                if s1[1] > UNIT:
                    base_action1[1] -= UNIT
                    if episode >= n - 10:
                        # print(s1)
                        self.track1 = np.append(self.track1, s1)

            # 智能体向下移动
            elif action == 1:  # down
                if s1[1] < (MAZE_H - 1) * UNIT:
                    base_action1[1] += UNIT
                    if episode >= n - 10:
                        self.track1 = np.append(self.track1, s1)

            # 智能体向右移动
            elif action == 2:  # right
                if s1[0] < (MAZE_W - 1) * UNIT:
                    base_action1[0] += UNIT
                    if episode >= n - 10:
                        self.track1 = np.append(self.track1, s1)

            if episode >= n - 10:
                self.trackrec1 = self.canvas.create_rectangle(
                    s1[0] - 10 + 10, s1[1] - 10 + 10,
                    s1[0] + 10 + 10, s1[1] + 10 + 10,
                    fill='green')
            self.canvas.move(self.rect1, base_action1[0], base_action1[1])  # move agent

            s1_ = self.canvas.coords(self.rect1)  # next state

            # 如果移动后智能体到达终点,那么给予reward1=1, done1=True, s1_='terminal'
            if s1_ == self.canvas.coords(self.oval1):
                reward1 = 1
                done1 = True
                s1_ = 'terminal'

            # 如果移动后智能体碰到障碍物,那么给予reward1=-1, done1=True, s1_='terminal'
            elif s1_ in [self.canvas.coords(self.hell1),
                         self.canvas.coords(self.hell2),
                         self.canvas.coords(self.hell3),
                         self.canvas.coords(self.oval2),
                         self.canvas.coords(self.oval3),
                         self.canvas.coords(self.rect2),
                         self.canvas.coords(self.rect3)]:
                reward1 = -1
                done1 = True
                s1_ = 'terminal'

            # 如果移动后智能体之间距离小于30,那么给予reward1=-1, done1=True, s1_='terminal'
            elif abs(s1_[1] - s2[1]) <= 30 or abs(s1_[1] - s3[1]) <= 30:
                # print(s1_[1])
                # print(s2[1])
                reward1 = -1
                done1 = True
                s1_ = 'terminal'

            # 如果移动后智能体采取弯折的动作,那么给予reward1=-0.3, done1=False
            elif action == 0:
                reward1 = -0.3
                done1 = False
            elif action == 1:
                reward1 = -0.3
                done1 = False
            else:
                reward1 = 0
                done1 = False
            # 最后返回智能体的下一个状态s1_, reward1, done1。
            return s1_, reward1, done1

    # 创建智能体2动作函数,与智能体1动作函数类似,这里不赘述
    def step2(self, action, episode, n):
        return s2_, reward2, done2

    # 创建智能体3动作函数,与智能体1动作函数类似,这里不赘述
    def step3(self, action, episode, n):
        return s3_, reward3, done3

    # 创建环境渲染函数
    def render(self):
        time.sleep(0.1)
        self.update()


# 创建环境更新函数
def update():
    for t in range(10):
        s1 = env.reset1()
        s2 = env.reset2()
        s3 = env.reset3()
        while True:
            env.render()
            a = 1
            s1, r1, done1 = env.step1(a)
            s2, r2, done2 = env.step2(a)
            s3, r3, done3 = env.step3(a)
            if done1 and done2 and done3:
                break