EDIT (1/3/16): соответствующая проблема github
Я использую Tensorflow (интерфейс Python) для реализации q-обучающего агента с аппроксимацией функции, обученной с использованием стохастического градиентного спуска. На каждой итерации эксперимента вызывается ступенчатая функция в агенте, которая обновляет параметры аппроксиматора на основе новой награды и активации, а затем выбирает новое действие для выполнения.
Вот проблема (с учебным жаргоном подкрепления):
- Агент вычисляет предсказания значения состояния для выбора действия.
- Затем дает управление другой программой, которая имитирует шаг в среде.
- Теперь функция шага агента вызывается для следующей итерации. Я хочу использовать класс оптимизатора Tensorflow для вычисления градиентов для меня. Однако для этого требуются как предсказания значения состояния, которые я вычислял на последнем шаге, так и их график. Так:
- Если я запустил оптимизатор на весь график, тогда он должен перекомпилировать предсказания значения состояния.
- Но, если я сохраняю предсказание (для выбранного действия) в качестве переменной, затем загружаю его в оптимизатор в качестве заполнителя, у него больше нет графика, необходимого для вычисления градиентов.
- Я не могу просто запустить все это в одном и том же sess.run(), потому что мне нужно отказаться от контроля и вернуть выбранное действие, чтобы получить следующее наблюдение и вознаграждение (использовать в целевом для функция потери).
Итак, есть ли способ, которым я могу (без обучения по изучению жаргона):
- Вычислить часть моего графика, возвращая значение1.
- Возвращаемое значение1 в вызывающую программу для вычисления значения2
- В следующей итерации используйте значение2 как часть функции потери для градиентного спуска БЕЗ пересчета части графика, вычисляющего значение1.
Конечно, я рассмотрел очевидные решения:
-
Просто скопируйте градиенты: это было бы легко для простых простых аппроксиматоров, которые я использую сейчас, но было бы действительно неудобно, если бы я экспериментировал с различными фильтрами и функциями активации в большой сверточной сети. Я бы очень хотел использовать класс Optimizer, если это возможно.
-
Вызвать симуляцию среды изнутри агента: Эта система делает это, но это сделает мой более сложным и удалит много модульности и структуры. Поэтому я не хочу этого делать.
Я несколько раз читал API и технический документ, но, похоже, не мог найти решение. Я пытался каким-то образом накормить цель в графе, чтобы вычислить градиенты, но не смог создать способ создания этого графика автоматически.
Если окажется, что в TensorFlow это невозможно, вы считаете, что было бы очень сложно реализовать это как новый оператор? (Я не использовал С++ через пару лет, поэтому источник TensorFlow выглядит немного пугающим.) Или мне лучше переключиться на нечто вроде факела, который имеет императивное дифференцирование Autograd вместо символической дифференциации?
Спасибо, что нашли время, чтобы помочь мне в этом. Я пытался сделать это как можно более сжатым.
EDIT: после дальнейшего поиска я встретил этот ранее заданный вопрос. Это немного отличается от моего (они стараются не обновлять сеть LSTM дважды на каждой итерации в Torch) и пока не имеют ответов.
Вот какой код, если это помогает:
'''
-Q-Learning agent for a grid-world environment.
-Receives input as raw rbg pixel representation of screen.
-Uses an artificial neural network function approximator with one hidden layer
2015 Jonathon Byrd
'''
import random
import sys
#import copy
from rlglue.agent.Agent import Agent
from rlglue.agent import AgentLoader as AgentLoader
from rlglue.types import Action
from rlglue.types import Observation
import tensorflow as tf
import numpy as np
world_size = (3,3)
total_spaces = world_size[0] * world_size[1]
class simple_agent(Agent):
#Contants
discount_factor = tf.constant(0.5, name="discount_factor")
learning_rate = tf.constant(0.01, name="learning_rate")
exploration_rate = tf.Variable(0.2, name="exploration_rate") # used to be a constant :P
hidden_layer_size = 12
#Network Parameters - weights and biases
W = [tf.Variable(tf.truncated_normal([total_spaces * 3, hidden_layer_size], stddev=0.1), name="layer_1_weights"),
tf.Variable(tf.truncated_normal([hidden_layer_size,4], stddev=0.1), name="layer_2_weights")]
b = [tf.Variable(tf.zeros([hidden_layer_size]), name="layer_1_biases"), tf.Variable(tf.zeros([4]), name="layer_2_biases")]
#Input placeholders - observation and reward
screen = tf.placeholder(tf.float32, shape=[1, total_spaces * 3], name="observation") #input pixel rgb values
reward = tf.placeholder(tf.float32, shape=[], name="reward")
#last step data
last_obs = np.array([1, 2, 3], ndmin=4)
last_act = -1
#Last step placeholders
last_screen = tf.placeholder(tf.float32, shape=[1, total_spaces * 3], name="previous_observation")
last_move = tf.placeholder(tf.int32, shape = [], name="previous_action")
next_prediction = tf.placeholder(tf.float32, shape = [], name="next_prediction")
step_count = 0
def __init__(self):
#Initialize computational graphs
self.q_preds = self.Q(self.screen)
self.last_q_preds = self.Q(self.last_screen)
self.action = self.choose_action(self.q_preds)
self.next_pred = self.max_q(self.q_preds)
self.last_pred = self.act_to_pred(self.last_move, self.last_q_preds) # inefficient recomputation
self.loss = self.error(self.last_pred, self.reward, self.next_prediction)
self.train = self.learn(self.loss)
#Summaries and Statistics
tf.scalar_summary(['loss'], self.loss)
tf.scalar_summary('reward', self.reward)
#w_hist = tf.histogram_summary("weights", self.W[0])
self.summary_op = tf.merge_all_summaries()
self.sess = tf.Session()
self.summary_writer = tf.train.SummaryWriter('tensorlogs', graph_def=self.sess.graph_def)
def agent_init(self,taskSpec):
print("agent_init called")
self.sess.run(tf.initialize_all_variables())
def agent_start(self,observation):
#print("agent_start called, observation = {0}".format(observation.intArray))
o = np.divide(np.reshape(np.asarray(observation.intArray), (1,total_spaces * 3)), 255)
return self.control(o)
def agent_step(self,reward, observation):
#print("agent_step called, observation = {0}".format(observation.intArray))
print("step, reward: {0}".format(reward))
o = np.divide(np.reshape(np.asarray(observation.intArray), (1,total_spaces * 3)), 255)
next_prediction = self.sess.run([self.next_pred], feed_dict={self.screen:o})[0]
if self.step_count % 10 == 0:
summary_str = self.sess.run([self.summary_op, self.train],
feed_dict={self.reward:reward, self.last_screen:self.last_obs,
self.last_move:self.last_act, self.next_prediction:next_prediction})[0]
self.summary_writer.add_summary(summary_str, global_step=self.step_count)
else:
self.sess.run([self.train],
feed_dict={self.screen:o, self.reward:reward, self.last_screen:self.last_obs,
self.last_move:self.last_act, self.next_prediction:next_prediction})
return self.control(o)
def control(self, observation):
results = self.sess.run([self.action], feed_dict={self.screen:observation})
action = results[0]
self.last_act = action
self.last_obs = observation
if (action==0): # convert action integer to direction character
action = 'u'
elif (action==1):
action = 'l'
elif (action==2):
action = 'r'
elif (action==3):
action = 'd'
returnAction=Action()
returnAction.charArray=[action]
#print("return action returned {0}".format(action))
self.step_count += 1
return returnAction
def Q(self, obs): #calculates state-action value prediction with feed-forward neural net
with tf.name_scope('network_inference') as scope:
h1 = tf.nn.relu(tf.matmul(obs, self.W[0]) + self.b[0])
q_preds = tf.matmul(h1, self.W[1]) + self.b[1] #linear activation
return tf.reshape(q_preds, shape=[4])
def choose_action(self, q_preds): #chooses action epsilon-greedily
with tf.name_scope('action_choice') as scope:
exploration_roll = tf.random_uniform([])
#greedy_action = tf.argmax(q_preds, 0) # gets the action with the highest predicted Q-value
#random_action = tf.cast(tf.floor(tf.random_uniform([], maxval=4.0)), tf.int64)
#exploration rate updates
#if self.step_count % 10000 == 0:
#self.exploration_rate.assign(tf.div(self.exploration_rate, 2))
return tf.select(tf.greater_equal(exploration_roll, self.exploration_rate),
tf.argmax(q_preds, 0), #greedy_action
tf.cast(tf.floor(tf.random_uniform([], maxval=4.0)), tf.int64)) #random_action
'''
Why does this return NoneType?:
flag = tf.select(tf.greater_equal(exploration_roll, self.exploration_rate), 'g', 'r')
if flag == 'g': #greedy
return tf.argmax(q_preds, 0) # gets the action with the highest predicted Q-value
elif flag == 'r': #random
return tf.cast(tf.floor(tf.random_uniform([], maxval=4.0)), tf.int64)
'''
def error(self, last_pred, r, next_pred):
with tf.name_scope('loss_function') as scope:
y = tf.add(r, tf.mul(self.discount_factor, next_pred)) #target
return tf.square(tf.sub(y, last_pred)) #squared difference error
def learn(self, loss): #Update parameters using stochastic gradient descent
#TODO: Either figure out how to avoid computing the q-prediction twice or just hardcode the gradients.
with tf.name_scope('train') as scope:
return tf.train.GradientDescentOptimizer(self.learning_rate).minimize(loss, var_list=[self.W[0], self.W[1], self.b[0], self.b[1]])
def max_q(self, q_preds):
with tf.name_scope('greedy_estimate') as scope:
return tf.reduce_max(q_preds) #best predicted action from current state
def act_to_pred(self, a, preds): #get the value prediction for action a
with tf.name_scope('get_prediction') as scope:
return tf.slice(preds, tf.reshape(a, shape=[1]), [1])
def agent_end(self,reward):
pass
def agent_cleanup(self):
self.sess.close()
pass
def agent_message(self,inMessage):
if inMessage=="what is your name?":
return "my name is simple_agent";
else:
return "I don't know how to respond to your message";
if __name__=="__main__":
AgentLoader.loadAgent(simple_agent())