博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的Execution Plan Visualization
阅读量:6648 次
发布时间:2019-06-25

本文共 4370 字,大约阅读时间需要 14 分钟。

本文主要研究一下flink的Execution Plan Visualization

实例

代码

@Test    public void testExecutionPlan(){        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        DataStream
> dataStream = env.fromElements(WORDS) .flatMap(new WordCountTest.Tokenizer()) .keyBy(0) .sum(1); dataStream.print(); System.out.println(env.getExecutionPlan()); }

json

{  "nodes": [    {      "id": 1,      "type": "Source: Collection Source",      "pact": "Data Source",      "contents": "Source: Collection Source",      "parallelism": 1    },    {      "id": 2,      "type": "Flat Map",      "pact": "Operator",      "contents": "Flat Map",      "parallelism": 4,      "predecessors": [        {          "id": 1,          "ship_strategy": "REBALANCE",          "side": "second"        }      ]    },    {      "id": 4,      "type": "Keyed Aggregation",      "pact": "Operator",      "contents": "Keyed Aggregation",      "parallelism": 4,      "predecessors": [        {          "id": 2,          "ship_strategy": "HASH",          "side": "second"        }      ]    },    {      "id": 5,      "type": "Sink: Print to Std. Out",      "pact": "Data Sink",      "contents": "Sink: Print to Std. Out",      "parallelism": 4,      "predecessors": [        {          "id": 4,          "ship_strategy": "FORWARD",          "side": "second"        }      ]    }  ]}

可视化

打开将上面的json,输入到文本框,点击Draw进行可视化如下:

图片描述

StreamExecutionEnvironment.getExecutionPlan

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java

@Publicpublic abstract class StreamExecutionEnvironment {    //......    /**     * Creates the plan with which the system will execute the program, and     * returns it as a String using a JSON representation of the execution data     * flow graph. Note that this needs to be called, before the plan is     * executed.     *     * @return The execution plan of the program, as a JSON String.     */    public String getExecutionPlan() {        return getStreamGraph().getStreamingPlanAsJSON();    }    /**     * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.     *     * @return The streamgraph representing the transformations     */    @Internal    public StreamGraph getStreamGraph() {        if (transformations.size() <= 0) {            throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");        }        return StreamGraphGenerator.generate(this, transformations);    }    //......}
  • StreamExecutionEnvironment的getExecutionPlan方法调用了getStreamGraph方法;getStreamGraph方法使用StreamGraphGenerator.generate生成了StreamGraph;之后就是调用StreamGraph.getStreamingPlanAsJSON来获取json格式的execution plan

StreamGraph.getStreamingPlanAsJSON

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraph.java

@Internalpublic class StreamGraph extends StreamingPlan {    private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);    private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME;    private final StreamExecutionEnvironment environment;    private final ExecutionConfig executionConfig;    private final CheckpointConfig checkpointConfig;    private boolean chaining;    private Map
streamNodes; private Set
sources; private Set
sinks; private Map
>> virtualSelectNodes; private Map
> virtualSideOutputNodes; private Map
>> virtualPartitionNodes; protected Map
vertexIDtoBrokerID; protected Map
vertexIDtoLoopTimeout; private StateBackend stateBackend; private Set
> iterationSourceSinkPairs; //...... public String getStreamingPlanAsJSON() { try { return new JSONGenerator(this).getJSON(); } catch (Exception e) { throw new RuntimeException("JSON plan creation failed", e); } } //......}
  • StreamGraph的getStreamingPlanAsJSON方法使用JSONGenerator来序列化自己,返回json格式的execution plan

小结

  • flink提供了的在线地址,用于进行execution plan的可视化,它接收json形式的execution plan
  • StreamExecutionEnvironment的getExecutionPlan方法调用了getStreamGraph方法;getStreamGraph方法使用StreamGraphGenerator.generate生成了StreamGraph
  • StreamGraph的getStreamingPlanAsJSON方法使用JSONGenerator来序列化自己,返回json格式的execution plan

doc

转载地址:http://dnyto.baihongyu.com/

你可能感兴趣的文章
如何取消hibernate hbm2ddl.auto 自动创建外键
查看>>
懒加载数据,在取出数据是容易出的bug....
查看>>
c#通过oledb获取excel文件表结构信息
查看>>
linux 正则
查看>>
php安装memcached扩展
查看>>
outlook收邮件时间与电脑时间不同步
查看>>
2012年常见问题FAQ
查看>>
VMware技术支持合作伙伴日-北京-2013年8月1日-幻灯片下载
查看>>
wget命令详解(附具体实例)
查看>>
springmvc+spring+mybatis整合实例【转】
查看>>
动态规划算法-凑硬币
查看>>
新来的博友写个文章
查看>>
深入iOS系统底层之指令集介绍
查看>>
Create a restful application with AngularJS and CakePHP(III)
查看>>
利用git让服务器网站与本地同步 ssh配置
查看>>
Android 混淆代码总结
查看>>
System.Concat - 连接字符串
查看>>
我的友情链接
查看>>
WeeksInAYear、WeeksInYear、DaysInAYear、DaysInAMonth、DaysInYear、DaysInMonth - 获取指定年月的周、日数...
查看>>
Ubuntu 12.04 安装 gearman 以及php扩展安装脚本
查看>>