Подтвердить что ты не робот

Проблемы классификации изображений в режиме реального времени Python с Neural Networks

Я пытаюсь использовать caffe и python для классификации изображений в режиме реального времени. Я использую OpenCV для потоковой передачи с веб-камеры в одном процессе и в отдельном процессе, используя caffe для выполнения классификации изображений на фреймах, вытащенных из веб-камеры. Затем я передаю результат классификации обратно в основной поток, чтобы заголовок потока веб-камеры.

Проблема в том, что, хотя у меня есть графический процессор NVIDIA и я выполняю предсказания caffe на графическом процессоре, основной поток отключается. Обычно без каких-либо прогнозов поток веб-камеры работает со скоростью 30 кадров в секунду; однако с прогнозами мой поток веб-камеры достигает в лучшем случае 15 кадров в секунду.

Я проверил, что кофе действительно использует GPU при выполнении предсказаний и что моя память GPU или GPU не максимизируется. Я также подтвердил, что мои ядра процессора не получают максимальную отдачу в любой момент во время программы. Мне интересно, не делаю ли я что-то не так, или если нет способа, чтобы эти 2 процесса действительно были отделены друг от друга. Любые советы приветствуются. Вот мой код для справки

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue
        #other initialization stuff

    def run(self):
        caffe.set_mode_gpu()
        caffe.set_device(0)
        #Load caffe net -- code omitted 
        while True:
            image = self.task_queue.get()
            #crop image -- code omitted
            text = net.predict(image)
            self.result_queue.put(text)

        return

import cv2
import caffe
import multiprocessing
import Queue 

tasks = multiprocessing.Queue()
results = multiprocessing.Queue()
consumer = Consumer(tasks,results)
consumer.start()

#Creating window and starting video capturer from camera
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
#Try to get the first frame
if vc.isOpened():
    rval, frame = vc.read()
else:
    rval = False
frame_copy[:] = frame
task_empty = True
while rval:
    if task_empty:
       tasks.put(frame_copy)
       task_empty = False
    if not results.empty():
       text = results.get()
       #Add text to frame
       cv2.putText(frame,text)
       task_empty = True

    #Showing the frame with all the applied modifications
    cv2.imshow("preview", frame)

    #Getting next frame from camera
    rval, frame = vc.read()
    frame_copy[:] = frame
    #Getting keyboard input 
    key = cv2.waitKey(1)
    #exit on ESC
    if key == 27:
        break

Я уверен, что предсказание caffe замедляет все, потому что, когда я комментирую предсказание и передаю фиктивный текст взад и вперед между процессами, я снова получаю 30 кадров в секунду.

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue
        #other initialization stuff

    def run(self):
        caffe.set_mode_gpu()
        caffe.set_device(0)
        #Load caffe net -- code omitted
        while True:
            image = self.task_queue.get()
            #crop image -- code omitted
            #text = net.predict(image)
            text = "dummy text"
            self.result_queue.put(text)

        return

import cv2
import caffe
import multiprocessing
import Queue 

tasks = multiprocessing.Queue()
results = multiprocessing.Queue()
consumer = Consumer(tasks,results)
consumer.start()

#Creating window and starting video capturer from camera
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
#Try to get the first frame
if vc.isOpened():
    rval, frame = vc.read()
else:
    rval = False
frame_copy[:] = frame
task_empty = True
while rval:
    if task_empty:
       tasks.put(frame_copy)
       task_empty = False
    if not results.empty():
       text = results.get()
       #Add text to frame
       cv2.putText(frame,text)
       task_empty = True

    #Showing the frame with all the applied modifications
    cv2.imshow("preview", frame)

    #Getting next frame from camera
    rval, frame = vc.read()
    frame_copy[:] = frame
    #Getting keyboard input 
    key = cv2.waitKey(1)
    #exit on ESC
    if key == 27:
        break
4b9b3361

Ответ 1

Некоторые пояснения и некоторые рекомендации:

  • Я запустил свой код ниже на ноутбуке с Intel Core i5-6300HQ @2.3GHz cpu, 8 GB RAM и NVIDIA GeForce GTX 960M gpu (память 2 ГБ), и результат был:

    Выполнял ли я код с запуском caffe или без него (комментируя или не net_output = this->net_->Forward(net_input) и некоторые необходимые вещи в void Consumer::entry()), я мог бы всегда получать около 30 кадров в секунду в основном потоке.

    Аналогичный результат был получен на ПК с Intel Core i5-4440 cpu, 8 GB RAM, NVIDIA GeForce GT 630 gpu (1 ГБ памяти).

  • Я запустил код @user3543300 в вопросе на том же ноутбуке, результатом был:

    Будь ли кофе работает (на gpu) или нет, я мог бы также получить около 30 кадров в секунду.

  • Согласно @user3543300, с двумя версиями кода, упомянутыми выше, @user3543300 может получить только около 15 кадров в секунду при запуске caffe (на Nvidia GeForce 940MX GPU and Intel® Core™ i7-6500U CPU @ 2.50GHz × 4 ноутбук). Также произойдет замедление частоты кадров веб-камеры, когда caffe работает на gpu в качестве независимой программы.

Итак, я все еще думаю, что проблема может быть скорее всего связана с ограничениями ввода-вывода оборудования, такими как пропускная способность DMA (этот поток о DMA может подсказка.) или пропускная способность ОЗУ. Надежда @user3543300 может проверить это или выяснить истинную проблему, о которой я не знал.

Если проблема в том, о чем я думаю выше, тогда разумная мысль заключалась бы в сокращении объема ввода-вывода памяти, введенного сетью CNN. Фактически, для решения подобной проблемы на встраиваемых системах с ограниченными аппаратными ресурсами было проведено некоторое исследование по этой теме, например. Qautification Структурно разреженные глубокие нейронные сети, SqueezeNet, Deep-Compression. Так что, надеюсь, это также поможет улучшить частоту кадров веб-камеры в вопросе, применяя такие навыки.


Исходный ответ:

Попробуйте это С++-решение. Он использует потоки для служебных данных ввода-вывода в вашей задаче, я протестировал ее с помощью bvlc_alexnet.caffemodel, deploy.prototxt, чтобы сделать классификацию изображений и не видел очевидного замедления основного потока (поток веб-камеры), когда работает caffe (на GPU):

#include <stdio.h>
#include <iostream>
#include <string>
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include "caffe/caffe.hpp"
#include "caffe/util/blocking_queue.hpp"
#include "caffe/data_transformer.hpp"
#include "opencv2/opencv.hpp"

using namespace cv;

//Queue pair for sharing image/results between webcam and caffe threads
template<typename T>
class QueuePair {
  public:
    explicit QueuePair(int size);
    ~QueuePair();

    caffe::BlockingQueue<T*> free_;
    caffe::BlockingQueue<T*> full_;

  DISABLE_COPY_AND_ASSIGN(QueuePair);
};
template<typename T>
QueuePair<T>::QueuePair(int size) {
  // Initialize the free queue
  for (int i = 0; i < size; ++i) {
    free_.push(new T);
  }
}
template<typename T>
QueuePair<T>::~QueuePair(){
  T *data;
  while (free_.try_pop(&data)){
    delete data;
  }
  while (full_.try_pop(&data)){
    delete data;
  }
}
template class QueuePair<Mat>;
template class QueuePair<std::string>;

//Do image classification(caffe predict) using a subthread
class Consumer{
  public:
    Consumer(boost::shared_ptr<QueuePair<Mat>> task
           , boost::shared_ptr<QueuePair<std::string>> result);
    ~Consumer();
    void Run();
    void Stop();
    void entry(boost::shared_ptr<QueuePair<Mat>> task
             , boost::shared_ptr<QueuePair<std::string>> result);

  private:
    bool must_stop();

    boost::shared_ptr<QueuePair<Mat> > task_q_;
    boost::shared_ptr<QueuePair<std::string> > result_q_;

    //caffe::Blob<float> *net_input_blob_;
    boost::shared_ptr<caffe::DataTransformer<float> > data_transformer_;
    boost::shared_ptr<caffe::Net<float> > net_;
    std::vector<std::string> synset_words_;
    boost::shared_ptr<boost::thread> thread_;
};
Consumer::Consumer(boost::shared_ptr<QueuePair<Mat>> task
                 , boost::shared_ptr<QueuePair<std::string>> result) :
 task_q_(task), result_q_(result), thread_(){

  //for data preprocess
  caffe::TransformationParameter trans_para;
  //set mean
  trans_para.set_mean_file("/path/to/imagenet_mean.binaryproto");
  //set crop size, here is cropping 227x227 from 256x256
  trans_para.set_crop_size(227);
  //instantiate a DataTransformer using trans_para for image preprocess
  data_transformer_.reset(new caffe::DataTransformer<float>(trans_para
                        , caffe::TEST));

  //initialize a caffe net
  net_.reset(new caffe::Net<float>(std::string("/path/to/deploy.prototxt")
           , caffe::TEST));
  //net parameter
  net_->CopyTrainedLayersFrom(std::string("/path/to/bvlc_alexnet.caffemodel"));

  std::fstream synset_word("path/to/caffe/data/ilsvrc12/synset_words.txt");
  std::string line;
  if (!synset_word.good()){
    std::cerr << "synset words open failed!" << std::endl;
  }
  while (std::getline(synset_word, line)){
    synset_words_.push_back(line.substr(line.find_first_of(' '), line.length()));
  }
  //a container for net input, holds data converted from cv::Mat
  //net_input_blob_ = new caffe::Blob<float>(1, 3, 227, 227);
}
Consumer::~Consumer(){
  Stop();
  //delete net_input_blob_;
}
void Consumer::entry(boost::shared_ptr<QueuePair<Mat>> task
    , boost::shared_ptr<QueuePair<std::string>> result){

  caffe::Caffe::set_mode(caffe::Caffe::GPU);
  caffe::Caffe::SetDevice(0);

  cv::Mat *frame;
  cv::Mat resized_image(256, 256, CV_8UC3);
  cv::Size re_size(resized_image.cols, resized_image.rows);

  //for caffe input and output
  const std::vector<caffe::Blob<float> *> net_input = this->net_->input_blobs();
  std::vector<caffe::Blob<float> *> net_output;

  //net_input.push_back(net_input_blob_);
  std::string *res;

  int pre_num = 1;
  while (!must_stop()){
    std::stringstream result_strm;
    frame = task->full_.pop();
    cv::resize(*frame, resized_image, re_size, 0, 0, CV_INTER_LINEAR);
    this->data_transformer_->Transform(resized_image, *net_input[0]);
    net_output = this->net_->Forward();
    task->free_.push(frame);

    res = result->free_.pop();
    //Process results here
    for (int i = 0; i < pre_num; ++i){
      result_strm << synset_words_[net_output[0]->cpu_data()[i]] << " " 
                  << net_output[0]->cpu_data()[i + pre_num] << "\n";
    }
    *res = result_strm.str();
    result->full_.push(res);
  }
}

void Consumer::Run(){
  if (!thread_){
    try{
      thread_.reset(new boost::thread(&Consumer::entry, this, task_q_, result_q_));
    }
    catch (std::exception& e) {
      std::cerr << "Thread exception: " << e.what() << std::endl;
    }
  }
  else
    std::cout << "Consumer thread may have been running!" << std::endl;
};
void Consumer::Stop(){
  if (thread_ && thread_->joinable()){
    thread_->interrupt();
    try {
      thread_->join();
    }
    catch (boost::thread_interrupted&) {
    }
    catch (std::exception& e) {
      std::cerr << "Thread exception: " << e.what() << std::endl;
    }
  }
}
bool Consumer::must_stop(){
  return thread_ && thread_->interruption_requested();
}


