Подтвердить что ты не робот

Есть ли реализация IEnumerable, которая только выполняет итерацию по этому источнику (например, LINQ) один раз

Предоставлено items - результат выражения q LINQ:

var items = from item in ItemsSource.RetrieveItems()
            where ...

Предположим, что для генерации каждого элемента требуется некоторое время, отличное от negligeble.

Возможны два режима работы:

  • Использование foreach позволит начать работу с элементами в начале коллекции намного раньше, чем в конце концов станет доступным. Однако, если мы захотим позже обработать одну и ту же коллекцию, нам придется скопировать ее сохранение:

    var storedItems = new List<Item>();
    foreach(var item in items){
        Process(item);
        storedItems .Add(item);
    }
    
    // Later
    foreach(var item in storedItems){
        ProcessMore(item);
    }
    

    Потому что, если бы мы только что сделали foreach(... in items), тогда temsSource.RetrieveItems() снова будет вызван.

  • Мы могли бы использовать .ToList() right upfront, но это заставило бы нас ждать, пока последний элемент будет извлечен, прежде чем мы сможем начать обработку первого.

Вопрос: существует ли реализация IEnumerable, которая будет выполнять первый раз, как обычный результат запроса LINQ, но будет материализоваться в процессе, так что второй foreach будет перебирать сохраненные значения?

4b9b3361

Ответ 1

Интересная задача, поэтому я должен предоставить свое собственное решение. Настолько забавно, что мое решение теперь находится в версии 3. Версия 2 была упрощением, которое я сделал на основе отзывов от Servy. Затем я понял, что мое решение имеет огромный недостаток. Если первое перечисление кэшированного перечислимого не завершилось, кэширование не будет выполнено. Многие расширения LINQ, такие как First и Take, будут перечислять достаточно перечисляемых, чтобы выполнить задание, и мне пришлось обновить до версии 3, чтобы сделать эту работу с кешированием.

Вопрос о последующих перечислениях перечисляемого, который не предполагает одновременного доступа. Тем не менее, я решил сделать мой поток решений безопасным. Это добавляет некоторую сложность и немного накладных расходов, но должно позволить использовать решение во всех сценариях.

public static class EnumerableExtensions {

  public static IEnumerable<T> Cached<T>(this IEnumerable<T> source) {
    if (source == null)
      throw new ArgumentNullException("source");
    return new CachedEnumerable<T>(source);
  }

}

class CachedEnumerable<T> : IEnumerable<T> {

  readonly Object gate = new Object();

  readonly IEnumerable<T> source;

  readonly List<T> cache = new List<T>();

  IEnumerator<T> enumerator;

  bool isCacheComplete;

  public CachedEnumerable(IEnumerable<T> source) {
    this.source = source;
  }

  public IEnumerator<T> GetEnumerator() {
    lock (this.gate) {
      if (this.isCacheComplete)
        return this.cache.GetEnumerator();
      if (this.enumerator == null)
        this.enumerator = source.GetEnumerator();
    }
    return GetCacheBuildingEnumerator();
  }

  public IEnumerator<T> GetCacheBuildingEnumerator() {
    var index = 0;
    T item;
    while (TryGetItem(index, out item)) {
      yield return item;
      index += 1;
    }
  }

  bool TryGetItem(Int32 index, out T item) {
    lock (this.gate) {
      if (!IsItemInCache(index)) {
        // The iteration may have completed while waiting for the lock.
        if (this.isCacheComplete) {
          item = default(T);
          return false;
        }
        if (!this.enumerator.MoveNext()) {
          item = default(T);
          this.isCacheComplete = true;
          this.enumerator.Dispose();
          return false;
        }
        this.cache.Add(this.enumerator.Current);
      }
      item = this.cache[index];
      return true;
    }
  }

  bool IsItemInCache(Int32 index) {
    return index < this.cache.Count;
  }

  IEnumerator IEnumerable.GetEnumerator() {
    return GetEnumerator();
  }

}

Расширение используется как это (sequence является IEnumerable<T>):

var cachedSequence = sequence.Cached();

// Pulling 2 items from the sequence.
foreach (var item in cachedSequence.Take(2))
  // ...

// Pulling 2 items from the cache and the rest from the source.
foreach (var item in cachedSequence)
  // ...

// Pulling all items from the cache.
foreach (var item in cachedSequence)
  // ...

