Подтвердить что ты не робот

Многопоточное приложение С# с вызовами базы данных SQL Server

У меня есть база данных SQL Server с 500 000 записей в таблице main. Существуют также три таблицы, которые называются child1, child2 и child3. Множество-много отношений между child1, child2, child3 и main реализованы через три таблицы отношений: main_child1_relationship, main_child2_relationship и main_child3_relationship. Мне нужно прочитать записи в main, обновить main, а также вставить в таблицы отношений новые строки, а также вставить новые записи в дочерние таблицы. Записи в дочерних таблицах имеют ограничения уникальности, поэтому псевдокод для фактического вычисления (CalculateDetails) будет выглядеть примерно так:

for each record in main
{
   find its child1 like qualities
   for each one of its child1 qualities
   {
      find the record in child1 that matches that quality
      if found
      {
          add a record to main_child1_relationship to connect the two records
      }
      else
      {
          create a new record in child1 for the quality mentioned
          add a record to main_child1_relationship to connect the two records
      }
   }
   ...repeat the above for child2
   ...repeat the above for child3 
}

Это прекрасно работает как однопоточное приложение. Но это слишком медленно. Обработка на С# довольно тяжелая и занимает слишком много времени. Я хочу превратить это в многопоточное приложение.

Каков наилучший способ сделать это? Мы используем Linq для Sql.

До сих пор мой подход заключался в создании нового объекта DataContext для каждой партии записей из main и использования ThreadPool.QueueUserWorkItem для его обработки. Однако эти партии наступают друг на друга, потому что один поток добавляет запись, а затем следующий поток пытается добавить один и тот же... Я получаю всевозможные интересные блокировки SQL Server.

Вот код:

    int skip = 0;
    List<int> thisBatch;
    Queue<List<int>> allBatches = new Queue<List<int>>();
    do
    {
        thisBatch = allIds
                .Skip(skip)
                .Take(numberOfRecordsToPullFromDBAtATime).ToList();
        allBatches.Enqueue(thisBatch);
        skip += numberOfRecordsToPullFromDBAtATime;

    } while (thisBatch.Count() > 0);

    while (allBatches.Count() > 0)
    {
        RRDataContext rrdc = new RRDataContext();

        var currentBatch = allBatches.Dequeue();
        lock (locker)  
        {
            runningTasks++;
        }
        System.Threading.ThreadPool.QueueUserWorkItem(x =>
                    ProcessBatch(currentBatch, rrdc));

        lock (locker) 
        {
            while (runningTasks > MAX_NUMBER_OF_THREADS)
            {
                 Monitor.Wait(locker);
                 UpdateGUI();
            }
        }
    }

И вот ProcessBatch:

    private static void ProcessBatch( 
        List<int> currentBatch, RRDataContext rrdc)
    {
        var topRecords = GetTopRecords(rrdc, currentBatch);
        CalculateDetails(rrdc, topRecords);
        rrdc.Dispose();

        lock (locker)
        {
            runningTasks--;
            Monitor.Pulse(locker);
        };
    }

И

    private static List<Record> GetTopRecords(RecipeRelationshipsDataContext rrdc, 
                                              List<int> thisBatch)
    {
        List<Record> topRecords;

        topRecords = rrdc.Records
                    .Where(x => thisBatch.Contains(x.Id))
                    .OrderBy(x => x.OrderByMe).ToList();
        return topRecords;
    }

CalculateDetails лучше всего объясняется псевдокодом наверху.

Я думаю, что должен быть лучший способ сделать это. Пожалуйста помоги. Большое спасибо!

4b9b3361

Ответ 1

Вот мой вопрос:

  • При использовании нескольких потоков для вставки/обновления/запроса данных в SQL Server или в любую базу данных взаимоблокировки являются фактом жизни. Вы должны предположить, что они произойдут и обработают их соответствующим образом.

  • Это не так сказать, мы не должны пытаться ограничить возникновение тупиков. Тем не менее, легко прочитать основные причины взаимоблокировок и предпринять шаги для их предотвращения, но SQL Server всегда вас удивит: -)

