news 2026/4/15 10:23:21

Flink源码阅读:如何生成ExecutionGraph

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink源码阅读:如何生成ExecutionGraph

今天我们一起来了解 Flink 最后一种执行图,ExecutionGraph 的执行过程。

基本概念

在阅读源码之前,我们先来了解一下 ExecutionGraph 中的一些基本概念。

  • ExecutionJobVertex:ExecutionJobVertex 是 ExecutionGraph 中的节点,对应的是 JobGraph 中的 JobVertex。

  • ExecutionVertex:每个 ExecutionJobVertex 都包含了一组 ExecutionVertex,ExecutionVertex 的数量就是节点对应的并行度。

  • IntermediateResult:IntermediateResult 表示节点的输出结果,与之对应的是 JobGraph 中的 IntermediateDataSet。

  • IntermediateResultPartition:IntermediateResultPartition 是每个 ExecutionVertex 的输出。

  • EdgeManager:EdgeManager 主要负责存储 ExecutionGraph 中所有之间的连接,包括其并行度。

  • Execution:Execution 可以认为是一次实际的运行尝试。每次执行时,Flink 都会将ExecutionVertex 封装成一个 Execution,并通过一个 ExecutionAttemptID 来做唯一标识。

ExecutionGraph 生成过程

了解了这些基本概念之后,我们一起来看一下 ExecutionGraph 的具体生成过程。生成 ExecutionGraph 的代码入口是 DefaultExecutionGraphBuilder.build 方法。

首先是获取一些基本信息,包括 jobInformation、jobStatusChangedListeners 等。

接下来就是创建一个 DefaultExecutionGraph 和生成执行计划。

// create a new execution graph, if none exists so farfinalDefaultExecutionGraphexecutionGraph=newDefaultExecutionGraph(jobInformation,futureExecutor,ioExecutor,rpcTimeout,executionHistorySizeLimit,classLoader,blobWriter,partitionGroupReleaseStrategyFactory,shuffleMaster,partitionTracker,executionDeploymentListener,executionStateUpdateListener,initializationTimestamp,vertexAttemptNumberStore,vertexParallelismStore,isDynamicGraph,executionJobVertexFactory,jobGraph.getJobStatusHooks(),markPartitionFinishedStrategy,taskDeploymentDescriptorFactory,jobStatusChangedListeners,executionPlanSchedulingContext);try{executionGraph.setPlan(JsonPlanGenerator.generatePlan(jobGraph));}catch(Throwablet){log.warn("Cannot create plan for job",t);// give the graph an empty planexecutionGraph.setPlan(newJobPlanInfo.Plan("","","",newArrayList<>()));}

下面就是两个比较核心的方法 getVerticesSortedTopologicallyFromSources 和 attachJobGraph。

// topologically sort the job vertices and attach the graph to the existing oneList<JobVertex>sortedTopology=jobGraph.getVerticesSortedTopologicallyFromSources();executionGraph.attachJobGraph(sortedTopology,jobManagerJobMetricGroup);

这两个方法是先将 JobVertex 进行排序,然后构建 ExecutionGraph 的拓扑图。

