中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

通過CartPole游戲詳解PPO優化的方法

發布時間:2023-04-18 17:50:00 來源:億速云 閱讀:145 作者:iii 欄目:開發技術

本篇內容主要講解“通過CartPole游戲詳解PPO優化的方法”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“通過CartPole游戲詳解PPO優化的方法”吧!

CartPole 介紹

在一個光滑的軌道上有個推車,桿子垂直微置在推車上,隨時有倒的風險。系統每次對推車施加向左或者向右的力,但我們的目標是讓桿子保持直立。桿子保持直立的每個時間單位都會獲得 +1 的獎勵。但是當桿子與垂直方向成 15 度以上的位置,或者推車偏離中心點超過 2.4 個單位后,這一輪局游戲結束。因此我們可以獲得的最高回報等于 200 。我們這里就是要通過使用 PPO 算法來訓練一個強化學習模型 actor-critic ,通過對比模型訓練前后的游戲運行 gif 圖,可以看出來我們訓練好的模型能長時間保持桿子處于垂直狀態。

庫準備

python==3.10.9
tensorflow-gpu==2.10.0
imageio==2.26.1
keras==2.10,0
gym==0.20.0
pyglet==1.5.20
scipy==1.10.1

超參數設置

這段代碼主要是導入所需的庫,并設置了一些超參數。

    import numpy as np
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import layers
    import gym
    import scipy.signal
    import time
    from tqdm import tqdm
    steps_per_epoch = 5000  # 每個 epoch 中訓練的步數
    epochs = 20  # 用于訓練的 epoch 數
    gamma = 0.90  # 折扣因子,用于計算回報
    clip_ratio = 0.2  # PPO 算法中用于限制策略更新的比率
    policy_learning_rate = 3e-4  # 策略網絡的學習率
    value_function_learning_rate = 3e-4  # 值函數網絡的學習率
    train_policy_iterations = 80  # 策略網絡的訓練迭代次數
    train_value_iterations = 80  # 值函數網絡的訓練迭代次數
    lam = 0.97  # PPO 算法中的 λ 參數
    target_kl = 0.01  # PPO 算法中的目標 KL 散度
    hidden_sizes = (64, 64) # 神經網絡的隱藏層維度 
    render = False    # 是否開啟畫面渲染,False 表示不開啟

模型定義

(1)這里定義了一個函數 discounted_cumulative_sums,接受兩個參數 xdiscount,該函數的作用是計算給定獎勵序列 x 的折扣累計和,折扣因子 discount 是一個介于 0 和 1 之間的值,表示對未來獎勵的折扣程度。 在強化學習中,折扣累計和是一個常用的概念,表示對未來獎勵的折扣累加。

def discounted_cumulative_sums(x, discount):
    return scipy.signal.lfilter([1], [1, float(-discount)], x[::-1], axis=0)[::-1]

(2)這里定義了一個Buffer類,用于存儲訓練數據。類中有如下主要的函數:

  • init: 初始化函數,用于設置成員變量的初始值

  • store: 將觀測值、行為、獎勵、價值和對數概率存儲到對應的緩沖區中

  • finish_trajectory: 結束一條軌跡,用于計算優勢和回報,并更新 trajectory_start_index 的值

get: 獲取所有緩沖區的值,用在訓練模型過程中。在返回緩沖區的值之前,將優勢緩沖區的值進行標準化處理,使其均值為 0 ,方差為 1

