Подтвердить что ты не робот

RavenDB Stream для неограниченных результатов - устойчивость соединения

Мы используем функцию Stream в RavenDB для загрузки, преобразования и переноса данных между двумя базами данных следующим образом:

var query = originSession.Query<T>(IndexForQuery);

using (var stream = originSession.Advanced.Stream(query))
{
    while (stream.MoveNext())
    {
        var streamedDocument = stream.Current.Document;

        OpenSessionAndMigrateSingleDocument(streamedDocument);
    }
}

Проблема в том, что одна из коллекций имеет миллионы строк, и мы продолжаем получать IOException в следующем формате:

Application: MigrateToNewSchema.exe
Framework Version: v4.0.30319
Description: The process was terminated due to an unhandled exception.
Exception Info: System.IO.IOException
Stack:
   at System.Net.ConnectStream.Read(Byte[], Int32, Int32)
   at System.IO.Compression.DeflateStream.Read(Byte[], Int32, Int32)
   at System.IO.Compression.GZipStream.Read(Byte[], Int32, Int32)
   at System.IO.StreamReader.ReadBuffer(Char[], Int32, Int32, Boolean ByRef)
   at System.IO.StreamReader.Read(Char[], Int32, Int32)
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadData(Boolean, Int32)
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadStringIntoBuffer(Char)
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseString(Char)
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseValue()
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadInternal()
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.Read()
   at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader)
   at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader)
   at Raven.Json.Linq.RavenJToken.ReadFrom(Raven.Imports.Newtonsoft.Json.JsonReader)
   at Raven.Client.Connection.ServerClient+<YieldStreamResults>d__6b.MoveNext()
   at Raven.Client.Document.DocumentSession+<YieldQuery>d__c`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MoveNext()
   at MigrateToNewSchema.Migrator.DataMigratorBase`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MigrateCollection()
   at MigrateToNewSchema.Program.MigrateData(MigrateToNewSchema.Enums.CollectionToMigrate, Raven.Client.IDocumentStore, Raven.Client.IDocumentStore)
   at MigrateToNewSchema.Program.Main(System.String[])

Это происходит довольно долго в потоковом режиме, и, конечно, проблемы переходного соединения будут возникать в течение такого периода (для завершения требуется несколько часов).

Однако при повторном запуске, когда мы используем Query, нам нужно начинать с нуля. Таким образом, в конечном счете, если в течение всего Stream произошел сбой соединения, тогда мы должны попробовать его снова и снова, пока он не будет работать от конца до конца.

Я знаю, что вы можете использовать ETag с потоком для эффективного перезапуска в определенный момент, однако нет никакой перегрузки, чтобы сделать это с помощью Query, который нам нужно отфильтровать перенесенные результаты и указать правильную коллекцию.

Итак, в RavenDB существует ли способ улучшить внутреннюю устойчивость соединения (свойство строки подключения, внутренние настройки и т.д.) или эффективно "восстановить" поток при ошибке?

4b9b3361

Ответ 1

В соответствии с предложением @StriplingWarrior я воссоздал решение, используя Подписки данных.

Используя этот подход, я смог перебрать все 2 миллиона строк (хотя, по общему признанию, с гораздо меньшей обработкой на элемент); 2 пункта здесь, которые помогли бы, когда мы пытались реализовать ту же логику с помощью Streams:

  • Пакеты только удаляются из подписной "очереди" после подтверждения (как и большинство стандартных очередей)
    • Подписанный IObserver<T> должен завершиться успешно для этого подтверждения, которое должно быть установлено.
    • Эта информация обрабатывается сервером, а не клиентом, поэтому позволяет перезагрузить клиент, не затрагивая последнюю успешную позицию, обработанную в подписке.
    • Подробнее см. здесь
  • Поскольку @StriplingWarrior указан потому, что вы можете создавать подписки с фильтрами вплоть до уровня свойств, можно было бы воспроизвести с меньшим набором результатов в случае исключения в самой подписке.
    • Первая точка действительно заменяет это; но это позволяет нам получить дополнительную гибкость, не проявляемую в Stream API

Среда тестирования - это база данных RavenDB 3.0 (локальная машина, работающая как служба Windows) с настройками по умолчанию для коллекции из 2 миллионов записей.

Код для создания фиктивных записей:

using (IDocumentStore store = GetDocumentStore())
{
    store.Initialize();

    using (var bulkInsert = store.BulkInsert())
    {
        for (var i = 0; i != recordsToCreate; i++)
        {
            var person = new Person
            {
                Id = Guid.NewGuid(),
                Firstname = NameGenerator.GenerateFirstName(),
                Lastname = NameGenerator.GenerateLastName()
            };

            bulkInsert.Store(person);
        }
    }
}

Подписывание этой коллекции тогда является случаем:

using (IDocumentStore store = GetDocumentStore())
{
    store.Initialize();

    var subscriptionId = store.Subscriptions.Create(new SubscriptionCriteria<Person>());

    var personSubscription = store.Subscriptions.Open<Person>(
        subscriptionId, new SubscriptionConnectionOptions()
    {
        BatchOptions = new SubscriptionBatchOptions()
        {
            // Max number of docs that can be sent in a single batch
            MaxDocCount = 16 * 1024,  
            // Max total batch size in bytes
            MaxSize = 4 * 1024 * 1024,
            // Max time the subscription needs to confirm that the batch
            // has been successfully processed
            AcknowledgmentTimeout = TimeSpan.FromMinutes(3)
        },
        IgnoreSubscribersErrors = false,
        ClientAliveNotificationInterval = TimeSpan.FromSeconds(30)
    });

    personSubscription.Subscribe(new PersonObserver());

    while (true)
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(500));
    }
}

Обратите внимание на PersonObserver; это просто базовая реализация IObserver:

public class PersonObserver : IObserver<Person>
{
    public void OnCompleted()
    {
        Console.WriteLine("Completed");
    }

    public void OnError(Exception error)
    {
        Console.WriteLine("Error occurred: " + error.ToString());
    }

    public void OnNext(Person person)
    {
        Console.WriteLine($"Received '{person.Firstname} {person.Lastname}'");
    }
}