广告位联系
返回顶部
分享到

Flink 侧流输出源码示例介绍

服务器其他 来源:互联网 作者:佚名 发布时间:2022-09-17 20:43:23 人浏览
摘要

Flink 的 side output 为我们提供了侧流(分流)输出的功能,根据条件可以把一条流分为多个不同的流,之后做不同的处理逻辑,下面就来看下侧流输出相关的源码。 先来看下面的一个

Flink 的 side output 为我们提供了侧流(分流)输出的功能,根据条件可以把一条流分为多个不同的流,之后做不同的处理逻辑,下面就来看下侧流输出相关的源码。

先来看下面的一个 Demo,一个流被分成了 3 个流,一个主流,两个侧流输出。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

SingleOutputStreamOperator<JasonLeePOJO> process =

        kafka_source1.process(

                new ProcessFunction<JasonLeePOJO, JasonLeePOJO>() {

                    @Override

                    public void processElement(

                            JasonLeePOJO value,

                            ProcessFunction<JasonLeePOJO, JasonLeePOJO>.Context ctx,

                            Collector<JasonLeePOJO> out)

                            throws Exception {

                        // 这个是主流输出

                        if (value.getName().equals("flink")) {

                            out.collect(value);

                        // 下面两个是测流输出

                        } else if (value.getName().equals("spark")) {

                            ctx.output(test, value);

                        // 测流

                        } else if (value.getName().equals("hadoop")) {

                            ctx.output(test1, value);

                        }

                    }

                });

为了更加清楚的查看每一个算子,我禁用了 operator chain,任务的 DAG 图如下所示:

这样就比较清晰了,很明显从 process 算子开始,1 个数据流分为了 3 个数据流,当然,在默认情况下没有禁止

operator chain 所有的算子都是 chain 在一起的。

源码解析

我们先来看第一个主流输出也就是 out.collect(value) 的源码,这里的 out 实际上是 TimestampedCollector 对象。

TimestampedCollector#collect

1

2

3

4

@Override

public void collect(T record) {

    output.collect(reuse.replace(record));

}

在 collect 方法中持有一个 output 对象,用来输出数据,在这里实际上是一个 CountingOutput 它是一个包装了 Output 的对象,主要用于更新发送数据的 metric,并输出数据。

CountingOutput#collect

1

2

3

4

5

@Override

public void collect(StreamRecord<OUT> record) {

    numRecordsOut.inc();

    output.collect(record);

}

在 CountingOutput 中也持有一个 output 对象,但是这里的 output 是 BroadcastingOutputCollector 对象,从名字就可以看出它是往下游广播数据的,这里就有一个疑问?把数据广播到下游,那岂不是下游的每个数据流都有这条数据吗?这样的话是怎么实现分流的呢?带着这个疑问,我们来看 BroadcastingOutputCollector 的 collect 方法是怎么实现的。

BroadcastingOutputCollector#collect

1

2

3

4

5

6

7

@Override

public void collect(StreamRecord<T> record) {

    // 这里的 outputs 数组有三个 output 分别对应上面的三个输出流

    for (Output<StreamRecord<T>> output : outputs) {

        output.collect(record);

    }

}

在 BroadcastingOutputCollector 对象里也持有一个 output 对象,其实他们都实现了 Output 接口,用来往下游发送数据,这里的 outputs 是一个 Output 数组,代表了下游的所有 Output,因为上面有三个输出流,所以数组里面就包含了 3 个 Output 对象。

循环的调用 output 的 collect 方法往下游发送数据,因为我打断了 operator chain,所以 process 算子和下游的 Print 算子不在同一个 operatorChain 内,那么上下游算子之间数据传输用的就是 RecordWriterOutput,否则用的是 CopyingChainingOutput 或者 ChainingOutput,具体使用的是哪个 Output 这里就不多介绍了,后面有时间的话会单独介绍。

RecordWriterOutput#collect

1

2

3

4

5

6

7

8

9

10

@Override