class Buffer:
    def __init__(self, observation_dimensions, size, gamma=0.99, lam=0.95):
        self.observation_buffer = np.zeros( (size, observation_dimensions), dtype=np.float32 )
        self.action_buffer = np.zeros(size, dtype=np.int32)
        self.advantage_buffer = np.zeros(size, dtype=np.float32)
        self.reward_buffer = np.zeros(size, dtype=np.float32)
        self.return_buffer = np.zeros(size, dtype=np.float32)
        self.value_buffer = np.zeros(size, dtype=np.float32)
        self.logprobability_buffer = np.zeros(size, dtype=np.float32)
        self.gamma, self.lam = gamma, lam
        self.pointer, self.trajectory_start_index = 0, 0
    def store(self, observation, action, reward, value, logprobability):
        self.observation_buffer[self.pointer] = observation
        self.action_buffer[self.pointer] = action
        self.reward_buffer[self.pointer] = reward
        self.value_buffer[self.pointer] = value
        self.logprobability_buffer[self.pointer] = logprobability
        self.pointer += 1
    def finish_trajectory(self, last_value=0):
        path_slice = slice(self.trajectory_start_index, self.pointer)
        rewards = np.append(self.reward_buffer[path_slice], last_value)
        values = np.append(self.value_buffer[path_slice], last_value)
        deltas = rewards[:-1] + self.gamma * values[1:] - values[:-1]
        self.advantage_buffer[path_slice] = discounted_cumulative_sums( deltas, self.gamma * self.lam )
        self.return_buffer[path_slice] = discounted_cumulative_sums(  rewards, self.gamma )[:-1]
        self.trajectory_start_index = self.pointer
    def get(self):
        self.pointer, self.trajectory_start_index = 0, 0
        advantage_mean, advantage_std = (  np.mean(self.advantage_buffer),  np.std(self.advantage_buffer), )
        self.advantage_buffer = (self.advantage_buffer - advantage_mean) / advantage_std
        return ( self.observation_buffer, self.action_buffer, self.advantage_buffer, self.return_buffer, self.logprobability_buffer, )

(3)這里定義了一個多層感知機(Multi-Layer Perceptron,MLP)的網絡結構,有如下參數:

  • x:輸入的張量

  • sizes:一個包含每一層的神經元個數的列表

  • activation:激活函數,用于中間層的神經元

  • output_activation:輸出層的激活函數

該函數通過循環生成相應個數的全連接層,并將 x 作為輸入傳入。其中,units 指定每一層的神經元個數,activation 指定該層使用的激活函數,返回最后一層的結果。

def mlp(x, sizes, activation=tf.tanh, output_activation=None):
    for size in sizes[:-1]:
        x = layers.Dense(units=size, activation=activation)(x)
    return layers.Dense(units=sizes[-1], activation=output_activation)(x)

(4)這里定義了一個函數 logprobabilities,用于計算給定動作 a 的對數概率。函數接受兩個參數,logitsa,其中 logits 表示模型輸出的未歸一化的概率分布,a 表示當前采取的動作。函數首先對 logits 進行 softmax 歸一化,然后對歸一化后的概率分布取對數,得到所有動作的對數概率。接著,函數使用 tf.one_hot 函數生成一個 one-hot 編碼的動作向量,并與所有動作的對數概率向量相乘,最后對結果進行求和得到給定動作的對數概率。

def logprobabilities(logits, a):
    logprobabilities_all = tf.nn.log_softmax(logits)
    logprobability = tf.reduce_sum( tf.one_hot(a, num_actions) * logprobabilities_all, axis=1 )
    return logprobability

(5)這里定義了一個函數 sample_action。該函數接受一個 observation(觀測值)參數,并在 actor 網絡上運行該觀測值以獲得動作 logits(邏輯值)。然后使用邏輯值(logits)來隨機采樣出一個動作,并將結果作為函數的輸出。

@tf.function
def sample_action(observation):
    logits = actor(observation)
    action = tf.squeeze(tf.random.categorical(logits, 1), axis=1)
    return logits, action

(6)這里定義了一個用于訓練策略的函數train_policy。該函數使用帶權重裁剪的 PPO 算法,用于更新 actor 的權重。

  • observation_buffer:輸入的觀測緩沖區

  • action_buffer:輸入的動作緩沖區

  • logprobability_buffer:輸入的對數概率緩沖區

  • advantage_buffer:輸入的優勢值緩沖區

在該函數內部,使用tf.GradientTape記錄執行的操作,用于計算梯度并更新策略網絡。計算的策略損失是策略梯度和剪裁比率的交集和。使用優化器policy_optimizer來更新actor的權重。最后,計算并返回 kl 散度的平均值,該值用于監控訓練的過程。