Некоторая причина блокировок:

Вот как я могу решить вашу проблему:

  • Я бы не откатил свое собственное решение для потоковой обработки, я бы использовал библиотеку TaskParallel. Мой основной метод будет выглядеть примерно так:

    using (var dc = new TestDataContext())
    {
        // Get all the ids of interest.
        // I assume you mark successfully updated rows in some way
        // in the update transaction.
        List<int> ids = dc.TestItems.Where(...).Select(item => item.Id).ToList();
    
        var problematicIds = new List<ErrorType>();
    
        // Either allow the TaskParallel library to select what it considers
        // as the optimum degree of parallelism by omitting the 
        // ParallelOptions parameter, or specify what you want.
        Parallel.ForEach(ids, new ParallelOptions {MaxDegreeOfParallelism = 8},
                            id => CalculateDetails(id, problematicIds));
    }
    
  • Выполнить метод CalculateDetails с повторениями для сбоев в блокировке.

    private static void CalculateDetails(int id, List<ErrorType> problematicIds)
    {
        try
        {
            // Handle deadlocks
            DeadlockRetryHelper.Execute(() => CalculateDetails(id));
        }
        catch (Exception e)
        {
            // Too many deadlock retries (or other exception). 
            // Record so we can diagnose problem or retry later
            problematicIds.Add(new ErrorType(id, e));
        }
    }
    
  • Основной метод CalculateDetails

    private static void CalculateDetails(int id)
    {
        // Creating a new DeviceContext is not expensive.
        // No need to create outside of this method.
        using (var dc = new TestDataContext())
        {
            // TODO: adjust IsolationLevel to minimize deadlocks
            // If you don't need to change the isolation level 
            // then you can remove the TransactionScope altogether
            using (var scope = new TransactionScope(
                TransactionScopeOption.Required,
                new TransactionOptions {IsolationLevel = IsolationLevel.Serializable}))
            {
                TestItem item = dc.TestItems.Single(i => i.Id == id);
    
                // work done here
    
                dc.SubmitChanges();
                scope.Complete();
            }
        }
    }
    
  • И, конечно, моя реализация вспомогательного помощника по тупиковой ошибке

    public static class DeadlockRetryHelper
    {
        private const int MaxRetries = 4;
        private const int SqlDeadlock = 1205;
    
        public static void Execute(Action action, int maxRetries = MaxRetries)
        {
            if (HasAmbientTransaction())
            {
                // Deadlock blows out containing transaction
                // so no point retrying if already in tx.
                action();
            }
    
            int retries = 0;
    
            while (retries < maxRetries)
            {
                try
                {
                    action();
                    return;
                }
                catch (Exception e)
                {
                    if (IsSqlDeadlock(e))
                    {
                        retries++;
                        // Delay subsequent retries - not sure if this helps or not
                        Thread.Sleep(100 * retries);
                    }
                    else
                    {
                        throw;
                    }
                }
            }
    
            action();
        }
    
        private static bool HasAmbientTransaction()
        {
            return Transaction.Current != null;
        }
    
        private static bool IsSqlDeadlock(Exception exception)
        {
            if (exception == null)
            {
                return false;
            }
    
            var sqlException = exception as SqlException;
    
            if (sqlException != null && sqlException.Number == SqlDeadlock)
            {
                return true;
            }
    
            if (exception.InnerException != null)
            {
                return IsSqlDeadlock(exception.InnerException);
            }
    
            return false;
        }
    }
    
  • Еще одна возможность заключается в использовании стратегии разбиения на разделы

Если ваши таблицы, естественно, могут быть разделены на несколько различных наборов данных, то вы можете либо использовать таблицы и индексы разделенных SQL Server, либо вы вручную разделить существующие таблицы на несколько наборов таблиц. Я бы рекомендовал использовать секционирование SQL Server, так как второй вариант был бы грязным. Также встроенное разделение доступно только на SQL Enterprise Edition.

Если для вас возможно разбиение на разделы, вы можете выбрать схему разделов, которая сломала вам данные, давая сказать 8 различных наборов. Теперь вы можете использовать свой оригинальный однопоточный код, но каждый из 8 потоков предназначен для отдельного раздела. Теперь не будет (или хотя бы минимального числа) тупиков.

Надеюсь, это имеет смысл.

Ответ 2

Обзор

Корень вашей проблемы заключается в том, что L2S DataContext, как ObjectContext объекта Entity Framework, не является потокобезопасным. Как объясняется в этот обмен форумами MSDN, поддержка асинхронных операций в решениях .NET ORM все еще рассматривается как .NET 4.0; вам придется сворачивать свое собственное решение, которое, как вы обнаружили, не всегда легко сделать, когда ваша инфраструктура предполагает однопоточность.

Я воспользуюсь этой возможностью, чтобы отметить, что L2S построен поверх ADO.NET, который сам полностью поддерживает асинхронную операцию - лично я бы предпочел напрямую заниматься этим нижним уровнем и написать сам SQL, убедитесь, что я полностью понял, что происходит в сети.

Решение SQL Server?

Сказав это, я должен спросить - должно ли это быть решением С#? Если вы можете составить свое решение из набора операторов вставки/обновления, вы можете просто отправить через SQL напрямую, и ваши проблемы с потоками и производительности исчезнут. * Мне кажется, что ваши проблемы связаны не с фактическими преобразованиями данных, которые должны быть но сосредоточиться на том, чтобы сделать их исполнителями из .NET. Если .NET удаляется из уравнения, ваша задача становится проще. В конце концов, лучшим решением часто является тот, который вы написали наименьший объем кода, не так ли?;)

Даже если ваша логика обновления/вставки не может быть выражена строго реляционным образом, SQL Server имеет встроенный механизм для итерации записей и выполнения логики - хотя они справедливо клевете для многих случаев использования, курсоры могут быть действительно подходящими для вашей задачи.

Если это задача, которая должна повторяться многократно, вы можете извлечь выгоду из ее кодирования как хранимой процедуры.

*, конечно, долговременный SQL приносит свои проблемы, такие как блокировка эскалации и использование индекса, с которыми вам придется бороться.

Решение С#

Конечно, может быть, что делать это в SQL не может быть и речи - может быть, ваши решения на основе кода зависят от данных, которые поступают из других источников, например, или, может быть, ваш проект имеет строгое соглашение "без SQL", Вы упоминаете некоторые типичные ошибки многопоточности, но, не видя своего кода, я не могу быть им действительно полезен.

Выполнение этого с С#, очевидно, жизнеспособно, но вам нужно иметь дело с тем, что фиксированный объем задержки будет существовать для каждого вашего звонка. Вы можете смягчить последствия задержек сети, используя объединенные соединения, включив несколько активных наборов результатов и используя асинхронные методы Begin/End для выполнения ваших запросов. Даже со всеми этими устройствами вам все равно придется принять, что есть стоимость доставки данных с SQL Server в ваше приложение.

Один из лучших способов, чтобы ваш код не перешагивал себя, заключается в том, чтобы избежать максимально возможного обмена изменяемыми данными между потоками. Это означало бы не совместное использование одного и того же DataContext для нескольких потоков. Следующим лучшим подходом является блокировка критических разделов кода, которые касаются разделяемых данных - lock блокирует весь доступ к DataContext, начиная с первого чтения и заканчивая окончательной записью. Такой подход может полностью исключить преимущества многопоточности; вы можете сделать вашу блокировку более мелкозернистой, но будьте осторожны, что это путь боли.

Намного лучше держать ваши операции отдельно друг от друга. Если вы можете разделить свою логику на "основные" записи, этот идеал, т.е. До тех пор, пока не будут отношения между различными дочерними таблицами, и пока одна запись в "главном" не имеет последствий для другой, вы можете разделить свои операции на несколько потоков следующим образом:

private IList<int> GetMainIds()
{
    using (var context = new MyDataContext())
        return context.Main.Select(m => m.Id).ToList();
}

private void FixUpSingleRecord(int mainRecordId)
{
    using (var localContext = new MyDataContext())
    {
        var main = localContext.Main.FirstOrDefault(m => m.Id == mainRecordId);

        if (main == null)
            return;

        foreach (var childOneQuality in main.ChildOneQualities)
        {
            // If child one is not found, create it
            // Create the relationship if needed
        }

        // Repeat for ChildTwo and ChildThree

        localContext.SaveChanges();
    }
}

public void FixUpMain()
{
    var ids = GetMainIds();
    foreach (var id in ids)
    {
        var localId = id; // Avoid closing over an iteration member
        ThreadPool.QueueUserWorkItem(delegate { FixUpSingleRecord(id) });
    }
}

Очевидно, это такой же пример игрушек, как и псевдокод в вашем вопросе, но, надеюсь, он заставит вас задуматься о том, как охватить ваши задачи таким образом, чтобы между ними не было (или минимального) общего состояния. Это, я думаю, станет ключом к правильному решению С#.

РЕДАКТИРОВАТЬ Ответы на обновления и комментарии

Если вы видите проблемы согласованности данных, я бы посоветовал применять семантику транзакций - вы можете сделать это, используя System.Transactions.TransactionScope(добавьте ссылку на System.Transactions). В качестве альтернативы вы можете сделать это на уровне ADO.NET, обратившись к внутреннему соединению и называя BeginTransaction на нем (или независимо от метода DataConnection).

Вы также указываете тупики. То, что вы сражаетесь с тупиками SQL Server, указывает, что фактические SQL-запросы наступают друг на друга. Не зная, что на самом деле отправляется по проводам, трудно сказать подробно, что происходит и как это исправить. Достаточно сказать, что SQL-взаимоблокировки являются результатом SQL-запросов, а не обязательно из конструкций Threading С# - вам нужно проверить, что именно происходит по проводу. Моя кишка говорит мне, что если каждая "основная" запись действительно независима от других, тогда не должно быть необходимости в блокировке строк и таблиц, и что Linq to SQL, вероятно, является виновником здесь.

Вы можете получить дамп исходного SQL, испускаемого L2S в вашем коде, установив свойство DataContext.Log на что-то, например. Console.Out. Хотя я никогда не использовал его лично, я понимаю, что LINQPad предлагает средства L2S, и вы также можете получить там SQL.

SQL Server Management Studio выберет остальную часть пути - с помощью Activity Monitor вы можете наблюдать за эскалацией блокировки в реальном времени. Используя Query Analyzer, вы можете получить представление о том, как SQL Server будет выполнять ваши запросы. С ними вы сможете получить хорошее представление о том, что ваш код делает на стороне сервера, и, в свою очередь, как его исправить.

Ответ 3

Я бы рекомендовал переместить всю обработку XML на SQL-сервер. Мало того, что все ваши тупики исчезнут, но вы увидите такое повышение производительности, что вы никогда не захотите вернуться.

Это будет лучше всего объяснено примером. В этом примере я предполагаю, что XML-блок уже входит в вашу основную таблицу (я называю это шкафом). Я предполагаю следующую схему:

CREATE TABLE closet (id int PRIMARY KEY, xmldoc ntext) 
CREATE TABLE shoe(id int PRIMARY KEY IDENTITY, color nvarchar(20))
CREATE TABLE closet_shoe_relationship (
    closet_id int REFERENCES closet(id),
    shoe_id int REFERENCES shoe(id)
)

И я ожидаю, что ваши данные (только основная таблица) сначала будут выглядеть так:

INSERT INTO closet(id, xmldoc) VALUES (1, '<ROOT><shoe><color>blue</color></shoe></ROOT>')
INSERT INTO closet(id, xmldoc) VALUES (2, '<ROOT><shoe><color>red</color></shoe></ROOT>')

Тогда вся ваша задача проста:

INSERT INTO shoe(color) SELECT DISTINCT CAST(CAST(xmldoc AS xml).query('//shoe/color/text()') AS nvarchar) AS color from closet
INSERT INTO closet_shoe_relationship(closet_id, shoe_id) SELECT closet.id, shoe.id FROM shoe JOIN closet ON CAST(CAST(closet.xmldoc AS xml).query('//shoe/color/text()') AS nvarchar) = shoe.color