Есть небольшая утечка, если перечисляется только часть перечислимого типа (например, cachedSequence.Take(2).ToList(). Перечислитель, который используется ToList, будет удален, но исходный перечислитель источника не будет удален. Это связано с тем, что первые 2 элемента кэшируется, а исходный счетчик сохраняется в памяти, если должны быть сделаны запросы на последующие элементы. В этом случае исходный счетчик очищается только тогда, когда исправляется сбор мусора (который будет в то же время, что и, возможно, большой кеш).

Ответ 2

Взгляните на библиотеку Reactive Extentsions - есть расширение MemoizeAll(), которое будет кэшировать элементы в вашем IEnumerable, как только они доступ к ним и их сохранение для будущих обращений.

См. этот блог-сообщение от Bart De Smet для хорошего чтения в MemoizeAll и других методах Rx.

Изменить. Фактически это найдено в отдельном пакете интерактивных расширений - доступно из NuGet или Загрузка Microsoft.

Ответ 3

public static IEnumerable<T> SingleEnumeration<T>(this IEnumerable<T> source)
{
    return new SingleEnumerator<T>(source);
}

private class SingleEnumerator<T> : IEnumerable<T>
{
    private CacheEntry<T> cacheEntry;
    public SingleEnumerator(IEnumerable<T> sequence)
    {
        cacheEntry = new CacheEntry<T>(sequence.GetEnumerator());
    }

    public IEnumerator<T> GetEnumerator()
    {
        if (cacheEntry.FullyPopulated)
        {
            return cacheEntry.CachedValues.GetEnumerator();
        }
        else
        {
            return iterateSequence<T>(cacheEntry).GetEnumerator();
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }
}

private static IEnumerable<T> iterateSequence<T>(CacheEntry<T> entry)
{
    using (var iterator = entry.CachedValues.GetEnumerator())
    {
        int i = 0;
        while (entry.ensureItemAt(i) && iterator.MoveNext())
        {
            yield return iterator.Current;
            i++;
        }
    }
}

private class CacheEntry<T>
{
    public bool FullyPopulated { get; private set; }
    public ConcurrentQueue<T> CachedValues { get; private set; }

    private static object key = new object();
    private IEnumerator<T> sequence;

    public CacheEntry(IEnumerator<T> sequence)
    {
        this.sequence = sequence;
        CachedValues = new ConcurrentQueue<T>();
    }

    /// <summary>
    /// Ensure that the cache has an item a the provided index.  If not, take an item from the 
    /// input sequence and move to the cache.
    /// 
    /// The method is thread safe.
    /// </summary>
    /// <returns>True if the cache already had enough items or 
    /// an item was moved to the cache, 
    /// false if there were no more items in the sequence.</returns>
    public bool ensureItemAt(int index)
    {
        //if the cache already has the items we don't need to lock to know we 
        //can get it
        if (index < CachedValues.Count)
            return true;
        //if we're done there no race conditions hwere either
        if (FullyPopulated)
            return false;

        lock (key)
        {
            //re-check the early-exit conditions in case they changed while we were
            //waiting on the lock.

            //we already have the cached item
            if (index < CachedValues.Count)
                return true;
            //we don't have the cached item and there are no uncached items
            if (FullyPopulated)
                return false;

            //we actually need to get the next item from the sequence.
            if (sequence.MoveNext())
            {
                CachedValues.Enqueue(sequence.Current);
                return true;
            }
            else
            {
                FullyPopulated = true;
                return false;
            }
        }
    }
}

Итак, это было отредактировано (по существу) для поддержки многопоточного доступа. Несколько потоков могут запрашивать элементы, а по элементам по элементам они будут кэшироваться. Ему не нужно ждать, пока вся последовательность будет повторяться, чтобы он возвращал кешированные значения. Ниже приведен пример программы, демонстрирующей это:

private static IEnumerable<int> interestingIntGenertionMethod(int maxValue)
{
    for (int i = 0; i < maxValue; i++)
    {
        Thread.Sleep(1000);
        Console.WriteLine("actually generating value: {0}", i);
        yield return i;
    }
}

public static void Main(string[] args)
{
    IEnumerable<int> sequence = interestingIntGenertionMethod(10)
        .SingleEnumeration();

    int numThreads = 3;
    for (int i = 0; i < numThreads; i++)
    {
        int taskID = i;
        Task.Factory.StartNew(() =>
        {
            foreach (int value in sequence)
            {
                Console.WriteLine("Task: {0} Value:{1}",
                    taskID, value);
            }
        });
    }

    Console.WriteLine("Press any key to exit...");
    Console.ReadKey(true);
}

Вам действительно нужно увидеть, как он работает, чтобы понять власть здесь. Как только один поток заставляет создавать следующие фактические значения, все оставшиеся потоки могут немедленно распечатать это сгенерированное значение, но все они будут ждать, если для печати этого потока не будут сохранены никакие значения. (Очевидно, что планирование потока/потокового пула может привести к тому, что одна задача займет больше времени, чтобы напечатать ее значение, чем необходимо.)