Подтвердить что ты не робот

Как добиться незакрепленного, но блокирующего поведения?

Я использую одиночную очередь для одиночной очереди без блокировки для интенсивного сетевого приложения. У меня есть куча рабочих потоков, получающих работу в их собственных отдельных очередях, которые они затем деактивируют и обрабатывают.

Удаление блокировок из этих очередей значительно улучшило производительность при высокой нагрузке, , но они больше не блокируются, когда очереди пусты, что, в свою очередь, приводит к резкому увеличению использования ЦП.

Как я могу эффективно блокировать поток до тех пор, пока он не сможет успешно удалить что-либо или будет убит/прерван?

4b9b3361

Ответ 1

Если вы работаете в Linux, изучите Futex. Он обеспечивает производительность неблокирующей реализации, используя атомарные операции, а не вызовы ядра, такие как мьютекс, но если вам нужно установить процесс в режим ожидания из-за того, что какое-то условие не является истинным (т.е. Блокирование), оно будет затем сделайте соответствующие вызовы ядра, чтобы заставить процесс спать и разбудить его в будущем событии. Это в основном как очень быстрый семафор.

Ответ 2

В Linux futex можно использовать для блокировки потока. Но имейте в виду, что Futexes Tricky!

UPDATE: переменные состояния гораздо безопаснее использовать, чем futexes, и более переносимы. Однако переменная условия используется в сочетании с мьютексом, поэтому, строго говоря, результат больше не будет блокироваться. Однако, если ваша основная цель - это производительность (а не гарантия глобального прогресса), а заблокированная часть (то есть условие для проверки после пробуждения потока) мала, может случиться так, что вы получите удовлетворительные результаты без необходимости входить в тонкости интегрирования futexes в алгоритм.

Ответ 3

Если вы работаете в Windows, вы не сможете использовать futexes, но в Windows Vista аналогичный механизм называется Keyed Events. К сожалению, это не является частью опубликованного API (это собственный API-интерфейс NTDLL), но вы можете использовать его, пока вы принимаете предостережение, которое оно может изменить в будущих версиях Windows (и вам не нужно запускать ядра до Vista). Обязательно прочитайте статью, указанную выше. Здесь непроверенный эскиз того, как он может работать:

/* Interlocked SList queue using keyed event signaling */

struct queue {
    SLIST_HEADER slist;
    // Note: Multiple queues can (and should) share a keyed event handle
    HANDLE keyed_event;
    // Initial value: 0
    // Prior to blocking, the queue_pop function increments this to 1, then
    // rechecks the queue. If it finds an item, it attempts to compxchg back to
    // 0; if this fails, then it racing with a push, and has to block
    LONG block_flag;
};

void init_queue(queue *qPtr) {
    NtCreateKeyedEvent(&qPtr->keyed_event, -1, NULL, 0);
    InitializeSListHead(&qPtr->slist);
    qPtr->blocking = 0;
}

void queue_push(queue *qPtr, SLIST_ENTRY *entry) {
    InterlockedPushEntrySList(&qPtr->slist, entry);

    // Transition block flag 1 -> 0. If this succeeds (block flag was 1), we
    // have committed to a keyed-event handshake
    LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
    if (oldv) {
        NtReleaseKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
    }
}

SLIST_ENTRY *queue_pop(queue *qPtr) {
    SLIST_ENTRY *entry = InterlockedPopEntrySList(&qPtr->slist);
    if (entry)
        return entry; // fast path

    // Transition block flag 0 -> 1. We must recheck the queue after this point
    // in case we race with queue_push; however since ReleaseKeyedEvent
    // blocks until it is matched up with a wait, we must perform the wait if
    // queue_push sees us
    LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 1, 0);

    assert(oldv == 0);

    entry = InterlockedPopEntrySList(&qPtr->slist);
    if (entry) {
        // Try to abort
        oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
        if (oldv == 1)
            return entry; // nobody saw us, we can just exit with the value
    }

    // Either we don't have an entry, or we are forced to wait because
    // queue_push saw our block flag. So do the wait
    NtWaitForKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
    // block_flag has been reset by queue_push

    if (!entry)
        entry = InterlockedPopEntrySList(&qPtr->slist);
    assert(entry);

    return entry;
}

Вы также можете использовать аналогичный протокол, используя Slim Read Write блокировки и Переменные состояния, с бесконтактным быстрым путем. Это обертки по ключевым событиям, поэтому они могут повлечь дополнительные накладные расходы, чем напрямую связанные с ключами.

Ответ 4

Вы пробовали условное ожидание? Когда очередь становится пустой, просто начните ждать нового задания. Поток, помещающий задания в очередь, должен подать сигнал. Таким образом, вы используете только блокировки, когда очередь пуста.

https://computing.llnl.gov/tutorials/pthreads/#ConditionVariables

Ответ 5

Вы можете заставить поток спать с помощью функции sigwait(). Вы можете разбудить поток с помощью pthread_kill. Это намного быстрее, чем переменные условия.

Ответ 6

Вы можете добавить спящие во время ожидания. Просто выберите наибольшее ожидание, которое вы готовы иметь, затем сделайте что-то вроде этого (псевдокод, потому что я не помню синтаксис pthread):

WAIT_TIME = 100; // Set this to whatever you're happy with
while(loop_condition) {
   thing = get_from_queue()
   if(thing == null) {
       sleep(WAIT_TIME);
   } else {
       handle(thing);
   }
}

Даже что-то короткое, как сон 100 мс, должно значительно снизить использование ЦП. Я не уверен, в какой момент переключение контекста сделает его хуже, чем ожидание.