Подтвердить что ты не робот

Как создать эффективный многопоточный планировщик задач на С++?

Я хотел бы создать очень эффективную систему планировщика задач на С++.

Основная идея такова:

class Task {
    public:
        virtual void run() = 0;
};

class Scheduler {
    public:
        void add(Task &task, double delayToRun);
};

За Scheduler должен существовать пул потоков фиксированного размера, который запускает задачи (я не хочу создавать поток для каждой задачи). delayToRun означает, что task не выполняется сразу, но delayToRun секунд спустя (измерение с точки, в которую оно было добавлено в Scheduler).

(delayToRun означает значение "по крайней мере", конечно. Если система загружена или если мы спросим невозможное из Планировщика, она не сможет обрабатывать наш запрос. лучшее, что может)

И вот моя проблема. Как эффективно реализовать delayToRun функциональность? Я пытаюсь решить эту проблему с использованием мьютексов и переменных условий.

Я вижу два пути:

С помощью менеджера потока

Планировщик содержит две очереди: allTasksQueue и tasksReadyToRunQueue. Задача добавляется в allTasksQueue в Scheduler::add. Существует менеджерский поток, который ждет наименьшее количество времени, поэтому он может поставить задачу от allTasksQueue до tasksReadyToRunQueue. Рабочие потоки ждут задачи, доступной в tasksReadyToRunQueue.

Если Scheduler::add добавляет задачу перед allTasksQueue (задача, которая имеет значение delayToRun, поэтому она должна идти до текущей задачи скорейшего запуска), тогда задача менеджера должна быть проснулся, чтобы он мог обновить время ожидания.

Этот метод можно считать неэффективным, так как ему нужны две очереди, и для выполнения задачи необходимо два файла condvar.signals(один для allTasksQueuetasksReadyToRunQueue, а один для сигнализации рабочего потока для фактического запуска задача)

Без потока менеджера

В планировщике есть одна очередь. Задача добавляется в эту очередь в Scheduler::add. Рабочий поток проверяет очередь. Если он пуст, он ждет без ограничения по времени. Если он не пуст, он ждет самую быструю задачу.

  • Если есть только одна переменная условия, для которой ожидаются рабочие потоки: этот метод можно считать неэффективным, потому что если задача добавлена ​​перед очередью (передняя часть означает, что есть N рабочих потоков, то индекс задачи < N), затем все рабочие потоки должны быть разбужены, чтобы обновить время, которое они ожидают.

  • Если для каждого потока есть отдельная переменная условия, тогда мы можем контролировать, какой поток просыпаться, поэтому в этом случае нам не нужно разбудить все потоки (нам нужно только просыпать поток который имеет наибольшее время ожидания, поэтому нам нужно управлять этим значением). Я сейчас думаю об этом, но разработка точных деталей сложна. Существуют ли какие-либо рекомендации/мысли/документы по этому методу?


Есть ли лучшее решение для этой проблемы? Я пытаюсь использовать стандартные возможности С++, но я готов использовать платформу (моя основная платформа - это linux), инструменты (например, pthreads) или даже специфические для Linux инструменты (например, futexes), если они обеспечивают лучшее решение.

4b9b3361

Ответ 1

Вы можете избежать наличия отдельного потока "менеджера" и необходимости пробуждать большое количество задач при изменении задачи следующего запуска, используя конструкцию, в которой один поток пула ожидает "рядом с запускать" (если есть) на одну переменную условия, а оставшиеся потоки пулов бесконечно ожидают вторую переменную условия.

Нитки пула будут выполнять псевдокод вдоль этих строк:

pthread_mutex_lock(&queue_lock);

while (running)
{
    if (head task is ready to run)
    {
        dequeue head task;
        if (task_thread == 1)
            pthread_cond_signal(&task_cv);
        else
            pthread_cond_signal(&queue_cv);

        pthread_mutex_unlock(&queue_lock);
        run dequeued task;
        pthread_mutex_lock(&queue_lock);
    }
    else if (!queue_empty && task_thread == 0)
    {
        task_thread = 1;
        pthread_cond_timedwait(&task_cv, &queue_lock, time head task is ready to run);
        task_thread = 0;
    }
    else
    {
        pthread_cond_wait(&queue_cv, &queue_lock);
    }
}

pthread_mutex_unlock(&queue_lock);

Если вы измените следующую задачу для запуска, вы выполните:

if (task_thread == 1)
    pthread_cond_signal(&task_cv);
else
    pthread_cond_signal(&queue_cv);

с сохраненным queue_lock.

В соответствии с этой схемой все пробуждения находятся только в одном потоке, есть только одна очередь приоритетов задач, и нет необходимости в потоке менеджера.

Ответ 2

Ваша спецификация слишком сильная:

delayToRun означает, что задача не выполняется немедленно, но delayToRun секунды спустя

Вы забыли добавить "по крайней мере":

  • Теперь задача не выполняется, но не менее delayToRun секунд спустя