@tf.function
def train_policy( observation_buffer, action_buffer, logprobability_buffer, advantage_buffer):
    with tf.GradientTape() as tape:   
        ratio = tf.exp( logprobabilities(actor(observation_buffer), action_buffer) - logprobability_buffer )
        min_advantage = tf.where(  advantage_buffer > 0, (1 + clip_ratio) * advantage_buffer, (1 - clip_ratio) * advantage_buffer, )
        policy_loss = -tf.reduce_mean( tf.minimum(ratio * advantage_buffer, min_advantage) )
    policy_grads = tape.gradient(policy_loss, actor.trainable_variables)
    policy_optimizer.apply_gradients(zip(policy_grads, actor.trainable_variables))
    kl = tf.reduce_mean( logprobability_buffer - logprobabilities(actor(observation_buffer), action_buffer) )
    kl = tf.reduce_sum(kl)
    return kl

(7)這里實現了價值函數(critic)的訓練過程,函數接受兩個參數:一個是 observation_buffer,表示當前存儲的狀態觀察序列;另一個是 return_buffer,表示狀態序列對應的回報序列。在函數內部,首先使用 critic 模型來預測當前狀態序列對應的狀態值(V), 然后計算當前狀態序列的平均回報與 V 之間的均方誤差,并對其進行求和取平均得到損失函數 value_loss。接下來計算梯度來更新可訓練的變量值。

@tf.function
def train_value_function(observation_buffer, return_buffer):
    with tf.GradientTape() as tape:  
        value_loss = tf.reduce_mean((return_buffer - critic(observation_buffer)) ** 2)
    value_grads = tape.gradient(value_loss, critic.trainable_variables)
    value_optimizer.apply_gradients(zip(value_grads, critic.trainable_variables))

游戲初始化

這里用于構建強化學習中的 Actor-Critic 網絡模型。首先,使用 gy m庫中的 CartPole-v0 環境創建一個環境實例 env 。然后,定義了兩個變量,分別表示觀測空間的維度 observation_dimensions 和動作空間的大小 num_actions,這些信息都可以從 env 中獲取。接著,定義了一個 Buffer 類的實例,用于存儲每個時間步的觀測、動作、獎勵、下一個觀測和 done 信號,以便后面的訓練使用。

然后,使用 Keras 庫定義了一個神經網絡模型 Actor ,用于近似模仿策略函數,該模型輸入是當前的觀測,輸出是每個動作的概率分布的對數。

另外,還定義了一個神經網絡模型 Critic ,用于近似模仿值函數,該模型輸入是當前的觀測,輸出是一個值,表示這個觀測的價值。最后,定義了兩個優化器,policy_optimizer 用于更新 Actor 網絡的參數,value_optimizer 用于更新 Critic 網絡的參數。

env = gym.make("CartPole-v0")
observation_dimensions = env.observation_space.shape[0]
num_actions = env.action_space.n
buffer = Buffer(observation_dimensions, steps_per_epoch)
observation_input = keras.Input(shape=(observation_dimensions,), dtype=tf.float32)
logits = mlp(observation_input, list(hidden_sizes) + [num_actions], tf.tanh, None)
actor = keras.Model(inputs=observation_input, outputs=logits)
value = tf.squeeze( mlp(observation_input, list(hidden_sizes) + [1], tf.tanh, None), axis=1 )
critic = keras.Model(inputs=observation_input, outputs=value)
policy_optimizer = keras.optimizers.Adam(learning_rate=policy_learning_rate)
value_optimizer = keras.optimizers.Adam(learning_rate=value_function_learning_rate)

保存未訓練時的運動情況

在未訓練模型之前,將模型控制游戲的情況保存是 gif ,可以看出來技術很糟糕,很快就結束了游戲。

import imageio
start = env.reset() 
frames = []
for t in range(steps_per_epoch):
    frames.append(env.render(mode='rgb_array'))
    start = start.reshape(1, -1)
    logits, action = sample_action(start)
    start, reward, done, _ = env.step(action[0].numpy())
    if done:
        break
