Flink使用SideOutPut替换Split实现分流

2019-09-30 07:29:02来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

Flink使用SideOutPut替换Split实现分流

以前的数据分析项目(版本1.4.2),对从Kafka读取的原始数据流,调用split接口实现分流.
新项目决定使用Flink 1.7.2,使用split接口进行分流的时候,发现接口被标记为depracted(后续可能会被移除).
搜索相关文档,发现新版本Flink中推荐使用带外数据进行分流.

预先建立OutputTag实例(LogEntity是从kafka读取的日志实例类).

private static final OutputTag<LogEntity> APP_LOG_TAG = new OutputTag<>("appLog", TypeInformation.of(LogEntity.class));
private static final OutputTag<LogEntity> ANALYZE_METRIC_TAG = new OutputTag<>("analyzeMetricLog", TypeInformation.of(LogEntity.class));

kafka读取的原始数据,通过process接口,打上相应标记.

    private static SingleOutputStreamOperator<LogEntity> sideOutStream(DataStream<LogEntity> rawLogStream) {
        return rawLogStream
                .process(new ProcessFunction<LogEntity, LogEntity>() {
                    @Override
                    public void processElement(LogEntity entity, Context ctx, Collector<LogEntity> out) throws Exception {
                        // 根据日志等级,给对象打上不同的标记
                        if (entity.getLevel().equals(ANALYZE_LOG_LEVEL)) {
                            ctx.output(ANALYZE_METRIC_TAG, entity);
                        } else {
                            ctx.output(APP_LOG_TAG, entity);
                        }
                    }
                })
                .name("RawLogEntitySplitStream");
    }

    // 调用函数,对原始数据流中的对象进行标记
    SingleOutputStreamOperator<LogEntity> sideOutLogStream = sideOutStream(rawLogStream);
    // 根据标记,获取不同的数据流,以便后续进行进一步分析
    DataStream<LogEntity> appLogStream = sideOutLogStream.getSideOutput(APP_LOG_TAG);
    DataStream<LogEntity> rawAnalyzeMetricLogStream = sideOutLogStream.getSideOutput(ANALYZE_METRIC_TAG);

通过以上步骤,就实现了数据流的切分.

PS:
如果您觉得我的文章对您有帮助,可以扫码领取下红包或扫码支持(随意多少,一分钱都是爱),谢谢!

支付宝红包 支付宝 微信

原文链接:https://www.cnblogs.com/jason1990/p/11610130.html
如有疑问请与原作者联系

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:Redis 相关功能和实用命令(五)

下一篇:手动刷新客户端配置内容(Spring Cloud Config)