Подтвердить что ты не робот

Обходной путь по ограничению потоков в Grand Central Dispatch?

С Grand Central Dispatch можно легко выполнять трудоемкую задачу по не основному потоку, избегать блокировки основного ада и поддерживать интерфейс отзывчивый. Просто используя dispatch_async и выполните задачу в глобальной параллельной очереди.

dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
    // code
});

Однако, что-то звучит слишком хорошо, чтобы быть правдой, как этот, обычно имеют недостаток. После того, как мы много используем это в нашем проекте приложений iOS, недавно мы обнаружили, что на нем существует ограничение на 64 потока. Как только мы достигнем предела, приложение замерзнет/повиснет. При приостановке приложения с помощью Xcode мы видим, что основной поток поддерживается semaphore_wait_trap.

Googling в Интернете подтверждает, что другие тоже сталкиваются с этой проблемой, но до сих пор не найдено ни одного решения.

Достигнут лимит жестких дисков Dispatch: 64 (слишком много потоков отправки заблокирован в синхронных операциях)

qaru.site/info/194100/..., подтверждает, что эта проблема возникает при использовании dispatch_sync и dispatch_barrier_async тоже.

Вопрос:
Поскольку Grand Central Dispatch имеет ограничение на 64 потока, существует ли какое-либо обходное решение для этого?

Спасибо заранее!

4b9b3361

Ответ 1

Хорошо, если вы связаны и решительны, вы можете освободиться от кандалов GCD, и выйдите вперед и взломайте прямо против предела производительности для каждого процесса с помощью pthreads, но в нижней строке это: если вы 're limit ограничение ширины очереди в GCD, вы можете рассмотреть возможность переоценки вашего подхода concurrency.

В крайних случаях существует два способа преодолеть предел:

  • У вас может быть заблокировано 64 потока на некоторых примитивах ОС с помощью блокировки syscall. (Привязка ввода/вывода)
  • Вы можете законно иметь 64 запускаемые задачи, все готовые к качению в одно и то же время. (Привязка процессора)

Если вы находитесь в ситуации №1, рекомендуется использовать неблокирующий ввод-вывод. Фактически, GCD имеет целую кучу вызовов, представленных в 10.7/Lion IIRC, которые облегчают асинхронное планирование ввода-вывода и улучшают повторное использование потоков. Если вы используете механизм ввода-вывода GCD, то эти потоки не будут привязаны к ожиданию ввода-вывода, GCD будет просто ставить в очередь ваши блоки (или функции), когда данные станут доступными в вашем файловом дескрипторе (или в файле mach). См. Документацию для dispatch_io_create и друзей.

В случае, если это поможет, вот небольшой пример (представленный без гарантии) эхо-сервера TCP, реализованный с использованием механизма ввода-вывода GCD:

in_port_t port = 10000;
void DieWithError(char *errorMessage);

