У моего коллеги и у меня есть спор. Мы пишем приложение .NET, которое обрабатывает огромные объемы данных. Он получает элементы данных, группирует их подмножества в блоки по определенному критерию и обрабатывает эти блоки.
Скажем, у нас есть элементы данных типа Foo
, которые приходят с некоторым источником (из сети, например) один за другим. Мы хотим собрать подмножества связанных объектов типа Foo
, построить объект типа Bar
из каждого такого подмножества и объектов процесса типа Bar
.
Один из нас предложил следующий дизайн. Его основная тема заключается в экспонировании объектов IObservable<T>
непосредственно из интерфейсов наших компонентов.
// ********* Interfaces **********
interface IFooSource
{
// this is the event-stream of objects of type Foo
IObservable<Foo> FooArrivals { get; }
}
interface IBarSource
{
// this is the event-stream of objects of type Bar
IObservable<Bar> BarArrivals { get; }
}
/ ********* Implementations *********
class FooSource : IFooSource
{
// Here we put logic that receives Foo objects from the network and publishes them to the FooArrivals event stream.
}
class FooSubsetsToBarConverter : IBarSource
{
IFooSource fooSource;
IObservable<Bar> BarArrivals
{
get
{
// Do some fancy Rx operators on fooSource.FooArrivals, like Buffer, Window, Join and others and return IObservable<Bar>
}
}
}
// this class will subscribe to the bar source and do processing
class BarsProcessor
{
BarsProcessor(IBarSource barSource);
void Subscribe();
}
// ******************* Main ************************
class Program
{
public static void Main(string[] args)
{
var fooSource = FooSourceFactory.Create();
var barsProcessor = BarsProcessorFactory.Create(fooSource) // this will create FooSubsetToBarConverter and BarsProcessor
barsProcessor.Subscribe();
fooSource.Run(); // this enters a loop of listening for Foo objects from the network and notifying about their arrival.
}
}
Другой предложил другой дизайн, что его основная тема использует наши собственные интерфейсы издателя/подписчика и использует Rx внутри реализаций только тогда, когда это необходимо.
//********** interfaces *********
interface IPublisher<T>
{
void Subscribe(ISubscriber<T> subscriber);
}
interface ISubscriber<T>
{
Action<T> Callback { get; }
}
//********** implementations *********
class FooSource : IPublisher<Foo>
{
public void Subscribe(ISubscriber<Foo> subscriber) { /* ... */ }
// here we put logic that receives Foo objects from some source (the network?) publishes them to the registered subscribers
}
class FooSubsetsToBarConverter : ISubscriber<Foo>, IPublisher<Bar>
{
void Callback(Foo foo)
{
// here we put logic that aggregates Foo objects and publishes Bars when we have received a subset of Foos that match our criteria
// maybe we use Rx here internally.
}
public void Subscribe(ISubscriber<Bar> subscriber) { /* ... */ }
}
class BarsProcessor : ISubscriber<Bar>
{
void Callback(Bar bar)
{
// here we put code that processes Bar objects
}
}
//********** program *********
class Program
{
public static void Main(string[] args)
{
var fooSource = fooSourceFactory.Create();
var barsProcessor = barsProcessorFactory.Create(fooSource) // this will create BarsProcessor and perform all the necessary subscriptions
fooSource.Run(); // this enters a loop of listening for Foo objects from the network and notifying about their arrival.
}
}
Какой, по вашему мнению, лучше? Экспозиция IObservable<T>
и создание наших компонентов для создания новых потоков событий из Rx-операторов или определения наших собственных интерфейсов издателя/подписчика и при необходимости использования Rx внутри?
Вот некоторые вещи, которые следует учитывать в проектах:
-
В первом дизайне потребитель наших интерфейсов обладает всей мощью Rx на кончиках пальцев и может выполнять любые Rx-операторы. Один из нас утверждает, что это преимущество, а другое утверждает, что это недостаток.
-
Вторая конструкция позволяет нам использовать любую архитектуру издателя/подписчика под капотом. Первый проект связывает нас с Rx.
-
Если мы хотим использовать мощность Rx, это требует большей работы во втором проекте, потому что нам нужно перевести пользовательскую реализацию издателя/подписчика на Rx и обратно. Это требует написания кода клея для каждого класса, который хочет обработать некоторые события.