public void collect(StreamRecord<OUT> record) {

    // 主流是没有 outputTag 的,只有测流有 outputTag

    if (this.outputTag != null) {

        // we are not responsible for emitting to the main output.

        return;

    }

 

    pushToRecordWriter(record);

}

接着来看 RecordWriterOutput 的 collect 方法,在 collect 方法里面会先判断 outputTag 是否为空,如果不为空不做任何处理,直接返回,否则就把数据推送到下游算子,只有侧流输出才需要定义 outputTag,主流(正常流)是没有 outputTag 的,所以这里会走 pushToRecordWriter 方法把数据写入到下游,也就是说虽然会以广播的形式把数据广播到所有下游,但其实另外两个侧流是直接返回的,只有主流才会把数据推送到下游,这也就解释了上面的疑问。

然后再来看第二个侧流输出 ctx.output(test, value) 的源码,这里的 ctx 实际上是 ProcessOperator#ContextImpl 对象。

ProcessOperator#ContextImpl#output

1

2

3

4

5

6

7

@Override

public <X> void output(OutputTag<X> outputTag, X value) {

    if (outputTag == null) {

        throw new IllegalArgumentException("OutputTag must not be null.");

    }

    output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));

}

如果 outputTag 是空,直接抛出异常,因为这个是侧流,所以必须要定义 OutputTag。这里的 output 实际上是父类 AbstractStreamOperator 所持有的变量,如果 outputTag 不为空,就调用 output 的 collect 方法把数据发送到下游,这里的 output 和上面的一样是 CountingOutput 但是 collect 方法是另外一个重载的方法。

CountingOutput#collect

1

2

3

4

5

@Override

public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {

    numRecordsOut.inc();

    output.collect(outputTag, record);

}

可以发现,这个 collect 方法比上面那个多了一个 OutputTag 参数,也就是使用侧流输出的时候定义的 OutputTag 对象,然后调用 output 的 collect 方法发送数据,这个也和上面的一样,同样是 BroadcastingOutputCollector 对象的另外一个重载方法,多了一个 OutputTag 参数。

BroadcastingOutputCollector#collect

1

2

3

4

5

6

@Override

public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {

    for (Output<StreamRecord<T>> output : outputs) {

        output.collect(outputTag, record);

    }

}

这里的逻辑和上面是一样的,同样的循环调用 collect 方法发送数据。

RecordWriterOutput#collect

1

2

3

4

5

6

7

@Override

public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {

    // 先要判断两个 OutputTag 是否一样

    if (OutputTag.isResponsibleFor(this.outputTag, outputTag)) {

        pushToRecordWriter(record);

    }

}

在这个 collect 方法中会先判断传入的 OutputTag 对象和成员变量 this.outputTag 是不是相等,如果是的话,就发送数据,否则不做任何处理,所以这里每次只会选择一个下游侧流输出数据,这样就实现了所谓的分流。

OutputTag#isResponsibleFor

1

2

3

4

public static boolean isResponsibleFor(

        @Nullable OutputTag<?> owner, @Nonnull OutputTag<?> other) {

    return other.equals(owner);

}

可以看到在 isResponsibleFor 方法内是直接调用 OutputTag 的 equals 方法判断两个对象是否相等的。

第三个侧流 test1 ctx.output(test1, value) 和第二个侧流 test 是完全一样的情况,这里就不在看代码了。

上面是完成了分流操作,那怎么获取到分流后结果呢(数据流)?我们可以通过 getSideOutput 方法获取。

1

2

DataStream<JasonLeePOJO> sideOutput = process.getSideOutput(test);

DataStream<JasonLeePOJO> sideOutput1 = process.getSideOutput(test1);

