Подтвердить что ты не робот

Когда один рабочий поток терпит неудачу, как отменить оставшихся работников?

У меня есть программа, которая порождает несколько потоков, каждая из которых выполняет долговременную задачу. Затем основной поток ожидает, что все рабочие потоки будут объединены, будут собирать результаты и выходить.

Если ошибка возникает у одного из рабочих, я хочу, чтобы оставшиеся работники прекратили изящество, так что основной поток может выйти вскоре после этого.

Мой вопрос в том, как это сделать, когда реализация долгосрочной задачи обеспечивается библиотекой, код которой я не могу изменить.

Вот простой эскиз системы без обработки ошибок:

void threadFunc()
{
    // Do long-running stuff
}

void mainFunc()
{
    std::vector<std::thread> threads;

    for (int i = 0; i < 3; ++i) {
        threads.push_back(std::thread(&threadFunc));
    }

    for (auto &t : threads) {
        t.join();
    }
}

Если длительная функция выполняет цикл, и у меня есть доступ к коду, тогда выполнение можно прервать, просто проверив общий флаг "keep on running" в верхней части каждой итерации.

std::mutex mutex;
bool error;

void threadFunc()
{
    try {
        for (...) {
            {
                std::unique_lock<std::mutex> lock(mutex);
                if (error) {
                    break;
                }
            }
        }
    } catch (std::exception &) {
        std::unique_lock<std::mutex> lock(mutex);
        error = true;
    }
}

Теперь рассмотрим случай, когда долговременная операция предоставляется библиотекой:

std::mutex mutex;
bool error;

class Task
{
public:
    // Blocks until completion, error, or stop() is called
    void run();

    void stop();
};

void threadFunc(Task &task)
{
    try {
        task.run();
    } catch (std::exception &) {
        std::unique_lock<std::mutex> lock(mutex);
        error = true;
    }
}

В этом случае основной поток должен обработать ошибку и вызвать stop() on все еще выполняемые задачи. Таким образом, он не может просто ждать, пока каждый рабочий join() как в исходной реализации.

Подход, который я использовал до сих пор, заключается в том, чтобы разделить следующую структуру между основной поток и каждый рабочий:

struct SharedData
{
    std::mutex mutex;
    std::condition_variable condVar;
    bool error;
    int running;
}

Когда рабочий завершается успешно, он уменьшает счетчик running. Если исключение поймано, рабочий устанавливает флаг error. В обоих случаях это затем вызывает condVar.notify_one().

Затем основной поток ожидает переменную условия, просыпаясь, если error установлен или running достигает нуля. При просыпании основной поток вызывает stop() для всех задач, если error установлен.

Этот подход работает, но я считаю, что должно быть более чистое решение, использующее некоторые примитивов более высокого уровня в стандартной библиотеке concurrency. Можно кто предлагает улучшенную реализацию?

Вот полный код для моего текущего решения:

// main.cpp

#include <chrono>
#include <mutex>
#include <thread>
#include <vector>

#include "utils.h"

// Class which encapsulates long-running task, and provides a mechanism for aborting it
class Task
{
public:
    Task(int tidx, bool fail)
    :   tidx(tidx)
    ,   fail(fail)
    ,   m_run(true)
    {

    }

    void run()
    {
        static const int NUM_ITERATIONS = 10;

        for (int iter = 0; iter < NUM_ITERATIONS; ++iter) {
            {
                std::unique_lock<std::mutex> lock(m_mutex);
                if (!m_run) {
                    out() << "thread " << tidx << " aborting";
                    break;
                }
            }

            out() << "thread " << tidx << " iter " << iter;
            std::this_thread::sleep_for(std::chrono::milliseconds(100));

            if (fail) {
                throw std::exception();
            }
        }
    }

    void stop()
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_run = false;
    }

    const int tidx;
    const bool fail;

private:
    std::mutex m_mutex;
    bool m_run;
};

