Подтвердить что ты не робот

Tensorflow: загрузка пользовательских данных + асинхронное вычисление

Это то, как я полагаю, пропущен из примеров TF.

Задача:

  • выборки для каждого класса приведены в отдельном каталоге, и, следовательно, метки непрямо (т.е. по dir)
  • развязанные нагрузки и вычисления в TF

Каждый отдельный бит можно найти, но я думаю, что все они вместе в одном месте помогут сэкономить много времени для начинающих TF (например, я).

Давайте рассмотрим 1., в моем случае это два набора изображений:

# all filenames for .jpg in dir 
#  - list of fnames
#  - list of labels 
def path_fnames(f_path, label, ext = ['.jpg', '.jpeg']):
    f_n = [f_path+'/'+f for f in sorted(os.listdir(f_path)) if os.path.splitext(f)[1].lower() in ext]
    f_l = [label] * len(f_n)
    return f_n, f_l
#     
def dense_to_one_hot(labels_dense, num_classes=10, dtype=np.float32):
    """Convert class labels from scalars to one-hot vectors."""
    num_labels     = labels_dense.shape[0]
    index_offset   = np.arange(num_labels) * num_classes
    labels_one_hot = np.zeros((num_labels, num_classes),dtype=dtype)
    labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
    return labels_one_hot

data_dir = '/mnt/dataset/'
dir_1   = '/class_1'
dir_2   = '/class_2'

# --- get filenames for data ---
dpath = [data_dir+dir_1, data_dir+dir_2]

f_n1, f_l1 = path_fnames(dpath[0], 0)
f_n2, f_l2 = path_fnames(dpath[1], 1)

# --- create one-hot labels ---
ohl    = dense_to_one_hot(np.asarray(f_l1+f_l2), num_classes=2, dtype = np.float32)
fnames = f_n1+f_n2;               # one-hot labels created in this sequence

Теперь у нас есть все имена файлов и однострочные метки, предварительно загруженные.

Переместится на 2.

Он основан на Как предварительно очистить данные с помощью пользовательской функции python в тензорном потоке. Короче говоря:

  • пользовательский графический ридер (замените его)
  • queue fnl_q с [меткой имени файла], которая используется читателем для подачи
  • queue proc_q с [меткой образца], которая используется для обработки обработки some_op
  • которые выполняют read_op, чтобы получить [sample label] и enqueue_op, чтобы поместить пару в proc_q. Тема управляется tf.Coordinator
  • some_op, которые сначала получают данные из proc_q by dequeue_many() и остальные вычисления (также могут быть помещены в отдельный поток).

Примечания:

  • feature_read_op и label_read_op - два отдельных op.
  • Я использую sleep() для замедления и управления op - только для целей тестирования.
  • Я разделил части "подачи" и "вычисления" - в реальном случае просто запускайте их параллельно
print 'TF version:', tf.__version__
# --- params ----
im_s       = [30, 30, 1]   # target image size
BATCH_SIZE = 16

# image reader 
# - fnl_queue: queue with [fn l] pairs 
# Notes 
# - to resize:  image_tensor = tf.image.resize_image_with_crop_or_pad(image_tensor, HEIGHT, WIDTH)
# - how about image preprocessing?
def img_reader_jpg(fnl_queue, ch = 3, keep = False):
    fn, label = fnl_queue.dequeue()

    if keep:
        fnl_queue.enqueue([fn, label])

    img_bytes = tf.read_file(fn)
    img_u8    = tf.image.decode_jpeg(img_bytes, channels=ch) 
    img_f32   = tf.cast(img_u8, tf.float32)/256.0  
    #img_4     = tf.expand_dims(img_f32,0)
    return img_f32, label

#  load [feature, label] and enqueue to processing queue
# - sess:             tf session 
# - sess:             tf Coordinator
# - [fr_op, lr_op ]:  feature_read_op label_read_op
# - enqueue_op:       [f l] pairs enqueue op
def load_and_enqueue(sess, coord, feature_read_op, label_read_op , enqueue_op):
    i = 0
    while not coord.should_stop():
        # for testing purpose
        time.sleep(0.1)                     
        #print 'load_and_enqueue i=',i
        #i = i +1

        feature, label = sess.run([feature_read_op, label_read_op ])

        feed_dict = {feature_input: feature,
                     label_input  : label}

        sess.run(enqueue_op, feed_dict=feed_dict)


# --- TF part ---

# filenames and labels are pre-loaded
fv = tf.constant(fnames)
lv = tf.constant(ohl)

#fnl_q    = tf.FIFOQueue(len(fnames), [tf.string, tf.float32])
fnl_q    = tf.RandomShuffleQueue(len(fnames), 0, [tf.string, tf.float32])
do_enq = fnl_q.enqueue_many([fv, lv])

# reading_op: feature_read_op label_read_op 
feature_read_op, label_read_op = img_reader_jpg(fnl_q, ch = im_s[2])

# samples queue
f_s = im_s
l_s = 2
feature_input = tf.placeholder(tf.float32, shape=f_s, name='feature_input')
label_input   = tf.placeholder(tf.float32, shape=l_s, name='label_input')

#proc_q     = tf.RandomShuffleQueue(len(fnames), 0, [tf.float32, tf.float32], shapes=[f_s, l_s])
proc_q     = tf.FIFOQueue(len(fnames), [tf.float32, tf.float32], shapes=[f_s, l_s])
enqueue_op = proc_q.enqueue([feature_input, label_input])

# test: 
# - some op
img_batch, lab_batch = proc_q.dequeue_many(BATCH_SIZE)
some_op   = [img_batch, lab_batch]

# service ops
init_op   = tf.initialize_all_variables()

