Elasticsearch 源码 - Discovery and Cluster Part1

本文的目标:

  • Elasticsearch 集群节点如何互相发现
  • Elasticsearch 如何选择Leader Master

源码:8.13

  • ES代码量过大,仅关注核心部分,串起流程
  • 借鉴ES的PR信息

如何加速源码阅读?github的PR

A new cluster coordination layer

非常值得学习的PR 范例,👍开源社区

Discovery

发现(Discovery)是集群形成模块找到其他节点以组成集群的过程。当启动一个Elasticsearch节点或当节点认为主节点失效时,该过程会运行,直到找到主节点或选出新的主节点为止

该过程从一个或多个种子主机提供程序的种子地址列表开始,结合上次已知集群中的任何具备主节点资格的节点的地址。整个过程分为两个阶段:首先,每个节点通过连接到每个地址来探测种子地址,尝试识别所连接的节点并验证其是否具备主节点资格。其次,如果验证成功,它会与远程节点共享所有已知的具备主节点资格的节点列表,远程节点也会反过来回应其已知的节点。然后,节点探测所有新发现的节点,请求其已知的节点,依此类推。

如果节点不具备主节点资格,那么它会继续这个发现过程,直到发现已选出的主节点。如果没有发现已选出的主节点,节点会在discovery.find_peers_interval时间后重试,该时间默认值为1秒。

如果节点具备主节点资格,那么它会继续这个发现过程,直到发现已选出的主节点,或者发现足够多的无主具备主节点资格的节点以完成选举。如果这些情况都没有迅速发生,节点会在discovery.find_peers_interval时间后重试,该时间默认值为1秒。

一旦选出主节点,通常它会保持作为选出的主节点,直到被有意停止。如果故障检测确定集群存在故障,它也可能停止作为主节点。当节点不再是选出的主节点时,它会重新开始发现过程。

Node间的连接 - Gossip-Like

  • 节点启动

    • Node.start() =>
    • Coordinator.start() =>
    • coordinator.startInitialJoin() =>
    • becomeCandidate() =>
    • peerFinder.activate
  • 启动 Election

    • StatefulPreVoteCollector.handlePreVoteResponse()=>
    • Coordinator.startElection() =>
    • Coordinate.broadcastStartJoinRequest() =>
    • joinHelper.sendStartJoinRequest
  • Peer 连接 PeerFinder.startProbe()

    • 通过PeerFinder::startProbe探测进行连接,sending PeersRequest,对端进行处理
  • 连接处理函数: HandshakingTransportAddressConnector:: connectToRemoteMasterNode

Discovery过程

根据SeedProvider获取配置的discovery.seed_hosts 列表

互相进行探测连接 seed_hosts

同步启动Coordinator 进行leader election

Cluster Coordination

Voting Configuration

每个Elasticsearch集群都有一个投票配置,该配置是指在进行决策(例如选举新主节点或提交新的集群状态)时计入响应的主节点候选节点集合。只有在投票配置中的大多数节点(超过一半)做出响应后,决策才会生效。

通常,投票配置与当前集群中所有主节点候选节点的集合相同。然而,在某些情况下,它们可能会有所不同

  • 当前的投票配置不一定与集群中所有可用的主节点候选节点集合相同。更改投票配置需要进行投票,因此在节点加入或离开集群时,调整配置需要一些时间。此外,在某些情况下,最具弹性的配置可能包含不可用的节点,或者不包含一些可用的节点。在这些情况下,投票配置与集群中可用的主节点候选节点集合不同

在节点加入或离开集群后,Elasticsearch会自动对投票配置进行相应的调整,以确保集群具有尽可能高的弹性。在移除更多节点之前,务必要等待这些调整完成

public class CoordinationMetadata implements Writeable, ToXContentFragment {
    ......
    private final long term;

    private final VotingConfiguration lastCommittedConfiguration;

    private final VotingConfiguration lastAcceptedConfiguration;

    private final Set<VotingConfigExclusion> votingConfigExclusions;
    /**
     * A collection of persistent node ids, denoting the voting configuration for cluster state changes.
     */
    public static class VotingConfiguration implements Writeable, ToXContentFragment {