int main(void)
{
  int max_queue_size = 1000;
  boost::shared_ptr<QueuePair<Mat>> tasks(new QueuePair<Mat>(max_queue_size));
  boost::shared_ptr<QueuePair<std::string>> results(new QueuePair<std::string>(max_queue_size));

  char str[100], info_str[100] = " results: ";
  VideoCapture vc(0);
  if (!vc.isOpened())
    return -1;

  Consumer consumer(tasks, results);
  consumer.Run();

  Mat frame, *frame_copy;
  namedWindow("preview");
  double t, fps;

  while (true){
    t = (double)getTickCount();
    vc.read(frame);

    if (waitKey(1) >= 0){
      consuer.Stop();
      break;
    }

    if (tasks->free_.try_peek(&frame_copy)){
      frame_copy = tasks->free_.pop();
      *frame_copy = frame.clone();
      tasks->full_.push(frame_copy);
    }
    std::string *res;
    std::string frame_info("");
    if (results->full_.try_peek(&res)){
      res = results->full_.pop();
      frame_info = frame_info + info_str;
      frame_info = frame_info + *res;
      results->free_.push(res);
    }    

    t = ((double)getTickCount() - t) / getTickFrequency();
    fps = 1.0 / t;

    sprintf(str, " fps: %.2f", fps);
    frame_info = frame_info + str;

    putText(frame, frame_info, Point(5, 20)
         , FONT_HERSHEY_SIMPLEX, 0.5, Scalar(0, 255, 0));
    imshow("preview", frame);
  }
}

И в src/caffe/util/blocking_queue.cpp сделайте небольшое изменение ниже и перестройте caffe:

...//Other stuff
template class BlockingQueue<Batch<float>*>;
template class BlockingQueue<Batch<double>*>;
template class BlockingQueue<Datum*>;
template class BlockingQueue<shared_ptr<DataReader::QueuePair> >;
template class BlockingQueue<P2PSync<float>*>;
template class BlockingQueue<P2PSync<double>*>;
//add these 2 lines below
template class BlockingQueue<cv::Mat*>;
template class BlockingQueue<std::string*>;

Ответ 2

Похоже, что оболочка caffe python блокирует Global Interpreter Lock (GIL). Таким образом, вызов любой команды python caffe блокирует ВСЕ потоки python.

Обходной путь (на свой страх и риск) должен был бы отключить GIL для определенных функций caffe. Например, если вы хотите иметь возможность запускать forward без блокировки, вы можете редактировать $CAFFE_ROOT/python/caffe/_caffe.cpp. Добавьте эту функцию:

void Net_Forward(Net<Dtype>& net, int start, int end) {
  Py_BEGIN_ALLOW_THREADS;   // <-- disable GIL
  net.ForwardFromTo(start, end);
  Py_END_ALLOW_THREADS;     // <-- restore GIL
}

И замените .def("_forward", &Net<Dtype>::ForwardFromTo) с помощью:

.def("_forward", &Net_Forward)

Не забывайте make pycaffe после изменения.

Подробнее см. этот.

Ответ 3

В вашем коде может случиться, что он работает в режиме gpu для первого вызова, а в последующих вызовах он вычисляет классификацию в режиме cpu как режим по умолчанию. В более ранней версии caffe set gpu mode for once было достаточно, теперь более новой версии он должен каждый раз устанавливать режим. Вы можете попробовать следующие изменения:

def run(self):

        #Load caffe net -- code omitted 
        while True:
            caffe.set_mode_gpu()
            caffe.set_device(0)
            image = self.task_queue.get()
            #crop image -- code omitted
            text = net.predict(image)
            self.result_queue.put(text)

        return

Также обратите внимание на тайминги gpu во время работы потребительского потока. Вы можете использовать следующую команду для nvidia:

nvidia-smi

Выше команда покажет вам использование gpu во время выполнения.

Если он не решает другое решение, сделайте код извлечения кадра opencv под потоком. Поскольку это связано с доступом ввода/вывода и устройства, вы можете получить выгоду от его запуска в отдельном потоке из потока/основного потока GUI. Этот поток будет толкать кадры в очереди и прогнозировать текущий потребительский поток. В этом случае тщательно обрабатывайте очередь с критическим блоком.

Ответ 4

Попробуйте использовать многопоточный подход вместо многопроцессорной обработки. Процессы нереста медленнее, чем нереста в потоки. Когда они работают, нет большой разницы. В вашем случае я думаю, что подход к потоку будет полезен, поскольку задействовано так много кадров.