Подтвердить что ты не робот

У Async.StartChild есть утечка памяти?

Когда я запускаю следующий тест (построенный с помощью F # 2.0), я получаю OutOfMemoryException. Для достижения исключения в моей системе требуется около 5 минут (i7-920 6gb ram, если он работает как процесс x86), но в любом случае мы можем видеть, как память растет в диспетчере задач.

module start_child_test
    open System
    open System.Diagnostics
    open System.Threading
    open System.Threading.Tasks

    let cnt = ref 0
    let sw = Stopwatch.StartNew()
    Async.RunSynchronously(async{
        while true do
            let! x = Async.StartChild(async{
                if (Interlocked.Increment(cnt) % 100000) = 0 then
                    if sw.ElapsedMilliseconds > 0L then
                        printfn "ops per sec = %d" (100000L*1000L / sw.ElapsedMilliseconds)
                    else
                        printfn "ops per sec = INF"
                    sw.Restart()
                    GC.Collect()
            })
            do! x
    })

    printfn "done...."

Я не вижу ничего плохого в этом коде и не вижу причин для роста памяти. Я сделал альтернативную реализацию, чтобы убедиться, что мои аргументы верны:

module start_child_fix
    open System
    open System.Collections
    open System.Collections.Generic
    open System.Threading
    open System.Threading.Tasks


    type IAsyncCallbacks<'T> = interface
        abstract member OnSuccess: result:'T -> unit
        abstract member OnError: error:Exception -> unit
        abstract member OnCancel: error:OperationCanceledException -> unit
    end

    type internal AsyncResult<'T> = 
        | Succeeded of 'T
        | Failed of Exception
        | Canceled of OperationCanceledException

    type internal AsyncGate<'T> = 
        | Completed of AsyncResult<'T>
        | Subscribed of IAsyncCallbacks<'T>
        | Started
        | Notified

    type Async with
        static member StartChildEx (comp:Async<'TRes>) = async{
            let! ct = Async.CancellationToken

            let gate = ref AsyncGate.Started
            let CompleteWith(result:AsyncResult<'T>, callbacks:IAsyncCallbacks<'T>) =
                if Interlocked.Exchange(gate, Notified) <> Notified then
                    match result with
                        | Succeeded v -> callbacks.OnSuccess(v)
                        | Failed e -> callbacks.OnError(e)
                        | Canceled e -> callbacks.OnCancel(e)

            let ProcessResults (result:AsyncResult<'TRes>) =
                let t = Interlocked.CompareExchange<AsyncGate<'TRes>>(gate, AsyncGate.Completed(result), AsyncGate.Started)
                match t with
                | Subscribed callbacks -> 
                    CompleteWith(result, callbacks)
                | _ -> ()
            let Subscribe (success, error, cancel) = 
                let callbacks = {
                    new IAsyncCallbacks<'TRes> with
                        member this.OnSuccess v = success v
                        member this.OnError e = error e
                        member this.OnCancel e = cancel e
                }
                let t = Interlocked.CompareExchange<AsyncGate<'TRes>>(gate, AsyncGate.Subscribed(callbacks), AsyncGate.Started)
                match t with
                | AsyncGate.Completed result -> 
                    CompleteWith(result, callbacks)
                | _ -> ()

            Async.StartWithContinuations(
                computation = comp,
                continuation = (fun v -> ProcessResults(AsyncResult.Succeeded(v))),
                exceptionContinuation = (fun e -> ProcessResults(AsyncResult.Failed(e))),
                cancellationContinuation = (fun e -> ProcessResults(AsyncResult.Canceled(e))),
                cancellationToken = ct
            )
            return Async.FromContinuations( fun (success, error, cancel) ->
                Subscribe(success, error, cancel)
            )
        }

Для этого теста он работает хорошо, без значительного потребления памяти. К сожалению, я не очень много разбираюсь в F # и сомневаюсь, если я пропущу некоторые вещи. В случае, если это ошибка, как я могу сообщить об этом команде F #?

4b9b3361

Ответ 1

Я думаю, что вы правы - похоже, утечка памяти в реализации StartChild.

Я сделал немного профилирования (после фантастического учебника от Dave Thomas) и версия с открытым исходным кодом F #, и я думаю, что я даже знаю, как это исправить. Если вы посмотрите на реализацию StartChild, он регистрирует обработчик с текущим токеном отмены рабочего процесса:

let _reg = ct.Register(
    (fun _ -> 
        match !ctsRef with
        |   null -> ()
        |   otherwise -> otherwise.Cancel()), null)

Объектами, которые остаются в куче, являются экземпляры этой зарегистрированной функции. Они могут быть незарегистрированы, вызывая _reg.Dispose(), но это никогда не происходит в исходном коде F #. Я попытался добавить _reg.Dispose() к функциям, вызываемым при завершении async:

(fun res -> _reg.Dispose(); ctsRef := null; resultCell.RegisterResult (Ok res, reuseThread=true))   
(fun err -> _reg.Dispose(); ctsRef := null; resultCell.RegisterResult (Error err,reuseThread=true))   
(fun err -> _reg.Dispose(); ctsRef := null; resultCell.RegisterResult (Canceled err,reuseThread=true))

... и, основываясь на моих экспериментах, это устраняет проблему. Итак, если вы хотите обходной путь, вы можете скопировать весь необходимый код из control.fs и добавить это как исправление.

Я отправлю отчет об ошибке команде F # со ссылкой на ваш вопрос. Если вы найдете что-то еще, вы можете связаться с ними, отправив отчеты об ошибках на fsbugs в microsoft dot com.