Подтвердить что ты не робот

Как выполнить предварительную выборку данных с помощью пользовательской функции python в тензорном потоке

Я пытаюсь выполнить предварительную выборку данных обучения, чтобы скрыть задержку ввода-вывода. Я хотел бы написать собственный код Python, который загружает данные с диска и препроцессы данных (например, путем добавления контекстного окна). Другими словами, один поток выполняет предварительную обработку данных, а другой - обучение. Возможно ли это в TensorFlow?

Обновление: у меня есть рабочий пример, основанный на примере @mrry.

import numpy as np
import tensorflow as tf
import threading

BATCH_SIZE = 5
TRAINING_ITERS = 4100

feature_input = tf.placeholder(tf.float32, shape=[128])
label_input = tf.placeholder(tf.float32, shape=[128])

q = tf.FIFOQueue(200, [tf.float32, tf.float32], shapes=[[128], [128]])
enqueue_op = q.enqueue([label_input, feature_input])

label_batch, feature_batch = q.dequeue_many(BATCH_SIZE)
c = tf.reshape(feature_batch, [BATCH_SIZE, 128]) + tf.reshape(label_batch, [BATCH_SIZE, 128])

sess = tf.Session()

def load_and_enqueue(sess, enqueue_op, coord):
  with open('dummy_data/features.bin') as feature_file, open('dummy_data/labels.bin') as label_file:
    while not coord.should_stop():
      feature_array = np.fromfile(feature_file, np.float32, 128)
      if feature_array.shape[0] == 0:
        print('reach end of file, reset using seek(0,0)')
        feature_file.seek(0,0)
        label_file.seek(0,0)
        continue
      label_value = np.fromfile(label_file, np.float32, 128)

      sess.run(enqueue_op, feed_dict={feature_input: feature_array,
                                      label_input: label_value})

coord = tf.train.Coordinator()
t = threading.Thread(target=load_and_enqueue, args=(sess,enqueue_op, coord))
t.start()

for i in range(TRAINING_ITERS):
  sum = sess.run(c)
  print('train_iter='+str(i))
  print(sum)

coord.request_stop()
coord.join([t])
4b9b3361

Ответ 1

Это распространенный прецедент, и большинство реализаций используют очереди TensorFlow, чтобы отделить код предварительной обработки от кода обучения. Существует учебник о том, как использовать очереди, но основные шаги заключаются в следующем:

  • Определите очередь, q, которая будет буферизовать предварительно обработанные данные. TensorFlow поддерживает простой tf.FIFOQueue, который создает элементы в том порядке, в котором они были установлены, и более продвинутый tf.RandomShuffleQueue, который создает элементы в произвольном порядке. Элемент очереди представляет собой набор из одного или нескольких тензоров (которые могут иметь разные типы и формы). Все очереди поддерживают операции с одним элементом (enqueue, dequeue) и batch (enqueue_many, dequeue_many), но для использования пакетных операций вы должны указывать формы каждого тензора в элементе очереди при построении очереди.

  • Создайте подграф, который ставит в очередь предварительно обработанные элементы. Один из способов сделать это - определить некоторые tf.placeholder() ops для тензоров, соответствующих одному примеру ввода, затем передать их в q.enqueue(). (Если ваша предварительная обработка производит партию одновременно, вы должны использовать q.enqueue_many().) Вы также можете включить операции TensorFlow на этом подграфе.

  • Создайте подграф, который выполняет обучение. Это будет выглядеть как обычный график TensorFlow, но получит его вклад, вызвав q.dequeue_many(BATCH_SIZE).

  • Запустите сеанс.

  • Создайте один или несколько потоков, которые выполняют вашу логику предварительной обработки, а затем выполните опцию enqueue op, подавая предварительно обработанные данные. Вы можете найти tf.train.Coordinator и tf.train.QueueRunner классы, полезные для этого.

  • Запустите свой учебный график (оптимизатор и т.д.) как обычно.

РЕДАКТИРОВАТЬ: Здесь простая функция load_and_enqueue() и фрагмент кода, чтобы вы начали:

# Features are length-100 vectors of floats
feature_input = tf.placeholder(tf.float32, shape=[100])
# Labels are scalar integers.
label_input = tf.placeholder(tf.int32, shape=[])

