Подтвердить что ты не робот

Async с огромными потоками данных

Мы используем IEnumerables для возврата огромных наборов данных из базы данных:

public IEnumerable<Data> Read(...)
{
    using(var connection = new SqlConnection(...))
    {
        // ...
        while(reader.Read())
        {
            // ...
            yield return item;
        }
    }
}

Теперь мы хотим использовать методы async, чтобы сделать то же самое. Однако для async нет IEnumerables, поэтому мы должны собирать данные в список до загрузки всего набора данных:

public async Task<List<Data>> ReadAsync(...)
{
    var result = new List<Data>();
    using(var connection = new SqlConnection(...))
    {
        // ...
        while(await reader.ReadAsync().ConfigureAwait(false))
        {
            // ...
            result.Add(item);
        }
    }
    return result;
}

Это потребует огромного количества ресурсов на сервере, потому что все данные должны быть в списке перед возвратом. Какая самая лучшая и простая в использовании альтернатива async для IEnumerables для работы с большими потоками данных? Я бы хотел избежать хранения всех данных в памяти во время обработки.

4b9b3361

Ответ 1

Самый простой вариант - TPL Dataflow. Все, что вам нужно сделать, это настроить ActionBlock, который обрабатывает обработку (параллельно, если хотите) и "отправляет" элементы в нее одним асинхронным способом.
Я также предложил бы установить BoundedCapacity, который будет дросселировать считывающее устройство считывателя из базы данных, когда обработка не сможет обрабатывать скорость.

var block = new ActionBlock<Data>(
    data => ProcessDataAsync(data),
    new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 1000,
        MaxDegreeOfParallelism = Environment.ProcessorCount
    });

using(var connection = new SqlConnection(...))
{
    // ...
    while(await reader.ReadAsync().ConfigureAwait(false))
    {
        // ...
       await block.SendAsync(item);
    }
}

Вы также можете использовать Reactive Extensions, но это более сложная и надежная структура, чем вам, возможно, потребуется.

Ответ 2

Это будет потреблять огромное количество ресурсов на сервере, потому что все данные должны быть в списке перед возвратом. Что лучше и легко использовать асинхронную альтернативу для IEnumerables для работы с большими данными потоки? Я хотел бы избежать хранения всех данных в памяти, пока обработка.

Если вы не хотите сразу отправлять все данные клиенту, вы можете использовать Reactive Extensions (Rx) (на клиенте ) и SignalR (как на клиенте, так и на сервере), чтобы справиться с этим.

SignalR позволит отправлять данные клиенту асинхронно. Rx позволит применить LINQ к асинхронной последовательности элементов данных по мере их поступления на клиент. Это, однако, изменило бы всю модель кода вашего клиент-серверного приложения.

Пример (сообщение в блоге Самуэля Джека):

Связанный вопрос (если не дубликат):

Ответ 3

В большинстве случаев, когда речь идет о методах асинхронного/ожидающего, мне легче включить проблему и использовать функции (Func<...>) или действия (Action<...>) вместо ad-hoc-кода, особенно с помощью IEnumerable и yield.

Другими словами, когда я думаю "async", я пытаюсь забыть старое понятие функции "возвращаемое значение", которое в противном случае настолько очевидно и что мы так хорошо знакомы.

Например, если вы измените исходный код синхронизации на это (processor - это код, который в конечном итоге сделает то, что вы делаете с одним элементом данных):

public void Read(..., Action<Data> processor)
{
    using(var connection = new SqlConnection(...))
    {
        // ...
        while(reader.Read())
        {
            // ...
            processor(item);
        }
    }
}

Затем асинхронную версию довольно просто написать:

public async Task ReadAsync(..., Action<Data> processor)
{
    using(var connection = new SqlConnection(...))
    {
        // note you can use connection.OpenAsync()
        // and command.ExecuteReaderAsync() here
        while(await reader.ReadAsync())
        {
            // ...
            processor(item);
        }
    }
}

Если вы можете изменить свой код таким образом, вам не нужны никакие расширения или дополнительная библиотека или IAsyncEnumerable.

Ответ 4

Как упоминалось в других плакатах, это может быть реализовано с помощью Rx. С Rx функция вернет IObservable<Data>, на которую можно подписаться, и она подталкивает данные к абоненту по мере его появления. IObservable также поддерживает LINQ и добавляет некоторые собственные методы расширения.

Обновление

Я добавил несколько общих вспомогательных методов, чтобы использовать повторное использование читателя, а также поддержку отмены.

