Подтвердить что ты не робот

Почему эта Observable.Generate перегрузка вызывает утечку памяти? [Использование Timespan <15ms]

Следующий код Rx.NET будет использовать примерно 500 МБ памяти примерно через 10 секунд на моей машине.

var stream =
    Observable.Range(0, 10000)
              .SelectMany(i => Observable.Generate(
                  0, 
                  j => true, 
                  j => j + 1, 
                  j => new { N = j },
                  j => TimeSpan.FromMilliseconds(1)));

stream.Subscribe();

Если я использую перегрузку Observable.Generate без параметра Func<int, TimeSpan>, мои платы использования памяти на 35 МБ.

var stream =
    Observable.Range(0, 10000)
              .SelectMany(i => Observable.Generate(
                  0,
                  j => true,
                  j => j + 1,
                  j => new { N = j }));
                  // j => TimeSpan.FromMilliseconds(1))); ** Removed! **

stream.Subscribe();

Кажется, что это проблема только при использовании методов расширения SelectMany() или Merge().

4b9b3361

Ответ 1

Это проблема, по которой используется планировщик по умолчанию.

В версии TimeSpan планировщик - DefaultScheduler. Без TimeSpan это CurrentThreadScheduler.

Таким образом, для основанного на времени генерации это очень быстро пытается запланировать все операции и в основном создает массивную очередь событий, ожидающих выполнения. Таким образом, он использует нагрузку памяти.

С не-временным созданием его с использованием текущего потока, так что он будет генерировать и потреблять каждое генерируемое значение последовательно и, следовательно, использовать очень мало памяти.

О, и это не утечка памяти. Это обычная операция, если вы пытаетесь планировать бесконечное число значений быстрее, чем они могут быть использованы.


Я декомпилировал код, чтобы определить, какие планировщики были использованы.

Здесь декомпилируется не-время:

public static IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector)
{
    if (condition == null)
        throw new ArgumentNullException("condition");
    if (iterate == null)
        throw new ArgumentNullException("iterate");
    if (resultSelector == null)
        throw new ArgumentNullException("resultSelector");
    return Observable.s_impl.Generate<TState, TResult>(initialState, condition, iterate, resultSelector);
}

public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector)
{
    return (IObservable<TResult>)new Generate<TState, TResult>(initialState, condition, iterate, resultSelector, SchedulerDefaults.Iteration);
}

internal static IScheduler Iteration
{
    get
    {
        return (IScheduler)CurrentThreadScheduler.Instance;
    }
}

Вышеуказанные методы относятся к Observable, QueryLanguage и SchedulerDefaults соответственно.