Дело в том, что если все десять тысяч задач запланированы с помощью 0.1 delayToRun, они, конечно же, практически не смогут работать одновременно.

С такой коррекцией вы просто поддерживаете некоторую очередь (или повестку дня) (запланированное время начала, закрытие для запуска), вы сохраняете эту сортировку очереди и начинаете N (некоторое фиксированное число) потоков, которые атомарно вытащите первый элемент повестки дня и запустите его.

тогда все рабочие потоки необходимо разбудить, чтобы обновить время, которое они ждут.

Нет, некоторые рабочие потоки будут разбужены.

Читайте о переменных состояния и широковещательной передаче.

Вы также можете использовать таймеры POSIX, см. timer_create (2) или конкретный таймер fd для Linux, см. timerfd_create (2)

Вероятно, вы избежали бы блокирования системных вызовов в ваших потоках и имели бы некоторый центральный поток, управляющий ими, используя какой-либо цикл событий (см. poll (2)...); в противном случае, если у вас есть сто задач с запуском sleep(100), и одна задача, запланированная на полсекунды, не будет выполняться до ста секунд.

Вы можете прочитать о программировании продолжения пути (это -CPS- очень важно). Прочитайте статью статью о продолжении прохождения C Юлиуша Хробочека.

Посмотрите также на Qt threads.

Вы также можете рассмотреть кодирование в Go (с его Goroutines).

Ответ 3

Это примерная реализация для интерфейса, который вы предоставили, который ближе всего подходит к описанию С менеджером потока.

Он использует один поток (timer_thread) для управления очередью (allTasksQueue), которая сортируется на основе фактического времени, когда задача должна быть запущена (std::chrono::time_point).
"Queue" - это std::priority_queue (который сохраняет свои элементы time_point отсортированы).

timer_thread обычно приостанавливается до запуска следующей задачи или при добавлении новой задачи.
Когда задача должна быть запущена, она помещается в tasksReadyToRunQueue, один из рабочих потоков сигнализируется, просыпается, удаляет его из очереди и начинает обработку задачи.

Обратите внимание, что пул потоков имеет верхний предел времени компиляции для количества потоков (40). Если вы планируете больше задач, чем может быть отправлено рабочим, новая задача будет заблокирована до тех пор, пока потоки не будут доступны снова.

Вы сказали, что этот подход неэффективен, но в целом он кажется мне достаточно эффективным. Все это управляется событиями, и вы не тратите время на процессорные циклы из-за ненужного вращения. Конечно, это просто пример, возможны оптимизации (примечание: std::multimap было заменено на std::priority_queue).

Реализация совместима с С++ 11

#include <iostream>
#include <chrono>
#include <queue>
#include <unistd.h>
#include <vector>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <memory>

class Task {
public:
    virtual void run() = 0;
    virtual ~Task() { }
};

class Scheduler {
public:
    Scheduler();
    ~Scheduler();

    void add(Task &task, double delayToRun);

private:
    using timepoint = std::chrono::time_point<std::chrono::steady_clock>;

    struct key {
        timepoint tp;
        Task *taskp;
    };

    struct TScomp {
        bool operator()(const key &a, const key &b) const
        {
            return a.tp > b.tp;
        }
    };

    const int ThreadPoolSize = 40;

    std::vector<std::thread> ThreadPool;
    std::vector<Task *> tasksReadyToRunQueue;

    std::priority_queue<key, std::vector<key>, TScomp> allTasksQueue;

    std::thread TimerThr;
    std::mutex TimerMtx, WorkerMtx;
    std::condition_variable TimerCV, WorkerCV;

    bool WorkerIsRunning = true;
    bool TimerIsRunning = true;

    void worker_thread();
    void timer_thread();
};

Scheduler::Scheduler()
{
    for (int i = 0; i <ThreadPoolSize; ++i)
        ThreadPool.push_back(std::thread(&Scheduler::worker_thread, this));

    TimerThr = std::thread(&Scheduler::timer_thread, this);
}

Scheduler::~Scheduler()
{
    {
        std::lock_guard<std::mutex> lck{TimerMtx};
        TimerIsRunning = false;
        TimerCV.notify_one();
    }
    TimerThr.join();

    {
        std::lock_guard<std::mutex> lck{WorkerMtx};
        WorkerIsRunning = false;
        WorkerCV.notify_all();
    }
    for (auto &t : ThreadPool)
        t.join();
}

void Scheduler::add(Task &task, double delayToRun)
{
    auto now = std::chrono::steady_clock::now();
    long delay_ms = delayToRun * 1000;

    std::chrono::milliseconds duration (delay_ms);

    timepoint tp = now + duration;

    if (now >= tp)
    {
        /*
         * This is a short-cut
         * When time is due, the task is directly dispatched to the workers
         */
        std::lock_guard<std::mutex> lck{WorkerMtx};
        tasksReadyToRunQueue.push_back(&task);
        WorkerCV.notify_one();

    } else
    {
        std::lock_guard<std::mutex> lck{TimerMtx};

        allTasksQueue.push({tp, &task});

        TimerCV.notify_one();
    }
}