# let run stuff
with tf.Session() as sess:

    sess.run(init_op)
    sess.run(do_enq)

    print "fnl_q.size:", fnl_q.size().eval()
    print "proc_q.size:", proc_q.size().eval()

    # --- test thread stuff ---
    #  - fill proc_q
    coord = tf.train.Coordinator()
    t = threading.Thread(target=load_and_enqueue, args = (sess, coord, feature_read_op, label_read_op , enqueue_op))
    t.start()

    time.sleep(2.1)

    coord.request_stop()
    coord.join([t])

    print "fnl_q.size:", fnl_q.size().eval()
    print "proc_q.size:", proc_q.size().eval()

    #  - process a bit 
    ss = sess.run(some_op)
    print 'ss[0].shape', ss[0].shape 
    print ' ss[1]:\n', ss[1]

    print "fnl_q.size:", fnl_q.size().eval()
    print "proc_q.size:", proc_q.size().eval() 

print 'ok'

Типичный выход:

TF version: 0.6.0

fnl_q.size: 1225
proc_q.size: 0

fnl_q.size: 1204
proc_q.size: 21

ss[0].shape (16, 30, 30, 1)
 ss[1]:
[[ 0.  1.]
 [ 1.  0.]
 [ 1.  0.]
 [ 0.  1.]
 [ 0.  1.]
 [ 1.  0.]
 [ 1.  0.]
 [ 0.  1.]
 [ 1.  0.]
 [ 0.  1.]
 [ 0.  1.]
 [ 1.  0.]
 [ 1.  0.]
 [ 0.  1.]
 [ 1.  0.]
 [ 0.  1.]]

fnl_q.size: 1204
proc_q.size: 5

ok  

Все как ожидалось

  • создана партия пар [образец метки]
  • пары перетасовываются

Осталось только применить TF, поскольку он предназначен для использования путем замены some_op:)

И вопрос: одна проблема - проблема, если я использую tf.FIFOQueue для имен файлов и tf.RandomShuffleQueue для образцов - перетасовка не происходит. Однако по-другому (как и код выше) он отлично перемешивается.

Любая проблема с перетасовкой для
tf.RandomShuffleQueue(len(fnames), 0, [tf.float32, tf.float32], shapes=[f_s, l_s])?


ADD: Версия с двумя потоками:

  • один для повторного заполнения/обновления/изменения очереди имен файлов
  • второй для заполнения выборок в очереди обработки.

Также добавлен правильный способ остановки потоков.

def load_and_enqueue(sess, coord, feature_read_op, label_read_op , enqueue_op):
    try:
        while not coord.should_stop():
            feature, label = sess.run([feature_read_op, label_read_op ])
            feed_dict = {feature_input: feature,
                         label_input  : label}
            sess.run(enqueue_op, feed_dict=feed_dict)
    except Exception as e:
        return


# periodically check the state of fnl queue and if needed refill it
#  - enqueue_op: 'refill' file-name_label queue 
def enqueue_fnl(sess, coord, fnl_q, enqueue_op):
    try:
        while not coord.should_stop():
            time.sleep(0.5)
            s = sess.run(fnl_q.size())
            if  s < (9*BATCH_SIZE) :
                sess.run(enqueue_op)
    except Exception as e:
        return


#  -- ops for feed part --

# filenames and labels are pre-loaded
fv = tf.constant(fnames)
lv = tf.constant(ohl)

# read op
fnl_q      = tf.RandomShuffleQueue(len(fnames)*2, 0, [tf.string, tf.float32], name = 'fnl_q')  # add some margin for re-fill to fit
do_fnl_enq = fnl_q.enqueue_many([fv, lv])
feature_read_op, label_read_op = img_reader_jpg(fnl_q, ch = IMG_SIZE[2])

# samples queue
feature_input = tf.placeholder(tf.float32, shape=IMG_SIZE, name='feature_input')
label_input   = tf.placeholder(tf.float32, shape=LAB_SIZE, name='label_input')
proc_q        = tf.FIFOQueue(len(fnames)*3, [tf.float32, tf.float32], shapes=[IMG_SIZE, LAB_SIZE], name = 'fe_la_q') 
enqueue_op    = proc_q.enqueue([feature_input, label_input])

# -- ops for trainind end eval
img_batch, lab_batch = proc_q.dequeue_many(BATCH_SIZE)

... here is your model

loss       = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits, lab_ph))
optimizer  = tf.train.AdamOptimizer(1e-4).minimize(loss)

with tf.Session() as sess:

    coord = tf.train.Coordinator()
    t_le  = threading.Thread(target=load_and_enqueue, args = (sess, coord, feature_read_op, label_read_op , enqueue_op) , name = 'load_and_enqueue')
    t_re  = threading.Thread(target=enqueue_fnl, args = (sess, coord, fnl_q, do_fnl_enq), name = 'enqueue_fnl')  # re-enq thread i.e. refiling filename queue 
    t_le.start()
    t_re.start()

    try:
    # training
    for step in xrange(823):
        # some proc
        img_v, lab_v = sess.run([img_batch, lab_batch])
        feed_dict = { img_ph   : img_v,
              lab_ph   : lab_v,
              keep_prob: 0.7}
        _, loss_v = sess.run([optimizer, loss], feed_dict = feed_dict)

    except Exception as e:
    print 'Training: Exception:', e


    # stop threads 
    coord.request_stop()                                     # ask to stop
    sess.run(fnl_q.close(cancel_pending_enqueues=True))      # tell proc_q don't wait for enque anymore
    sess.run(proc_q.close(cancel_pending_enqueues=True))     # tell proc_q don't wait for enque anymore
    coord.join([t_le, t_re], stop_grace_period_secs=8)       
4b9b3361