Подтвердить что ты не робот

Почему этот код async/wait TAP медленнее, чем версия TPL?

Мне пришлось написать консольное приложение, которое называлось веб-службой Microsoft Dynamics CRM, для выполнения действия над более чем восемью тысячами CRM-объектов. Подробности вызова веб-службы не имеют значения и не показаны здесь, но мне нужен многопоточный клиент, чтобы я мог выполнять вызовы параллельно. Я хотел бы иметь возможность контролировать количество потоков, используемых в настройках конфигурации, а также для приложения, чтобы отменить всю операцию, если количество ошибок службы достигло порога, определенного в конфигурации.

Я написал его с помощью Task Parallel Library Task.Run и ContinueWith, отслеживая количество вызовов (потоков), количество ошибок, которые мы получили, и был ли пользователь отменен с клавиатуры. Все отлично работало, и у меня была обширная регистрация, чтобы убедиться, что потоки заканчиваются чисто и все в порядке. Я мог видеть, что программа использовала максимальное количество потоков параллельно и, если наш максимальный предел был достигнут, до тех пор, пока запущенная задача не завершится, прежде чем запускать другую.

Во время моего обзора кода мой коллега предположил, что было бы лучше сделать это с помощью async/wait вместо задач и продолжений, поэтому я создал ветвь и переписал ее таким образом. Результаты были интересными - версия async/await была почти в два раза медленнее и никогда не достигала максимального количества разрешенных параллельных операций/потоков. В TPL всегда было до 10 потоков параллельно, тогда как версия async/await никогда не превышала 5.

Мой вопрос: допустил ли я ошибку в том, как я написал код async/await (или код TPL даже)? Если я не закодировал это неправильно, можете ли вы объяснить, почему async/await менее эффективен, и значит ли это, что лучше использовать TPL для многопоточного кода.

Обратите внимание, что код, который я тестировал, на самом деле не вызывал CRM - класс CrmClient просто нить-спал в течение продолжительности, указанной в конфигурации (пять секунд), а затем генерирует исключение. Это означало отсутствие внешних переменных, которые могли бы повлиять на производительность.

Для целей этого вопроса я создал урезанную программу, которая объединяет обе версии; который вызывается, определяется настройкой конфигурации. Каждый из них начинается с загрузочного бегуна, который настраивает среду, создает класс очереди, а затем использует объект TaskCompletion для ожидания завершения. A CancellationTokenSource используется для сообщения об аннулировании с пользователя. Список идентификаторов для процесса считывается из встроенного файла и помещается в ConcurrentQueue. Они оба начинают вызов StartCrmRequest столько раз, сколько max-threads; впоследствии, каждый раз, когда результат обрабатывается, метод ProcessResult снова вызывает StartCrmRequest, сохраняя ход, пока все наши идентификаторы не будут обработаны.

Вы можете клонировать/загружать полную программу отсюда: https://bitbucket.org/kentrob/pmgfixso/

Вот соответствующая конфигурация:

<appSettings>
    <add key="TellUserAfterNCalls" value="5"/>
    <add key="CrmErrorsBeforeQuitting" value="20"/>
    <add key="MaxThreads" value="10"/>
    <add key="CallIntervalMsecs" value="5000"/>
    <add key="UseAsyncAwait" value="True" />
</appSettings>

Начиная с версии TPL, вот бегун бутстрапа, который запускает диспетчер очереди:

public static class TplRunner
{
    private static readonly CancellationTokenSource CancellationTokenSource = new CancellationTokenSource();

    public static void StartQueue(RuntimeParameters parameters, IEnumerable<string> idList)
    {
        Console.CancelKeyPress += (s, args) =>
        {
            CancelCrmClient();
            args.Cancel = true;
        };

        var start = DateTime.Now;
        Program.TellUser("Start: " + start);

        var taskCompletionSource = new TplQueue(parameters)
            .Start(CancellationTokenSource.Token, idList);

        while (!taskCompletionSource.Task.IsCompleted)
        {
            if (Console.KeyAvailable)
            {
                if (Console.ReadKey().Key != ConsoleKey.Q) continue;
                Console.WriteLine("When all threads are complete, press any key to continue.");
                CancelCrmClient();
            }
        }

        var end = DateTime.Now;
        Program.TellUser("End: {0}. Elapsed = {1} secs.", end, (end - start).TotalSeconds);
    }

    private static void CancelCrmClient()
    {
        CancellationTokenSource.Cancel();
        Console.WriteLine("Cancelling Crm client. Web service calls in operation will have to run to completion.");
    }
}

Вот сам менеджер очереди TPL:

public class TplQueue
{
    private readonly RuntimeParameters parameters;
    private readonly object locker = new object();
    private ConcurrentQueue<string> idQueue = new ConcurrentQueue<string>();
    private readonly CrmClient crmClient;
    private readonly TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>();
    private int threadCount;
    private int crmErrorCount;
    private int processedCount;
    private CancellationToken cancelToken;

    public TplQueue(RuntimeParameters parameters)
    {
        this.parameters = parameters;
        crmClient = new CrmClient();
    }

    public TaskCompletionSource<bool> Start(CancellationToken cancellationToken, IEnumerable<string> ids)
    {
        cancelToken = cancellationToken;

        foreach (var id in ids)
        {
            idQueue.Enqueue(id);
        }

        threadCount = 0;

        // Prime our thread pump with max threads.
        for (var i = 0; i < parameters.MaxThreads; i++)
        {
            Task.Run((Action) StartCrmRequest, cancellationToken);
        }

        return taskCompletionSource;
    }

    private void StartCrmRequest()
    {
        if (taskCompletionSource.Task.IsCompleted)
        {
            return;
        }

        if (cancelToken.IsCancellationRequested)
        {
            Program.TellUser("Crm client cancelling...");
            ClearQueue();
            return;
        }

        var count = GetThreadCount();

        if (count >= parameters.MaxThreads)
        {
            return;
        }

        string id;
        if (!idQueue.TryDequeue(out id)) return;

        IncrementThreadCount();
        crmClient.CompleteActivityAsync(new Guid(id), parameters.CallIntervalMsecs).ContinueWith(ProcessResult);

        processedCount += 1;
        if (parameters.TellUserAfterNCalls > 0 && processedCount%parameters.TellUserAfterNCalls == 0)
        {
            ShowProgress(processedCount);
        }
    }

    private void ProcessResult(Task<CrmResultMessage> response)
    {
        if (response.Result.CrmResult == CrmResult.Failed && ++crmErrorCount == parameters.CrmErrorsBeforeQuitting)
        {
            Program.TellUser(
                "Quitting because CRM error count is equal to {0}. Already queued web service calls will have to run to completion.",
                crmErrorCount);
            ClearQueue();
        }

        var count = DecrementThreadCount();

        if (idQueue.Count == 0 && count == 0)
        {
            taskCompletionSource.SetResult(true);
        }
        else
        {
            StartCrmRequest();
        }
    }

    private int GetThreadCount()
    {
        lock (locker)
        {
            return threadCount;
        }
    }

    private void IncrementThreadCount()
    {
        lock (locker)
        {
            threadCount = threadCount + 1;
        }
    }

    private int DecrementThreadCount()
    {
        lock (locker)
        {
            threadCount = threadCount - 1;
            return threadCount;
        }
    }

    private void ClearQueue()
    {
        idQueue = new ConcurrentQueue<string>();
    }

    private static void ShowProgress(int processedCount)
    {
        Program.TellUser("{0} activities processed.", processedCount);
    }
}

Обратите внимание, что я знаю, что несколько счетчиков не являются потокобезопасными, но они не критичны; переменная threadCount является единственной критической.

Вот пример метода фиктивного клиента CRM:

public Task<CrmResultMessage> CompleteActivityAsync(Guid activityId, int callIntervalMsecs)
{
    // Here we would normally call a CRM web service.
    return Task.Run(() =>
    {
        try
        {
            if (callIntervalMsecs > 0)
            {
                Thread.Sleep(callIntervalMsecs);
            }
            throw new ApplicationException("Crm web service not available at the moment.");
        }
        catch
        {
            return new CrmResultMessage(activityId, CrmResult.Failed);
        }
    });
}

И вот те же классы async/await (для краткости с удалением общих методов):

public static class AsyncRunner
{
    private static readonly CancellationTokenSource CancellationTokenSource = new CancellationTokenSource();

    public static void StartQueue(RuntimeParameters parameters, IEnumerable<string> idList)
    {
        var start = DateTime.Now;
        Program.TellUser("Start: " + start);

        var taskCompletionSource = new AsyncQueue(parameters)
            .StartAsync(CancellationTokenSource.Token, idList).Result;

        while (!taskCompletionSource.Task.IsCompleted)
        {
            ...
        }

        var end = DateTime.Now;
        Program.TellUser("End: {0}. Elapsed = {1} secs.", end, (end - start).TotalSeconds);
    }
}

Менеджер очередей async/wait:

public class AsyncQueue
{
    private readonly RuntimeParameters parameters;
    private readonly object locker = new object();
    private readonly CrmClient crmClient;
    private readonly TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>();
    private CancellationToken cancelToken;
    private ConcurrentQueue<string> idQueue = new ConcurrentQueue<string>();
    private int threadCount;
    private int crmErrorCount;
    private int processedCount;

