Skip to content

Instantly share code, notes, and snippets.

@Aitozi
Last active May 27, 2018 04:27
Show Gist options
  • Save Aitozi/2e021e923ed394155b191853e0975a71 to your computer and use it in GitHub Desktop.
Save Aitozi/2e021e923ed394155b191853e0975a71 to your computer and use it in GitHub Desktop.
Flink JobGraph生成涉及的类
public class StreamGraphHasherV2 implements StreamGraphHasher {
private static final Logger LOG = LoggerFactory.getLogger(StreamGraphHasherV2.class);
/**
* Returns a map with a hash for each {@link StreamNode} of the {@link
* StreamGraph}. The hash is used as the {@link JobVertexID} in order to
* identify nodes across job submissions if they didn't change.
*
* <p>The complete {@link StreamGraph} is traversed. The hash is either
* computed from the transformation's user-specified id (see
* {@link StreamTransformation#getUid()}) or generated in a deterministic way.
*
* <p>The generated hash is deterministic with respect to: 生成的一个node的hash值由以下结构部分组成
* <ul>
* <li>node-local properties (like parallelism, UDF, node ID), 真正计算的时候只考虑了node的id属性,其他的并发度和userfunction可能认为改变了依然是同一个任务
* <li>chained output nodes, and 能够chain上的output算子
* <li>input nodes hashes input节点的hash值
* </ul>
*
* @return A map from {@link StreamNode#id} to hash as 16-byte array.
*/
@Override
public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
// The hash function used to generate the hash
final HashFunction hashFunction = Hashing.murmur3_128(0);
final Map<Integer, byte[]> hashes = new HashMap<>();
Set<Integer> visited = new HashSet<>();
Queue<StreamNode> remaining = new ArrayDeque<>();
// We need to make the source order deterministic. The source IDs are
// not returned in the same order, which means that submitting the same
// program twice might result in different traversal, which breaks the
// deterministic hash assignment.
// 返回的是乱序的,但是hash的生成又和hashes中当前size相关,所以要先做排序保证顺序每次都是一样的
List<Integer> sources = new ArrayList<>();
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
sources.add(sourceNodeId);
}
Collections.sort(sources);
//
// 广度优先,先遍历所有的source
// Traverse the graph in a breadth-first manner. Keep in mind that
// the graph is not a tree and multiple paths to nodes can exist.
//
// Start with source nodes
for (Integer sourceNodeId : sources) {
remaining.add(streamGraph.getStreamNode(sourceNodeId));
visited.add(sourceNodeId);
}
StreamNode currentNode;
while ((currentNode = remaining.poll()) != null) {
// Generate the hash code. Because multiple path exist to each
// node, we might not have all required inputs available to
// generate the hash code.
if (generateNodeHash(currentNode, hashFunction, hashes, streamGraph.isChainingEnabled())) {
// Add the child nodes
for (StreamEdge outEdge : currentNode.getOutEdges()) {
StreamNode child = outEdge.getTargetVertex();
if (!visited.contains(child.getId())) {
remaining.add(child);
visited.add(child.getId());
}
}
} else {
// We will revisit this later.
// visited中表示的已经访问过的元素,可能在生成hash值的时候是失败的,比如input节点还没有生成就会返回失败等
// input节点生成完成之后再重新扔进remaing进行hash计算
visited.remove(currentNode.getId());
}
}
return hashes;
}
/**
* Generates a hash for the node and returns whether the operation was
* successful.
*
* @param node The node to generate the hash for
* @param hashFunction The hash function to use
* @param hashes The current state of generated hashes
* @return <code>true</code> if the node hash has been generated.
* <code>false</code>, otherwise. If the operation is not successful, the
* hash needs be generated at a later point when all input is available.
* @throws IllegalStateException If node has user-specified hash and is
* intermediate node of a chain
*/
private boolean generateNodeHash(
StreamNode node,
HashFunction hashFunction,
Map<Integer, byte[]> hashes,
boolean isChainingEnabled) {
// Check for user-specified ID
// 这个就是用户通过setUid设置的节点的id值
String userSpecifiedHash = node.getTransformationUID();
if (userSpecifiedHash == null) {
// Check that all input nodes have their hashes computed
for (StreamEdge inEdge : node.getInEdges()) {
// If the input node has not been visited yet, the current
// node will be visited again at a later point when all input
// nodes have been visited and their hashes set.
// 只有当input节点都被生成hash存储之后才能进入后面节点的hash生成
if (!hashes.containsKey(inEdge.getSourceId())) {
return false;
}
}
Hasher hasher = hashFunction.newHasher();
byte[] hash = generateDeterministicHash(node, hasher, hashes, isChainingEnabled);
if (hashes.put(node.getId(), hash) != null) {
// Sanity check
throw new IllegalStateException("Unexpected state. Tried to add node hash " +
"twice. This is probably a bug in the JobGraph generator.");
}
return true;
} else {
Hasher hasher = hashFunction.newHasher();
byte[] hash = generateUserSpecifiedHash(node, hasher);
for (byte[] previousHash : hashes.values()) {
if (Arrays.equals(previousHash, hash)) {
throw new IllegalArgumentException("Hash collision on user-specified ID. " +
"Most likely cause is a non-unique ID. Please check that all IDs " +
"specified via `uid(String)` are unique.");
}
}
if (hashes.put(node.getId(), hash) != null) {
// Sanity check
throw new IllegalStateException("Unexpected state. Tried to add node hash " +
"twice. This is probably a bug in the JobGraph generator.");
}
return true;
}
}
/**
* Generates a hash from a user-specified ID.
*/
private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) {
hasher.putString(node.getTransformationUID(), Charset.forName("UTF-8"));
// 用户指定了uid之后直接生成指定的hash值就可以了,步骤简化了不少
return hasher.hash().asBytes();
}
/**
* Generates a deterministic hash from node-local properties and input and
* output edges.
*/
private byte[] generateDeterministicHash(
StreamNode node,
Hasher hasher,
Map<Integer, byte[]> hashes,
boolean isChainingEnabled) {
// Include stream node to hash. We use the current size of the computed
// hashes as the ID. We cannot use the node's ID, because it is
// assigned from a static counter. This will result in two identical
// programs having different hashes.
generateNodeLocalHash(node, hasher, hashes.size());
// Include chained nodes to hash
for (StreamEdge outEdge : node.getOutEdges()) {
// 判断节点是否能够连接起来
if (isChainable(outEdge, isChainingEnabled)) {
StreamNode chainedNode = outEdge.getTargetVertex();
// Use the hash size again, because the nodes are chained to
// this node. This does not add a hash for the chained nodes.
// hashsize此时没有变,因为当前节点的还没有加进去,因为这些节点是被融合成了一个,所以使用同一个id值
generateNodeLocalHash(chainedNode, hasher, hashes.size());
}
}
byte[] hash = hasher.hash().asBytes();
// Make sure that all input nodes have their hash set before entering
// this loop (calling this method).
for (StreamEdge inEdge : node.getInEdges()) {
byte[] otherHash = hashes.get(inEdge.getSourceId());
// Sanity check
if (otherHash == null) {
throw new IllegalStateException("Missing hash for input node "
+ inEdge.getSourceVertex() + ". Cannot generate hash for "
+ node + ".");
}
// 这就是每个节点的hash信息都会带上他input节点的hash信息
for (int j = 0; j < hash.length; j++) {
hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]);
}
}
if (LOG.isDebugEnabled()) {
String udfClassName = "";
if (node.getOperator() instanceof AbstractUdfStreamOperator) {
udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator())
.getUserFunction().getClass().getName();
}
LOG.debug("Generated hash '" + byteToHexString(hash) + "' for node " +
"'" + node.toString() + "' {id: " + node.getId() + ", " +
"parallelism: " + node.getParallelism() + ", " +
"user function: " + udfClassName + "}");
}
return hash;
}
/**
* Applies the {@link Hasher} to the {@link StreamNode} (only node local
* attributes are taken into account). The hasher encapsulates the current
* state of the hash.
*
* <p>The specified ID is local to this node. We cannot use the
* {@link StreamNode#id}, because it is incremented in a static counter.
* Therefore, the IDs for identical jobs will otherwise be different.
*/
private void generateNodeLocalHash(StreamNode node, Hasher hasher, int id) {
// This resolves conflicts for otherwise identical source nodes. BUT
// the generated hash codes depend on the ordering of the nodes in the
// stream graph.
// 没用使用到传入的node参数,查看了下,这个是因为V1中的hash计算和node的属性相关,但V2版本用不到,应该被去除
hasher.putInt(id);
}
private boolean isChainable(StreamEdge edge, boolean isChainingEnabled) {
StreamNode upStreamVertex = edge.getSourceVertex();
StreamNode downStreamVertex = edge.getTargetVertex();
StreamOperator<?> headOperator = upStreamVertex.getOperator();
StreamOperator<?> outOperator = downStreamVertex.getOperator();
// 判断是否可以chain其实是对Edge进行判断,取其上下游进行判断
return downStreamVertex.getInEdges().size() == 1
&& outOperator != null
&& headOperator != null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& isChainingEnabled;
}
}
/**
* The StreamingJobGraphGenerator converts a {@link StreamGraph} into a {@link JobGraph}.
*/
@Internal
public class StreamingJobGraphGenerator {
private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
/**
* Restart delay used for the FixedDelayRestartStrategy in case checkpointing was enabled but
* no restart strategy has been specified.
*/
private static final long DEFAULT_RESTART_DELAY = 10000L;
// ------------------------------------------------------------------------
public static JobGraph createJobGraph(StreamGraph streamGraph) {
return new StreamingJobGraphGenerator(streamGraph).createJobGraph();
}
// ------------------------------------------------------------------------
private final StreamGraph streamGraph;
private final Map<Integer, JobVertex> jobVertices;
private final JobGraph jobGraph;
private final Collection<Integer> builtVertices;
private final List<StreamEdge> physicalEdgesInOrder;
private final Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
private final Map<Integer, StreamConfig> vertexConfigs;
private final Map<Integer, String> chainedNames;
private final Map<Integer, ResourceSpec> chainedMinResources;
private final Map<Integer, ResourceSpec> chainedPreferredResources;
private final StreamGraphHasher defaultStreamGraphHasher;
private final List<StreamGraphHasher> legacyStreamGraphHashers;
private StreamingJobGraphGenerator(StreamGraph streamGraph) {
this.streamGraph = streamGraph;
this.defaultStreamGraphHasher = new StreamGraphHasherV2();
this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphHasherV1(), new StreamGraphUserHashHasher());
this.jobVertices = new HashMap<>();
this.builtVertices = new HashSet<>();
this.chainedConfigs = new HashMap<>();
this.vertexConfigs = new HashMap<>();
this.chainedNames = new HashMap<>();
this.chainedMinResources = new HashMap<>();
this.chainedPreferredResources = new HashMap<>();
this.physicalEdgesInOrder = new ArrayList<>();
jobGraph = new JobGraph(streamGraph.getJobName());
}
private JobGraph createJobGraph() {
// make sure that all vertices start immediately
jobGraph.setScheduleMode(ScheduleMode.EAGER);
// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// Generate legacy version hashes for backwards compatibility
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
setChaining(hashes, legacyHashes, chainedOperatorHashes);
setPhysicalEdges();
setSlotSharing();
configureCheckpointing();
// add registered cache file into job configuration
for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : streamGraph.getEnvironment().getCachedFiles()) {
DistributedCache.writeFileInfoToConfig(e.f0, e.f1, jobGraph.getJobConfiguration());
}
// set the ExecutionConfig last when it has been finalized
try {
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
}
catch (IOException e) {
throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
"This indicates that non-serializable types (like custom serializers) were registered");
}
return jobGraph;
}
private void setPhysicalEdges() {
Map<Integer, List<StreamEdge>> physicalInEdgesInOrder = new HashMap<Integer, List<StreamEdge>>();
for (StreamEdge edge : physicalEdgesInOrder) {
int target = edge.getTargetId();
List<StreamEdge> inEdges = physicalInEdgesInOrder.get(target);
// create if not set
if (inEdges == null) {
inEdges = new ArrayList<>();
physicalInEdgesInOrder.put(target, inEdges);
}
inEdges.add(edge);
}
for (Map.Entry<Integer, List<StreamEdge>> inEdges : physicalInEdgesInOrder.entrySet()) {
int vertex = inEdges.getKey();
List<StreamEdge> edgeList = inEdges.getValue();
// 给各个vertex的StreamConfig设置其上游的inEdgeList
vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);
}
}
/**
* Sets up task chains from the source {@link StreamNode} instances.
*
* <p>This will recursively create all {@link JobVertex} instances.
*/
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
// 同样是从source处往后创建chain
createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
}
}
private List<StreamEdge> createChain(
Integer startNodeId,
Integer currentNodeId,
Map<Integer, byte[]> hashes,
List<Map<Integer, byte[]>> legacyHashes,
int chainIndex,
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
// builtVertices表示已经创建过JobVertex节点的streamNode的id
if (!builtVertices.contains(startNodeId)) {
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
for (StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(
// 这里是递归的将能够和这个edge chain的edge都串起来并且记录在transitiveOutEdges,他负责收集所有的outEdges,不关chain或者不chain
createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
}
// 深度优先的算法,从source先一直获取到最终的edge和上面的区别在于他的判断因子nonChainable.getTargetId()变了,上面的保持不变,这里是取下一个节点再去判断是否能chain起来
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
}
// 这里其实不管是不是可以chain的算子都会放到chainedOperatorHashes 这个应该表示的是做完chain操作之后的算子的hash列表,
// 里面不包含那些被chain进去的算子的hash因为没有他们的startNodeID,这个首先被放进去的应该是sink处算子的hash,因为是递归调用的
List<Tuple2<byte[], byte[]>> operatorHashes = chainedOperatorHashes.get(startNodeId);
if (operatorHashes == null) {
operatorHashes = new ArrayList<>();
chainedOperatorHashes.put(startNodeId, operatorHashes);
}
operatorHashes.add(new Tuple2<>(hashes.get(currentNodeId), legacyHashes.get(1).get(currentNodeId)));
// 对于chain的startnodeid就是最开始那个id值
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
// 资源计算主要针对的是chain的算子的计算
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
// 创建JobVertex
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
: new StreamConfig(new Configuration());
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
if (currentNodeId.equals(startNodeId)) {
config.setChainStart();
config.setChainIndex(0);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
config.setOutEdgesInOrder(transitiveOutEdges);
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
/**
* 当前节点和startNodeId一致的时候有两种可能:
* 1. 这个节点是chain的head底部已经遍历完成
* 2. 这个节点是个单节点那么他的transitiveOutEdges就包含了一个edge,就是他上游节点的edge
*/
for (StreamEdge edge : transitiveOutEdges) {
connect(startNodeId, edge);
}
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNodeId);
if (chainedConfs == null) {
chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>());
}
config.setChainIndex(chainIndex);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
// 保存chain的每个节点的配置
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
// 这里应该可以看出普通没有chain的节点应该既是chainstart 也是chainend 两个属性都为true
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
// 返回一个链路所涉及到的边
return transitiveOutEdges;
} else {
return new ArrayList<>();
}
}
private String createChainedName(Integer vertexID, List<StreamEdge> chainedOutputs) {
String operatorName = streamGraph.getStreamNode(vertexID).getOperatorName();
if (chainedOutputs.size() > 1) {
List<String> outputChainedNames = new ArrayList<>();
for (StreamEdge chainable : chainedOutputs) {
outputChainedNames.add(chainedNames.get(chainable.getTargetId()));
}
return operatorName + " -> (" + StringUtils.join(outputChainedNames, ", ") + ")";
} else if (chainedOutputs.size() == 1) {
return operatorName + " -> " + chainedNames.get(chainedOutputs.get(0).getTargetId());
} else {
// 没有chain的话就返回自己的名字
return operatorName;
}
}
private ResourceSpec createChainedMinResources(Integer vertexID, List<StreamEdge> chainedOutputs) {
ResourceSpec minResources = streamGraph.getStreamNode(vertexID).getMinResources();
// chain的资源是将各个资源相加起来 每个节点有资源的概念
for (StreamEdge chainable : chainedOutputs) {
minResources = minResources.merge(chainedMinResources.get(chainable.getTargetId()));
}
return minResources;
}
private ResourceSpec createChainedPreferredResources(Integer vertexID, List<StreamEdge> chainedOutputs) {
ResourceSpec preferredResources = streamGraph.getStreamNode(vertexID).getPreferredResources();
for (StreamEdge chainable : chainedOutputs) {
preferredResources = preferredResources.merge(chainedPreferredResources.get(chainable.getTargetId()));
}
return preferredResources;
}
private StreamConfig createJobVertex(
Integer streamNodeId,
Map<Integer, byte[]> hashes,
List<Map<Integer, byte[]>> legacyHashes,
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
JobVertex jobVertex;
StreamNode streamNode = streamGraph.getStreamNode(streamNodeId);
byte[] hash = hashes.get(streamNodeId);
if (hash == null) {
throw new IllegalStateException("Cannot find node hash. " +
"Did you generate them before calling this method?");
}
JobVertexID jobVertexId = new JobVertexID(hash);
List<JobVertexID> legacyJobVertexIds = new ArrayList<>(legacyHashes.size());
for (Map<Integer, byte[]> legacyHash : legacyHashes) {
hash = legacyHash.get(streamNodeId);
if (null != hash) {
legacyJobVertexIds.add(new JobVertexID(hash));
}
}
List<Tuple2<byte[], byte[]>> chainedOperators = chainedOperatorHashes.get(streamNodeId);
List<OperatorID> chainedOperatorVertexIds = new ArrayList<>();
List<OperatorID> userDefinedChainedOperatorVertexIds = new ArrayList<>();
if (chainedOperators != null) {
for (Tuple2<byte[], byte[]> chainedOperator : chainedOperators) {
chainedOperatorVertexIds.add(new OperatorID(chainedOperator.f0));
userDefinedChainedOperatorVertexIds.add(chainedOperator.f1 != null ? new OperatorID(chainedOperator.f1) : null);
}
}
if (streamNode.getInputFormat() != null) {
jobVertex = new InputFormatVertex(
chainedNames.get(streamNodeId),
jobVertexId,
legacyJobVertexIds,
chainedOperatorVertexIds,
userDefinedChainedOperatorVertexIds);
TaskConfig taskConfig = new TaskConfig(jobVertex.getConfiguration());
taskConfig.setStubWrapper(new UserCodeObjectWrapper<Object>(streamNode.getInputFormat()));
} else {
// 非chain的算子也在chainedNames中
jobVertex = new JobVertex(
chainedNames.get(streamNodeId),
jobVertexId,
legacyJobVertexIds,
chainedOperatorVertexIds,
userDefinedChainedOperatorVertexIds);
}
jobVertex.setResources(chainedMinResources.get(streamNodeId), chainedPreferredResources.get(streamNodeId));
jobVertex.setInvokableClass(streamNode.getJobVertexClass());
int parallelism = streamNode.getParallelism();
if (parallelism > 0) {
jobVertex.setParallelism(parallelism);
} else {
parallelism = jobVertex.getParallelism();
}
jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
if (LOG.isDebugEnabled()) {
LOG.debug("Parallelism set: {} for {}", parallelism, streamNodeId);
}
jobVertices.put(streamNodeId, jobVertex);
builtVertices.add(streamNodeId);
jobGraph.addVertex(jobVertex);
return new StreamConfig(jobVertex.getConfiguration());
}
@SuppressWarnings("unchecked")
private void setVertexConfig(Integer vertexID, StreamConfig config,
List<StreamEdge> chainableOutputs, List<StreamEdge> nonChainableOutputs) {
StreamNode vertex = streamGraph.getStreamNode(vertexID);
config.setVertexID(vertexID);
config.setBufferTimeout(vertex.getBufferTimeout());
config.setTypeSerializerIn1(vertex.getTypeSerializerIn1());
config.setTypeSerializerIn2(vertex.getTypeSerializerIn2());
config.setTypeSerializerOut(vertex.getTypeSerializerOut());
// iterate edges, find sideOutput edges create and save serializers for each outputTag type
for (StreamEdge edge : chainableOutputs) {
if (edge.getOutputTag() != null) {
config.setTypeSerializerSideOut(
edge.getOutputTag(),
edge.getOutputTag().getTypeInfo().createSerializer(streamGraph.getExecutionConfig())
);
}
}
for (StreamEdge edge : nonChainableOutputs) {
if (edge.getOutputTag() != null) {
config.setTypeSerializerSideOut(
edge.getOutputTag(),
edge.getOutputTag().getTypeInfo().createSerializer(streamGraph.getExecutionConfig())
);
}
}
config.setStreamOperator(vertex.getOperator());
config.setOutputSelectors(vertex.getOutputSelectors());
config.setNumberOfOutputs(nonChainableOutputs.size());
config.setNonChainedOutputs(nonChainableOutputs);
config.setChainedOutputs(chainableOutputs);
config.setTimeCharacteristic(streamGraph.getEnvironment().getStreamTimeCharacteristic());
final CheckpointConfig ceckpointCfg = streamGraph.getCheckpointConfig();
config.setStateBackend(streamGraph.getStateBackend());
config.setCheckpointingEnabled(ceckpointCfg.isCheckpointingEnabled());
if (ceckpointCfg.isCheckpointingEnabled()) {
config.setCheckpointMode(ceckpointCfg.getCheckpointingMode());
}
else {
// the "at-least-once" input handler is slightly cheaper (in the absence of checkpoints),
// so we use that one if checkpointing is not enabled
config.setCheckpointMode(CheckpointingMode.AT_LEAST_ONCE);
}
config.setStatePartitioner(0, vertex.getStatePartitioner1());
config.setStatePartitioner(1, vertex.getStatePartitioner2());
config.setStateKeySerializer(vertex.getStateKeySerializer());
Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
if (vertexClass.equals(StreamIterationHead.class)
|| vertexClass.equals(StreamIterationTail.class)) {
config.setIterationId(streamGraph.getBrokerID(vertexID));
config.setIterationWaitTime(streamGraph.getLoopTimeout(vertexID));
}
List<StreamEdge> allOutputs = new ArrayList<StreamEdge>(chainableOutputs);
allOutputs.addAll(nonChainableOutputs);
vertexConfigs.put(vertexID, config);
}
private void connect(Integer headOfChain, StreamEdge edge) {
physicalEdgesInOrder.add(edge);
Integer downStreamvertexID = edge.getTargetId();
JobVertex headVertex = jobVertices.get(headOfChain);
JobVertex downStreamVertex = jobVertices.get(downStreamvertexID);
StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());
downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);
// 根据partitioner定义上下游的连接规则,其实是生成相应的JobEdge
StreamPartitioner<?> partitioner = edge.getPartitioner();
JobEdge jobEdge;
if (partitioner instanceof ForwardPartitioner) {
// 这里涉及到了IntermediateDataSet的生成,具体代码见JobVertex内部
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
DistributionPattern.POINTWISE,
ResultPartitionType.PIPELINED_BOUNDED);
} else if (partitioner instanceof RescalePartitioner){
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
DistributionPattern.POINTWISE,
ResultPartitionType.PIPELINED_BOUNDED);
} else {
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
DistributionPattern.ALL_TO_ALL,
ResultPartitionType.PIPELINED_BOUNDED);
}
// set strategy name so that web interface can show it.
jobEdge.setShipStrategyName(partitioner.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(),
headOfChain, downStreamvertexID);
}
}
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = edge.getSourceVertex();
StreamNode downStreamVertex = edge.getTargetVertex();
StreamOperator<?> headOperator = upStreamVertex.getOperator();
StreamOperator<?> outOperator = downStreamVertex.getOperator();
return downStreamVertex.getInEdges().size() == 1
&& outOperator != null
&& headOperator != null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
}
private void setSlotSharing() {
Map<String, SlotSharingGroup> slotSharingGroups = new HashMap<>();
for (Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
String slotSharingGroup = streamGraph.getStreamNode(entry.getKey()).getSlotSharingGroup();
SlotSharingGroup group = slotSharingGroups.get(slotSharingGroup);
if (group == null) {
group = new SlotSharingGroup();
slotSharingGroups.put(slotSharingGroup, group);
}
entry.getValue().setSlotSharingGroup(group);
}
for (Tuple2<StreamNode, StreamNode> pair : streamGraph.getIterationSourceSinkPairs()) {
CoLocationGroup ccg = new CoLocationGroup();
JobVertex source = jobVertices.get(pair.f0.getId());
JobVertex sink = jobVertices.get(pair.f1.getId());
ccg.addVertex(source);
ccg.addVertex(sink);
source.updateCoLocationGroup(ccg);
sink.updateCoLocationGroup(ccg);
}
}
private void configureCheckpointing() {
CheckpointConfig cfg = streamGraph.getCheckpointConfig();
long interval = cfg.getCheckpointInterval();
if (interval > 0) {
// check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy
if (streamGraph.getExecutionConfig().getRestartStrategy() == null) {
// if the user enabled checkpointing, the default number of exec retries is infinite.
streamGraph.getExecutionConfig().setRestartStrategy(
RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));
}
} else {
// interval of max value means disable periodic checkpoint
interval = Long.MAX_VALUE;
}
// --- configure the participating vertices ---
// collect the vertices that receive "trigger checkpoint" messages.
// currently, these are all the sources
// source节点接收trigger checkpoint的信号
List<JobVertexID> triggerVertices = new ArrayList<>();
// collect the vertices that need to acknowledge the checkpoint
// currently, these are all vertices
// 所有的节点都需要ack ckpt信息
List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size());
// collect the vertices that receive "commit checkpoint" messages
// currently, these are all vertices
List<JobVertexID> commitVertices = new ArrayList<>(jobVertices.size());
for (JobVertex vertex : jobVertices.values()) {
if (vertex.isInputVertex()) {
triggerVertices.add(vertex.getID());
}
commitVertices.add(vertex.getID());
ackVertices.add(vertex.getID());
}
// --- configure options ---
ExternalizedCheckpointSettings externalizedCheckpointSettings;
if (cfg.isExternalizedCheckpointsEnabled()) {
CheckpointConfig.ExternalizedCheckpointCleanup cleanup = cfg.getExternalizedCheckpointCleanup();
// Sanity check
if (cleanup == null) {
throw new IllegalStateException("Externalized checkpoints enabled, but no cleanup mode configured.");
}
externalizedCheckpointSettings = ExternalizedCheckpointSettings.externalizeCheckpoints(cleanup.deleteOnCancellation());
} else {
externalizedCheckpointSettings = ExternalizedCheckpointSettings.none();
}
CheckpointingMode mode = cfg.getCheckpointingMode();
boolean isExactlyOnce;
if (mode == CheckpointingMode.EXACTLY_ONCE) {
isExactlyOnce = true;
} else if (mode == CheckpointingMode.AT_LEAST_ONCE) {
isExactlyOnce = false;
} else {
throw new IllegalStateException("Unexpected checkpointing mode. " +
"Did not expect there to be another checkpointing mode besides " +
"exactly-once or at-least-once.");
}
// --- configure the master-side checkpoint hooks ---
final ArrayList<MasterTriggerRestoreHook.Factory> hooks = new ArrayList<>();
for (StreamNode node : streamGraph.getStreamNodes()) {
StreamOperator<?> op = node.getOperator();
if (op instanceof AbstractUdfStreamOperator) {
Function f = ((AbstractUdfStreamOperator<?, ?>) op).getUserFunction();
// 对于实现了这个接口的userFunction,记录他的hooks,这个hook在做checkpoint的时候在master节点触发操作
if (f instanceof WithMasterCheckpointHook) {
hooks.add(new FunctionMasterCheckpointHookFactory((WithMasterCheckpointHook<?>) f));
}
}
}
// because the hooks can have user-defined code, they need to be stored as
// eagerly serialized values
final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks;
if (hooks.isEmpty()) {
serializedHooks = null;
} else {
try {
MasterTriggerRestoreHook.Factory[] asArray =
hooks.toArray(new MasterTriggerRestoreHook.Factory[hooks.size()]);
serializedHooks = new SerializedValue<>(asArray);
}
catch (IOException e) {
throw new FlinkRuntimeException("Trigger/restore hook is not serializable", e);
}
}
// because the state backend can have user-defined code, it needs to be stored as
// eagerly serialized value
final SerializedValue<StateBackend> serializedStateBackend;
if (streamGraph.getStateBackend() == null) {
serializedStateBackend = null;
} else {
try {
serializedStateBackend =
new SerializedValue<StateBackend>(streamGraph.getStateBackend());
}
catch (IOException e) {
throw new FlinkRuntimeException("State backend is not serializable", e);
}
}
// --- done, put it all together ---
JobCheckpointingSettings settings = new JobCheckpointingSettings(
triggerVertices, ackVertices, commitVertices, interval,
cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),
cfg.getMaxConcurrentCheckpoints(),
externalizedCheckpointSettings,
serializedStateBackend,
serializedHooks,
isExactlyOnce);
jobGraph.setSnapshotSettings(settings);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment