DQN 알고리즘 전체 코드 리뷰

 

전체적인 DQN 코드 구조를 먼저 살펴보고, 그 이후 세부적인 동작을 하나씩 차근차근 이해해보도록 하자.

처음 이 코드를 접했을 때는 여러 함수와 클래스, 흐름이 복잡하게 얽혀 있어 다소 낯설고 어렵게 느껴질 수 있다. 하지만 DQN 구조는 대부분의 강화학습 실습에서 반복적으로 사용되며, 실제로도 여러 프로젝트에 쉽게 재활용되는 구조이기 때문에 한 번만 잘 익혀두면 이후 다른 강화학습 알고리즘을 학습할 때도 큰 도움이 된다.

# -*- coding: utf-8 -*-
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.optimizers import Adam
import gym
import numpy as np
import random as rand
import os
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
 
class Agent(object):
    def __init__(self):
        self.env = gym.make('CartPole-v1')
        self.state_size = self.env.observation_space.shape[0]
        self.action_size = self.env.action_space.n
 
        self.node_num = 12
        self.learning_rate = 0.001
        self.epochs_cnt = 5
        self.model = self.build_model()        
        
        self.discount_rate = 0.97
        self.penalty = -100
        
        self.episode_num = 500        
        
        self.replay_memory_limit = 2048
        self.replay_size = 32
        self.replay_memory = []
 
        self.epsilon = 0.99
        self.epsilon_decay = 0.2
        self.epsilon_min = 0.05
        
        self.moving_avg_size = 20
        self.reward_list = []
        self.count_list = []
        self.moving_avg_list = []        
 
    def build_model(self):
        input_states = Input(shape=(self.state_size,), name='input_states')
        x = Dense(self.node_num, activation='tanh')(input_states)
        out_actions = Dense(self.action_size, activation='linear', name='output')(x)
 
        model = tf.keras.models.Model(inputs=input_states, outputs=out_actions)
        model.compile(optimizer=Adam(learning_rate=self.learning_rate),
                      loss='mean_squared_error')
        model.summary()
        return model
 
    def train(self):
        for episode in range(self.episode_num):
            state, _ = self.env.reset()
            
            _, step_count, reward_tot = self.take_action_and_append_memory(episode, state)
            
            self.reward_list.append(reward_tot - self.penalty)
            self.count_list.append(step_count)
            self.moving_avg_list.append(np.mean(self.reward_list[-self.moving_avg_size:]))                
            
            self.train_mini_batch()
                
            if episode % 10 == 0:
                moving_avg = np.mean(self.reward_list[-self.moving_avg_size:])
                reward_avg = np.mean(self.reward_list)
                print(f"episode:{episode}, moving_avg:{moving_avg:.2f}, rewards_avg:{reward_avg:.2f}")
        
        self.save_model()
 
    def take_action_and_append_memory(self, episode, state):
        reward_tot = 0
        step_count = 0
        done = False
        epsilon = self.get_episilon(episode)
 
        while not done:
            step_count += 1
            state_input = np.reshape(state, [1, self.state_size]).astype(np.float32)
            Q = self.model(state_input).numpy()
            action = self.greed_search(epsilon, Q)
 
            next_state, reward, terminated, truncated, _ = self.env.step(action)
            done = terminated or truncated
 
            if done and step_count < 499:
                reward = self.penalty
 
            self.replay_memory.append([state, action, reward, next_state, done])
            if len(self.replay_memory) > self.replay_memory_limit:
                self.replay_memory.pop(0)
 
            reward_tot += reward
            state = next_state
 
        return Q, step_count, reward_tot
 
    def train_mini_batch(self):
        if len(self.replay_memory) < self.replay_size:
            return
 
        batch = rand.sample(self.replay_memory, self.replay_size)
 
        array_state = np.zeros((self.replay_size, self.state_size), dtype=np.float32)
        array_next_state = np.zeros((self.replay_size, self.state_size), dtype=np.float32)
 
        actions, rewards, dones = [], [], []
 
        for idx, (state, action, reward, next_state, done) in enumerate(batch):
            array_state[idx] = state
            array_next_state[idx] = next_state
            actions.append(action)
            rewards.append(reward)
            dones.append(done)
 
        q_values = self.model(array_state).numpy()
        q_next_values = self.model(array_next_state).numpy()
 
        for idx in range(self.replay_size):
            if dones[idx]:
                q_values[idx, actions[idx]] = rewards[idx]
            else:
                q_values[idx, actions[idx]] = rewards[idx] + self.discount_rate * np.max(q_next_values[idx])
 
        self.train_on_batch(array_state, q_values)
 
    def train_on_batch(self, states, targets):
        with tf.GradientTape() as tape:
            predictions = self.model(states, training=True)
            loss = tf.reduce_mean(tf.square(targets - predictions))
        gradients = tape.gradient(loss, self.model.trainable_variables)
        self.model.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))        
 
    def get_episilon(self, episode):
        
        result = self.epsilon * (1 - episode / (self.episode_num * self.epsilon_decay))
        
        return max(result, self.epsilon_min)
 
    def greed_search(self, epsilon, Q):
        
        if epsilon > np.random.rand():
            return self.env.action_space.sample()
        
        else:
            return np.argmax(Q)
 
    def save_model(self):
        os.makedirs("./model", exist_ok=True)
        self.model.save("./model/dqn.keras")
        print("*****end learning")
 
if __name__ == "__main__":
    agent = Agent()
    agent.train()

Cartpole_DQN 

예제는 앞에서 설치한 주피터 노트북을 사용해서 테스트하는 것이 좋다. 다른 파이썬 프로그램을 사용하면 코드를 전체적으로 모두 실행해야 하지만, 주피터 노트북은 코드를 셀 단위로 나누어 부분적으로 실행할 수 있다. 데이터 분석 분야에서 주피터 노트북이 가장 많이 활용되는 이유이기도 하다.


댓글 쓰기

Please Select Embedded Mode To Show The Comment System.*

다음 이전