    public AsyncQueue(RuntimeParameters parameters)
    {
        this.parameters = parameters;
        crmClient = new CrmClient();
    }

    public async Task<TaskCompletionSource<bool>> StartAsync(CancellationToken cancellationToken,
        IEnumerable<string> ids)
    {
        cancelToken = cancellationToken;

        foreach (var id in ids)
        {
            idQueue.Enqueue(id);
        }
        threadCount = 0;

        // Prime our thread pump with max threads.
        for (var i = 0; i < parameters.MaxThreads; i++)
        {
            await StartCrmRequest();
        }

        return taskCompletionSource;
    }

    private async Task StartCrmRequest()
    {
        if (taskCompletionSource.Task.IsCompleted)
        {
            return;
        }

        if (cancelToken.IsCancellationRequested)
        {
            ...
            return;
        }

        var count = GetThreadCount();

        if (count >= parameters.MaxThreads)
        {
            return;
        }

        string id;
        if (!idQueue.TryDequeue(out id)) return;

        IncrementThreadCount();
        var crmMessage = await crmClient.CompleteActivityAsync(new Guid(id), parameters.CallIntervalMsecs);
        ProcessResult(crmMessage);

        processedCount += 1;
        if (parameters.TellUserAfterNCalls > 0 && processedCount%parameters.TellUserAfterNCalls == 0)
        {
            ShowProgress(processedCount);
        }
    }

    private async void ProcessResult(CrmResultMessage response)
    {
        if (response.CrmResult == CrmResult.Failed && ++crmErrorCount == parameters.CrmErrorsBeforeQuitting)
        {
            Program.TellUser(
                "Quitting because CRM error count is equal to {0}. Already queued web service calls will have to run to completion.",
                crmErrorCount);
            ClearQueue();
        }

        var count = DecrementThreadCount();

        if (idQueue.Count == 0 && count == 0)
        {
            taskCompletionSource.SetResult(true);
        }
        else
        {
            await StartCrmRequest();
        }
    }
}

Итак, при установке MaxThreads на 10 и CrmErrorsBeforeQuitting на 20, версия TPL на моей машине завершается через 19 секунд, а версия async/await занимает 35 секунд. Учитывая, что у меня более 8000 звонков, чтобы сделать это, это значительная разница. Любые идеи?

4b9b3361

Ответ 1

Я думаю, что вижу здесь проблему или, по крайней мере, ее часть. Посмотрите внимательно на два бита кода ниже; они не эквивалентны.

// Prime our thread pump with max threads.
for (var i = 0; i < parameters.MaxThreads; i++)
{
    Task.Run((Action) StartCrmRequest, cancellationToken);
}

и

// Prime our thread pump with max threads.
for (var i = 0; i < parameters.MaxThreads; i++)
{
    await StartCrmRequest();
}

В исходном коде (я принимаю его как заданный, чтобы он был функционально звуковым), существует один вызов ContinueWith. Именно столько операторов await я ожидаю увидеть в тривиальной перезаписи, если он предназначен для сохранения исходного поведения.

Не является жестким и быстрым правилом и применим только в простых случаях, но, тем не менее, хорошо держать глаза.

Ответ 2

Я думаю, вы слишком усложнили свое решение и оказались не в том месте, где захотите, в реализации.

Прежде всего, соединения с любым хостом HTTP ограничены диспетчером точек обслуживания. ограничение по умолчанию для клиентских сред равно 2, но вы можете увеличить его самостоятельно.

Независимо от того, сколько потоков вы создаете, не будет более активных запросов, чем ожидалось.

Затем, как заметил кто-то, await логически блокирует поток выполнения.

И, наконец, вы потратили свое время на создание AsyncQueue, когда вы должны были использовать потоки данных TPL.

Ответ 3

Когда реализовано с помощью async/await, я ожидаю, что алгоритм привязки ввода-вывода будет работать в одном потоке. В отличие от @KirillShlenskiy, я считаю, что бит, ответственный за "возвращение" в контекст вызывающего абонента, не несет ответственности за замедление. Я думаю, что вы переполняете пул потоков, пытаясь использовать его для операций с привязкой к I/O. Он предназначен в основном для операций с привязкой к вычислению.

Посмотрите на ForEachAsync. Я чувствую, что вы ищете (обсуждение Стивена Туба, вы найдете видео Wischik тоже значимым):

http://blogs.msdn.com/b/pfxteam/archive/2012/03/05/10278165.aspx

(Для уменьшения объема памяти используйте степень concurrency)

http://vimeo.com/43808831 http://vimeo.com/43808833