void Scheduler::worker_thread()
{
    for (;;)
    {
        std::unique_lock<std::mutex> lck{WorkerMtx};

        WorkerCV.wait(lck, [this] { return tasksReadyToRunQueue.size() != 0 ||
                                           !WorkerIsRunning; } );

        if (!WorkerIsRunning)
            break;

        Task *p = tasksReadyToRunQueue.back();
        tasksReadyToRunQueue.pop_back();

        lck.unlock();

        p->run();

        delete p; // delete Task
    }
}

void Scheduler::timer_thread()
{
    for (;;)
    {
        std::unique_lock<std::mutex> lck{TimerMtx};

        if (!TimerIsRunning)
            break;

        auto duration = std::chrono::nanoseconds(1000000000);

        if (allTasksQueue.size() != 0)
        {
            auto now = std::chrono::steady_clock::now();

            auto head = allTasksQueue.top();
            Task *p = head.taskp;

            duration = head.tp - now;
            if (now >= head.tp)
            {
                /*
                 * A Task is due, pass to worker threads
                 */
                std::unique_lock<std::mutex> ulck{WorkerMtx};
                tasksReadyToRunQueue.push_back(p);
                WorkerCV.notify_one();
                ulck.unlock();

                allTasksQueue.pop();
            }
        }

        TimerCV.wait_for(lck, duration);
    }
}
/*
 * End sample implementation
 */



class DemoTask : public Task {
    int n;
public:
    DemoTask(int n=0) : n{n} { }
    void run() override
    {
        std::cout << "Start task " << n << std::endl;;
        std::this_thread::sleep_for(std::chrono::seconds(2));
        std::cout << " Stop task " << n << std::endl;;
    }
};

int main()
{
    Scheduler sched;

    Task *t0 = new DemoTask{0};
    Task *t1 = new DemoTask{1};
    Task *t2 = new DemoTask{2};
    Task *t3 = new DemoTask{3};
    Task *t4 = new DemoTask{4};
    Task *t5 = new DemoTask{5};

    sched.add(*t0, 7.313);
    sched.add(*t1, 2.213);
    sched.add(*t2, 0.713);
    sched.add(*t3, 1.243);
    sched.add(*t4, 0.913);
    sched.add(*t5, 3.313);

    std::this_thread::sleep_for(std::chrono::seconds(10));
}

Ответ 4

Это означает, что вы хотите постоянно выполнять все задачи с использованием некоторого порядка.

Вы можете создать некоторый тип сортировки с помощью стека задержки (или даже связанного списка) задач. Когда новая задача подходит, вы должны вставить ее в позицию в зависимости от времени задержки (просто эффективно вычислить эту позицию и эффективно вставить новую задачу).

Запустите все задачи, начиная с главы стека задач (или списка).

Ответ 5

Код ядра для С++ 11:

#include <thread>
#include <queue>
#include <chrono>
#include <mutex>
#include <atomic>
using namespace std::chrono;
using namespace std;
class Task {
public:
    virtual void run() = 0;
};
template<typename T, typename = enable_if<std::is_base_of<Task, T>::value>>
class SchedulerItem {
public:
    T task;
    time_point<steady_clock> startTime;
    int delay;
    SchedulerItem(T t, time_point<steady_clock> s, int d) : task(t), startTime(s), delay(d){}
};
template<typename T, typename = enable_if<std::is_base_of<Task, T>::value>>
class Scheduler {
public:
    queue<SchedulerItem<T>> pool;
    mutex mtx;
    atomic<bool> running;
    Scheduler() : running(false){}
    void add(T task, double delayMsToRun) {
        lock_guard<mutex> lock(mtx);
        pool.push(SchedulerItem<T>(task, high_resolution_clock::now(), delayMsToRun));
        if (running == false) runNext();
    }
    void runNext(void) {
        running = true;
        auto th = [this]() {
            mtx.lock();
            auto item = pool.front();
            pool.pop();
            mtx.unlock();
            auto remaining = (item.startTime + milliseconds(item.delay)) - high_resolution_clock::now();
            if(remaining.count() > 0) this_thread::sleep_for(remaining);
            item.task.run();
            if(pool.size() > 0) 
                runNext();
            else
                running = false;
        };
        thread t(th);
        t.detach();
    }
};

Тестовый код:

class MyTask : Task {
public:
    virtual void run() override {
        printf("mytask \n");
    };
};
int main()
{
    Scheduler<MyTask> s;

    s.add(MyTask(), 0);
    s.add(MyTask(), 2000);
    s.add(MyTask(), 2500);
    s.add(MyTask(), 6000);
    std::this_thread::sleep_for(std::chrono::seconds(10));

}