public static class ObservableEx
    {
        public static IObservable<T> CreateFromSqlCommand<T>(string connectionString, string command, Func<SqlDataReader, Task<T>> readDataFunc)
        {
            return CreateFromSqlCommand(connectionString, command, readDataFunc, CancellationToken.None);
        }

        public static IObservable<T> CreateFromSqlCommand<T>(string connectionString, string command, Func<SqlDataReader, Task<T>> readDataFunc, CancellationToken cancellationToken)
        {
            return Observable.Create<T>(
                async o =>
                {
                    SqlDataReader reader = null;

                    try
                    {                        
                        using (var conn = new SqlConnection(connectionString))
                        using (var cmd = new SqlCommand(command, conn))
                        {
                            await conn.OpenAsync(cancellationToken);
                            reader = await cmd.ExecuteReaderAsync(CommandBehavior.CloseConnection, cancellationToken);

                            while (await reader.ReadAsync(cancellationToken))
                            {
                                var data = await readDataFunc(reader);
                                o.OnNext(data);                                
                            }

                            o.OnCompleted();
                        }
                    }
                    catch (Exception ex)
                    {
                        o.OnError(ex);
                    }

                    return reader;
                });
        }
    }

Реализация ReadData теперь значительно упрощена.

     private static IObservable<Data> ReadData()
    {
        return ObservableEx.CreateFromSqlCommand(connectionString, "select * from Data", async r =>
        {
            return await Task.FromResult(new Data()); // sample code to read from reader.
        });
    }

Использование

Вы можете подписаться на Observable, предоставив ему IObserver, но также есть перегрузки, которые берут lambdas. По мере того как данные становятся доступными, вызываемый вызов OnNext вызывает вызов. Если есть исключение, вызываемый обратный вызов OnError вызывается. Наконец, если больше нет данных, вызываемый вызов обращается к OnCompleted.

Если вы хотите отменить наблюдаемые, просто удалите подписку.

void Main()
{
   // This is an asyncrhonous call, it returns straight away
    var subscription = ReadData()
        .Skip(5)                        // Skip first 5 entries, supports LINQ               
        .Delay(TimeSpan.FromSeconds(1)) // Rx operator to delay sequence 1 second
        .Subscribe(x =>
    {
        // Callback when a new Data is read
        // do something with x of type Data
    },
    e =>
    {
        // Optional callback for when an error occurs
    },
    () =>
    {
        //Optional callback for when the sequenc is complete
    }
    );

    // Dispose subscription when finished
    subscription.Dispose();

    Console.ReadKey();
}

Ответ 5

Я думаю, что Rx, безусловно, способ пойти в этом сценарии, учитывая, что наблюдаемая последовательность является формальной двойственной к перечислимой.

Как уже упоминалось в предыдущем ответе, вы можете перезаписать свою последовательность как наблюдаемую с нуля, но есть также несколько способов сохранить свои блоки итераторов, но затем просто открутите их асинхронно.

1) Просто преобразуйте перечислимое значение в наблюдаемое так:

using System.Reactive.Linq;
using System.Reactive.Concurrency;

var enumerable = Enumerable.Range(10);
var observable = enumerable.ToObservable();
var subscription = observable.Subscribe(x => Console.WriteLine(x));

Это приведет к тому, что ваш счетчик будет вести себя как наблюдаемый, нажимая его уведомления в любые последующие наблюдатели. В этом случае, когда вызывается функция "Подписаться", она будет блокироваться до тех пор, пока все данные не будут обработаны. Если вы хотите, чтобы он был полностью асинхронным, вы можете установить его в другой поток, используя:

var observable = enumerable.ToObservable().SubscribeOn(NewThreadScheduler.Default);

Теперь разматывание перечислимого будет выполнено в новом потоке, и метод подписки будет немедленно возвращен.

2) Размотайте перечислимое использование с помощью другого асинхронного источника событий:

var enumerable = Enumerable.Range(10);
var observable = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
                           .Zip(enumerable, (t, x) => x);
var subscription = observable.Subscribe(x => Console.WriteLine(x));

В этом случае я настраиваю таймер на огонь каждую секунду, и всякий раз, когда он срабатывает, он перемещает итератор вперед. Теперь таймер можно легко заменить любым источником событий, чтобы точно контролировать, когда итератор движется вперед.

Я получаю удовольствие от синтаксиса и семантики блоков итераторов (например, что происходит с блоками try/finally и dispose), поэтому иногда я использую эти проекты даже при разработке асинхронных операций.