5.3.2. 创建强化学习环境#
为了成功实施强化学习,我们需要定义强化学习的另一个重要模块:环境(Environment)。强化学习的环境可以是一个网格,其中每个状态对应于二维网格上的一个图块,智能体可以采取的唯一动作是在网格向上、向下、向左或向右移动。智能体的目标是找到以最直接的方式通往目标方块的方法。
假设我们有一个 10 × 10 的网格,钢筋起始位置在左方,钢筋目标位置在右方。我们可以采用以下代码实现上述步骤。
图 5-10 多智能体强化学习环境
首先让我们导入需要的库。
# 导入需要的库
import numpy as np
import time
import sys
import pandas as pd
if sys.version_info.major == 2:
import Tkinter as tk
else:
import tkinter as tk
下面这段代码定义了一个名为迷宫(Maze)的类,它继承了tkinter库中的tk类,并重写了其中的一些方法。在类的初始化函数__init__方法中,初始化了一些基本设置,如窗口大小、标题等,包括每个格子的像素数(UNIT),迷宫的高度(MAZE_H)和宽度(MAZE_W)。_ build_maze方法用来创建迷宫的布局,并在其中创建了迷宫的网格,包括智能体的起点(create_rectangle)、终点(create_oval)和障碍物(create_rectangle),然后使用pack方法将其全部打包在一起。
# 定义每个方格的像素点
UNIT = 20
# 定义迷宫高度
MAZE_H = 10
# 定义迷宫宽度
MAZE_W = 10
# 继承tkinter库中的tk类,创建迷宫类
class Maze(tk.Tk, object):
def __init__(self):
super(Maze, self).__init__()
# 定义智能体动作:上、下、右
self.action_space = ['u', 'd', 'r']
self.n_actions = len(self.action_space)
self.title('Rebar')
# 定义环境的尺寸
self.geometry('{0}x{1}'.format(MAZE_W * UNIT, MAZE_H * UNIT))
self._build_maze()
def _build_maze(self):
# 定义环境的高度和宽度
self.canvas = tk.Canvas(self, bg='white',
height=MAZE_H * UNIT,
width=MAZE_W * UNIT)
# 创建环境网格
for c in range(0, MAZE_W * UNIT, UNIT):
x0, y0, x1, y1 = c, 0, c, MAZE_H * UNIT
self.canvas.create_line(x0, y0, x1, y1)
for r in range(0, MAZE_H * UNIT, UNIT):
x0, y0, x1, y1 = 0, r, MAZE_W * UNIT, r
self.canvas.create_line(x0, y0, x1, y1)
# 创建环境起点
origin = np.array([0, 0])
# 创建环境中的障碍物1
hell1_center = np.array([60, 140]) + 10
self.hell1 = self.canvas.create_rectangle(
hell1_center[0] - 10, hell1_center[1] - 10,
hell1_center[0] + 10, hell1_center[1] + 10,
fill='black')
# 创建环境中的障碍物2
hell2_center = np.array([60, 40]) + 10
self.hell2 = self.canvas.create_rectangle(
hell2_center[0] - 10, hell2_center[1] - 10,
hell2_center[0] + 10, hell2_center[1] + 10,
fill='black')
# 创建环境中的障碍物3
hell3_center = np.array([120, 40]) + 10
self.hell3 = self.canvas.create_rectangle(
hell3_center[0] - 10, hell3_center[1] - 10,
hell3_center[0] + 10, hell3_center[1] + 10,
fill='black')
# 创建环境中的目标点1
oval_center1 = np.array([180, 40]) + 10
self.oval1 = self.canvas.create_oval(
oval_center1[0] - 10, oval_center1[1] - 10,
oval_center1[0] + 10, oval_center1[1] + 10,
fill='yellow')
# 创建环境中的目标点2
oval_center2 = np.array([180, 40]) + 10
self.oval2 = self.canvas.create_oval(
oval_center2[0] - 10, oval_center2[1] - 10 + 40,
oval_center2[0] + 10, oval_center2[1] + 10 + 40,
fill='yellow')
# 创建环境中的目标点3
oval_center3 = np.array([180, 120]) + 10
self.oval3 = self.canvas.create_oval(
oval_center3[0] - 10, oval_center3[1] - 10 + 20,
oval_center3[0] + 10, oval_center3[1] + 10 + 20,
fill='yellow')
# 创建环境中智能体的起点1
self.rect1 = self.canvas.create_rectangle(
origin[0] + 10 - 10, origin[1] + 30 - 10,
origin[0] + 10 + 10, origin[1] + 30 + 10,
fill='red')
# 创建环境中智能体的起点2
self.rect2 = self.canvas.create_rectangle(
origin[0] + 10 - 10, origin[1] + 90 - 10,
origin[0] + 10 + 10, origin[1] + 90 + 10,
fill='red')
# 创建环境中智能体的起点3
self.rect3 = self.canvas.create_rectangle(
origin[0] + 10 - 10, origin[1] + 170 - 10,
origin[0] + 10 + 10, origin[1] + 170 + 10,
fill='red')
# pack()函数在水平和垂直框中排列所创建的网格和构件
self.canvas.pack()
# 创建智能体轨迹记录器
self.track1 = np.array([])
self.sumtrack1 = pd.DataFrame([])
self.track2 = np.array([])
self.sumtrack2 = pd.DataFrame([])
self.track3 = np.array([])
self.sumtrack3 = pd.DataFrame([])
# 创建智能体1在环境中的重置函数
def reset1(self, episode, n):
self.update()
time.sleep(0)
# 使用"self.canvas.delete(self.rect1)"方法删除之前创建的矩形,使用"create_rectangle"方法创建一个新矩形,并将其填充为红色。
self.canvas.delete(self.rect1)
origin = np.array([0, 0])
self.rect1 = self.canvas.create_rectangle(
origin[0] + 10 - 10, origin[1] + 30 - 10,
origin[0] + 10 + 10, origin[1] + 30 + 10,
fill='red')
# 检查episode编号是否大于或等于n-10,self.track1的大小不等于零。如果两个条件都成立,代码进入一个for循环,在self.track1数组的每4个元素处创建填充白色的矩形。
if episode >= (n - 10):
if self.track1.size != 0:
for i in range(self.track1.size):
if i % 4 == 0:
self.trackrec1 = self.canvas.create_rectangle(
self.track1[i] - 10 + 10, self.track1[i + 1] - 10 + 10,
self.track1[i] + 10 + 10, self.track1[i + 1] + 10 + 10,
fill='white')
# 使用self.track1数组创建一个DataFrame,将智能体的轨迹存入Dataframe中,并保存在"out1.csv"的CSV文件中,并重置self.track1数组
track1 = pd.DataFrame(self.track1)
self.sumtrack1 = pd.concat([track1, self.sumtrack1], axis=1, ignore_index=True)
self.sumtrack1.to_csv('out1.csv')
self.track1 = np.array([])
# return observation
return self.canvas.coords(self.rect1)
# 创建智能体2的重置函数,与智能体1的重置函数类似,这里不赘述
def reset2(self, episode, n):
return self.canvas.coords(self.rect2)
# 创建智能体3的重置函数,与智能体1的重置函数类似,这里不赘述
def reset3(self, episode, n):
return self.canvas.coords(self.rect3)
# 创建智能体1动作函数
def step1(self, action, episode, n):
# 定义了三个变量s1, s2, s3分别存储三个智能体的坐标
s1 = self.canvas.coords(self.rect1)
s2 = self.canvas.coords(self.rect2)
s3 = self.canvas.coords(self.rect3)
base_action1 = np.array([0, 0])
if s1 == self.canvas.coords(self.oval1):
s1_ = 'terminal'
reward1 = 0
done1 = True
return s1_, reward1, done1
else:
# 智能体向上移动
if action == 0: # up
if s1[1] > UNIT:
base_action1[1] -= UNIT
if episode >= n - 10:
# print(s1)
self.track1 = np.append(self.track1, s1)
# 智能体向下移动
elif action == 1: # down
if s1[1] < (MAZE_H - 1) * UNIT:
base_action1[1] += UNIT
if episode >= n - 10:
self.track1 = np.append(self.track1, s1)
# 智能体向右移动
elif action == 2: # right
if s1[0] < (MAZE_W - 1) * UNIT:
base_action1[0] += UNIT
if episode >= n - 10:
self.track1 = np.append(self.track1, s1)
if episode >= n - 10:
self.trackrec1 = self.canvas.create_rectangle(
s1[0] - 10 + 10, s1[1] - 10 + 10,
s1[0] + 10 + 10, s1[1] + 10 + 10,
fill='green')
self.canvas.move(self.rect1, base_action1[0], base_action1[1]) # move agent
s1_ = self.canvas.coords(self.rect1) # next state
# 如果移动后智能体到达终点,那么给予reward1=1, done1=True, s1_='terminal'
if s1_ == self.canvas.coords(self.oval1):
reward1 = 1
done1 = True
s1_ = 'terminal'
# 如果移动后智能体碰到障碍物,那么给予reward1=-1, done1=True, s1_='terminal'
elif s1_ in [self.canvas.coords(self.hell1),
self.canvas.coords(self.hell2),
self.canvas.coords(self.hell3),
self.canvas.coords(self.oval2),
self.canvas.coords(self.oval3),
self.canvas.coords(self.rect2),
self.canvas.coords(self.rect3)]:
reward1 = -1
done1 = True
s1_ = 'terminal'
# 如果移动后智能体之间距离小于30,那么给予reward1=-1, done1=True, s1_='terminal'
elif abs(s1_[1] - s2[1]) <= 30 or abs(s1_[1] - s3[1]) <= 30:
# print(s1_[1])
# print(s2[1])
reward1 = -1
done1 = True
s1_ = 'terminal'
# 如果移动后智能体采取弯折的动作,那么给予reward1=-0.3, done1=False
elif action == 0:
reward1 = -0.3
done1 = False
elif action == 1:
reward1 = -0.3
done1 = False
else:
reward1 = 0
done1 = False
# 最后返回智能体的下一个状态s1_, reward1, done1。
return s1_, reward1, done1
# 创建智能体2动作函数,与智能体1动作函数类似,这里不赘述
def step2(self, action, episode, n):
return s2_, reward2, done2
# 创建智能体3动作函数,与智能体1动作函数类似,这里不赘述
def step3(self, action, episode, n):
return s3_, reward3, done3
# 创建环境渲染函数
def render(self):
time.sleep(0.1)
self.update()
# 创建环境更新函数
def update():
for t in range(10):
s1 = env.reset1()
s2 = env.reset2()
s3 = env.reset3()
while True:
env.render()
a = 1
s1, r1, done1 = env.step1(a)
s2, r2, done2 = env.step2(a)
s3, r3, done3 = env.step3(a)
if done1 and done2 and done3:
break