getSideOutput 源码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {

    sideOutputTag = clean(requireNonNull(sideOutputTag));

 

    // make a defensive copy

    sideOutputTag = new OutputTag<X>(sideOutputTag.getId(), sideOutputTag.getTypeInfo());

 

    TypeInformation<?> type = requestedSideOutputs.get(sideOutputTag);

    if (type != null && !type.equals(sideOutputTag.getTypeInfo())) {

        throw new UnsupportedOperationException(

                "A side output with a matching id was "

                        + "already requested with a different type. This is not allowed, side output "

                        + "ids need to be unique.");

    }

 

    requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo());

 

    SideOutputTransformation<X> sideOutputTransformation =

            new SideOutputTransformation<>(this.getTransformation(), sideOutputTag);

    return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation);

}

getSideOutput 方法里先是构建了一个 SideOutputTransformation 对象,然后又构建了 DataStream 对象,这样我们就可以基于分流后的 DataStream 做不同的处理逻辑了,从而实现了把一个 DataStream 分流成多个 DataStream 功能。

总结

通过对侧流输出的源码进行解析,在分流的时候,数据是通过广播的方式发送到下游算子的,对于主流的数据来说,只有 OutputTag 为空的才会处理,侧流因为 OutputTag 不为空,所以直接返回,不做任何处理,那对于侧流的数据来说,是通过判断两个 OutputTag 是否相等,所以每次只会把数据发送到下游对应的那一个侧流上去,这样即可实现分流逻辑。


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。

您可能感兴趣的文章 :

原文链接 : https://juejin.cn/post/7143077414153748488
相关文章
  • 网站https访问是443端口还是433端口

    网站https访问是443端口还是433端口
    https默认端口号是443 https是以安全为目标的http通道,简单讲是http的安全,即http下加入SSL层,https的安全基础是SSL,因此加密权的详细内容就
  • 关于HTTPS端口443的技术介绍(什么是443端口)
    443端口是用来保证客户和服务器之间的通信安全。 本文将重点介绍HTTPS 443端口,它是如何工作的,它保护什么,以及为什么我们需要它。
  • ElasticSearch事件查询语言EQL操作
    EQL的全名是Event Query Language (EQL)。事件查询语言(EQL)是一种用于基于事件的时间序列数据(例如日志,指标和跟踪)的查询语言。在Elast
  • aarch64服务器部署mysql的流程介绍

    aarch64服务器部署mysql的流程介绍
    aarch64服务器-部署mysql aarch64服务器-部署nacos 1、创建工作目录 1 mkdir -p /apps/mysql/{mydir,datadir,conf,source} 2、编写docker-compose.yaml 1 2 3 4 5 6 7 8 9 1
  • 解决Navicat连接服务器不成功的问题(Access denied

    解决Navicat连接服务器不成功的问题(Access denied
    出现的原因一般是服务器的root用户没有开启访问权限,一般来说值允许本地的访问。 解决方法: 一:第一种方法 1、首先打开xshell连接服务
  • Elasticsearch6.2服务器升配后的bug(避坑指南)

    Elasticsearch6.2服务器升配后的bug(避坑指南)
    本篇文章记录最近一次生产服务器硬件升级之后引起集群不稳定的现象,希望可以帮到有其它人避免采坑。 一、问题描述 升级后出现的异常
  • 使用Ubuntu搭建DNS服务器

    使用Ubuntu搭建DNS服务器
    一、重点说明/etc/bind/named.conf.options配置文件 在进行bind9服务器配置时,/etc/bind/named.conf.options是十分关键的配置文件,它决定着DNS服务器是否
  • Flink 侧流输出源码示例介绍

    Flink 侧流输出源码示例介绍
    Flink 的 side output 为我们提供了侧流(分流)输出的功能,根据条件可以把一条流分为多个不同的流,之后做不同的处理逻辑,下面就来看下
  • Fluentd搭建日志收集服务介绍
    公司需要搭建一个日志收集服务器,用于将公司的项目日志汇总到一台服务器上面,方便查看和减轻各项目服务器压力。但是由于目前资源
  • 游戏服务器中的Netty应用以及源码剖析

    游戏服务器中的Netty应用以及源码剖析
    一、Reactor模式和Netty线程模型 最近因为工作需要,学习了一段时间Netty的源码,并做了一个简单的分享,研究还不是特别深入,继续努力。
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计