with imageio.get_writer('未訓練前的樣子.gif', mode='I') as writer:
    for frame in frames:
        writer.append_data(frame)

模型訓練

這里主要是訓練模型,執行 eopch 輪,每一輪中循環 steps_per_epoch 步,每一步就是根據當前的觀測結果 observation 來抽樣得到下一步動作,然后將得到的各種觀測結果、動作、獎勵、value 值、對數概率值保存在 buffer 對象中,待這一輪執行游戲運行完畢,收集了一輪的數據之后,就開始訓練策略和值函數,并打印本輪的訓練結果,不斷重復這個過程,

observation, episode_return, episode_length = env.reset(), 0, 0
for epoch in tqdm(range(epochs)):
    sum_return = 0
    sum_length = 0
    num_episodes = 0
    for t in range(steps_per_epoch):
        if render:
            env.render()
        observation = observation.reshape(1, -1)
        logits, action = sample_action(observation)
        observation_new, reward, done, _ = env.step(action[0].numpy())
        episode_return += reward
        episode_length += 1
        value_t = critic(observation)
        logprobability_t = logprobabilities(logits, action)
        buffer.store(observation, action, reward, value_t, logprobability_t)
        observation = observation_new
        terminal = done
        if terminal or (t == steps_per_epoch - 1):
            last_value = 0 if done else critic(observation.reshape(1, -1))
            buffer.finish_trajectory(last_value)
            sum_return += episode_return
            sum_length += episode_length
            num_episodes += 1
            observation, episode_return, episode_length = env.reset(), 0, 0
    ( observation_buffer, action_buffer, advantage_buffer,  return_buffer, logprobability_buffer, ) = buffer.get()
    for _ in range(train_policy_iterations):
        kl = train_policy( observation_buffer, action_buffer, logprobability_buffer, advantage_buffer )
        if kl > 1.5 * target_kl:
            break
    for _ in range(train_value_iterations):
        train_value_function(observation_buffer, return_buffer)
    print( f"完成第 {epoch + 1} 輪訓練, 平均獎勵: {sum_length / num_episodes}" )

打印:完成第 1 輪訓練, 平均獎勵: 30.864197530864196
完成第 2 輪訓練, 平均獎勵: 40.32258064516129
...
完成第 9 輪訓練, 平均獎勵: 185.1851851851852
完成第 11 輪訓練, 平均獎勵: 172.41379310344828
...
完成第 14 輪訓練, 平均獎勵: 172.41379310344828
...
完成第 18 輪訓練, 平均獎勵: 185.1851851851852
...
完成第 20 輪訓練, 平均獎勵: 200.0

保存訓練后的運動情況

在訓練模型之后,將模型控制游戲的情況保存是 gif ,可以看出來技術很嫻熟,可以在很長的時間內使得棒子始終保持近似垂直的狀態。

import imageio
start = env.reset()
frames = []
for t in range(steps_per_epoch):
    frames.append(env.render(mode='rgb_array'))
    start = start.reshape(1, -1)
    logits, action = sample_action(start)
    start, reward, done, _ = env.step(action[0].numpy())
    if done:
        break
with imageio.get_writer('訓練后的樣子.gif', mode='I') as writer:
    for frame in frames:
        writer.append_data(frame)

到此,相信大家對“通過CartPole游戲詳解PPO優化的方法”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

卢湾区| 屯门区| 九江县| 台中市| 巴林右旗| 花垣县| 会昌县| 普兰店市| 固原市| 梁山县| 微山县| 乌拉特中旗| 灵石县| 四会市| 佳木斯市| 天等县| 运城市| 平和县| 麻江县| 肃宁县| 昆山市| 偃师市| 德州市| 英德市| 资源县| 民勤县| 喀什市| 宝鸡市| 克什克腾旗| 友谊县| 定兴县| 泸西县| 通山县| 临猗县| 称多县| 高安市| 出国| 江安县| 喀喇沁旗| 威信县| 哈尔滨市|