Но учитывая, что вы сделаете много подобной обработки, вы можете сделать свою жизнь проще, объявив свой основной blob как тип XML и еще больше упростив это:

INSERT INTO shoe(color)
    SELECT DISTINCT CAST(xmldoc.query('//shoe/color/text()') AS nvarchar)
    FROM closet
INSERT INTO closet_shoe_relationship(closet_id, shoe_id)
    SELECT closet.id, shoe.id
    FROM shoe JOIN closet
        ON CAST(xmldoc.query('//shoe/color/text()') AS nvarchar) = shoe.color

Возможны дополнительные оптимизации производительности, такие как предварительные вычисления, которые многократно вызывали результаты Xpath во временной или постоянной таблице или преобразовывали начальную совокупность основной таблицы в BULK INSERT, но я не ожидаю, что вам действительно понадобятся те, которые преуспеют.

Ответ 4

Тупики сервера sql являются нормальными и ожидаются в этом типе сценария. Рекомендация MS заключается в том, что они должны обрабатываться на стороне приложения чем сторона db.

Однако, если вам нужно убедиться, что хранимая процедура вызывается только один раз, вы можете использовать блокировку mutex sql с помощью sp_getapplock. Вот пример того, как реализовать этот

BEGIN TRAN
DECLARE @mutex_result int;
EXEC @mutex_result = sp_getapplock @Resource = 'CheckSetFileTransferLock',
 @LockMode = 'Exclusive';

IF ( @mutex_result < 0)
BEGIN
    ROLLBACK TRAN

END

-- do some stuff

EXEC @mutex_result = sp_releaseapplock @Resource = 'CheckSetFileTransferLock'
COMMIT TRAN  

Ответ 5

Эта проблема может быть решена с помощью функции LimitedConcurrencyLevelTaskScheduler

public class InOutMessagesController
{
    private static LimitedConcurrencyLevelTaskScheduler scheduler = new LimitedConcurrencyLevelTaskScheduler(1);
    private TaskFactory taskFactory = new TaskFactory(scheduler);
    private TaskFactory<MyTask<Object[]>> taskFactoryWithResult = new TaskFactory<MyTask<Object[]>>(scheduler);
    private ConcurrentBag<Task> tasks = new ConcurrentBag<Task>();
    private ConcurrentBag<MyTask<Object[]>> tasksWithResult = new ConcurrentBag<MyTask<Object[]>>();
    private ConcurrentBag<int> endedTaskIds = new ConcurrentBag<int>();
    private ConcurrentBag<int> endedTaskWithResultIds = new ConcurrentBag<int>();
    private Task TaskForgetEndedTasks;
    private static object taskForgetLocker = new object();


    #region Conveyor
    private async void AddTaskVoidToQueue(Task task)
    {
        try
        {
            tasks.Add(task);

            await taskFactory.StartNew(() => task.Start());

            if (TaskForgetEndedTasks == null)
            {
                ForgetTasks();
            }
        }
        catch (Exception ex)
        {
            NLogger.Error(ex);
        }
    }

    private async Task<Object[]> AddTaskWithResultToQueue(MyTask<Object[]> task)
    {
        ForgetTasks();

        tasksWithResult.Add(task);

        return await taskFactoryWithResult.StartNew(() => { task.Start(); return task; }).Result;
    }

    private Object[] GetEqualTaskWithResult(string methodName)
    {
        var equalTask = tasksWithResult.FirstOrDefault(x => x.MethodName == methodName);

        if (equalTask == null)
        {
            return null;
        }
        else
        {
            return equalTask.Result;
        }
    }

    private void ForgetTasks()
    {
        Task.WaitAll(tasks.Where(x => x.Status == TaskStatus.Running || x.Status == TaskStatus.Created || x.Status == TaskStatus.WaitingToRun).ToArray());

        lock (taskForgetLocker)
        {
            if (TaskForgetEndedTasks == null)
            {
                TaskForgetEndedTasks = new Task(ForgetEndedTasks);

                TaskForgetEndedTasks.Start();
            }

            TaskForgetEndedTasks.Wait();

            TaskForgetEndedTasks = null;
        }
    }