// Data shared between all threads
struct SharedData
{
    std::mutex mutex;
    std::condition_variable condVar;
    bool error;
    int running;

    SharedData(int count)
    :   error(false)
    ,   running(count)
    {

    }
};

void threadFunc(Task &task, SharedData &shared)
{
    try {
        out() << "thread " << task.tidx << " starting";

        task.run(); // Blocks until task completes or is aborted by main thread

        out() << "thread " << task.tidx << " ended";
    } catch (std::exception &) {
        out() << "thread " << task.tidx << " failed";

        std::unique_lock<std::mutex> lock(shared.mutex);
        shared.error = true;
    }

    {
        std::unique_lock<std::mutex> lock(shared.mutex);
        --shared.running;
    }

    shared.condVar.notify_one();
}

int main(int argc, char **argv)
{
    static const int NUM_THREADS = 3;

    std::vector<std::unique_ptr<Task>> tasks(NUM_THREADS);
    std::vector<std::thread> threads(NUM_THREADS);

    SharedData shared(NUM_THREADS);

    for (int tidx = 0; tidx < NUM_THREADS; ++tidx) {
        const bool fail = (tidx == 1);
        tasks[tidx] = std::make_unique<Task>(tidx, fail);
        threads[tidx] = std::thread(&threadFunc, std::ref(*tasks[tidx]), std::ref(shared));
    }

    {
        std::unique_lock<std::mutex> lock(shared.mutex);

        // Wake up when either all tasks have completed, or any one has failed
        shared.condVar.wait(lock, [&shared](){
            return shared.error || !shared.running;
        });

        if (shared.error) {
            out() << "error occurred - terminating remaining tasks";
            for (auto &t : tasks) {
                t->stop();
            }
        }
    }

    for (int tidx = 0; tidx < NUM_THREADS; ++tidx) {
        out() << "waiting for thread " << tidx << " to join";
        threads[tidx].join();
        out() << "thread " << tidx << " joined";
    }

    out() << "program complete";

    return 0;
}

Некоторые функции полезности определены здесь:

// utils.h

#include <iostream>
#include <mutex>
#include <thread>

#ifndef UTILS_H
#define UTILS_H

#if __cplusplus <= 201103L
// Backport std::make_unique from C++14
#include <memory>
namespace std {

template<typename T, typename ...Args>
std::unique_ptr<T> make_unique(
            Args&& ...args)
{
    return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}

} // namespace std
#endif // __cplusplus <= 201103L

// Thread-safe wrapper around std::cout
class ThreadSafeStdOut
{
public:
    ThreadSafeStdOut()
    :   m_lock(m_mutex)
    {

    }

    ~ThreadSafeStdOut()
    {
        std::cout << std::endl;
    }

    template <typename T>
    ThreadSafeStdOut &operator<<(const T &obj)
    {
        std::cout << obj;
        return *this;
    }

private:
    static std::mutex m_mutex;
    std::unique_lock<std::mutex> m_lock;
};

std::mutex ThreadSafeStdOut::m_mutex;

// Convenience function for performing thread-safe output
ThreadSafeStdOut out()
{
    return ThreadSafeStdOut();
}

#endif // UTILS_H
4b9b3361

Ответ 1

Я думал о вашей ситуации когда-то, и это может помочь вам. Вероятно, вы могли бы попробовать сделать несколько разных методов для достижения своей цели. Есть 2-3 варианта, которые могут быть использованы или комбинация всех трех. Я, как минимум, покажу первый вариант, который я все еще изучаю и пытаюсь освоить концепции специализированных шаблонов, а также использовать Lambdas.

  • Использование класса менеджера
  • Использование инкапсуляции специализации шаблонов
  • Использование Lambdas.

Псевдокод класса менеджера выглядит примерно так:

class ThreadManager {
private:
    std::unique_ptr<MainThread> mainThread_;
    std::list<std::shared_ptr<WorkerThread> lWorkers_;  // List to hold finished workers
    std::queue<std::shared_ptr<WorkerThread> qWorkers_; // Queue to hold inactive and waiting threads.
    std::map<unsigned, std::shared_ptr<WorkerThread> mThreadIds_; // Map to associate a WorkerThread with an ID value.
    std::map<unsigned, bool> mFinishedThreads_; // A map to keep track of finished and unfinished threads.

    bool threadError_; // Not needed if using exception handling
public:
    explicit ThreadManager( const MainThread& main_thread );

    void shutdownThread( const unsigned& threadId );
    void shutdownAllThreads();

    void addWorker( const WorkerThread& worker_thread );          
    bool isThreadDone( const unsigned& threadId );

    void spawnMainThread() const; // Method to start main thread work.

    void spawnWorkerThread( unsigned threadId, bool& error );

    bool getThreadError( unsigned& threadID ); // Returns True If Thread Encountered An Error and passes the ID of that thread, 

};

Только для демонстрационных целей я использовал значение bool, чтобы определить, не удалось ли поток для упрощения структуры, и, конечно же, это можно заменить на понравившиеся вам, если вы предпочитаете использовать исключения или недопустимые значения без знака и т.д.

Теперь для использования этого класса будет что-то вроде этого: Также обратите внимание, что класс этого типа будет считаться лучшим, если он был объектом типа Singleton, так как вы не хотели бы больше, чем 1 ManagerClass, так как вы работаете с общими указателями.

SomeClass::SomeClass( ... ) {
    // This class could contain a private static smart pointer of this Manager Class
    // Initialize the smart pointer giving it new memory for the Manager Class and by passing it a pointer of the Main Thread object

   threadManager_ = new ThreadManager( main_thread ); // Wouldn't actually use raw pointers here unless if you had a need to, but just shown for simplicity       
}

SomeClass::addThreads( ... ) {
    for ( unsigned u = 1, u <= threadCount; u++ ) {
         threadManager_->addWorker( some_worker_thread );
    }
}

SomeClass::someFunctionThatSpawnsThreads( ... ) {
    threadManager_->spawnMainThread();

    bool error = false;       
    for ( unsigned u = 1; u <= threadCount; u++ ) {
        threadManager_->spawnWorkerThread( u, error );

        if ( error ) { // This Thread Failed To Start, Shutdown All Threads
            threadManager->shutdownAllThreads();
        }
    }

    // If all threads spawn successfully we can do a while loop here to listen if one fails.
    unsigned threadId;
    while ( threadManager_->getThreadError( threadId ) ) {
         // If the function passed to this while loop returns true and we end up here, it will pass the id value of the failed thread.
         // We can now go through a for loop and stop all active threads.
         for ( unsigned u = threadID + 1; u <= threadCount; u++ ) {
             threadManager_->shutdownThread( u );
         }

         // We have successfully shutdown all threads
         break;
    }
}

Мне нравится дизайн класса manager, так как я использовал их в других проектах, и они пригождаются довольно часто, особенно при работе с базой кода, которая содержит много и несколько ресурсов, таких как рабочий движок игры, который имеет много активов, таких как как спрайты, текстуры, аудиофайлы, карты, игровые элементы и т.д. Использование класса менеджера помогает отслеживать и поддерживать все активы. Эта же концепция может быть применена к "Управление" активными, неактивными, ожидающими потоками и знает, как интуитивно обрабатывать и отключать все потоки должным образом. Я бы рекомендовал использовать ExceptionHandler, если ваша база кода и библиотеки поддерживают исключения, а также обработку исключений потоковой безопасности вместо передачи и использования ошибок для ошибок. Кроме того, наличие класса Logger хорош там, где оно может записываться в файл журнала и в консольное окно, чтобы дать явное сообщение о том, какая функция была выбрана, и что вызвало исключение, где сообщение журнала может выглядеть следующим образом:

Exception Thrown: someFunctionNamedThis in ThisFile on Line# (x)
    threadID 021342 failed to execute.

Таким образом вы можете посмотреть файл журнала и быстро узнать, какой поток вызывает исключение, вместо использования переданных переменных bool.

Ответ 2

The implementation of the long-running task is provided by a library whose code I cannot modify.

Это означает, что у вас нет возможности синхронизировать работу, выполняемую рабочими потоками

If an error occurs in one of the workers,

Предположим, что вы действительно можете обнаружить ошибки работника; некоторые из них могут быть легко обнаружены, если информация о другой библиотеке не может быть использована.

  • петли кода библиотеки.
  • код библиотеки преждевременно выходит с неперехваченным исключением.

I want the remaining workers to stop **gracefully**

Это просто невозможно

Лучшее, что вы можете сделать, это написать диспетчер потоков, проверяющий статус рабочего потока, и если обнаружено условие ошибки, оно просто (беззастенчиво) "убивает" все рабочие потоки и завершает работу.

Вы также должны рассмотреть возможность обнаружения циклического рабочего потока (по таймауту) и предложить пользователю возможность убить или продолжить ожидание завершения процесса.

Ответ 3

Ваша проблема в том, что долго работающая функция не является вашим кодом, и вы говорите, что не можете ее изменить. Следовательно, вы не можете заставить его обращать внимание на какой-либо внешний примитив синхронизации (переменные условия, семафоры, мьютексы, трубы и т.д.), Если разработчик библиотеки не сделал это для вас.

Поэтому ваш единственный вариант - сделать что-то, что позволяет бороться с любым кодом независимо от того, что он делает. Это то, что делают сигналы. Для этого вам придется использовать pthread_kill() или что бы то ни было, эквивалент в эти дни.

Образец будет состоять в том, что

  • Нить, которая обнаруживает ошибку, должна сообщить об этой ошибке в основной поток каким-либо образом.
  • Затем основной поток необходимо вызвать pthread_kill() для всех остальных оставшихся потоков. Не путайте имя - pthread_kill() - это просто способ передачи произвольного сигнала в поток. Обратите внимание, что такие сигналы, как STOP, CONTINUE и TERMINATE, являются процессами, даже если они связаны с pthread_kill(), а не с потоком, поэтому не используйте их.
  • В каждом из этих потоков вам понадобится обработчик сигнала. При доставке сигнала в поток путь выполнения в этом потоке будет переходить к обработчику независимо от того, что выполняла работающая функция.
  • Теперь вы снова в (ограниченном) управлении и можете (возможно, ну, может быть) сделать некоторую ограниченную очистку и завершить поток.
  • Тем временем основной поток будет вызывать pthread_join() для всех потоков, которые он сигнализирует, и они возвратятся.

Мои мысли:

  • Это очень уродливый способ сделать это (и сигналы /pthreads, как известно, трудно получить правильно, и я не эксперт), но я не вижу, какой у вас другой выбор.
  • Это будет длинный путь от поиска "изящного" в исходном коде, хотя опыт конечного пользователя будет в порядке.
  • Вы отмените выполнение частичной части выполнения этой функции библиотеки, поэтому, если какая-либо очистка обычно будет выполняться (например, освобождение выделенной памяти), которая не будет выполнена, и у вас будет утечка памяти. Бег под чем-то вроде valgrind - это способ разработки, если это происходит.
  • Единственный способ получить библиотечную функцию для очистки (если она понадобится) будет для вашего обработчика сигнала, чтобы вернуть управление функции и позволить ему выполнить до завершения, просто то, что вы не хотите делать.
  • И, конечно, это не будет работать на Windows (нет pthreads, по крайней мере, не стоит говорить, хотя может быть эквивалентный механизм).

Действительно, лучший способ - переустановить (если возможно) эту библиотечную функцию.