序
本文主要研究一下flink的Execution Plan Visualization
实例
代码
@Test public void testExecutionPlan(){ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream> dataStream = env.fromElements(WORDS) .flatMap(new WordCountTest.Tokenizer()) .keyBy(0) .sum(1); dataStream.print(); System.out.println(env.getExecutionPlan()); }
json
{ "nodes": [ { "id": 1, "type": "Source: Collection Source", "pact": "Data Source", "contents": "Source: Collection Source", "parallelism": 1 }, { "id": 2, "type": "Flat Map", "pact": "Operator", "contents": "Flat Map", "parallelism": 4, "predecessors": [ { "id": 1, "ship_strategy": "REBALANCE", "side": "second" } ] }, { "id": 4, "type": "Keyed Aggregation", "pact": "Operator", "contents": "Keyed Aggregation", "parallelism": 4, "predecessors": [ { "id": 2, "ship_strategy": "HASH", "side": "second" } ] }, { "id": 5, "type": "Sink: Print to Std. Out", "pact": "Data Sink", "contents": "Sink: Print to Std. Out", "parallelism": 4, "predecessors": [ { "id": 4, "ship_strategy": "FORWARD", "side": "second" } ] } ]}
可视化
打开将上面的json,输入到文本框,点击Draw进行可视化如下:
StreamExecutionEnvironment.getExecutionPlan
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@Publicpublic abstract class StreamExecutionEnvironment { //...... /** * Creates the plan with which the system will execute the program, and * returns it as a String using a JSON representation of the execution data * flow graph. Note that this needs to be called, before the plan is * executed. * * @return The execution plan of the program, as a JSON String. */ public String getExecutionPlan() { return getStreamGraph().getStreamingPlanAsJSON(); } /** * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job. * * @return The streamgraph representing the transformations */ @Internal public StreamGraph getStreamGraph() { if (transformations.size() <= 0) { throw new IllegalStateException("No operators defined in streaming topology. Cannot execute."); } return StreamGraphGenerator.generate(this, transformations); } //......}
- StreamExecutionEnvironment的getExecutionPlan方法调用了getStreamGraph方法;getStreamGraph方法使用StreamGraphGenerator.generate生成了StreamGraph;之后就是调用StreamGraph.getStreamingPlanAsJSON来获取json格式的execution plan
StreamGraph.getStreamingPlanAsJSON
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraph.java
@Internalpublic class StreamGraph extends StreamingPlan { private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class); private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME; private final StreamExecutionEnvironment environment; private final ExecutionConfig executionConfig; private final CheckpointConfig checkpointConfig; private boolean chaining; private MapstreamNodes; private Set sources; private Set sinks; private Map >> virtualSelectNodes; private Map > virtualSideOutputNodes; private Map >> virtualPartitionNodes; protected Map vertexIDtoBrokerID; protected Map vertexIDtoLoopTimeout; private StateBackend stateBackend; private Set > iterationSourceSinkPairs; //...... public String getStreamingPlanAsJSON() { try { return new JSONGenerator(this).getJSON(); } catch (Exception e) { throw new RuntimeException("JSON plan creation failed", e); } } //......}
- StreamGraph的getStreamingPlanAsJSON方法使用JSONGenerator来序列化自己,返回json格式的execution plan
小结
- flink提供了的在线地址,用于进行execution plan的可视化,它接收json形式的execution plan
- StreamExecutionEnvironment的getExecutionPlan方法调用了getStreamGraph方法;getStreamGraph方法使用StreamGraphGenerator.generate生成了StreamGraph
- StreamGraph的getStreamingPlanAsJSON方法使用JSONGenerator来序列化自己,返回json格式的execution plan