java – Flink Streaming:如何根据数据将一个数据流输出到不同的输出?

前端之家收集整理的这篇文章主要介绍了java – Flink Streaming:如何根据数据将一个数据流输出到不同的输出?前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
在Apache Flink中,我有一串元组.让我们假设一个非常简单的Tuple1< String>.元组可以在其值字段中具有任意值(例如,“P1”,“P2”等).一组可能的值是有限的,但我不知道预先设定的全部(所以可能有一个’P362′).我想将该元组写入某个输出位置,具体取决于元组中的值.例如我想拥有以下文件结构:

> / output / P1
> / output / P2

在文档中,我只发现写入我事先知道的位置的可能性(例如stream.writeCsv(“/ output / athere”)),但是没有办法让数据的内容决定数据实际结束的位置.

我阅读了关于文档中的输出分割,但是这似乎并没有提供一种方法来将输出重定向到不同的目的地,我想要拥有它(或者我不明白这将如何工作).

这可以用Flink API来完成吗?如果没有,是否有可能有第三方图书馆可以做到这一点,或者我必须自己建立一个这样的事情?

更新

按照Matthias的建议,我想出了一个筛选接收函数,它确定输出路径,然后在将序列化之后将元组写入相应的文件.我把它放在这里供参考,也许它对别人有用:

public class SiftingSinkFunction<IT> extends RichSinkFunction<IT> {

    private final OutputSelector<IT> outputSelector;
    private final MapFunction<IT,String> serializationFunction;
    private final String basePath;
    Map<String,TextOutputFormat<String>> formats = new HashMap<>();

    /**
     * @param outputSelector        the selector which determines into which output(s) a record is written.
     * @param serializationFunction a function which serializes the record to a string.
     * @param basePath              the base path for writing the records. It will be appended with the output selector.
     */
    public SiftingSinkFunction(OutputSelector<IT> outputSelector,MapFunction<IT,String> serializationFunction,String basePath) {
        this.outputSelector = outputSelector;
        this.serializationFunction = serializationFunction;
        this.basePath = basePath;
    }


    @Override
    public void invoke(IT value) throws Exception {
        // find out where to write.
        Iterable<String> selection = outputSelector.select(value);
        for (String s : selection) {
            // ensure we have a format for this.
            TextOutputFormat<String> destination = ensureDestinationExists(s);
            // then serialize and write.
            destination.writeRecord(serializationFunction.map(value));
        }
    }

    private TextOutputFormat<String> ensureDestinationExists(String selection) throws IOException {
        // if we know the destination,we just return the format.
        if (formats.containsKey(selection)) {
            return formats.get(selection);
        }

        // create a new output format and initialize it from the context.
        TextOutputFormat<String> format = new TextOutputFormat<>(new Path(basePath,selection));
        StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
        format.configure(context.getTaskStubParameters());
        format.open(context.getIndexOfThisSubtask(),context.getNumberOfParallelSubtasks());

        // put it into our map.
        formats.put(selection,format);
        return format;
    }

    @Override
    public void close() throws IOException {
        Exception lastException = null;
        try {
            for (TextOutputFormat<String> format : formats.values()) {
                try {
                    format.close();
                } catch (Exception e) {
                    lastException = e;
                    format.tryCleanupOnError();
                }
            }
        } finally {
            formats.clear();
        }

        if (lastException != null) {
            throw new IOException("Close Failed.",lastException);
        }
    }
}

解决方法

你可以实现一个自定义的接收器.从两者之一继承:

> org.apache.flink.streaming.api.functions.sink.SinkFunction
> org.apache.flink.streaming.api.functions.sink.RichSinkFunction

在你的程序中使用:

stream.addSink(SinkFunction<T> sinkFunction);

而不是stream.writeCsv(“/ output / athere”).

原文链接:https://www.f2er.com/java/123567.html

猜你在找的Java相关文章