    private void ForgetEndedTasks()
    {
        try
        {
            var completedTasks = tasks.Where(x => x.IsCompleted || x.IsFaulted || x.IsCanceled);
            var completedTasksWithResult = tasksWithResult.Where(x => x.IsCompleted || x.IsFaulted || x.IsCanceled);

            if (completedTasks.Count() > 0)
            {
                foreach (var ts in completedTasks)
                {
                    if (ts.Exception != null)
                    {
                        NLogger.Error(ts.Exception);

                        if (ts.Exception.InnerException != null)
                        {
                            NLogger.Error(ts.Exception.InnerException);
                        }
                    }

                    endedTaskIds.Add(ts.Id);
                }

                if (endedTaskIds.Count != 0)
                {
                    foreach (var t in endedTaskIds)
                    {
                        Task ct = completedTasks.FirstOrDefault(x => x.Id == t);

                        tasks.TryTake(out ct);
                    }
                }

                endedTaskIds = new ConcurrentBag<int>();
            }

            if (completedTasksWithResult.Count() > 0)
            {
                foreach (var ts in completedTasksWithResult)
                {
                    if (ts.Exception != null)
                    {
                        NLogger.Error(ts.Exception);

                        if (ts.Exception.InnerException != null)
                        {
                            NLogger.Error(ts.Exception.InnerException);
                        }
                    }

                    endedTaskWithResultIds.Add(ts.Id);
                }

                foreach (var t in endedTaskWithResultIds)
                {
                    var ct = tasksWithResult.FirstOrDefault(x => x.Id == t);

                    tasksWithResult.TryTake(out ct);
                }

                endedTaskWithResultIds = new ConcurrentBag<int>();
            }
        }
        catch(Exception ex)
        {
            NLogger.Error(ex);
        }
    }
    #endregion Conveyor

    internal void UpdateProduct(List<ProductData> products)
    {
            var updateProductDataTask = new Task(() => ADOWorker.UpdateProductData(products));

            AddTaskVoidToQueue(updateProductDataTask);
    }

    internal async Task<IEnumerable<ProductData>> GetProduct()
    {
        string methodName = "GetProductData";

        Product_Data[] result = GetEqualTaskWithResult(methodName) as Product_Data[];

        if (result == null)
        {
            var task = new MyTask<Object[]>(ADOWorker.GetProductData, methodName);

            result = await AddTaskWithResultToQueue(task) as Product_Data[];
        }

        return result;
    }
}

public class ADOWorker
{
    public Object[] GetProductData()
    {
        entities = new DataContext();

        return entities.Product_Data.ToArray();
    }

    public void UpdateProductData(List<Product_Data> products)
    {
            entities = new DataContext();

            foreach (Product_Data pr_data in products)
            {
                entities.sp_Product_Data_Upd(pr_data);
            }            
    }
}

Ответ 6

Это может быть очевидным, но цикл через каждый кортеж и выполнение вашей работы в вашем контейнере сервлета включает в себя много накладных расходов на запись.

Если возможно, переместите часть или всю эту обработку на SQL-сервер, переписав логику как одну или несколько хранимых процедур.

Ответ 7

Если

  • У вас не так много времени, чтобы потратить на эту проблему, и вам нужно ее исправить прямо сейчас.
  • Вы уверены, что ваш код выполнен таким образом, что другой поток НЕ будет изменять одну и ту же запись.
  • Вы не боитесь.

Затем... вы можете просто добавить "БЕЗ БЛОКИРОВКИ" к вашим запросам, чтобы MSSQL не применял блокировки.

Для использования с осторожностью:)

Но так или иначе, вы не сказали нам, где время потеряно (в моно-поточной версии). Потому что, если это в коде, я советую вам написать все в БД напрямую, чтобы избежать непрерывного обмена данными. Если это в db, я советую проверить индекс (слишком много?), I/o, cpu и т.д.