    public static final VotingConfiguration EMPTY_CONFIG = new VotingConfiguration(Collections.emptySet());
    public static final VotingConfiguration MUST_JOIN_ELECTED_MASTER = new VotingConfiguration(
                Collections.singleton("_must_join_elected_master_")
            );

    private final Set<String> nodeIds;
    }
    ......
}

Quorum-based Decision making

选举主节点和更改集群状态是具备主节点资格的节点必须协同执行的两项基本任务。确保这些活动能够在某些节点故障的情况下仍能可靠地运行非常重要。Elasticsearch 通过在收到法定人数的响应后将每个操作视为成功来实现这种可靠性。法定人数是集群中部分具备主节点资格的节点的子集。仅要求部分节点响应的优势在于,即使一些节点出现故障,也不会阻止集群继续进展。法定人数的选择经过仔细考虑,以防止集群出现“脑裂”情况,即集群分裂为两个部分,每个部分可能会做出与另一部分不一致的决策。

Elasticsearch 允许您向运行中的集群添加和移除具备主节点资格的节点。在许多情况下,您可以通过根据需要启动或停止节点来实现这一点。有关更多信息,请参见“在您的集群中添加和移除节点”。

随着节点的添加或移除,Elasticsearch 通过更新集群的投票配置来保持最佳的容错级别。投票配置是指在做出选举新主节点或提交新集群状态等决策时计算响应的具备主节点资格的节点集。只有在投票配置中超过一半的节点响应后,才能做出决策。通常情况下,投票配置与当前集群中所有具备主节点资格的节点集合相同。然而,在某些情况下,它们可能会有所不同

  • ElectionStrategy -- Plugin 化, 可以进行自定义扩展
//ElectionStrategy.java
public boolean isElectionQuorum(
        DiscoveryNode localNode,
        long localCurrentTerm,
        long localAcceptedTerm,
        long localAcceptedVersion,
        VotingConfiguration lastCommittedConfiguration,
        VotingConfiguration lastAcceptedConfiguration,
        VoteCollection joinVotes
    ) {
        return joinVotes.isQuorum(lastCommittedConfiguration)
            && joinVotes.isQuorum(lastAcceptedConfiguration)
            && satisfiesAdditionalQuorumConstraints(
                localNode,
                localCurrentTerm,
                localAcceptedTerm,
                localAcceptedVersion,
                lastCommittedConfiguration,
                lastAcceptedConfiguration,
                joinVotes
            );
    }
//CoordinationState.java
public boolean isQuorum(VotingConfiguration configuration) {
            return configuration.hasQuorum(nodes.keySet());
 }

//CoordinationMetadata.java
public boolean hasQuorum(Collection<String> votes) {
            int votedNodesCount = 0;
            for (String nodeId : nodeIds) {
                if (votes.contains(nodeId)) {
                    votedNodesCount++;
                }
            }
            return votedNodesCount * 2 > nodeIds.size();
        }
//VotingOnlyNodeElectionStrategy.java
 public boolean satisfiesAdditionalQuorumConstraints(
            DiscoveryNode localNode,
            long localCurrentTerm,
            long localAcceptedTerm,
            long localAcceptedVersion,
            VotingConfiguration lastCommittedConfiguration,
            VotingConfiguration lastAcceptedConfiguration,
            VoteCollection joinVotes
        ) {
            // if local node is voting only, have additional checks on election quorum definition
            if (isVotingOnlyNode(localNode)) {
                // if all votes are from voting only nodes, do not elect as master (no need to transfer state)
                if (joinVotes.nodes().stream().filter(DiscoveryNode::isMasterNode).allMatch(VotingOnlyNodePlugin::isVotingOnlyNode)) {
                    return false;
                }
                // if there's a vote from a full master node with same state (i.e. last accepted term and version match), then that node
                // should become master instead, so we should stand down. There are two exceptional cases, however:
                // 1) if we are in term 0. In that case, we allow electing the voting-only node to avoid poisonous situations where only
                // voting-only nodes are bootstrapped.
                // 2) if there is another full master node with an older state. In that case, we ensure that
                // satisfiesAdditionalQuorumConstraints cannot go from true to false when adding new joinVotes in the same election.
                // As voting-only nodes only broadcast the state to the full master nodes, eventually all of them will have caught up
                // and there should not be any remaining full master nodes with older state, effectively disabling election of
                // voting-only nodes.
                if (joinVotes.getJoins().stream().anyMatch(fullMasterWithSameState(localAcceptedTerm, localAcceptedVersion))
                    && localAcceptedTerm > 0
                    && joinVotes.getJoins().stream().noneMatch(fullMasterWithOlderState(localAcceptedTerm, localAcceptedVersion))) {
                    return false;
                }
            }
            return true;
        }
