常见强化学习算法的实现

广泛地讲,强化学习是机器通过与环境交互来实现目标的一种计算方法。

机器和环境的一轮交互是指,机器在环境的一个状态下做一个动作决策,把这个动作作用到环境当中,这个环境发生相应的改变并且将相应的奖励反馈和下一轮状态传回机器。

这种交互是迭代进行的,机器的目标是最大化在多轮交互过程中获得的累积奖励的期望。

强化学习用智能体(agent)这个概念来表示做决策的机器。相比于有监督学习中的“模型”,强化学习中的“智能体”强调机器不但可以感知周围的环境信息,还可以通过做决策来直接改变这个环境,而不只是给出一些预测信号。

N-Step-Sarsa

多步 Sarsa 模型: 适用于有限状态、有限动作、随机性不强的场景,通过预测对应状态在对应行为下的价值来进行动作选择;为在线决策模型。

状态更新函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class N_Step_Sarsa:
def __init__(self, n_step, epsilon, alpha, gamma, n_state, n_action):
self.Q_table = np.zeros([n_state, n_action])
self.n_action = n_action
self.alpha = alpha
self.gamma = gamma
self.epsilon = epsilon
self.n_step = n_step
self.state_list = []
self.action_list = []
self.reward_list = []
return

def sample(self, state, pre=False):
if np.random.random() < self.epsilon and not pre:
action = np.random.randint(self.n_action)
else:
action = np.argmax(self.Q_table[state])
return action

def update(self, s, a, r, s_t, a_t, done):
self.state_list.append(s)
self.action_list.append(a)
self.reward_list.append(r)
if len(self.state_list) == self.n_step:
G = self.Q_table[s_t, a_t]
for i in reversed(range(self.n_step)):
G = self.gamma * G + self.reward_list[i]
if done and i > 0:
s = self.state_list[i]
a = self.action_list[i]
self.Q_table[s, a] += self.alpha * (G - self.Q_table[s, a])
s = self.state_list.pop(0)
a = self.action_list.pop(0)
self.reward_list.pop(0)
self.Q_table[s, a] += self.alpha * (G - self.Q_table[s, a])
if done:
self.state_list = []
self.action_list = []
self.reward_list = []
return

Q-learning

Q-learning 模型: 适用于有限状态、有限动作、随机性不强的场景,通过预测对应状态在对应行为下的价值来进行动作选择;为离线决策模型。

状态更新函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class QLearning:
def __init__(self, epsilon, alpha, gamma, n_state, n_action):
self.Q_table = np.zeros([n_state, n_action])
self.n_action = n_action
self.alpha = alpha
self.gamma = gamma
self.epsilon = epsilon
return

def sample(self, state, pre=False):
if np.random.random() < self.epsilon and not pre:
action = np.random.randint(self.n_action)
else:
action = np.argmax(self.Q_table[state])
return action

def update(self, s, a, r, s_t):
target_Q = r + self.gamma * self.Q_table[s_t].max() - self.Q_table[s, a]
self.Q_table[s, a] += self.alpha * target_Q
return

DQN

DQN 模型: 适用于无限状态、有限动作、随机性不强的场景,通过预测对应状态在对应行为下的价值来进行动作选择;为离线决策模型。

状态更新函数:

即使接近。

即要最小化

考虑到随着模型的训练而摆动、并且单独采样收集到的训练数据太少, DQN 采用使用了经验回放目标模型两个技巧来帮助模型加速拟合。

  • 经验回放:维护一个回放缓冲区,将每次从环境中采样得到的四元组数据(状态、动作、奖励、下一状态)存储到回放缓冲区中,训练 Q 网络的时候再从回放缓冲区中随机采样若干数据来进行训练。
  • 目标模型:训练过程中 Q 网络的不断更新会导致目标不断发生改变,不如暂时先将 TD 目标中的 Q 网络固定住。为了实现这一思想,我们需要利用两套 Q 网络。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class ReplayBuffer:
def __init__(self, capacity):
self.buffer = collections.deque(maxlen=capacity)
return

