Подтвердить что ты не робот

Parallel.Foreach + доходность возврата?

Я хочу обработать что-то с помощью параллельного цикла следующим образом:

public void FillLogs(IEnumerable<IComputer> computers)
{
    Parallel.ForEach(computers, cpt=>
    {
        cpt.Logs = cpt.GetRawLogs().ToList();
    });

}

Хорошо, все нормально. Но как сделать, если я хочу, чтобы метод FillLogs возвращал IEnumerable?

public IEnumerable<IComputer> FillLogs(IEnumerable<IComputer> computers)
{
    Parallel.ForEach(computers, cpt=>
    {
        cpt.Logs = cpt.GetRawLogs().ToList();
        yield return cpt // KO, don't work
    });

}

ИЗМЕНИТЬ

Кажется, это не возможно... но я использую что-то вроде этого:

public IEnumerable<IComputer> FillLogs(IEnumerable<IComputer> computers)
{
    return computers.AsParallel().Select(cpt => cpt);
}

Но где я помещаю команду cpt.Logs = cpt.GetRawLogs().ToList();

4b9b3361

Ответ 1

Краткая версия - нет, это невозможно через блок итератора; более длинная версия, вероятно, связана с синхронизированной очередью /dequeue между потоком итератора вызывающего абонента (выполняющим dequeue) и параллельными рабочими (выполнение очереди); но как побочная заметка - журналы обычно связаны с IO, а параллелизирующие вещи, которые связаны с IO, часто не работают очень хорошо.

Если вызывающему абоненту потребуется некоторое время, чтобы потреблять каждый, тогда может быть какая-то заслуга в подходе, который обрабатывает только один журнал за раз, но может сделать это , а, вызывающий абонент потребляет предыдущий журнал; т.е. начинает a Task для следующего элемента перед yield и ждет завершения после yield... но это опять же довольно сложно, В качестве упрощенного примера:

static void Main()
{
    foreach(string s in Get())
    {
        Console.WriteLine(s);
    }
}

static IEnumerable<string> Get() {
    var source = new[] {1, 2, 3, 4, 5};
    Task<string> outstandingItem = null;
    Func<object, string> transform = x => ProcessItem((int) x);
    foreach(var item in source)
    {
        var tmp = outstandingItem;

        // note: passed in as "state", not captured, so not a foreach/capture bug
        outstandingItem = new Task<string>(transform, item);
        outstandingItem.Start();

        if (tmp != null) yield return tmp.Result;
    }
    if (outstandingItem != null) yield return outstandingItem.Result;
}
static string ProcessItem(int i)
{
    return i.ToString();
}

Ответ 2

Я не хочу быть оскорбительным, но, возможно, есть недостаток понимания. Parallel.ForEach означает, что TPL будет запускать foreach в соответствии с доступным оборудованием в нескольких потоках. Но это означает, что ii возможно сделать эту работу параллельно! yield return дает вам возможность получить некоторые значения из списка (или что-то еще) и вернуть их один за другим по мере необходимости. Это предотвращает необходимость сначала найти все элементы, соответствующие условию, а затем перебрать их. Это действительно преимущество производительности, но не может быть сделано параллельно.

Ответ 3

Как насчет

            Queue<string> qu = new Queue<string>();
            bool finished = false;
            Task.Factory.StartNew(() =>
            {
                Parallel.ForEach(get_list(), (item) =>
                {
                    string itemToReturn = heavyWorkOnItem(item);         
                    lock (qu)
                       qu.Enqueue(itemToReturn );                        
                });
                finished = true;
            });

            while (!finished)
            {
                lock (qu)
                    while (qu.Count > 0)
                        yield return qu.Dequeue();
                //maybe a thread sleep here?
            }

Edit: Я думаю, что это лучше:

        public static IEnumerable<TOutput> ParallelYieldReturn<TSource, TOutput>(this IEnumerable<TSource> source, Func<TSource, TOutput> func)
        {
            ConcurrentQueue<TOutput> qu = new ConcurrentQueue<TOutput>();
            bool finished = false;
            AutoResetEvent re = new AutoResetEvent(false);
            Task.Factory.StartNew(() =>
            {
                Parallel.ForEach(source, (item) =>
                {
                    qu.Enqueue(func(item));
                    re.Set();
                });
                finished = true;
                re.Set();
            });

            while (!finished)
            {
                re.WaitOne();
                while (qu.Count > 0)
                {
                    TOutput res;
                    if (qu.TryDequeue(out res))
                        yield return res;
                }
            }
        }   

Edit2: Я согласен с коротким ответом Нет. Этот код бесполезен; вы не можете сломать цикл доходности.