Подтвердить что ты не робот

API Kafka Streams: KStream для KTable

У меня есть тема Kafka, где я отправляю события местоположения (key = user_id, value = user_location). Я могу читать и обрабатывать его как KStream:

KStreamBuilder builder = new KStreamBuilder();

KStream<String, Location> locations = builder
        .stream("location_topic")
        .map((k, v) -> {
            // some processing here, omitted form clarity
            Location location = new Location(lat, lon);
            return new KeyValue<>(k, location);
        });

Это хорошо работает, но я бы хотел иметь KTable с последней известной позицией каждого пользователя. Как я могу это сделать?

Я умею писать и читать из промежуточной темы:

// write to intermediate topic
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux");

// build KTable from intermediate topic
KTable<String, Location> table = builder.table("location_topic_aux", "store");

Есть ли простой способ получить a KTable от a KStream? Это мое первое приложение, использующее потоки Kafka, поэтому я, вероятно, не вижу ничего очевидного.

4b9b3361

Ответ 1

Обновление:

В Kafka 2.5 будет добавлен новый метод KStream#toTable(), который обеспечит удобный способ преобразования KStream в KTable. Подробнее см.: https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL

Оригинальный ответ:

На данный момент нет прямого пути сделать это. Ваш подход абсолютно действителен, как обсуждалось в Confluent FAQs: http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step

Это самый простой подход в отношении кода. Однако у него есть недостатки: (а) вам нужно управлять дополнительной темой и (б) это приводит к дополнительному сетевому трафику, потому что данные записываются и перечитываются с Kafka.

Есть одна альтернатива, использующая "фиктивное сокращение":

KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> stream = ...; // some computation that creates the derived KStream

KTable<String, Long> table = stream.groupByKey().reduce(
    new Reducer<Long>() {
        @Override
        public Long apply(Long aggValue, Long newValue) {
            return newValue;
        }
    },
    "dummy-aggregation-store");

Этот подход несколько более сложен в отношении кода по сравнению с вариантом 1, но имеет то преимущество, что (а) не требуется ручное управление темами и (б) повторное чтение данных из Kafka не требуется.

В целом, вам нужно решить, какой подход вам больше нравится:

В варианте 2 Kafka Streams создаст внутреннюю тему журнала изменений для резервного копирования KTable для обеспечения отказоустойчивости. Таким образом, оба подхода требуют некоторого дополнительного хранения в Kafka и приводят к дополнительному сетевому трафику. В целом, это компромисс между немного более сложным кодом в варианте 2 и ручным управлением темой в варианте 1.