전체적인 DQN 코드 구조를 먼저 살펴보고, 그 이후 세부적인 동작을 하나씩 차근차근 이해해보도록 하자.
처음 이 코드를 접했을 때는 여러 함수와 클래스, 흐름이 복잡하게
얽혀 있어 다소 낯설고 어렵게 느껴질 수 있다. 하지만 DQN 구조는
대부분의 강화학습 실습에서 반복적으로 사용되며, 실제로도 여러 프로젝트에 쉽게 재활용되는 구조이기 때문에
한 번만 잘 익혀두면 이후 다른 강화학습 알고리즘을 학습할 때도 큰 도움이 된다.
# -*- coding: utf-8 -*-
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.optimizers import Adam
import gym
import numpy as np
import random as rand
import os
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
class Agent(object):
def __init__(self):
self.env = gym.make('CartPole-v1')
self.state_size = self.env.observation_space.shape[0]
self.action_size = self.env.action_space.n
self.node_num = 12
self.learning_rate = 0.001
self.epochs_cnt = 5
self.model = self.build_model()
self.discount_rate = 0.97
self.penalty = -100
self.episode_num = 500
self.replay_memory_limit = 2048
self.replay_size = 32
self.replay_memory = []
self.epsilon = 0.99
self.epsilon_decay = 0.2
self.epsilon_min = 0.05
self.moving_avg_size = 20
self.reward_list = []
self.count_list = []
self.moving_avg_list = []
def build_model(self):
input_states = Input(shape=(self.state_size,), name='input_states')
x = Dense(self.node_num, activation='tanh')(input_states)
out_actions = Dense(self.action_size, activation='linear', name='output')(x)
model = tf.keras.models.Model(inputs=input_states, outputs=out_actions)
model.compile(optimizer=Adam(learning_rate=self.learning_rate),
loss='mean_squared_error')
model.summary()
return model
def train(self):
for episode in range(self.episode_num):
state, _ = self.env.reset()
_, step_count, reward_tot = self.take_action_and_append_memory(episode, state)
self.reward_list.append(reward_tot - self.penalty)
self.count_list.append(step_count)
self.moving_avg_list.append(np.mean(self.reward_list[-self.moving_avg_size:]))
self.train_mini_batch()
if episode % 10 == 0:
moving_avg = np.mean(self.reward_list[-self.moving_avg_size:])
reward_avg = np.mean(self.reward_list)
print(f"episode:{episode}, moving_avg:{moving_avg:.2f}, rewards_avg:{reward_avg:.2f}")
self.save_model()
def take_action_and_append_memory(self, episode, state):
reward_tot = 0
step_count = 0
done = False
epsilon = self.get_episilon(episode)
while not done:
step_count += 1
state_input = np.reshape(state, [1, self.state_size]).astype(np.float32)
Q = self.model(state_input).numpy()
action = self.greed_search(epsilon, Q)
next_state, reward, terminated, truncated, _ = self.env.step(action)
done = terminated or truncated
if done and step_count < 499:
reward = self.penalty
self.replay_memory.append([state, action, reward, next_state, done])
if len(self.replay_memory) > self.replay_memory_limit:
self.replay_memory.pop(0)
reward_tot += reward
state = next_state
return Q, step_count, reward_tot
def train_mini_batch(self):
if len(self.replay_memory) < self.replay_size:
return
batch = rand.sample(self.replay_memory, self.replay_size)
array_state = np.zeros((self.replay_size, self.state_size), dtype=np.float32)
array_next_state = np.zeros((self.replay_size, self.state_size), dtype=np.float32)
actions, rewards, dones = [], [], []
for idx, (state, action, reward, next_state, done) in enumerate(batch):
array_state[idx] = state
array_next_state[idx] = next_state
actions.append(action)
rewards.append(reward)
dones.append(done)
q_values = self.model(array_state).numpy()
q_next_values = self.model(array_next_state).numpy()
for idx in range(self.replay_size):
if dones[idx]:
q_values[idx, actions[idx]] = rewards[idx]
else:
q_values[idx, actions[idx]] = rewards[idx] + self.discount_rate * np.max(q_next_values[idx])
self.train_on_batch(array_state, q_values)
def train_on_batch(self, states, targets):
with tf.GradientTape() as tape:
predictions = self.model(states, training=True)
loss = tf.reduce_mean(tf.square(targets - predictions))
gradients = tape.gradient(loss, self.model.trainable_variables)
self.model.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))
def get_episilon(self, episode):
result = self.epsilon * (1 - episode / (self.episode_num * self.epsilon_decay))
return max(result, self.epsilon_min)
def greed_search(self, epsilon, Q):
if epsilon > np.random.rand():
return self.env.action_space.sample()
else:
return np.argmax(Q)
def save_model(self):
os.makedirs("./model", exist_ok=True)
self.model.save("./model/dqn.keras")
print("*****end learning")
if __name__ == "__main__":
agent = Agent()
agent.train()
Cartpole_DQN
예제는 앞에서 설치한 주피터 노트북을 사용해서 테스트하는 것이 좋다. 다른
파이썬 프로그램을 사용하면 코드를 전체적으로 모두 실행해야 하지만, 주피터 노트북은 코드를 셀 단위로
나누어 부분적으로 실행할 수 있다. 데이터 분석 분야에서 주피터 노트북이 가장 많이 활용되는 이유이기도
하다.
Tags:
강화학습