Подтвердить что ты не робот

Есть ли асинхронная версия PLINQ?

Я хочу выполнить запрос по потоку данных при обработке элементов параллельно с определенной степенью parallelism. Обычно я использую PLINQ для этого, но мои рабочие элементы не связаны с ЦП, а связаны с IO. Я хочу использовать async IO. PLINQ не поддерживает работу async.

Какой самый умный способ запуска запроса в стиле PLINQ, но с асинхронными рабочими элементами?


Вот более подробная иллюстрация проблемы:

Моя цель - обработать потенциально бесконечный поток "элементов" способом, который логически описан следующим запросом:

var items = new int[10]; //simulate data

var results =
 from x in items.AsParallel().WithDegreeOfParallelism(100)
 where Predicate(x)
 select ComputeSomeValue(x);

foreach (var result in results)
 PerformSomeAction(result);

Этот запрос - всего лишь эскиз реального запроса. Теперь я хочу, чтобы каждая из функций-заполнителей была асинхронной (возвращала Task и внутренне была основана на async IO).

Обратите внимание, что в памяти может быть гораздо больше элементов, чем может быть сохранено. Я также должен контролировать степень parallelism, чтобы максимизировать базовое сетевое и дисковое оборудование.

Этот вопрос не касается многоядерных процессоров. Он полностью применим к машинам с одним ядром ЦП, поскольку IO все еще может выиграть от parallelism. Подумайте о медленных вызовах веб-сервиса и т.п.

4b9b3361

Ответ 1

Это похоже на работу для реактивной среды Microsoft.

Я начал с этого кода в качестве моих начальных переменных:

var items = Enumerable.Range(0, 10).ToArray();

Func<int, bool> Predicate = x => x % 2 == 0;

Func<int, int> ComputeSomeValue = x =>
{
    Thread.Sleep(10000);
    return x * 3;
};

Теперь я использовал обычный запрос LINQ в качестве базовой линии:

var results =
    from x in items
    where Predicate(x)
    select ComputeSomeValue(x);

Это заняло 50 секунд, чтобы вычислить следующие результаты:

enumerable

Затем я переключился на наблюдаемый (реактивный каркас) запрос:

var results =
    from x in items.ToObservable()
    where Predicate(x)
    from y in Observable.Start(() => ComputeSomeValue(x))
    select y;

Это заняло 10 секунд, чтобы получить:

observable

Он явно вычисляет параллельно.

Однако результаты не соответствуют порядку. Поэтому я изменил запрос на это:

var query =
    from x in items.ToObservable()
    where Predicate(x)
    from y in Observable.Start(() => ComputeSomeValue(x))
    select new { x, y };

var results =
    query
        .ToEnumerable()
        .OrderBy(z => z.x)
        .Select(z => z.y);

Это продолжалось 10 секунд, но результаты вернулись в правильном порядке.

Теперь единственная проблема здесь - WithDegreeOfParallelism. Там есть купе вещей, чтобы попробовать здесь.

Сначала я изменил код, чтобы произвести 10 000 значений с временем вычисления 10 мс. Мой стандартный запрос LINQ продолжался 50 секунд. Но реактивный запрос занял 6,3 секунды. Если бы он мог выполнять все вычисления в одно и то же время, он должен был бы принимать гораздо меньше. Это показывает, что он максимизирует асинхронный конвейер.

Во-вторых, реактивная среда использует планировщики для всего планирования работы. Вы можете попробовать множество планировщиков, которые поставляются с реактивной каркасной платформой, чтобы найти альтернативу, если встроенный не сделает то, что вы хотите. Или вы даже можете написать свой собственный планировщик, чтобы делать все, что вам нравится.


Здесь версия запроса, которая также вычисляет предикат.

var results =
    from x in items.ToObservable()
    from p in Observable.Start(() => Predicate(x))
    where p
    from y in Observable.Start(() => ComputeSomeValue(x))
    select new { x, y };

Ответ 2

Как указано здесь, PLINQ предназначен для параллельного выполнения запросов LINQ на многоядерном/многопроцессорном > систем. Это не слишком много касается прохладных систем с большим количеством дисковых блоков и супер-сетевых возможностей. AFAIK, он предназначен для запуска исполняемого кода на большее количество ядер, а не для одновременной отправки нескольких запросов ввода-вывода в операционную систему.

Возможно, ваш Predicate (x) связан с CPU, поэтому вы можете выполнить эту операцию фильтрации с помощью PLINQ. Но вы не можете применять операции ввода-вывода (ComputeSomeValue, PerformSomeAction) таким же образом.

Что вы можете сделать, так это определить цепочку операций (по два в вашем случае) для каждого элемента (см. задачи продолжения) и отправку эта цепочка (последовательно (?)).

Кроме того, вы упоминали о "бесконечном потоке предметов". Это может немного звучать как проблема производителя-потребителя, если эти элементы также генерируются в/в.

Возможно, ваша проблема не в том, что многоядерные дружественные... Это может быть просто требование ввода-вывода, что все...