def add(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
return

def sample(self, batch_size):
transitions = random.sample(self.buffer, batch_size)
state, action, reward, next_state, done = zip(*transitions)
return np.array(state), action, reward, np.array(next_state), done

def size(self):
return len(self.buffer)

class DQN:
def __init__(self, net, learning_rate, gamma, epsilon, update_step, action_dim, device):
self.action_dim = action_dim
self.q_net = net().to(device)
self.target_q_net = net().to(device)
self.optimizer = torch.optim.Adam(self.q_net.parameters(), lr=learning_rate)
self.gamma = gamma
self.epsilon = epsilon
self.update_step = update_step
self.step = 0
self.device = device
return

def sample(self, state, pre=False):
if np.random.random() < self.epsilon and not pre:
action = np.random.randint(self.action_dim)
else:
state = torch.tensor([state], dtype=torch.float).to(self.device)
action = self.q_net(state).argmax().item()
return action

def update(self, s, a, r, s_t, dones):
s = torch.tensor(s, dtype=torch.float).to(self.device)
a = torch.tensor(a).view(-1, 1).to(self.device)
r = torch.tensor(r, dtype=torch.float).view(-1, 1).to(self.device)
s_t = torch.tensor(s_t, dtype=torch.float).to(self.device)
dones = torch.tensor(dones, dtype=torch.float).view(-1, 1).to(self.device)

q_values = self.q_net(states).gather(1, actions)
max_next_q_values = self.target_q_net(next_states).max(1)[0].view(-1, 1)
q_targets = rewards + self.gamma * max_next_q_values * (1 - dones)

dqn_loss = torch.mean(F.mse_loss(q_values, q_targets))
self.optimizer.zero_grad()
dqn_loss.backward()
self.optimizer.step()

if self.count % self.target_update == 0:
self.target_q_net.load_state_dict(self.q_net.state_dict())

self.count += 1
return

REINFORCE

REINFORCE 模型: 适用于无限状态、有限动作、随机性强的场景,通过预测对应状态采取不同动作对应的概率来进行动作选择;为在线决策模型。

状态更新函数:

简单代数计算可知,模型参数更新规则为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class REINFORCE:
def __init__(self, net, learning_rate, gamma, action_dim, device):
self.policy_net = net().to(device)
self.optimizer = torch.optim.Adam(self.policy_net.parameters(), lr=learning_rate)
self.gamma = gamma
self.device = device
return

def sample(self, state, pre=False):
state = torch.tensor([state], dtype=torch.float).to(self.device)
probs = self.policy_net(state)
action_dist = torch.distributions.Categorical(probs)
action = action_dist.sample()
return action.item()

def update(self, s, a, r):
state_list = s
action_list = a
reward_list = r

G = 0
self.optimizer.zero_grad()
for i in reversed(range(len(reward_list))):
reward = reward_list[i]
state = torch.tensor([state_list[i]], dtype=torch.float).to(self.device)
action = torch.tensor([action_list[i]]).view(-1, 1).to(self.device)
log_prob = torch.log(self.policy_net(state).gather(1, action))
G = self.gamma * G + reward
loss = -log_prob * G
loss.backward()
self.optimizer.step()
return

DDPG

DDPG 模型: 适用于无限状态、无限动作、随机性强的场景,通过预测对应状态采取不同动作对应的概率和操作的价值来进行动作选择;为离线决策模型。

状态更新函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class ReplayBuffer:
def __init__(self, capacity):
self.buffer = collections.deque(maxlen=capacity)
return

def add(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
return

def sample(self, batch_size):
transitions = random.sample(self.buffer, batch_size)
state, action, reward, next_state, done = zip(*transitions)
return np.array(state), action, reward, np.array(next_state), done

def size(self):
return len(self.buffer)

class DDPG:
def __init__(self, PolicyNet, QValueNet, sigma, actor_lr, critic_lr, tau, gamma, action_dim, device):
self.actor = PolicyNet().to(device)
self.critic = QValueNet().to(device)
self.target_actor = PolicyNet().to(device)
self.target_critic = QValueNet().to(device)
self.target_critic.load_state_dict(self.critic.state_dict())
self.target_actor.load_state_dict(self.actor.state_dict())
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
self.gamma = gamma
self.sigma = sigma
self.tau = tau
self.action_dim = action_dim
self.device = device
return

def sample(self, state, pre=False):
state = torch.tensor([state], dtype=torch.float).to(self.device)
action = self.actor(state).item()
action = action if pre else action + self.sigma * np.random.randn(self.action_dim)
return action

def soft_update(self, net, target_net):
for param_target, param in zip(target_net.parameters(), net.parameters()):
param_target.data.copy_(param_target.data * (1.0 - self.tau) + param.data * self.tau)
return

def update(self, s, a, r, s_t, dones):
states = torch.tensor(s, dtype=torch.float).to(self.device)
actions = torch.tensor(a, dtype=torch.float).view(-1, 1).to(self.device)
rewards = torch.tensor(r, dtype=torch.float).view(-1, 1).to(self.device)
next_states = torch.tensor(s_t, dtype=torch.float).to(self.device)
dones = torch.tensor(dones, dtype=torch.float).view(-1, 1).to(self.device)

next_q_values = self.target_critic(next_states, self.target_actor(next_states))
q_targets = rewards + self.gamma * next_q_values * (1 - dones)

critic_loss = torch.mean(F.mse_loss(self.critic(states, actions), q_targets))
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()

actor_loss = -torch.mean(self.critic(states, self.actor(states)))
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()

self.soft_update(self.actor, self.target_actor)
self.soft_update(self.critic, self.target_critic)
return

总结

适用于 Sarsa Q-learning DQN REINFORCE DDGP
状态数 有限 有限 无限 无限 无限
动作数 有限 有限 有限 有限 无限
随机性
决策类型 在线 离线 离线 在线 离线