Подтвердить что ты не робот

Кафка: создание пользовательского сериализатора

Я пытаюсь построить POC с Kafka 0.8.1. Я использую свой собственный класс java как сообщение Kafka, у которого есть куча типов данных String. Я не могу использовать класс сериализатора по умолчанию или класс сериализатора String, который поставляется с библиотекой Kafka. Думаю, мне нужно написать собственный сериализатор и передать его свойствам производителя. Если вам известно написать пример пользовательского сериализатора в Kafka (в java), разделите его. Цените много, спасибо большое.

4b9b3361

Ответ 1

Для записи пользовательского сериализатора необходимы следующие элементы:

  • Внедрить Encoder с объектом, указанным для общего
    • Требуется создание конструктора VerifiableProperties
  • Переопределить toBytes(...) метод, гарантирующий возврат байтового массива
  • Вставьте класс сериализатора в ProducerConfig

Объявление пользовательского сериализатора для производителя

Как вы отметили в своем вопросе, Kafka поставляет средства для объявления определенного сериализатора для производителя. Класс serializer задается в экземпляре ProducerConfig, и этот экземпляр используется для построения желаемого класса Producer.

Если вы выполните Пример производителя Kafka, вы построите ProducerConfig через объект Properties. При создании файла свойств обязательно укажите:

props.put("serializer.class", "path.to.your.CustomSerializer");

С помощью пути к классу, который вы хотите использовать Kafka для сериализации сообщений, перед добавлением их в журнал.

Создание настраиваемого сериализатора, который Kafka понимает

Написание настраиваемого сериализатора, который может правильно интерпретировать Kafka, требует реализации класса Encoder[T] scala, который предоставляет Kafka. Внедрение черт в java странно, но следующий метод работал для сериализации JSON в моем проекте:

public class JsonEncoder implements Encoder<Object> {
    private static final Logger logger = Logger.getLogger(JsonEncoder.class);
    // instantiating ObjectMapper is expensive. In real life, prefer injecting the value.
    private static final ObjectMapper objectMapper = new ObjectMapper();

    public JsonEncoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public byte[] toBytes(Object object) {
        try {
            return objectMapper.writeValueAsString(object).getBytes();
        } catch (JsonProcessingException e) {
            logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
        }
        return "".getBytes();
    }
}

В вашем вопросе звучит так, будто вы используете один объект (позвоните ему CustomMessage) для всех сообщений, добавленных в ваш журнал. В этом случае ваш сериализатор может выглядеть примерно так:

package com.project.serializer;

public class CustomMessageEncoder implements Encoder<CustomMessage> {
    public CustomMessageEncoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public byte[] toBytes(CustomMessage customMessage) {
        return customMessage.toBytes();
    }
}

Который оставил бы конфигурацию вашего свойства следующим образом:

props.put("serializer.class", "path.to.your.CustomSerializer");

Ответ 2

Вам необходимо реализовать как кодирование, так и декодер

public class JsonEncoder implements Encoder<Object> {
        private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);

        public JsonEncoder(VerifiableProperties verifiableProperties) {
            /* This constructor must be present for successful compile. */
        }

        @Override
        public byte[] toBytes(Object object) {
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                return objectMapper.writeValueAsString(object).getBytes();
            } catch (JsonProcessingException e) {
                LOGGER.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
            }
            return "".getBytes();
        }
    }

Код декодера

public class JsonDecoder  implements Decoder<Object> {
    private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);
    public JsonDecoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public Object fromBytes(byte[] bytes) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            return objectMapper.readValue(bytes, Map.class);
        } catch (IOException e) {
            LOGGER.error(String.format("Json processing failed for object: %s", bytes.toString()), e);
        }
        return null;
    }
}

Запись pom

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.4.1.3</version>
</dependency>

Установите кодировщик по умолчанию в свойстве Kafka

properties.put("serializer.class","kafka.serializer.DefaultEncoder");

Код записи и считывателя выглядит следующим образом

byte[] bytes = encoder.toBytes(map);
        KeyedMessage<String, byte[]> message =new KeyedMessage<String, byte[]>(this.topic, bytes);

JsonDecoder decoder = new JsonDecoder(null);
Map map = (Map) decoder.fromBytes(it.next().message());