Подтвердить что ты не робот

Observable.Generate с селектором TimeSpan появляется как утечка памяти [При использовании TimeSpan> 15ms]

Я изучаю использование Observable.Generate для создания последовательности результатов, отобранных с интервалом, используя примеры из веб-сайта msdn в качестве отправной точки.

Следующий код БЕЗ селектора TimeSpan не обнаруживает утечку памяти:

IObservable<string> obs = Observable.Generate(initialState: 1,
                                              condition: x => x < 1000,
                                              iterate: x => x + 1,
                                              resultSelector: x => x.ToString());
obs.Subscribe(x => Console.WriteLine(x));

Однако следующий код с селектором TimeSpan обнаруживает утечку памяти:

TimeSpan timeSpan = TimeSpan.FromSeconds(1);
IObservable<string> obs = Observable.Generate(initialState: 1,
                                              condition: x => x < 1000,
                                              iterate: x => x + 1,
                                              resultSelector: x => x.ToString(),
                                              timeSelector: x => timeSpan);
obs.Subscribe(x => Console.WriteLine(x));

Например, это игрушечное приложение быстро отобразит утечку памяти с помощью Memory Profiler, который поставляется с сообществом VS 2015:

using System;
using System.Reactive.Linq;

namespace Sample
{
    public class Program
    {
        static void Main()
        {
            IObservable<string> obs = Observable.Generate(1, x => x < 1000*1000, x => x + 1, x => x.ToString(), x => TimeSpan.FromMilliseconds(500));
            obs.Subscribe(x => { /*Do nothing but simply run the observable*/ });
            Console.ReadLine();
        }
    }
}

Утечка памяти - это растущая коллекция:

System.Reactive.Disposables StableCompositeDisposable.Binary
System.Reactive.Disposables SingleAssignmentDisposable

Я использую этот API неправильно? Должен ли я ожидать, что память вырастет или это ошибка с Reactive?

4b9b3361

Ответ 1

Это выглядит как ошибка для меня - или, по крайней мере, беспорядочное/нежелательное поведение в рекурсивной реализации расписания DefaultScheduler (это не очень рекурсивно, я говорю о перегрузке, которая проходит в самом планировщике к запланированному действию поэтому вы можете запланировать продолжение).

Ресурсы, которые вы видите, создаются вызовом метода DefaultScheduler.Schedule(строка 71 здесь: https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.Core/Reactive/Concurrency/DefaultScheduler.cs).

Есть несколько причин, по которым другие попытки обнаружить это не удалось. Во-первых, одноразовые объекты в конечном итоге удаляются - но только тогда, когда Generate OnCompletes или OnErrors, после чего System.Reactive.AnonymousSafeObserver<T>, возвращаемый генератором, когда вы подписываетесь на него, он очищается.

Во-вторых, если вы используете короткий TimeSpan (помните, что минимальное разрешение .NET Timer равно 15ms), тогда Rx оптимизирует использование таймера и вызывает QueueUserWorkItem без использования таймера, когда-либо создаются.

Если вы вникнете в Generate implementation (https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs), вы увидите, что он передает IDisposable, возвращенный начальным вызовом Расписание передачи его обратно наблюдателю, который висит на нем до ошибки/завершения. Это предотвращает сбор всей результирующей цепочки вызовов - и означает, что если вам нужно отменить или когда произойдет очистка, то только после каждого запланированного действия одноразовые будут удалены.

Вы можете увидеть тот же эффект в приведенном ниже коде, который использует DefaultScheduler напрямую - ссылка на cancel в последней строке достаточно, чтобы вызвать утечку. Обязательно используйте сборку релизов, иначе компилятор сохранит отмену, пока метод не закончится независимо.

// ensure you are using a release build of this code
ManualResetEvent mre = new ManualResetEvent();
IDisposable cancel;
int maxCount = 20;

TimeSpan timeSpan = TimeSpan.FromSeconds(1);

Func<IScheduler, int, IDisposable> recurse = null;
recurse = (self, state) =>
{
    Console.WriteLine(state);

    if (state == maxCount)
    {
        mre.Set();
        return Disposable.Empty;
    }

    return self.Schedule(state + 1, timeSpan, recurse);
};

cancel = Scheduler.Default.Schedule(1, timeSpan, recurse);

mre.WaitOne();

// uncomment the following line, and you'll get the same leak
// leave it commented, and cancel reference is GC'd early and there no leak
// if(cancel == null) Console.WriteLine("Hang on to cancel");

Я использовал Jetbrains dotMemory API для получения дампов памяти, чтобы сделать выводы здесь - я лишил код выше этих вызовов API, но здесь есть полный смысл, если у вас есть этот продукт, и вы сможете увидеть влияние раскола окончательной строки довольно четко: https://gist.github.com/james-world/f20377ea610fb8fc0ee811d27f7a837c В качестве альтернативы вы можете использовать API-интерфейс профилирования MS, который я не выгружал в мой мозг, работающий на данный момент!