......
// check vote_only节点
public static boolean isVotingOnlyNode(DiscoveryNode discoveryNode) {
        return discoveryNode.getRoles().contains(DiscoveryNodeRole.VOTING_ONLY_NODE_ROLE);
    }
// check FullMaster
public static boolean isFullMasterNode(DiscoveryNode discoveryNode) {
        return discoveryNode.isMasterNode() && discoveryNode.getRoles().contains(DiscoveryNodeRole.VOTING_ONLY_NODE_ROLE) == false;
    }
  • joinVotes.isQuorum

    • VotingConfiguration lastCommittedConfiguration 集群最后一次提交的主节点选举配置

    • VotingConfiguration lastAcceptedConfiguration 集群中最后一次被接受但尚未提交的配置

    • 以上是处理跟踪集群节点的变化

      • 假设集群中有三个主节点候选节点 A、B 和 C。初始状态下,lastCommittedConfiguration 和 lastAcceptedConfiguration 都是 [A, B, C]。

        • 节点变动:假设节点 D 加入集群,新的配置 [A, B, C, D] 首先被作为 lastAcceptedConfiguration。
        • 一致性达成:当该配置被多数节点认可并达成一致时,lastAcceptedConfiguration 被提交为 lastCommittedConfiguration。
        • 新配置生效:此时,lastCommittedConfiguration 变为 [A, B, C, D],新的主节点选举将基于这个配置进行。
  • satisfiesAdditionalQuorumConstraints,ES默认提供的策略VotingOnlyNodeElectionStrategy.java

    • 用于在 Elasticsearch 集群中确定某个节点是否满足额外的选举仲裁约束条件

    • 根据特定规则决定是否允许这些节点在主节点选举中发挥作用

      • 检查是否为仅投票节点

        • 如果所有投票都来自仅投票节点,则不要选举为主节点

        • 否则,检查完整主节点的投票

          • 如果有一个投票来自具有相同状态(localAcceptedTerm 和 localAcceptedVersion 匹配)的FullMaster节点(MASTER_ROLE且非VOTE_ONLY),并且当前任期大于 0,返回 false。这意味着,如果存在更合适的完整主节点(与当前节点状态相同),当前仅投票节点应该让位

          • 但是,如果有一个FullMaster的状态较旧,则忽略上述条件。这确保在同一选举过程中,不会因为新的投票加入而导致选举资格变化

            • 新旧比较

              • lastAcceptedTerm 投票论次,每次发起+1
              • lastAcceptedVersion(集群state 版本号,node加入和移除变化)
      • 非vote-only节点,则返回 true,表示当前节点满足选举的附加仲裁约束,可以继续参与选举

预选阶段 PreVotingRound

  • 直接进行选举可能导致多个节点同时发起选举请求,造成冲突。预投票机制引入了一个预选步骤,确保只有得到足够支持的节点才会正式发起选举,从而减少选举冲突的概率
  • 像 Elasticsearch 这样的分布式搜索和分析引擎,确保主节点选举的稳定性和可靠性至关重要
private class PreVotingRound implements Releasable {
    private final Map<DiscoveryNode, PreVoteResponse> preVotesReceived = newConcurrentMap();
    private final AtomicBoolean electionStarted = new AtomicBoolean();
    private final PreVoteRequest preVoteRequest;
    private final ClusterState clusterState;
    private final AtomicBoolean isClosed = new AtomicBoolean();
    ......
}
  • preVotesReceived:一个线程安全的 Map,存储从各个节点收到的预投票响应
  • preVoteRequest:预投票请求对象
private PreVoteResponse handlePreVoteRequest(final PreVoteRequest request) {
        updateMaxTermSeen.accept(request.getCurrentTerm());

        Tuple<DiscoveryNode, PreVoteResponse> state = this.state;
        assert state != null : "received pre-vote request before fully initialised";

        final DiscoveryNode leader = state.v1();
        final PreVoteResponse response = state.v2();

        final StatusInfo statusInfo = nodeHealthService.getHealth();
        if (statusInfo.getStatus() == UNHEALTHY) {
            String message = "rejecting " + request + " on unhealthy node: [" + statusInfo.getInfo() + "]";
            logger.debug(message);
            throw new NodeHealthCheckFailureException(message);
        }

        if (leader == null) {
            return response;
        }

        if (leader.equals(request.getSourceNode())) {
            // This is a _rare_ case where our leader has detected a failure and stepped down, but we are still a follower. It's possible
            // that the leader lost its quorum, but while we're still a follower we will not offer joins to any other node so there is no
            // major drawback in offering a join to our old leader. The advantage of this is that it makes it slightly more likely that the
            // leader won't change, and also that its re-election will happen more quickly than if it had to wait for a quorum of followers
            // to also detect its failure.
            return response;
        }

        throw new CoordinationStateRejectedException("rejecting " + request + " as there is already a leader");
    }
private void handlePreVoteResponse(final PreVoteResponse response, final DiscoveryNode sender) {
            if (isClosed.get()) {
                logger.debug("{} is closed, ignoring {} from {}", this, response, sender);
                return;
            }

            updateMaxTermSeen.accept(response.getCurrentTerm());

            if (response.getLastAcceptedTerm() > clusterState.term()
                || (response.getLastAcceptedTerm() == clusterState.term() && response.getLastAcceptedVersion() > clusterState.version())) {
                logger.debug("{} ignoring {} from {} as it is fresher", this, response, sender);
                return;
            }

            preVotesReceived.put(sender, response);

            // create a fake VoteCollection based on the pre-votes and check if there is an election quorum
            final VoteCollection voteCollection = new VoteCollection();
            final DiscoveryNode localNode = clusterState.nodes().getLocalNode();
            final PreVoteResponse localPreVoteResponse = getPreVoteResponse();

            preVotesReceived.forEach(
                (node, preVoteResponse) -> voteCollection.addJoinVote(
                    new Join(
                        node,
                        localNode,
                        preVoteResponse.getCurrentTerm(),
                        preVoteResponse.getLastAcceptedTerm(),
                        preVoteResponse.getLastAcceptedVersion()
                    )
                )
            );

            if (electionStrategy.isElectionQuorum(
                clusterState.nodes().getLocalNode(),
                localPreVoteResponse.getCurrentTerm(),
                localPreVoteResponse.getLastAcceptedTerm(),
                localPreVoteResponse.getLastAcceptedVersion(),
                clusterState.getLastCommittedConfiguration(),
                clusterState.getLastAcceptedConfiguration(),
                voteCollection
            ) == false) {
                logger.debug("{} added {} from {}, no quorum yet", this, response, sender);
                return;
            }

            if (electionStarted.compareAndSet(false, true) == false) {
                logger.debug("{} added {} from {} but election has already started", this, response, sender);
                return;
            }

            logger.debug("{} added {} from {}, starting election", this, response, sender);
            startElection.run();//Election切入点
        }
  • 将收集到的投票信息放入至preVotesReceived,再转化为voteCollection

  • electionStrategy.isElectionQuorum 判断是否满足Quorum

  • ClusterBootstrap后,通过scheduler 周期性执行,直到有结果产生scheduleNextElection

    • delayMillis 是每次调度的延迟时间,加入了random.nextLong() 保证各节点发起投票的随机性
    • localNodeMayWinElection() => false node不进行选举,减少选举冲突
