Я пытаюсь построить POC с Kafka 0.8.1. Я использую свой собственный класс java как сообщение Kafka, у которого есть куча типов данных String. Я не могу использовать класс сериализатора по умолчанию или класс сериализатора String, который поставляется с библиотекой Kafka. Думаю, мне нужно написать собственный сериализатор и передать его свойствам производителя. Если вам известно написать пример пользовательского сериализатора в Kafka (в java), разделите его. Цените много, спасибо большое.
Кафка: создание пользовательского сериализатора
Ответ 1
Для записи пользовательского сериализатора необходимы следующие элементы:
- Внедрить
Encoder
с объектом, указанным для общего- Требуется создание конструктора
VerifiableProperties
- Требуется создание конструктора
- Переопределить
toBytes(...)
метод, гарантирующий возврат байтового массива - Вставьте класс сериализатора в
ProducerConfig
Объявление пользовательского сериализатора для производителя
Как вы отметили в своем вопросе, Kafka поставляет средства для объявления определенного сериализатора для производителя. Класс serializer задается в экземпляре ProducerConfig
, и этот экземпляр используется для построения желаемого класса Producer
.
Если вы выполните Пример производителя Kafka, вы построите ProducerConfig
через объект Properties
. При создании файла свойств обязательно укажите:
props.put("serializer.class", "path.to.your.CustomSerializer");
С помощью пути к классу, который вы хотите использовать Kafka для сериализации сообщений, перед добавлением их в журнал.
Создание настраиваемого сериализатора, который Kafka понимает
Написание настраиваемого сериализатора, который может правильно интерпретировать Kafka, требует реализации класса Encoder[T]
scala, который предоставляет Kafka. Внедрение черт в java странно, но следующий метод работал для сериализации JSON в моем проекте:
public class JsonEncoder implements Encoder<Object> {
private static final Logger logger = Logger.getLogger(JsonEncoder.class);
// instantiating ObjectMapper is expensive. In real life, prefer injecting the value.
private static final ObjectMapper objectMapper = new ObjectMapper();
public JsonEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(Object object) {
try {
return objectMapper.writeValueAsString(object).getBytes();
} catch (JsonProcessingException e) {
logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
}
return "".getBytes();
}
}
В вашем вопросе звучит так, будто вы используете один объект (позвоните ему CustomMessage
) для всех сообщений, добавленных в ваш журнал. В этом случае ваш сериализатор может выглядеть примерно так:
package com.project.serializer;
public class CustomMessageEncoder implements Encoder<CustomMessage> {
public CustomMessageEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(CustomMessage customMessage) {
return customMessage.toBytes();
}
}
Который оставил бы конфигурацию вашего свойства следующим образом:
props.put("serializer.class", "path.to.your.CustomSerializer");
Ответ 2
Вам необходимо реализовать как кодирование, так и декодер
public class JsonEncoder implements Encoder<Object> {
private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);
public JsonEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(Object object) {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(object).getBytes();
} catch (JsonProcessingException e) {
LOGGER.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
}
return "".getBytes();
}
}
Код декодера
public class JsonDecoder implements Decoder<Object> {
private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);
public JsonDecoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public Object fromBytes(byte[] bytes) {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.readValue(bytes, Map.class);
} catch (IOException e) {
LOGGER.error(String.format("Json processing failed for object: %s", bytes.toString()), e);
}
return null;
}
}
Запись pom
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.4.1.3</version>
</dependency>
Установите кодировщик по умолчанию в свойстве Kafka
properties.put("serializer.class","kafka.serializer.DefaultEncoder");
Код записи и считывателя выглядит следующим образом
byte[] bytes = encoder.toBytes(map);
KeyedMessage<String, byte[]> message =new KeyedMessage<String, byte[]>(this.topic, bytes);
JsonDecoder decoder = new JsonDecoder(null);
Map map = (Map) decoder.fromBytes(it.next().message());