# Alternatively, could do:
# feature_batch_input = tf.placeholder(tf.float32, shape=[None, 100])
# label_batch_input = tf.placeholder(tf.int32, shape=[None])

q = tf.FIFOQueue(100, [tf.float32, tf.int32], shapes=[[100], []])
enqueue_op = q.enqueue([feature_input, label_input])

# For batch input, do:
# enqueue_op = q.enqueue_many([feature_batch_input, label_batch_input])

feature_batch, label_batch = q.dequeue_many(BATCH_SIZE)
# Build rest of model taking label_batch, feature_batch as input.
# [...]
train_op = ...

sess = tf.Session()

def load_and_enqueue():
  with open(...) as feature_file, open(...) as label_file:
    while True:
      feature_array = numpy.fromfile(feature_file, numpy.float32, 100)
      if not feature_array:
        return
      label_value = numpy.fromfile(feature_file, numpy.int32, 1)[0]

      sess.run(enqueue_op, feed_dict={feature_input: feature_array,
                                      label_input: label_value})

# Start a thread to enqueue data asynchronously, and hide I/O latency.
t = threading.Thread(target=load_and_enqueue)
t.start()

for _ in range(TRAINING_EPOCHS):
  sess.run(train_op)

Ответ 2

Другими словами, один поток выполняет предварительную обработку данных, а другой - обучение. Возможно ли это в TensorFlow?

Да, это так. mrry работает, но проще.

Получение данных

tf.py_func обертывает функцию python и использует ее как оператор TensorFlow. Таким образом, мы можем каждый раз загружать данные в sess.run(). Проблема с этим подходом заключается в том, что данные загружаются во время sess.run() через основной поток.

Минимальный пример:

def get_numpy_tensor():
  return np.array([[1,2],[3,4]], dtype=np.float32)
tensorflow_tensor = tf.py_func(get_numpy_tensor, [], tf.float32)

Более сложный пример:

def get_numpy_tensors():
  # Load data from the disk into numpy arrays.
  input = np.array([[1,2],[3,4]], dtype=np.float32)
  target = np.int32(1)
  return input, target
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32])

tensorflow_input, tensorflow_target = 2*tensorflow_input, 2*tensorflow_target

sess = tf.InteractiveSession()
numpy_input, numpy_target = sess.run([tensorflow_input, tensorflow_target])
assert np.all(numpy_input==np.array([[2,4],[6,8]])) and numpy_target==2

Предварительная выборка данных в другом потоке

Чтобы оцифровать наши данные в другом потоке (чтобы sess.run() не нужно было ждать данных), мы можем использовать tf.train.batch() на наших операторах из tf.py_func().

Минимальный пример:

tensor_shape = get_numpy_tensor().shape
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32, shapes=[tensor_shape])
# Run `tf.train.start_queue_runners()` once session is created.

Мы можем опустить аргумент shapes, если tensorflow_tensor имеет свою форму:

tensor_shape = get_numpy_tensor().shape
tensorflow_tensor.set_shape(tensor_shape)
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32)
# Run `tf.train.start_queue_runners()` once session is created.

Более сложный пример:

input_shape, target_shape = (2, 2), ()
def get_numpy_tensors():
  input = np.random.rand(*input_shape).astype(np.float32)
  target = np.random.randint(10, dtype=np.int32)
  print('f', end='')
  return input, target
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32])
batch_size = 2
tensorflow_inputs, tensorflow_targets = tf.train.batch([tensorflow_input, tensorflow_target], batch_size, shapes=[input_shape, target_shape], capacity=2)
# Internal queue will contain at most `capasity=2` times `batch_size=2` elements `[tensorflow_input, tensorflow_target]`.

tensorflow_inputs, tensorflow_targets = 2*tensorflow_inputs, 2*tensorflow_targets

sess = tf.InteractiveSession()
tf.train.start_queue_runners() # Internally, `tf.train.batch` uses a QueueRunner, so we need to ask tf to start it.
for _ in range(10):
  numpy_inputs, numpy_targets = sess.run([tensorflow_inputs, tensorflow_targets])
  assert numpy_inputs.shape==(batch_size, *input_shape) and numpy_targets.shape==(batch_size, *target_shape)
  print('r', end='')

# Prints `fffffrrffrfrffrffrffrffrffrffrf`.

В случае, если get_numpy_tensor() возвращает партию тензоров, то tf.train.batch(..., enqueue_many=True) поможет.