//Coordinator.java
private void startElectionScheduler() {
        assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
        assert electionScheduler == null : electionScheduler;

        if (getLocalNode().isMasterNode() == false) {
            return;
        }

        final TimeValue gracePeriod = TimeValue.ZERO;
        electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() {
            @Override
            public void run() {
                synchronized (mutex) {
                    if (mode == Mode.CANDIDATE) {
                        final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
                        final var nodeEligibility = localNodeMayWinElection(lastAcceptedState, electionStrategy);
                        if (nodeEligibility.mayWin() == false) {
                            assert nodeEligibility.reason().isEmpty() == false;
                            logger.info(
                                "skip prevoting as local node may not win election ({}): {}",
                                nodeEligibility.reason(),
                                lastAcceptedState.coordinationMetadata()
                            );
                            return;
                        }

                        final StatusInfo statusInfo = nodeHealthService.getHealth();
                        if (statusInfo.getStatus() == UNHEALTHY) {
                            logger.debug("skip prevoting as local node is unhealthy: [{}]", statusInfo.getInfo());
                            return;
                        }

                        if (prevotingRound != null) {
                            prevotingRound.close();
                        }
                        prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes());
                    }
                }
            }

            @Override
            public String toString() {
                return "scheduling of new prevoting round";
            }
        });
    }

- ElectionScheduler 定时running => scheduleNextElection

//ElectionSchedulerFactory.java
void scheduleNextElection(final TimeValue gracePeriod, final Runnable scheduledRunnable) {
            if (isClosed.get()) {
                logger.debug("{} not scheduling election", this);
                return;
            }

            final long thisAttempt = attempt.getAndIncrement();
            // to overflow here would take over a million years of failed election attempts, so we won't worry about that:
            final long maxDelayMillis = Math.min(maxTimeout.millis(), initialTimeout.millis() + thisAttempt * backoffTime.millis());
            final long delayMillis = toPositiveLongAtMost(random.nextLong(), maxDelayMillis) + gracePeriod.millis();
            final Runnable runnable = new AbstractRunnable() {
                ......
                @Override
                protected void doRun() {
                    if (isClosed.get()) {
                        logger.debug("{} not starting election", this);
                    } else {
                        logger.debug("{} starting election", this);
                        if (thisAttempt > 0 && thisAttempt % 10 == 0) {
                            logger.info("""
                                retrying master election after [{}] failed attempts; \
                                election attempts are currently scheduled up to [{}ms] apart""", thisAttempt, maxDelayMillis);
                        }
                        scheduleNextElection(duration, scheduledRunnable);
                        scheduledRunnable.run();
                    }
                }
    ......

正式阶段 Master elections

Elasticsearch 使用选举过程来选出一个主节点,无论是在启动时还是在现有的主节点故障时。任何具备主节点资格的节点都可以发起选举,通常首次选举都会成功。选举失败通常发生在两个节点几乎同时发起选举的情况下,为了减少这种情况的发生,每个节点的选举调度都是随机的。节点会在选举失败后重试,并逐渐延长重试间隔时间,因此最终选举成功的概率会非常高(接近 100%)

  • 👆startElection.run()=>Coordinator.startElection()👇

  • 预选阶段结束后,进入正式投票

    • 同预选阶段localNodeMayWinElection()=>false 不进行选举,减少冲突
    • broadcastStartJoinRequest
    • StartJoinRequest是对JoinRequest的封装
//Coordinator.java
private void startElection() {
        synchronized (mutex) {
            // The preVoteCollector is only active while we are candidate, but it does not call this method with synchronisation, so we have
            // to check our mode again here.
            if (mode == Mode.CANDIDATE) {
                final var nodeEligibility = localNodeMayWinElection(getLastAcceptedState(), electionStrategy);
                if (nodeEligibility.mayWin() == false) {
                    assert nodeEligibility.reason().isEmpty() == false;
                    logger.trace(
                        "skip election as local node may not win it ({}): {}",
                        nodeEligibility.reason(),
                        getLastAcceptedState().coordinationMetadata()
                    );
                    return;
                }

                final var electionTerm = getTermForNewElection();
                logger.debug("starting election for {} in term {}", getLocalNode(), electionTerm);
                broadcastStartJoinRequest(getLocalNode(), electionTerm, getDiscoveredNodes());
            }
        }
    }
 /**
     * Broadcasts a request to all 'discoveredNodes' in the cluster to elect 'candidateMasterNode' as the new master.
     *
     * @param candidateMasterNode the node running for election
     * @param term the new proposed master term
     * @param discoveredNodes all the nodes to which to send the request
     */
    private void broadcastStartJoinRequest(DiscoveryNode candidateMasterNode, long term, List<DiscoveryNode> discoveredNodes) {
        electionStrategy.onNewElection(candidateMasterNode, term, new ActionListener<>() {
            @Override
            public void onResponse(StartJoinRequest startJoinRequest) {
                discoveredNodes.forEach(node -> joinHelper.sendStartJoinRequest(startJoinRequest, node));
            }

            @Override
            public void onFailure(Exception e) {
                logger.log(
                    e instanceof CoordinationStateRejectedException ? Level.DEBUG : Level.WARN,
                    Strings.format("election attempt for [%s] in term [%d] failed", candidateMasterNode, term),
                    e
                );
            }
        });
    }
  • JoinHelper::sendStartJoinRequest => 最终发出sendJoinRequest👇

  • Coordinator::processJoinRequest => handleJoin()处理Join Request👇

  • CoordinationState::handleJoin 最终处理Join Request

  • Quorum 决策CoordinationState::isElectionQuorum()👇,详细见👆Quorum-based Decision making

  • 如果Node节点state, prevElectionWon == false && coordState.electionWon

    • becomeLeader()

      • master-eligible nodes节点mode 默认是candidate
      • 成为leader后 candidate => leader
  • Follower是何时设置的?详细见下篇,这里不展开

    • Publish.start => sendPublishRequest
    • Coordinator.handlePublishRequest => becomeFollower
public void sendJoinRequest(DiscoveryNode destination, long term, Optional<Join> optionalJoin) {
        assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
        final StatusInfo statusInfo = nodeHealthService.getHealth();
        if (statusInfo.getStatus() == UNHEALTHY) {
            logger.debug("dropping join request to [{}], unhealthy status: [{}]", destination, statusInfo.getInfo());
            return;
        }
        ......
        final var pendingJoinInfo = new PendingJoinInfo(transportService.getThreadPool().relativeTimeInMillis());
        if (pendingOutgoingJoins.putIfAbsent(dedupKey, pendingJoinInfo) == null) {

            // If this node is under excessive heap pressure then the state it receives for join validation will trip a circuit breaker and
            // fail the join attempt, resulting in retrying in a loop which makes the master just send a constant stream of cluster states
            // to this node. We try and keep the problem local to this node by checking that we can at least allocate one byte:
            final var breaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
            try {
                breaker.addEstimateBytesAndMaybeBreak(1L, "pre-flight join request");
            } catch (Exception e) {
                pendingJoinInfo.message = PENDING_JOIN_FAILED;
                pendingOutgoingJoins.remove(dedupKey);
                if (e instanceof ElasticsearchException elasticsearchException) {
                    final var attempt = new FailedJoinAttempt(destination, joinRequest, elasticsearchException);
                    attempt.logNow();
                    lastFailedJoinAttempt.set(attempt);
                    assert elasticsearchException instanceof CircuitBreakingException : e; // others shouldn't happen, handle them anyway
                } else {
                    logger.error("join failed during pre-flight circuit breaker check", e);
                    assert false : e; // shouldn't happen, handle it anyway
                }
                return;
            }
            breaker.addWithoutBreaking(-1L);

            logger.debug("attempting to join {} with {}", destination, joinRequest);
            pendingJoinInfo.message = PENDING_JOIN_CONNECTING;
            // Typically we're already connected to the destination at this point, the PeerFinder holds a reference to this connection to
            // keep it open, but we need to acquire our own reference to keep the connection alive through the joining process.
            transportService.connectToNode(destination, new ActionListener<>() {
                @Override
                public void onResponse(Releasable connectionReference) {
                    logger.trace("acquired connection for joining join {} with {}", destination, joinRequest);

                    // Register the connection in joinConnections so it can be released once we successfully apply the cluster state, at
                    // which point the NodeConnectionsService will have taken ownership of it.
                    registerConnection(destination, connectionReference);

                    // It's possible that our cluster applier is still applying an earlier cluster state (maybe stuck waiting on IO), in
                    // which case the master will accept our join and add us to the cluster but we won't be able to apply the joining state
                    // fast enough and will be kicked out of the cluster for lagging, which can happen repeatedly and be a little
                    // disruptive. To avoid this we send the join from the applier thread which ensures that it's not busy doing something
                    // else.
                    pendingJoinInfo.message = PENDING_JOIN_WAITING_APPLIER;
                    clusterApplier.onNewClusterState(
                        "joining " + destination.descriptionWithoutAttributes(),
                        () -> null,
                        new ActionListener<>() {
                            @Override
                            public void onResponse(Void unused) {
                                assert ThreadPool.assertCurrentThreadPool(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME);
                                pendingJoinInfo.message = PENDING_JOIN_WAITING_RESPONSE;
                                transportService.sendRequest(
                                    destination,
                                    JOIN_ACTION_NAME,
                                    joinRequest,
                                    TransportRequestOptions.of(null, TransportRequestOptions.Type.PING),
                                    new TransportResponseHandler.Empty() {
                                        @Override
                                        public Executor executor() {
                                            return TransportResponseHandler.TRANSPORT_WORKER;
                                        }

                                        @Override
                                        public void handleResponse() {
                                            pendingJoinInfo.message = PENDING_JOIN_WAITING_STATE; // only logged if state delayed
                                            pendingOutgoingJoins.remove(dedupKey);
                                            logger.debug("successfully joined {} with {}", destination, joinRequest);
                                            lastFailedJoinAttempt.set(null);
                                        }

                                        @Override
                                        public void handleException(TransportException exp) {
                                            cleanUpOnFailure(exp);
                                        }
                                    }
                                );
                            }

                            @Override
                            public void onFailure(Exception e) {
                                assert false : e; // no-op cluster state update cannot fail
                                cleanUpOnFailure(new TransportException(e));
                            }

                            private void cleanUpOnFailure(TransportException exp) {
                                pendingJoinInfo.message = PENDING_JOIN_FAILED;
                                pendingOutgoingJoins.remove(dedupKey);
                                final var attempt = new FailedJoinAttempt(destination, joinRequest, exp);
                                attempt.logNow();
                                lastFailedJoinAttempt.set(attempt);
                                unregisterAndReleaseConnection(destination, connectionReference);
                            }
                        }
                    );
                }

                @Override
                public void onFailure(Exception e) {
                    pendingJoinInfo.message = PENDING_JOIN_CONNECT_FAILED;
                    pendingOutgoingJoins.remove(dedupKey);
                    final var attempt = new FailedJoinAttempt(
                        destination,
                        joinRequest,
                        new ConnectTransportException(destination, "failed to acquire connection", e)
                    );
                    attempt.logNow();
                    lastFailedJoinAttempt.set(attempt);
                }
            });

        } 
        ......
    }
//Coordinator.java
/**
     * Processes the request to join the cluster. Received by the node running for election to master.
     */
    private void processJoinRequest(JoinRequest joinRequest, ActionListener<Void> joinListener) {
        assert Transports.assertNotTransportThread("blocking on coordinator mutex and maybe doing IO to increase term");
        final Optional<Join> optionalJoin = joinRequest.getOptionalJoin();
        try {
            synchronized (mutex) {
                updateMaxTermSeen(joinRequest.getTerm());

                final CoordinationState coordState = coordinationState.get();
                final boolean prevElectionWon = coordState.electionWon()
                    && optionalJoin.stream().allMatch(j -> j.term() <= getCurrentTerm());

                optionalJoin.ifPresent(this::handleJoin);
                joinAccumulator.handleJoinRequest(
                    joinRequest.getSourceNode(),
                    joinRequest.getCompatibilityVersions(),
                    joinRequest.getFeatures(),
                    joinListener
                );

                if (prevElectionWon == false && coordState.electionWon()) {
                    becomeLeader();
                }
            }
        } catch (Exception e) {
            joinListener.onFailure(e);
        }
    }
private void handleJoin(Join join) {
        synchronized (mutex) {
            ensureTermAtLeast(getLocalNode(), join.term()).ifPresent(this::handleJoin);

            if (coordinationState.get().electionWon()) {
                // If we have already won the election then the actual join does not matter for election purposes, so swallow any exception
                final boolean isNewJoinFromMasterEligibleNode = handleJoinIgnoringExceptions(join);

                // If we haven't completely finished becoming master then there's already a publication scheduled which will, in turn,
                // schedule a reconfiguration if needed. It's benign to schedule a reconfiguration anyway, but it might fail if it wins the
                // race against the election-winning publication and log a big error message, which we can prevent by checking this here:
                final boolean establishedAsMaster = mode == Mode.LEADER && getLastAcceptedState().term() == getCurrentTerm();
                if (isNewJoinFromMasterEligibleNode && establishedAsMaster && publicationInProgress() == false) {
                    scheduleReconfigurationIfNeeded();
                }
            } else {
                coordinationState.get().handleJoin(join); // this might fail and bubble up the exception
            }
        }
    }
//CoordinationState.java
public boolean handleJoin(Join join) {
        assert join.masterCandidateMatches(localNode) : "handling join " + join + " for the wrong node " + localNode;

        if (join.term() != getCurrentTerm()) {
            logger.debug("handleJoin: ignored join due to term mismatch (expected: [{}], actual: [{}])", getCurrentTerm(), join.term());
            throw new CoordinationStateRejectedException(
                "incoming term " + join.term() + " does not match current term " + getCurrentTerm()
            );
        }

        if (startedJoinSinceLastReboot == false) {
            logger.debug("handleJoin: ignored join as term was not incremented yet after reboot");
            throw new CoordinationStateRejectedException("ignored join as term has not been incremented yet after reboot");
        }

        final long lastAcceptedTerm = getLastAcceptedTerm();
        if (join.lastAcceptedTerm() > lastAcceptedTerm) {
            logger.debug(
                "handleJoin: ignored join as joiner has a better last accepted term (expected: <=[{}], actual: [{}])",
                lastAcceptedTerm,
                join.lastAcceptedTerm()
            );
            throw new CoordinationStateRejectedException(
                "incoming last accepted term "
                    + join.lastAcceptedTerm()
                    + " of join higher than current last accepted term "
                    + lastAcceptedTerm
            );
        }

        if (join.lastAcceptedTerm() == lastAcceptedTerm && join.lastAcceptedVersion() > getLastAcceptedVersion()) {
            logger.debug(
                "handleJoin: ignored join as joiner has a better last accepted version (expected: <=[{}], actual: [{}]) in term {}",
                getLastAcceptedVersion(),
                join.lastAcceptedVersion(),
                lastAcceptedTerm
            );
            throw new CoordinationStateRejectedException(
                "incoming last accepted version "
                    + join.lastAcceptedVersion()
                    + " of join higher than current last accepted version "
                    + getLastAcceptedVersion()
                    + " in term "
                    + lastAcceptedTerm
            );
        }

        if (getLastAcceptedConfiguration().isEmpty()) {
            // We do not check for an election won on setting the initial configuration, so it would be possible to end up in a state where
            // we have enough join votes to have won the election immediately on setting the initial configuration. It'd be quite
            // complicated to restore all the appropriate invariants when setting the initial configuration (it's not just electionWon)
            // so instead we just reject join votes received prior to receiving the initial configuration.
            logger.debug("handleJoin: rejecting join since this node has not received its initial configuration yet");
            throw new CoordinationStateRejectedException("rejecting join since this node has not received its initial configuration yet");
        }

        boolean added = joinVotes.addJoinVote(join);
        boolean prevElectionWon = electionWon;
        electionWon = isElectionQuorum(joinVotes);
        assert prevElectionWon == false || electionWon : // we cannot go from won to not won
            "locaNode= " + localNode + ", join=" + join + ", joinVotes=" + joinVotes;
        logger.debug(
            "handleJoin: added join {} from [{}] for election, electionWon={} lastAcceptedTerm={} lastAcceptedVersion={}",
            join,
            join.votingNode(),
            electionWon,
            lastAcceptedTerm,
            getLastAcceptedVersion()
        );

        if (electionWon && prevElectionWon == false) {
            logger.debug("handleJoin: election won in term [{}] with {}", getCurrentTerm(), joinVotes);
            lastPublishedVersion = getLastAcceptedVersion();
        }
        return added;
    }

参考

Discovery and cluster formation

源代码 8.13

Github PR