// Returns a block you can call later to shut down the server -- caller owns block.
dispatch_block_t CreateCleanupBlockForLaunchedServer()
{
    // Create the socket
    int servSock = -1;
    if ((servSock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
        DieWithError("socket() failed");
    }

    // Bind the socket - if the port we want is in use, increment until we find one that isn't
    struct sockaddr_in echoServAddr;
    memset(&echoServAddr, 0, sizeof(echoServAddr));
    echoServAddr.sin_family = AF_INET;
    echoServAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    do {
        printf("server attempting to bind to port %d\n", (int)port);
        echoServAddr.sin_port = htons(port);
    } while (bind(servSock, (struct sockaddr *) &echoServAddr, sizeof(echoServAddr)) < 0 && ++port);

    // Make the socket non-blocking
    if (fcntl(servSock, F_SETFL, O_NONBLOCK) < 0) {
        shutdown(servSock, SHUT_RDWR);
        close(servSock);
        DieWithError("fcntl() failed");
    }

    // Set up the dispatch source that will alert us to new incoming connections
    dispatch_queue_t q = dispatch_queue_create("server_queue", DISPATCH_QUEUE_CONCURRENT);
    dispatch_source_t acceptSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, servSock, 0, q);
    dispatch_source_set_event_handler(acceptSource, ^{
        const unsigned long numPendingConnections = dispatch_source_get_data(acceptSource);
        for (unsigned long i = 0; i < numPendingConnections; i++) {
            int clntSock = -1;
            struct sockaddr_in echoClntAddr;
            unsigned int clntLen = sizeof(echoClntAddr);

            // Wait for a client to connect
            if ((clntSock = accept(servSock, (struct sockaddr *) &echoClntAddr, &clntLen)) >= 0)
            {
                printf("server sock: %d accepted\n", clntSock);

                dispatch_io_t channel = dispatch_io_create(DISPATCH_IO_STREAM, clntSock, q, ^(int error) {
                    if (error) {
                        fprintf(stderr, "Error: %s", strerror(error));
                    }
                    printf("server sock: %d closing\n", clntSock);
                    close(clntSock);
                });

                // Configure the channel...
                dispatch_io_set_low_water(channel, 1);
                dispatch_io_set_high_water(channel, SIZE_MAX);

                // Setup read handler
                dispatch_io_read(channel, 0, SIZE_MAX, q, ^(bool done, dispatch_data_t data, int error) {
                    BOOL close = NO;
                    if (error) {
                        fprintf(stderr, "Error: %s", strerror(error));
                        close = YES;
                    }

                    const size_t rxd = data ? dispatch_data_get_size(data) : 0;
                    if (rxd) {
                        // echo...
                        printf("server sock: %d received: %ld bytes\n", clntSock, (long)rxd);
                        // write it back out; echo!
                        dispatch_io_write(channel, 0, data, q, ^(bool done, dispatch_data_t data, int error) {});
                    }
                    else {
                        close = YES;
                    }

                    if (close) {
                        dispatch_io_close(channel, DISPATCH_IO_STOP);
                        dispatch_release(channel);
                    }
                });
            }
            else {
                printf("accept() failed;\n");
            }
        }
    });

    // Resume the source so we're ready to accept once we listen()
    dispatch_resume(acceptSource);

    // Listen() on the socket
    if (listen(servSock, SOMAXCONN) < 0) {
        shutdown(servSock, SHUT_RDWR);
        close(servSock);
        DieWithError("listen() failed");
    }

    // Make cleanup block for the server queue
    dispatch_block_t cleanupBlock = ^{
        dispatch_async(q, ^{
            shutdown(servSock, SHUT_RDWR);
            close(servSock);
            dispatch_release(acceptSource);
            dispatch_release(q);
        });
    };

    return Block_copy(cleanupBlock);
}

В любом случае... вернемся к теме:

Если вы находитесь в ситуации № 2, вы должны спросить себя: "Я действительно что-то набираю с помощью этого подхода?" Скажем, у вас самый опытный MacPro - 12 ядер, 24 гиперповерхностных/виртуальных ядра. С 64 потоками у вас есть ок. 3: 1 для виртуального ядра. Контекстные коммутаторы и пропуски кэша не являются бесплатными. Помните, мы предположили, что вы не привязаны к этому сценарию для ввода/вывода, так что все, что вы делаете, имея больше задач, чем ядра, тратит время процессора с помощью переключателей контекста и тэшей кэша.

В действительности, если ваше приложение висит, потому что вы достигли предела ширины очереди, наиболее вероятным сценарием является то, что вы голодали на свою очередь. Вероятно, вы создали зависимость, которая сводится к тупиковой ситуации. Случай, который я видел чаще всего, - это когда несколько, блокированные потоки пытаются dispatch_sync в той же очереди, когда нет нити. Это всегда терпит неудачу.

Здесь почему: Ширина очереди - это деталь реализации. Ограничение ширины нити 64 для GCD недокументировано, потому что хорошо спроектированная архитектура concurrency не должна зависеть от ширины очереди. Вы всегда должны проектировать свою архитектуру concurrency таким образом, чтобы очередь с двумя потоками в конечном итоге закончила задание с тем же результатом (если медленнее) в виде очереди с 1000 потоками. Если вы этого не сделаете, всегда будет шанс, что ваша очередь будет голодать. Разделение вашей рабочей нагрузки на параллелизуемые единицы должно открывать себе возможность оптимизации, а не требование для базового функционирования. Одним из способов обеспечения соблюдения этой дисциплины во время разработки является попытка работать с последовательной очередью в местах, где вы используете параллельные очереди, но ожидаете неблокированного поведения. Выполнение таких проверок поможет вам уловить некоторые (но не все) эти ошибки раньше.

Кроме того, с точностью до вашего исходного вопроса: IIUC, ограничение потока 64 составляет 64 потока на совпадение очередей верхнего уровня, поэтому, если вы действительно чувствуете необходимость, вы можете использовать все три параллельные очереди верхнего уровня (по умолчанию, Высокий и низкий приоритет) для достижения более 64 потоков. Пожалуйста, не делайте этого, хотя. Исправьте свой дизайн таким образом, чтобы он вместо этого не голодал. Ты будешь счастливее. И вообще, как я намекал выше, если вы голодаете в очереди с 64 нитями, вы, вероятно, в конце концов просто заполните все три очередей верхнего уровня и/или столкнетесь с лимитом каждого процесса и будете тоже голодать.