Подтвердить что ты не робот

Лучший способ в .NET управлять очередью задач на отдельном (одном) потоке

Я знаю, что за эти годы асинхронное программирование сильно изменилось. Я несколько смущен, что позволил себе получить этот ржавый всего лишь 34 года, но я рассчитываю на StackOverflow, чтобы довести меня до скорости.

То, что я пытаюсь сделать, это управлять очередью "работы" в отдельном потоке, но таким образом, что обрабатывается только один элемент за раз. Я хочу опубликовать работу над этим потоком, и ему не нужно передавать что-либо обратно вызывающему. Конечно, я мог бы просто развернуть новый объект Thread и пропустить его через общий объект Queue, используя спящие, прерывания, дескрипторы ожидания и т.д. Но я знаю, что с тех пор все стало лучше. Мы имеем BlockingCollection, Task, async/await, не говоря уже о пакетах NuGet, которые, вероятно, абстрагируются от этого.

Я знаю, что вопросы "Какие лучшие...", как правило, неодобрились, поэтому я буду перефразировать его, сказав "Что такое рекомендуемый..." способ сделать что-то подобное с помощью встроенных механизмов .NET предпочтительно. Но если сторонний пакет NuGet упрощает вещание, это тоже хорошо.

Я рассмотрел экземпляр TaskScheduler с фиксированным максимумом concurrency из 1, но, похоже, на данный момент существует, вероятно, гораздо менее неуклюжий способ сделать это.

Фон

В частности, то, что я пытаюсь сделать в этом случае, - это очередь задачи геолокации IP во время веб-запроса. Один и тот же IP-адрес может несколько раз оказаться в очереди на геолокацию, но задача будет знать, как его обнаружить и ускорить, если он уже был разрешен. Но обработчик запроса просто собирается отправить эти вызовы () => LocateAddress(context.Request.UserHostAddress) в очередь и позволить методу LocateAddress обрабатывать двойное обнаружение работы. API-интерфейс геолокации, который я использую, не любит бомбардировать запросы, поэтому я хочу ограничить его одной одновременной задачей одновременно. Тем не менее, было бы неплохо, если бы подход позволил легко масштабировать до более параллельных задач с простым изменением параметров.

4b9b3361

Ответ 1

Чтобы создать асинхронную одиночную степень очереди parallelism, вы можете просто создать SemaphoreSlim, инициализированный одним, а затем применить метод enqueing await при приобретении этого семафора до начала запрошенной работы.

public class TaskQueue
{
    private SemaphoreSlim semaphore;
    public TaskQueue()
    {
        semaphore = new SemaphoreSlim(1);
    }

    public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            return await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
    public async Task Enqueue(Func<Task> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
}

Конечно, чтобы иметь фиксированную степень parallelism, кроме одной, просто инициализировать семафор другому числу.

Ответ 2

Ваш лучший вариант, как я вижу, это использовать TPL Dataflow ActionBlock:

var actionBlock = new ActionBlock<string>(address =>
{
    if (!IsDuplicate(address))
    {
        LocateAddress(address);
    }
});

actionBlock.Post(context.Request.UserHostAddress);

TPL Dataflow - надежная, потокобезопасная, async - уже и очень настраиваемая структура, основанная на актерах (доступна как nuget)

Вот простой пример для более сложного случая. Предположим, вы хотите:

  • Включить concurrency (ограничен доступными ядрами).
  • Ограничьте размер очереди (чтобы у вас не хватило памяти).
  • Имейте как LocateAddress, так и вставку очереди async.
  • Отмените все через час.

var actionBlock = new ActionBlock<string>(async address =>
{
    if (!IsDuplicate(address))
    {
        await LocateAddressAsync(address);
    }
}, new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 10000,
    MaxDegreeOfParallelism = Environment.ProcessorCount,
    CancellationToken = new CancellationTokenSource(TimeSpan.FromHours(1)).Token
});

await actionBlock.SendAsync(context.Request.UserHostAddress);

Ответ 3

Используйте BlockingCollection<Action>, чтобы создать шаблон производителя/потребителя с одним потребителем (только одна вещь, работающая одновременно, как вы хотите) и один или несколько производителей.

Сначала определите общую очередь:

BlockingCollection<Action> queue = new BlockingCollection<Action>();

В вашем потребителе Thread или Task вы берете из него:

//This will block until there an item available
Action itemToRun = queue.Take()

Затем из любого числа производителей на другие потоки просто добавьте в очередь:

queue.Add(() => LocateAddress(context.Request.UserHostAddress));

Ответ 4

На самом деле вам не нужно запускать задачи в одном потоке, вам нужно, чтобы они запускались последовательно (один за другим) и FIFO. У TPL нет класса для этого, но вот моя реализация с тестами. https://github.com/Gentlee/SerialQueue

Также есть реализация @Servy, тесты показывают, что он в два раза медленнее, чем мой, и он не гарантирует FIFO.