getVerticesSortedTopologicallyFromSources
publicList<JobVertex>getVerticesSortedTopologicallyFromSources()throwsInvalidProgramException{// early out on empty listsif(this.taskVertices.isEmpty()){returnCollections.emptyList();}List<JobVertex>sorted=newArrayList<JobVertex>(this.taskVertices.size());Set<JobVertex>remaining=newLinkedHashSet<JobVertex>(this.taskVertices.values());// start by finding the vertices with no input edges// and the ones with disconnected inputs (that refer to some standalone data set){Iterator<JobVertex>iter=remaining.iterator();while(iter.hasNext()){JobVertexvertex=iter.next();if(vertex.isInputVertex()){sorted.add(vertex);iter.remove();}}}intstartNodePos=0;// traverse from the nodes that were added until we found all elementswhile(!remaining.isEmpty()){// first check if we have more candidates to start traversing from. if not, then the// graph is cyclic, which is not permittedif(startNodePos>=sorted.size()){thrownewInvalidProgramException("The job graph is cyclic.");}JobVertexcurrent=sorted.get(startNodePos++);addNodesThatHaveNoNewPredecessors(current,sorted,remaining);}returnsorted;}

这段代码是将所有的节点进行排序,先将所有的 Source 节点筛选出来,然后再将剩余节点假如列表。这样就能构建出最终的拓扑图。

attachJobGraph
@OverridepublicvoidattachJobGraph(List<JobVertex>verticesToAttach,JobManagerJobMetricGroupjobManagerJobMetricGroup)throwsJobException{assertRunningInJobMasterMainThread();LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} "+"vertices and {} intermediate results.",verticesToAttach.size(),tasks.size(),intermediateResults.size());attachJobVertices(verticesToAttach,jobManagerJobMetricGroup);if(!isDynamic){initializeJobVertices(verticesToAttach);}// the topology assigning should happen before notifying new vertices to failoverStrategyexecutionTopology=DefaultExecutionTopology.fromExecutionGraph(this);partitionGroupReleaseStrategy=partitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology());}

attachJobGraph 方法主要包含两步逻辑,第一步是调用 attachJobVertices 方法创建 ExecutionJobVertex 实例,第二步是调用 fromExecutionGraph 创建一些其他的核心对象。

attachJobVertices

attachJobVertices 方法中就是遍历所有的 JobVertex,然后利用 JobVertex 生成 ExecutionJobVertex。

/** Attach job vertices without initializing them. */privatevoidattachJobVertices(List<JobVertex>topologicallySorted,JobManagerJobMetricGroupjobManagerJobMetricGroup)throwsJobException{for(JobVertexjobVertex:topologicallySorted){if(jobVertex.isInputVertex()&&!jobVertex.isStoppable()){this.isStoppable=false;}VertexParallelismInformationparallelismInfo=parallelismStore.getParallelismInfo(jobVertex.getID());// create the execution job vertex and attach it to the graphExecutionJobVertexejv=executionJobVertexFactory.createExecutionJobVertex(this,jobVertex,parallelismInfo,coordinatorStore,jobManagerJobMetricGroup);ExecutionJobVertexpreviousTask=this.tasks.putIfAbsent(jobVertex.getID(),ejv);if(previousTask!=null){thrownewJobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",jobVertex.getID(),ejv,previousTask));}this.verticesInCreationOrder.add(ejv);this.numJobVerticesTotal++;}}

initializeJobVertices

在 DefaultExecutionGraph.initializeJobVertices 中是遍历了刚刚排好序的 JobVertex,获取了 ExecutionJobVertex 之后调用了 ExecutionGraph.initializeJobVertex 方法。

我们直接来看 ExecutionGraph.initializeJobVertex 的逻辑。

defaultvoidinitializeJobVertex(ExecutionJobVertexejv,longcreateTimestamp)throwsJobException{initializeJobVertex(ejv,createTimestamp,VertexInputInfoComputationUtils.computeVertexInputInfos(ejv,getAllIntermediateResults()::get));}

这里先是调用了 VertexInputInfoComputationUtils.computeVertexInputInfos 方法,生成了 Map<IntermediateDataSetID, JobVertexInputInfo> jobVertexInputInfos。它表示的是每个 ExecutionVertex 消费上游 IntermediateResultPartition 的范围。

这里有两种模式,分别是 POINTWISE (点对点)和 ALL_TO_ALL(全对全)

在 POINTWISE 模式中,会按照尽量均匀分布的方式处理。

  • 例如上游并发度是4,下游并发度是2时,那么前两个 IntermediateResultPartition 就会被第一个 ExecutionVertex 消费,后两个 IntermediateResultPartition 就会被第二个 ExecutionVertex 消费。

  • 如果上游并发度是2,下游是3时,那么下游前两个 IntermediateResultPartition 会被第一个 ExecutionVertex 消费,第三个 IntermediateResultPartition 则会被第二个 ExecutionVertex 消费。

publicstaticJobVertexInputInfocomputeVertexInputInfoForPointwise(intsourceCount,inttargetCount,Function<Integer,Integer>numOfSubpartitionsRetriever,booleanisDynamicGraph){finalList<ExecutionVertexInputInfo>executionVertexInputInfos=newArrayList<>();if(sourceCount>=targetCount){for(intindex=0;index<targetCount;index++){intstart=index*sourceCount/targetCount;intend=(index+1)*sourceCount/targetCount;IndexRangepartitionRange=newIndexRange(start,end-1);IndexRangesubpartitionRange=computeConsumedSubpartitionRange(index,1,()->numOfSubpartitionsRetriever.apply(start),isDynamicGraph,false,false);executionVertexInputInfos.add(newExecutionVertexInputInfo(index,partitionRange,subpartitionRange));}}else{for(intpartitionNum=0;partitionNum<sourceCount;partitionNum++){intstart=(partitionNum*targetCount+sourceCount-1)/sourceCount;intend=((partitionNum+1)*targetCount+sourceCount-1)/sourceCount;intnumConsumers=end-start;IndexRangepartitionRange=newIndexRange(partitionNum,partitionNum);// Variable used in lambda expression should be final or effectively finalfinalintfinalPartitionNum=partitionNum;for(inti=start;i<end;i++){IndexRangesubpartitionRange=computeConsumedSubpartitionRange(i,numConsumers,()->numOfSubpartitionsRetriever.apply(finalPartitionNum),isDynamicGraph,false,false);executionVertexInputInfos.add(newExecutionVertexInputInfo(i,partitionRange,subpartitionRange));}}}returnnewJobVertexInputInfo(executionVertexInputInfos);}

在 ALL_TO_ALL 模式中,每个下游都会消费所有上游的数据。

publicstaticJobVertexInputInfocomputeVertexInputInfoForAllToAll(intsourceCount,inttargetCount,Function<Integer,Integer>numOfSubpartitionsRetriever,booleanisDynamicGraph,booleanisBroadcast,booleanisSingleSubpartitionContainsAllData){finalList<ExecutionVertexInputInfo>executionVertexInputInfos=newArrayList<>();IndexRangepartitionRange=newIndexRange(0,sourceCount-1);for(inti=0;i<targetCount;++i){IndexRangesubpartitionRange=computeConsumedSubpartitionRange(i,targetCount,()->numOfSubpartitionsRetriever.apply(0),isDynamicGraph,isBroadcast,isSingleSubpartitionContainsAllData);executionVertexInputInfos.add(newExecutionVertexInputInfo(i,partitionRange,subpartitionRange));}returnnewJobVertexInputInfo(executionVertexInputInfos);}

生成好了 jobVertexInputInfos 之后,我们再回到 DefaultExecutionGraph.initializeJobVertex 方法中。

@OverridepublicvoidinitializeJobVertex(ExecutionJobVertexejv,longcreateTimestamp,Map<IntermediateDataSetID,JobVertexInputInfo>jobVertexInputInfos)throwsJobException{checkNotNull(ejv);checkNotNull(jobVertexInputInfos);jobVertexInputInfos.forEach((resultId,info)->this.vertexInputInfoStore.put(ejv.getJobVertexId(),resultId,info));ejv.initialize(executionHistorySizeLimit,rpcTimeout,createTimestamp,this.initialAttemptCounts.getAttemptCounts(ejv.getJobVertexId()),executionPlanSchedulingContext);ejv.connectToPredecessors(this.intermediateResults);for(IntermediateResultres:ejv.getProducedDataSets()){IntermediateResultpreviousDataSet=this.intermediateResults.putIfAbsent(res.getId(),res);if(previousDataSet!=null){thrownewJobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",res.getId(),res,previousDataSet));}}registerExecutionVerticesAndResultPartitionsFor(ejv);// enrich network memory.SlotSharingGroupslotSharingGroup=ejv.getSlotSharingGroup();if(areJobVerticesAllInitialized(slotSharingGroup)){SsgNetworkMemoryCalculationUtils.enrichNetworkMemory(slotSharingGroup,this::getJobVertex,shuffleMaster);}}

首先来看 ExecutionJobVertex.initialize 方法。这个方法主要是生成 IntermediateResult 和 ExecutionVertex。

protectedvoidinitialize(intexecutionHistorySizeLimit,Durationtimeout,longcreateTimestamp,SubtaskAttemptNumberStoreinitialAttemptCounts,ExecutionPlanSchedulingContextexecutionPlanSchedulingContext)throwsJobException{checkState(parallelismInfo.getParallelism()>0);checkState(!isInitialized());this.taskVertices=newExecutionVertex[parallelismInfo.getParallelism()];this.inputs=newArrayList<>(jobVertex.getInputs().size());// create the intermediate resultsthis.producedDataSets=newIntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];for(inti=0;i<jobVertex.getProducedDataSets().size();i++){finalIntermediateDataSetresult=jobVertex.getProducedDataSets().get(i);this.producedDataSets[i]=newIntermediateResult(result,this,this.parallelismInfo.getParallelism(),result.getResultType(),executionPlanSchedulingContext);}// create all task verticesfor(inti=0;i<this.parallelismInfo.getParallelism();i++){ExecutionVertexvertex=createExecutionVertex(this,i,producedDataSets,timeout,createTimestamp,executionHistorySizeLimit,initialAttemptCounts.getAttemptCount(i));this.taskVertices[i]=vertex;}// sanity check for the double referencing between intermediate result partitions and// execution verticesfor(IntermediateResultir:this.producedDataSets){if(ir.getNumberOfAssignedPartitions()!=this.parallelismInfo.getParallelism()){thrownewRuntimeException("The intermediate result's partitions were not correctly assigned.");}}// set up the input splits, if the vertex has anytry{@SuppressWarnings("unchecked")InputSplitSource<InputSplit>splitSource=(InputSplitSource<InputSplit>)jobVertex.getInputSplitSource();if(splitSource!=null){ThreadcurrentThread=Thread.currentThread();ClassLoaderoldContextClassLoader=currentThread.getContextClassLoader();currentThread.setContextClassLoader(graph.getUserClassLoader());try{inputSplits=splitSource.createInputSplits(this.parallelismInfo.getParallelism());if(inputSplits!=null){splitAssigner=splitSource.getInputSplitAssigner(inputSplits);}}finally{currentThread.setContextClassLoader(oldContextClassLoader);}}else{inputSplits=null;}}catch(Throwablet){thrownewJobException("Creating the input splits caused an error: "+t.getMessage(),t);}}

在创建 ExecutionVertex 时,会创建 IntermediateResultPartition 和 Execution,创建 Execution 时,会设置 attemptNumber,这个值默认是0,如果 ExecutionVertex 是重新调度的,那么 attemptNumber 会自增加1。

ExecutionJobVertex.connectToPredecessors 方法主要是生成 ExecutionVertex 与 IntermediateResultPartition 的关联关系。这里设置关联关系也分成了点对点和全对全两种模式处理,点对点模式需要计算 ExecutionVertex 对应的 IntermediateResultPartition index 的范围。两种模式最终都调用了 connectInternal 方法。

/** Connect all execution vertices to all partitions. */privatestaticvoidconnectInternal(List<ExecutionVertex>taskVertices,List<IntermediateResultPartition>partitions,ResultPartitionTyperesultPartitionType,EdgeManageredgeManager){checkState(!taskVertices.isEmpty());checkState(!partitions.isEmpty());ConsumedPartitionGroupconsumedPartitionGroup=createAndRegisterConsumedPartitionGroupToEdgeManager(taskVertices.size(),partitions,resultPartitionType,edgeManager);for(ExecutionVertexev:taskVertices){ev.addConsumedPartitionGroup(consumedPartitionGroup);}List<ExecutionVertexID>consumerVertices=taskVertices.stream().map(ExecutionVertex::getID).collect(Collectors.toList());ConsumerVertexGroupconsumerVertexGroup=ConsumerVertexGroup.fromMultipleVertices(consumerVertices,resultPartitionType);for(IntermediateResultPartitionpartition:partitions){partition.addConsumers(consumerVertexGroup);}consumedPartitionGroup.setConsumerVertexGroup(consumerVertexGroup);consumerVertexGroup.setConsumedPartitionGroup(consumedPartitionGroup);}

这个方法中 ev.addConsumedPartitionGroup(consumedPartitionGroup); 负责将 ExecutionVertex 到 IntermediateResultPartition 的关联关系保存在 EdgeManager.vertexConsumedPartitions 中。

而 partition.addConsumers(consumerVertexGroup); 则负责将 IntermediateResultPartition 到 ExecutionVertex 的关系保存在 EdgeManager.partitionConsumers 中。

总结

通过本文,我们了解了 Flink 是如何将 JobGraph 转换成 ExecutionGraph 的。其中涉及到的一些核心概念名称比较类似,建议认真学习和理解透彻之后再研究其生成方法和对应关系,也可以借助前文中 ExecutionGraph 示意图辅助学习。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/13 18:26:42

从Git安装到运行FLUX.1-dev:新手避坑指南

从Git安装到运行FLUX.1-dev&#xff1a;新手避坑指南 在AI生成图像的热潮中&#xff0c;越来越多开发者尝试部署像 FLUX.1-dev 这样的前沿多模态模型。然而&#xff0c;当你兴致勃勃地克隆完仓库、装好依赖&#xff0c;却卡在“CUDA out of memory”或“Missing model weights…

作者头像 李华
网站建设 2026/4/12 17:36:22

跨平台歌词下载神器:ZonyLrcToolsX 完全使用指南

跨平台歌词下载神器&#xff1a;ZonyLrcToolsX 完全使用指南 【免费下载链接】ZonyLrcToolsX ZonyLrcToolsX 是一个能够方便地下载歌词的小软件。 项目地址: https://gitcode.com/gh_mirrors/zo/ZonyLrcToolsX 还在为音乐播放器缺少歌词而烦恼吗&#xff1f;ZonyLrcTool…

作者头像 李华
网站建设 2026/4/8 7:54:13

如何用响应式编程实现5倍性能提升的异步处理系统

如何用响应式编程实现5倍性能提升的异步处理系统 【免费下载链接】reactor-core Non-Blocking Reactive Foundation for the JVM 项目地址: https://gitcode.com/gh_mirrors/re/reactor-core 在现代应用开发中&#xff0c;异步编程已成为提升系统性能的关键技术。本文将…

作者头像 李华
网站建设 2026/3/30 20:58:56

什么是InfiniBand(IB)网络

转自微信号&#xff1a;Ai long cloud一、什么是InfiniBand网络InfiniBand&#xff1a;即“无限带宽”技术&#xff0c;缩写为IB&#xff0c;是一种网络通信标准&#xff0c;是RDMA技术的一种协议&#xff0c;它采用高速差分信号技术和多通道并行传输机制&#xff0c;主要目标是…

作者头像 李华
网站建设 2026/4/14 5:00:03

PyTorch安装Qwen-Image全流程教程(附GPU算力优化建议)

PyTorch部署Qwen-Image全流程与GPU算力优化实战 在AIGC浪潮席卷创意产业的今天&#xff0c;高质量图像生成已不再局限于研究实验室&#xff0c;而是逐步成为企业内容生产链路中的关键环节。从广告设计到游戏原画&#xff0c;从社交媒体运营到出版物插图&#xff0c;对“精准可控…

作者头像 李华
网站建设 2026/4/9 18:18:31

Markdown超链接关联Qwen3-VL-30B相关技术文档

Qwen3-VL-30B&#xff1a;如何让AI真正“看懂”世界&#xff1f; 在智能客服上传一张产品故障图&#xff0c;系统不仅能识别出损坏部件&#xff0c;还能结合说明书判断是否在保修范围内&#xff1b;医生将CT影像与病历文本同时输入&#xff0c;AI自动比对历史记录并提示潜在误诊…

作者头像 李华