Подтвердить что ты не робот

Flink Streaming: Как вывести один поток данных на разные выходы в зависимости от данных?

В Apache Flink у меня есть поток кортежей. Возьмем действительно простой Tuple1<String>. Кортеж может иметь произвольное значение в поле значений (например, "P1", "P2" и т.д.). Набор возможных значений конечен, но я не знаю полного набора заранее (так что может быть "P362" ). Я хочу написать этот кортеж в определенное место вывода в зависимости от значения внутри кортежа. Так, например, Я хотел бы иметь следующую структуру файлов:

  • /output/P1
  • /output/P2

В документации я только нашел возможность писать в местах, которые я знаю заранее (например, stream.writeCsv("/output/somewhere")), но никоим образом не позволять содержимому данных решать, где данные фактически заканчиваются.

Я прочитал о расщеплении вывода в документации, но это, похоже, не дает возможности перенаправить вывод в разные адресаты так, как я хотел бы иметь его (или просто не понимаю, как это будет работать).

Можно ли это сделать с помощью Flink API, если да, то каким образом? Если нет, может быть, может быть, сторонняя библиотека, которая может это сделать, или мне пришлось бы строить такую ​​вещь самостоятельно?

Обновление

Следуя предложению Маттиаса, я придумал функцию sifting sink, которая определяет выходной путь, а затем записывает кортеж в соответствующий файл после его сериализации. Я поставил его здесь для справки, возможно, это полезно для кого-то еще:

public class SiftingSinkFunction<IT> extends RichSinkFunction<IT> {

    private final OutputSelector<IT> outputSelector;
    private final MapFunction<IT, String> serializationFunction;
    private final String basePath;
    Map<String, TextOutputFormat<String>> formats = new HashMap<>();

    /**
     * @param outputSelector        the selector which determines into which output(s) a record is written.
     * @param serializationFunction a function which serializes the record to a string.
     * @param basePath              the base path for writing the records. It will be appended with the output selector.
     */
    public SiftingSinkFunction(OutputSelector<IT> outputSelector, MapFunction<IT, String> serializationFunction, String basePath) {
        this.outputSelector = outputSelector;
        this.serializationFunction = serializationFunction;
        this.basePath = basePath;
    }


    @Override
    public void invoke(IT value) throws Exception {
        // find out where to write.
        Iterable<String> selection = outputSelector.select(value);
        for (String s : selection) {
            // ensure we have a format for this.
            TextOutputFormat<String> destination = ensureDestinationExists(s);
            // then serialize and write.
            destination.writeRecord(serializationFunction.map(value));
        }
    }

    private TextOutputFormat<String> ensureDestinationExists(String selection) throws IOException {
        // if we know the destination, we just return the format.
        if (formats.containsKey(selection)) {
            return formats.get(selection);
        }

        // create a new output format and initialize it from the context.
        TextOutputFormat<String> format = new TextOutputFormat<>(new Path(basePath, selection));
        StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
        format.configure(context.getTaskStubParameters());
        format.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());

        // put it into our map.
        formats.put(selection, format);
        return format;
    }

    @Override
    public void close() throws IOException {
        Exception lastException = null;
        try {
            for (TextOutputFormat<String> format : formats.values()) {
                try {
                    format.close();
                } catch (Exception e) {
                    lastException = e;
                    format.tryCleanupOnError();
                }
            }
        } finally {
            formats.clear();
        }

        if (lastException != null) {
            throw new IOException("Close failed.", lastException);
        }
    }
}
4b9b3361

Ответ 1

Вы можете реализовать пользовательскую раковину. Наследовать от одного из них:

  • org.apache.flink.streaming.api.functions.sink.SinkFunction
  • org.apache.flink.streaming.api.functions.sink.RichSinkFunction

В вашей программе используйте:

stream.addSink(SinkFunction<T> sinkFunction);

вместо stream.writeCsv("/output/somewhere").