news 2026/2/16 8:46:20

强化学习QAC求最优策略的代码实现

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
强化学习QAC求最优策略的代码实现

理论基础:

注意:

1. get_policy_by_state_value_net() 是额外写的一个基于Q的贪心策略,不属于QAC算法,得到的策略不一定是最优的,与get_policy_by_policy_net表现一致是偶然现象。

2. 图片中的伪代码并没有说要生成多条episode,但这个为了保证每个(s,a)pair都能被访问到,会生成多条episode。

代码可运行:

import numpy as np import torch from torch import nn from env import GridWorldEnv from utils import drow_policy class QAC(object): def __init__(self, env: GridWorldEnv, gamma=0.9, lr_actor=1e-2, lr_critic=1e-2): self.env = env self.action_space_size = self.env.num_actions self.state_space_size = self.env.num_states self.gamma = gamma self.pnet = nn.Sequential( # policy_net nn.Linear(2, 16), # s -> Π(a|s) nn.ReLU(), nn.Linear(16, self.action_space_size) ) self.qnet = nn.Sequential( # q_value_net nn.Linear(2, 16), # s -> q[s,a] nn.ReLU(), nn.Linear(16, self.action_space_size) ) self.value_optimizer = torch.optim.Adam(self.qnet.parameters(), lr=lr_critic) self.policy_optimizer = torch.optim.Adam(self.pnet.parameters(), lr=lr_actor) self.policy = np.zeros((self.state_space_size, self.action_space_size)) self.q_value = np.zeros((self.state_space_size, self.action_space_size)) def decode_state(self, state): ''' :param state: int :return: 归一化后的元组 ''' i = state // self.env.size j = state % self.env.size return torch.tensor((i / (self.env.size - 1), j / (self.env.size - 1)), dtype=torch.float32) def generate_action(self, state): ''' :param state: tuple :return: int,float ''' logits = self.pnet(state) action_probs = torch.softmax(logits, dim=0) # π(a|s,θ) action_dist = torch.distributions.Categorical(action_probs) # 按分布采样 action = action_dist.sample() log_prob = action_dist.log_prob(action) # In π(a|s,θ) 注意传入的是索引,会自动做log(action_probs[action_index]) return action.item(), log_prob def solve(self, num_episodes=200): for _ in range(num_episodes): state_int = self.env.reset() state = self.decode_state(state_int) done = False while not done: action, log_prob = self.generate_action(state) # a_t,s_t,In π(a_t|s_t,θ) next_state_int, reward, done = self.env.step(state_int, action) # s_t+1,r_t+1 next_state = self.decode_state(next_state_int) if not done: next_action, _ = self.generate_action(next_state) # a_t+1 else: next_action, action_prob = None, None # Critic (value update) qvalue = self.qnet(state)[action] # q(s_t,a_t) if done: td_target = torch.tensor(reward, dtype=torch.float32) else: with torch.no_grad(): # semi gradient qvalue_next = self.qnet(next_state)[next_action] # q(s_t+1,a_t+1) td_target = torch.tensor(reward, dtype=torch.float32) + self.gamma * qvalue_next delta = td_target - qvalue # TD error self.value_optimizer.zero_grad() critic_loss = 0.5 * delta.pow(2) critic_loss.backward() self.value_optimizer.step() # Actor (policy update) qvalue = qvalue.detach() # 避免梯度污染 self.policy_optimizer.zero_grad() actor_loss = -log_prob * qvalue actor_loss.backward() self.policy_optimizer.step() state_int = next_state_int state = next_state def get_policy_by_policy_net(self): for s in range(self.state_space_size): if s in self.env.terminal: self.policy[s,4]=1 break s_t = self.decode_state(s) logits = self.pnet(s_t) action_probs = torch.softmax(logits, dim=0) a=torch.argmax(action_probs) self.policy[s,a]=1 return self.policy def get_policy_by_state_value_net(self): for s in range(self.state_space_size): if s in self.env.terminal: self.policy[s,4]=1 break a = np.argmax(self.q_value[s]) self.policy[s, a] = 1 return self.policy def get_qvalues(self): for s in range(self.state_space_size): s_t = self.decode_state(s) logits = self.qnet(s_t).detach().numpy() # q(s,a)表示在状态s执行动作a后,未来所有折扣回报的期望值,不要取softmax然后取最大 self.q_value[s, :] = logits return self.q_value if __name__ == '__main__': env = GridWorldEnv( size=5, forbidden=[(1, 2), (3, 3)], terminal=[(4, 4)], r_boundary=-1, r_other=-0.04, r_terminal=1, r_forbidden=-1, r_stay=-0.1 ) # 注意samples要大一点,否则每个state被访问到的概率很小 vi = QAC(env=env) vi.solve(num_episodes=200) print("\n state value: ") print(vi.get_qvalues()) print("\n get policy by policy net:") drow_policy(vi.get_policy_by_policy_net(), env) print("\n get policy by state value net:") drow_policy(vi.get_policy_by_state_value_net(), env)

运行结果:

1. 表现一致(终点状态不是 . 是因为没有特殊处理,其他代码保持不变。由于表现一致的情况很少,因此不再继续展示特殊处理后的输出)

2. 表现不一致

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/5 5:14:05

谷歌的ui设计规范主要有哪些

谷歌的UI设计规范核心是‌Material Design‌,其要点包括:设计原则‌材质与动效‌:灵感源于物理世界,通过阴影、层次和自然动效(如重力、摩擦力)增强交互真实感。 无障碍设计‌:遵循WCAG标准&…

作者头像 李华
网站建设 2026/1/29 15:15:08

操作系统应用(四十二)仙盟屏幕录像工具—东方仙盟炼气期

东方仙盟屏幕录像工具亮点 1. 手机发布免二次剪辑 电脑录屏能自动适配移动端竖版比例,录手机讲解内容时会智能放大操作区域。导出后直接就能发抖音、快手等平台,省去用剪映等工具调整尺寸的麻烦,做短视频教程效率翻倍。 2. 录制功能超全面 …

作者头像 李华
网站建设 2026/2/10 1:14:56

javadoc规范、idea生成javadoc等

文章目录idea生成javadocidea生成javadoc时报错 编码GBK的不可映射字符有哪些javadoc注解文档以前不太注意这些规范,认为这个没用,写代码随心所欲,实际这个想法是不对的。idea生成javadoc 可以给整个项目生成,也可以给某个文件生…

作者头像 李华
网站建设 2026/2/14 15:44:07

AtomicInteger实现安全减库存

文章目录安全库存类任意main方法里面调用使用说明并发情况下如何多节点执行并发问题一直是个小难点,自动有了AtomicInteger类,一切都变得简单了。安全库存类 这个类支持几种减库存的方法,挺好的。 代码: public class SafeInve…

作者头像 李华