Подтвердить что ты не робот

Есть ли способ использовать параллельную библиотеку задач (TPL) с SQLDataReader?

Мне нравится простота методов расширения Parallel.For и Parallel.ForEach в TPL. Мне было интересно, есть ли способ воспользоваться чем-то подобным или даже с немного более продвинутыми задачами.

Ниже приведено типичное использование для SqlDataReader, и мне было интересно, возможно ли это, и если да, то как заменить цикл while на что-то в TPL. Поскольку читатель не может предоставить фиксированное количество итераций, метод "Расширение" невозможен, что позволяет справиться с задачами, которые я собирал. Я надеялся, что кто-то, возможно, уже справился с этим, и разработал некоторые из них, и не надейся на ADO.net.

using (SqlConnection conn = new SqlConnection("myConnString"))
using (SqlCommand comm = new SqlCommand("myQuery", conn))
{
    conn.Open();

    SqlDataReader reader = comm.ExecuteReader();

    if (reader.HasRows)
    {
        while (reader.Read())
        {
            // Do something with Reader
        }
    }
}
4b9b3361

Ответ 1

Ты почти там. Оберните код, который вы отправили в функции с этой подписью:

IEnumerable<IDataRecord> MyQuery()

а затем замените код // Do something with Reader следующим образом:

yield return reader;

Теперь у вас есть что-то, что работает в одном потоке. К сожалению, когда вы читаете результаты запроса, он возвращает ссылку на один и тот же объект каждый раз, и объект просто мутирует себя для каждой итерации. Это означает, что если вы попытаетесь запустить его параллельно, вы получите некоторые действительно нечетные результаты, поскольку параллельные чтения мутируют объект, используемый в разных потоках. Вам нужен код, чтобы взять копию записи для отправки в ваш параллельный цикл.

На этом этапе, тем не менее, мне нравится пропустить дополнительную копию записи и перейти прямо к строго типизированному классу. Более того, мне нравится использовать общий метод для этого:

IEnumerable<T> GetData<T>(Func<IDataRecord, T> factory, string sql, Action<SqlParameterCollection> addParameters)
{
    using (var cn = new SqlConnection("My connection string"))
    using (var cmd = new SqlCommand(sql, cn))
    {
        addParameters(cmd.Parameters);

        cn.Open();
        using (var rdr = cmd.ExecuteReader())
        {
            while (rdr.Read())
            {
                yield return factory(rdr);
            }
        }
    }
}

Предполагая, что ваши методы factory создают копию, как ожидалось, этот код должен быть безопасным для использования в цикле Parallel.ForEach. Вызов метода будет выглядеть примерно так (предполагая класс Employee со статическим методом factory с именем "Создать" ):

var UnderPaid = GetData<Employee>(Employee.Create, 
       "SELECT * FROM Employee WHERE AnnualSalary <= @MinSalary", 
       p => {
           p.Add("@MinSalary", SqlDbType.Int).Value = 50000;
       });
Parallel.ForEach(UnderPaid, e => e.GiveRaise());

Важное обновление:
Я не настолько уверен в этом коде, как когда-то был. Отдельный поток может все еще мутировать читателя, пока другой поток находится в процессе его копирования. Я мог бы установить блокировку, но я также обеспокоен тем, что другой поток может вызвать обновление читателя после того, как оригинал сам вызвал Read(), но прежде чем он начнет делать копию. Поэтому критический раздел здесь состоит из всего цикла while... и в этот момент вы снова возвращаетесь к однопоточному. Я ожидаю, что есть способ изменить этот код для работы, как ожидалось, для многопоточных сценариев, но для этого потребуется больше изучения.

Ответ 2

У вас будет трудность замены этого цикла while. SqlDataReader не  класс потоковой безопасности, поэтому вы не можете использовать его непосредственно из нескольких потоков.

При этом вы можете обрабатывать данные, которые вы читаете, используя TPL. Здесь есть несколько вариантов. Самым простым может быть создание собственной реализации IEnumerable<T>, которая работает с читателем, и возвращает класс или структуру, содержащие ваши данные. Затем вы можете использовать PLINQ или оператор Parallel.ForEach для параллельной обработки ваших данных:

public IEnumerable<MyDataClass> ReadData()
{
    using (SqlConnection conn = new SqlConnection("myConnString"))
    using (SqlCommand comm = new SqlCommand("myQuery", conn))
    {
        conn.Open();

        SqlDataReader reader = comm.ExecuteReader();

        if (reader.HasRows)
        {
            while (reader.Read())
            {
                yield return new MyDataClass(... data from reader ...);
            }
        }
    }
}

Как только у вас есть этот метод, вы можете обработать его напрямую, используя PLINQ или TPL:

Parallel.ForEach(this.ReadData(), data =>
{
    // Use the data here...
});

Или:

this.ReadData().AsParallel().ForAll(data => 
{
    // Use the data here...
});