Подтвердить что ты не робот

Использует std:: async много раз для небольших задач, дружественных к производительности?

Чтобы предоставить некоторую справочную информацию, я обрабатываю сохраненный файл, и после использования регулярного выражения для разбиения файла на его объекты-объекты мне необходимо обработать данные объекта на основе того, какой тип объекта он есть.

Моя нынешняя мысль состоит в том, чтобы использовать parallelism, чтобы получить немного прироста производительности, так как загрузка каждого объекта не зависит друг от друга. Поэтому я собирался определить функцию LoadObject, принимающую std::string для каждого типа объекта, который я буду обрабатывать, а затем вызывая std::async следующим образом:

void LoadFromFile( const std::string& szFileName )
{
     static const std::regex regexObject( "=== ([^=]+) ===\\n((?:.|\\n)*)\\n=== END \\1 ===", std::regex_constants::ECMAScript | std::regex_constants::optimize );

     std::ifstream inFile( szFileName );
     inFile.exceptions( std::ifstream::failbit | std::ifstream::badbit );

     std::string szFileData( (std::istreambuf_iterator<char>(inFile)), (std::istreambuf_iterator<char>()) );

     inFile.close();

     std::vector<std::future<void>> vecFutures;

     for( std::sregex_iterator itObject( szFileData.cbegin(), szFileData.cend(), regexObject ), end; itObject != end; ++itObject )
     {
          // Determine what type of object we're loading:
          if( (*itObject)[1] == "Type1" )
          {
               vecFutures.emplace_back( std::async( LoadType1, (*itObject)[2].str() ) );
          }
          else if( (*itObject)[1] == "Type2" )
          {
               vecFutures.emplace_back( std::async( LoadType2, (*itObject)[2].str() ) );
          }
          else
          {
               throw std::runtime_error( "Unexpected type encountered whilst reading data file." );
          }
     }

     // Make sure all our tasks completed:
     for( auto& future : vecFutures )
     {
           future.get();
     }
}

Обратите внимание, что в приложении будет более двух типов (это был только короткий пример) и потенциально тысячи объектов в файле, который нужно прочитать.

Я знаю, что создание слишком большого количества потоков часто является плохим для производительности, когда оно превосходит максимальное аппаратное обеспечение concurrency из-за контекстных переключателей, но если моя память служит мне правильно, среда выполнения С++ должна отслеживать количество потоков создал и запланировал std::async соответствующим образом (я полагаю, что в случае Microsoft их библиотека ConcRT несет ответственность за это?), поэтому приведенный выше код может по-прежнему приводить к улучшению производительности?

Спасибо заранее!

4b9b3361

Ответ 1

среда выполнения С++ должна отслеживать количество созданных потоков и правильно настроить std:: async

Нет. Если асинхронные задачи фактически выполняются асинхронно (а не отложено), то все, что требуется, это то, что они запускаются, как если бы они были в новом потоке. Это совершенно верно для нового потока, который будет создан и запущен для каждой задачи, без какой-либо ограниченной аппаратной возможности для parallelism.

Есть примечание:

[Примечание. Если эта политика указана вместе с другими политиками, например, при использовании значения политики запуска:: async | запуск:: отложено,   реализации должны отложить вызов или выбрать политику   когда не более concurrency можно эффективно использовать. -end note]

Однако это ненормативно, и в любом случае это указывает на то, что когда-либо более concurrency может быть использовано, задачи могут откладываться и, следовательно, выполняться, когда кто-то ждет результата, а не остается асинхронным и запущенным сразу после завершения одной из предыдущих асинхронных задач, что было бы желательно для максимального parallelism.

То есть, если у нас есть 10 длительных задач, и реализация может выполнять только 4 параллельно, то первые 4 будут асинхронными, а затем последние 6 могут быть отложены. Ожидание будущих фьючерсов будет выполнять отложенные задачи по одному потоку в последовательности, исключая параллельное выполнение для этих задач.

В примечании также говорится, что вместо отложенного вызова выбор политики может быть отложен. То есть функция все еще может выполняться асинхронно, но это решение может быть отложено, скажем, до тех пор, пока одна из ранних задач не завершится, освободив ядро ​​для новой задачи. Но опять же, это не требуется, нота является ненормативной, и насколько я знаю, реализация Microsoft является единственной, которая ведет себя таким образом. Когда я посмотрел на другую реализацию libС++, она просто полностью игнорирует эту заметку, так что использование политик std::launch::async или std::launch::any приводит к асинхронному выполнению нового потока.

(Я считаю, что в случае Microsoft их библиотека ConcRT несет ответственность за это?)

Реализация Microsoft действительно ведет себя так, как вы описываете, но это не требуется, и переносная программа не может полагаться на это поведение.

Один из способов портативного ограничения того, сколько потоков фактически выполняется, - использовать что-то вроде семафора:

#include <future>
#include <mutex>
#include <cstdio>

// a semaphore class
//
// All threads can wait on this object. When a waiting thread
// is woken up, it does its work and then notifies another waiting thread.
// In this way only n threads will be be doing work at any time.
// 
class Semaphore {
private:
    std::mutex m;
    std::condition_variable cv;
    unsigned int count;

public:
    Semaphore(int n) : count(n) {}
    void notify() {
        std::unique_lock<std::mutex> l(m);
        ++count;
        cv.notify_one();
    }
    void wait() {
        std::unique_lock<std::mutex> l(m);
        cv.wait(l, [this]{ return count!=0; });
        --count;
    }
};

// an RAII class to handle waiting and notifying the next thread
// Work is done between when the object is created and destroyed
class Semaphore_waiter_notifier {
    Semaphore &s;
public:
    Semaphore_waiter_notifier(Semaphore &s) : s{s} { s.wait(); }
    ~Semaphore_waiter_notifier() { s.notify(); }
};

// some inefficient work for our threads to do
int fib(int n) {
    if (n<2) return n;
    return fib(n-1) + fib(n-2);
}

// for_each algorithm for iterating over a container but also
// making an integer index available.
//
// f is called like f(index, element)
template<typename Container, typename F>
F for_each(Container &c, F f) {
    Container::size_type i = 0;
    for (auto &e : c)
        f(i++, e);
    return f;
}

// global semaphore so that lambdas don't have to capture it
Semaphore thread_limiter(4);

int main() {
    std::vector<int> input(100);
    for_each(input, [](int i, int &e) { e = (i%10) + 35; });

    std::vector<std::future<int>> output;
    for_each(input, [&output](int i, int e) {
        output.push_back(std::async(std::launch::async, [] (int task, int n) -> int {
            Semaphore_waiter_notifier w(thread_limiter);
            std::printf("Starting task %d\n", task);
            int res = fib(n);
            std::printf("\t\t\t\t\t\tTask %d finished\n", task);
            return res;
        }, i, e));
    });

    for_each(output, [](int i, std::future<int> &e) {
        std::printf("\t\t\tWaiting on task %d\n", i);
        int res = e.get();
        std::printf("\t\t\t\t\t\t\t\t\tTask %d result: %d\n", i, res);
    });
}