Elasticsearch 源码 - Discovery and Cluster Part2

本文的目标:

  • Elasticsearch 集群如何启动,处理Node join/left
  • 集群各节点如何感知cluster state信息

源码:8.13

  • ES代码量过大,仅关注核心部分,串起流程
  • 借鉴ES的PR信息

如何加速源码阅读?github的PR

A new cluster coordination layer

非常值得学习的PR 范例,👍开源社区

Cluster Coordination

Bootstrapping a cluster

启动 Elasticsearch 集群,必须在集群中的一个或多个主节点上明确定义初始的 master-eligible 节点集。这被称为cluster bootstrapping。仅在集群第一次启动时需要。新启动的节点在加入正在运行的集群时,会从集群的leader主节点获取这些信息。

初始的 master-eligible 节点集在 cluster.initial_master_nodes 设置中定义

入口ClusterBootstrapService.java

前文Node start时trigger - master节点才会执行bootstrap

public void startInitialJoin() {
        synchronized (mutex) {
            becomeCandidate("startInitialJoin");
        }
        clusterBootstrapService.scheduleUnconfiguredBootstrap();
    }
......
void scheduleUnconfiguredBootstrap() {
        if (unconfiguredBootstrapTimeout == null) {
            return;
        }

        if (transportService.getLocalNode().isMasterNode() == false) {
            return;
        }

        logger.info(
            "no discovery configuration found, will perform best-effort cluster bootstrapping after [{}] "
                + "unless existing master is discovered",
            unconfiguredBootstrapTimeout
        );

        transportService.getThreadPool()
            .scheduleUnlessShuttingDown(unconfiguredBootstrapTimeout, transportService.getThreadPool().generic(), new Runnable() {
                @Override
                public void run() {
                    final Set<DiscoveryNode> discoveredNodes = getDiscoveredNodes();
                    logger.debug("performing best-effort cluster bootstrapping with {}", discoveredNodes);
                    startBootstrap(discoveredNodes, emptyList());
                }

                @Override
                public String toString() {
                    return "unconfigured-discovery delayed bootstrap";
                }
            });
    }

private void startBootstrap(Set<DiscoveryNode> discoveryNodes, List<String> unsatisfiedRequirements) {
        assert discoveryNodes.stream().allMatch(DiscoveryNode::isMasterNode) : discoveryNodes;
        assert unsatisfiedRequirements.size() < discoveryNodes.size() : discoveryNodes + " smaller than " + unsatisfiedRequirements;
        if (bootstrappingPermitted.compareAndSet(true, false)) {
            doBootstrap(
                new VotingConfiguration(
                    Stream.concat(
                        discoveryNodes.stream().map(DiscoveryNode::getId),
                        unsatisfiedRequirements.stream().map(s -> BOOTSTRAP_PLACEHOLDER_PREFIX + s)
                    ).collect(Collectors.toSet())
                )
            );
        }
    }

private void doBootstrap(VotingConfiguration votingConfiguration) {
        assert transportService.getLocalNode().isMasterNode();

        try {
            votingConfigurationConsumer.accept(votingConfiguration);
        } catch (Exception e) {
            logger.warn(() -> "exception when bootstrapping with " + votingConfiguration + ", rescheduling", e);
            transportService.getThreadPool()
                .scheduleUnlessShuttingDown(TimeValue.timeValueSeconds(10), transportService.getThreadPool().generic(), new Runnable() {
                    @Override
                    public void run() {
                        doBootstrap(votingConfiguration);
                    }

                    @Override
                    public String toString() {
                        return "retry of failed bootstrapping with " + votingConfiguration;
                    }
                });
        }
    }
  • 其中votingConfigurationConsumer执行的是👇的方法,初始化投票配置

    • 只有master节点可以进行initConfiguration
    • votingConfiguration 包含此节点
    • peerFinder.getFoundPeers() 找到所有peers
    • peers满足votingConfiguration.hasQuorum
    • 构建CoordinationMetadata信息
    • 初始化coordinationState
    • 赢得localNodeMayWinElection 的开启startElectionScheduler() [逻辑见上篇文章]
public boolean setInitialConfiguration(final VotingConfiguration votingConfiguration) {
        synchronized (mutex) {
            final ClusterState currentState = getStateForMasterService();

            if (isInitialConfigurationSet()) {
                logger.debug("initial configuration already set, ignoring {}", votingConfiguration);
                return false;
            }

            if (getLocalNode().isMasterNode() == false) {
                logger.debug("skip setting initial configuration as local node is not a master-eligible node");
                throw new CoordinationStateRejectedException(
                    "this node is not master-eligible, but cluster bootstrapping can only happen on a master-eligible node"
                );
            }

            if (votingConfiguration.getNodeIds().contains(getLocalNode().getId()) == false) {
                logger.debug("skip setting initial configuration as local node is not part of initial configuration");
                throw new CoordinationStateRejectedException("local node is not part of initial configuration");
            }

            final List<DiscoveryNode> knownNodes = new ArrayList<>();
            knownNodes.add(getLocalNode());
            peerFinder.getFoundPeers().forEach(knownNodes::add);

            if (votingConfiguration.hasQuorum(knownNodes.stream().map(DiscoveryNode::getId).toList()) == false) {
                logger.debug(
                    "skip setting initial configuration as not enough nodes discovered to form a quorum in the "
                        + "initial configuration [knownNodes={}, {}]",
                    knownNodes,
                    votingConfiguration
                );
                throw new CoordinationStateRejectedException(
                    "not enough nodes discovered to form a quorum in the initial configuration "
                        + "[knownNodes="
                        + knownNodes
                        + ", "
                        + votingConfiguration
                        + "]"
                );
            }

            logger.info("setting initial configuration to {}", votingConfiguration);
            final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(currentState.coordinationMetadata())
                .lastAcceptedConfiguration(votingConfiguration)
                .lastCommittedConfiguration(votingConfiguration)
                .build();

            Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata());
            // automatically generate a UID for the metadata if we need to
            metadataBuilder.generateClusterUuidIfNeeded();
            metadataBuilder.coordinationMetadata(coordinationMetadata);

            coordinationState.get().setInitialState(ClusterState.builder(currentState).metadata(metadataBuilder).build());
            var nodeEligibility = localNodeMayWinElection(getLastAcceptedState(), electionStrategy);
            assert nodeEligibility.mayWin()
                : "initial state does not allow local node to win election, reason: "
                    + nodeEligibility.reason()
                    + " , metadata: "
                    + getLastAcceptedState().coordinationMetadata();
            preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version
            startElectionScheduler();
            return true;
        }
    }

Leader-Side Join Handling

  • LeaderJoinAccumulator 负责处理JoinRequest
  • 放入TaskQueue,joinTaskQueue.submitTask("node-join", task, null);
//Coordinator.java
/**
     * Processes the request to join the cluster. Received by the node running for election to master.
     */
    private void processJoinRequest(JoinRequest joinRequest, ActionListener<Void> joinListener) {
        assert Transports.assertNotTransportThread("blocking on coordinator mutex and maybe doing IO to increase term");
        final Optional<Join> optionalJoin = joinRequest.getOptionalJoin();
        try {
            ......
                joinAccumulator.handleJoinRequest(
                    joinRequest.getSourceNode(),
                    joinRequest.getCompatibilityVersions(),
                    joinRequest.getFeatures(),
                    joinListener
                );

                if (prevElectionWon == false && coordState.electionWon()) {
                    becomeLeader();
                }
            }
        } catch (Exception e) {
            joinListener.onFailure(e);
        }
}

//JoinHelper.java

interface JoinAccumulator {
        void handleJoinRequest(
            DiscoveryNode sender,
            CompatibilityVersions compatibilityVersions,
            Set<String> features,
            ActionListener<Void> joinListener
        );

        default void close(Mode newMode) {}
}

class LeaderJoinAccumulator implements JoinAccumulator {
        @Override
        public void handleJoinRequest(
            DiscoveryNode sender,
            CompatibilityVersions compatibilityVersions,
            Set<String> features,
            ActionListener<Void> joinListener
        ) {
            final JoinTask task = JoinTask.singleNode(
                sender,
                compatibilityVersions,
                features,
                joinReasonService.getJoinReason(sender, Mode.LEADER),
                joinListener,
                currentTermSupplier.getAsLong()
            );
            joinTaskQueue.submitTask("node-join", task, null);
        }

        @Override
        public String toString() {
            return "LeaderJoinAccumulator";
        }
    }
  • 谁来负责处理Task呢?NodeJoinExecutor.java
  • 处理JoinTask
public ClusterState execute(BatchExecutionContext<JoinTask> batchExecutionContext) throws Exception {
        // The current state that MasterService uses might have been updated by a (different) master in a higher term already. If so, stop
        // processing the current cluster state update, there's no point in continuing to compute it as it will later be rejected by
        // Coordinator#publish anyhow.
        assert batchExecutionContext.taskContexts().isEmpty() == false : "Expected to have non empty join tasks list";

        var term = batchExecutionContext.taskContexts().stream().mapToLong(t -> t.getTask().term()).max().getAsLong();

        .......

        final boolean isBecomingMaster = joinTaskContexts.stream().anyMatch(t -> t.getTask().isBecomingMaster());
        final DiscoveryNodes currentNodes = initialState.nodes();
        boolean nodesChanged = false;
        ClusterState.Builder newState;

        if (currentNodes.getMasterNode() == null && isBecomingMaster) {
            assert initialState.term() < term : "there should be at most one become master task per election (= by term)";
            // use these joins to try and become the master.
            // Note that we don't have to do any validation of the amount of joining nodes - the commit
            // during the cluster state publishing guarantees that we have enough
            try (var ignored = batchExecutionContext.dropHeadersContext()) {
                // suppress deprecation warnings e.g. from reroute()
                newState = becomeMasterAndTrimConflictingNodes(initialState, joinTaskContexts, term);
            }
            nodesChanged = true;
        } else if (currentNodes.isLocalNodeElectedMaster()) {
            assert initialState.term() == term : "term should be stable for the same master";
            newState = ClusterState.builder(initialState);
        } else {
            .......
        }

        ......
        // processing any joins
        Map<String, String> joinedNodeIdsByNodeName = new HashMap<>();
        for (final var joinTaskContext : joinTaskContexts) {
            final var joinTask = joinTaskContext.getTask();
            final List<Runnable> onTaskSuccess = new ArrayList<>(joinTask.nodeCount());
            for (final JoinTask.NodeJoinTask nodeJoinTask : joinTask.nodeJoinTasks()) {
                ......
                onTaskSuccess.add(() -> {
                    final var reason = nodeJoinTask.reason();
                    if (reason.guidanceDocs() == null) {
                        logger.info(
                            "node-join: [{}] with reason [{}]",
                            nodeJoinTask.node().descriptionWithoutAttributes(),
                            reason.message()
                        );
                    } else {
                        logger.warn(
                            "node-join: [{}] with reason [{}]; for troubleshooting guidance, see {}",
                            nodeJoinTask.node().descriptionWithoutAttributes(),
                            reason.message(),
                            reason.guidanceDocs()
                        );
                    }
                    nodeJoinTask.listener().onResponse(null);
                });
            }
            joinTaskContext.success(() -> {
                for (Runnable joinCompleter : onTaskSuccess) {
                    joinCompleter.run();
                }
            });
        }

        if (nodesChanged) {
            rerouteService.reroute(
                "post-join reroute",
                Priority.HIGH,
                ActionListener.wrap(r -> logger.trace("post-join reroute completed"), e -> logger.debug("post-join reroute failed", e))
            );

            if (joinedNodeIdsByNodeName.isEmpty() == false) {
               ......
            }

            ......
            return updatedState;
        } else {
            // we must return a new cluster state instance to force publishing. This is important
            // for the joining node to finalize its join and set us as a master
            return newState.build();
        }
    }

Follower和Candidate会如何处理JoinRequest

static class FollowerJoinAccumulator implements JoinAccumulator {
        @Override
        public void handleJoinRequest(
            DiscoveryNode sender,
            CompatibilityVersions compatibilityVersions,
            Set<String> features,
            ActionListener<Void> joinListener
        ) {
            joinListener.onFailure(new CoordinationStateRejectedException("join target is a follower"));
        }

        @Override
        public String toString() {
            return "FollowerJoinAccumulator";
        }
    }
 class CandidateJoinAccumulator implements JoinAccumulator {

        private record JoinInformation(CompatibilityVersions compatibilityVersions, Set<String> features, ActionListener<Void> listener) {}

        private final Map<DiscoveryNode, JoinInformation> joinRequestAccumulator = new HashMap<>();
        boolean closed;

        @Override
        public void handleJoinRequest(
            DiscoveryNode sender,
            CompatibilityVersions compatibilityVersions,
            Set<String> features,
            ActionListener<Void> joinListener
        ) {
            assert closed == false : "CandidateJoinAccumulator closed";
            var prev = joinRequestAccumulator.put(sender, new JoinInformation(compatibilityVersions, features, joinListener));
            if (prev != null) {
                prev.listener().onFailure(new CoordinationStateRejectedException("received a newer join from " + sender));
            }
        }
 }

Publishing the cluster state

Leader主节点是集群中唯一可以更改集群状态的节点

主节点一次处理一批集群状态更新,计算所需的更改并将更新后的集群状态发布给集群中的所有其他节点。每次发布都从leader主节点向集群中的所有节点广播更新后的集群状态开始。每个节点响应一个确认,但尚未应用新接收到的状态。一旦leader主节点收集到足够的 master-eligible 节点的确认,新集群状态就被认为是已提交的,主节点会再广播另一条消息,指示节点应用现已提交的状态。每个节点收到此消息,应用更新后的状态,然后向leader主节点发送第二个确认

leader主节点允许每次集群状态更新在有限的时间内完全发布到所有节点。这由 cluster.publish.timeout 设置定义,默认值为 30 秒,从发布开始的时间算起。如果在新集群状态提交之前达到这个时间,那么集群状态更改将被拒绝,并且被选举的主节点会认为自己已失败。此时,它会退位并开始尝试选举新的主节点

如果在 cluster.publish.timeout 时间内提交了新的集群状态,被选举的主节点会认为更改成功。它会等待直到超时时间到达,或者直到收到集群中每个节点确认已应用更新状态的确认,然后开始处理和发布下一个集群状态更新。如果某些确认未收到(即某些节点尚未确认已应用当前更新),这些节点被称为滞后节点,因为它们的集群状态落后于leader主节点的最新状态。被选举的主节点会等待滞后节点在进一步的时间内赶上,即 cluster.follower_lag.timeout,默认值为 90 秒。如果在此时间内某节点仍未成功应用集群状态更新,那么该节点将被视为已失败,并且被选举的主节点会将其从集群中移除。

集群状态更新通常作为与先前集群状态的差异发布,这减少了发布集群状态更新所需的时间和网络带宽。例如,当仅更新集群状态中部分索引的映射时,只需要将这些索引的更新发布到集群中的节点,只要这些节点具有先前的集群状态。如果某节点缺少先前的集群状态,例如重新加入集群时,被选举的主节点会将完整的集群状态发布到该节点,以便它能够接收将来的差异更新

先看一段TLA代码:

\* handle publish request m on node n
HandlePublishRequest(n, m) ==
  /\ m.method = PublishRequest
  /\ m.term = currentTerm[n]
  /\ (m.term = lastAcceptedTerm[n]) => (m.version > lastAcceptedVersion[n])
  /\ lastAcceptedTerm' = [lastAcceptedTerm EXCEPT ![n] = m.term]
  /\ lastAcceptedVersion' = [lastAcceptedVersion EXCEPT ![n] = m.version]
  /\ lastAcceptedValue' = [lastAcceptedValue EXCEPT ![n] = m.value]
  /\ lastAcceptedConfiguration' = [lastAcceptedConfiguration EXCEPT ![n] = m.config]
  /\ lastCommittedConfiguration' = [lastCommittedConfiguration EXCEPT ![n] = m.commConf] 
  /\ LET
       response == [method   |-> PublishResponse,
                    source   |-> n,
                    dest     |-> m.source,
                    term     |-> m.term,
                    version  |-> m.version]
     IN
       /\ messages' = messages \cup {response}
       /\ UNCHANGED <<startedJoinSinceLastReboot, currentTerm, descendant, lastPublishedConfiguration,
                      electionWon, lastPublishedVersion, joinVotes, publishVotes, initialConfiguration,
                      initialValue, initialAcceptedVersion>>

\* node n commits a change
HandlePublishResponse(n, m) ==
  /\ m.method = PublishResponse
  /\ electionWon[n]
  /\ m.term = currentTerm[n]
  /\ m.version = lastPublishedVersion[n]
  /\ publishVotes' = [publishVotes EXCEPT ![n] = @ \cup {m.source}]
  /\ IF
       IsPublishQuorum(n, publishVotes'[n])
     THEN
       LET
         commitRequests == { [method   |-> Commit,
                              source   |-> n,
                              dest     |-> ns,
                              term     |-> currentTerm[n],
                              version  |-> lastPublishedVersion[n]] : ns \in Nodes }
       IN
         /\ messages' = messages \cup commitRequests
     ELSE
       UNCHANGED <<messages>>
        /\ UNCHANGED <<startedJoinSinceLastReboot, lastCommittedConfiguration, currentTerm, electionWon, descendant,
                   lastAcceptedVersion, lastAcceptedValue, lastAcceptedTerm, lastAcceptedConfiguration,
                   lastPublishedVersion, lastPublishedConfiguration, joinVotes, initialConfiguration,
                   initialValue, initialAcceptedVersion>>


\* apply committed configuration to node n
HandleCommit(n, m) ==
  /\ m.method = Commit
  /\ m.term = currentTerm[n]
  /\ m.term = lastAcceptedTerm[n]
  /\ m.version = lastAcceptedVersion[n]
  /\ (electionWon[n] => lastAcceptedVersion[n] = lastPublishedVersion[n])
  /\ lastCommittedConfiguration' = [lastCommittedConfiguration EXCEPT ![n] = lastAcceptedConfiguration[n]]
  /\ UNCHANGED <<currentTerm, joinVotes, messages, lastAcceptedTerm, lastAcceptedValue, startedJoinSinceLastReboot, descendant,
                 electionWon, lastAcceptedConfiguration, lastAcceptedVersion, lastPublishedVersion, publishVotes,
                 lastPublishedConfiguration, initialConfiguration, initialValue, initialAcceptedVersion>>

再结合java代码:

  • PublishRequest 比较简单,包装了ClusterState

  • PublishWithJoinResponse / PublishResponse 对term version的封装

  • MasterService.executeAndPublishBatch 消费stateUpdated相关Task

    • 例如join/remove Node时

      • 提交Task任务至 joinTaskQueue/nodeLeftQueue - BatchingTaskQueue

      • BatchingTaskQueue 封装了ConcurrentLinkedQueue<Entry> queue = new ConcurrentLinkedQueue<>();

      • BatchingTaskQueue.Processor 负责处理相关任务

        • consumer函数为Master的executeAndPublishBatch

        • publishClusterStateUpdate=>publish 传递ClusterStatePublicationEvent

        • 最终由Coordinator.push完成封装publishRequest

        • Publication.start 负责处理state发布,调用sendPublishRequest

        • 其中 publicationTargets 为 publishRequest.getAcceptedState().getNodes()

          • nodes 极为accepted 状态的DiscoveryNodes
public abstract class Publication {

    protected final Logger logger = LogManager.getLogger(getClass());

    private final List<PublicationTarget> publicationTargets;
    private final PublishRequest publishRequest;
    private final AckListener ackListener;
    private final LongSupplier currentTimeSupplier;
    private final long startTime;

    private Optional<SubscribableListener<ApplyCommitRequest>> applyCommitRequest; // set when state is committed
    private boolean isCompleted; // set when publication is completed
    private boolean cancelled; // set when publication is cancelled
    ......
}
public class PublishRequest {

    private final ClusterState acceptedState;

    public PublishRequest(ClusterState acceptedState) {
        this.acceptedState = acceptedState;
    }
    ......
}
public class PublishWithJoinResponse extends TransportResponse {
    private final PublishResponse publishResponse;
    private final Optional<Join> optionalJoin;
    ......
}
public class PublishResponse implements Writeable {

    private final long term;
    private final long version;
    ......
}

//CoordinationState.java
 /**
     * May be called in order to prepare publication of the given cluster state
     *
     * @param clusterState The cluster state to publish.
     * @return A PublishRequest to publish the given cluster state
     * @throws CoordinationStateRejectedException if the arguments were incompatible with the current state of this object.
     */
    public PublishRequest handleClientValue(ClusterState clusterState) {
        if (electionWon == false) {
            logger.debug("handleClientValue: ignored request as election not won");
            throw new CoordinationStateRejectedException("election not won");
        }
        if (lastPublishedVersion != getLastAcceptedVersion()) {
            logger.debug("handleClientValue: cannot start publishing next value before accepting previous one");
            throw new CoordinationStateRejectedException("cannot start publishing next value before accepting previous one");
        }
        if (clusterState.term() != getCurrentTerm()) {
            logger.debug(
                "handleClientValue: ignored request due to term mismatch "
                    + "(expected: [term {} version >{}], actual: [term {} version {}])",
                getCurrentTerm(),
                lastPublishedVersion,
                clusterState.term(),
                clusterState.version()
            );
            throw new CoordinationStateRejectedException(
                "incoming term " + clusterState.term() + " does not match current term " + getCurrentTerm()
            );
        }
        if (clusterState.version() <= lastPublishedVersion) {
            logger.debug(
                "handleClientValue: ignored request due to version mismatch "
                    + "(expected: [term {} version >{}], actual: [term {} version {}])",
                getCurrentTerm(),
                lastPublishedVersion,
                clusterState.term(),
                clusterState.version()
            );
            throw new CoordinationStateRejectedException(
                "incoming cluster state version "
                    + clusterState.version()
                    + " lower or equal to last published version "
                    + lastPublishedVersion
            );
        }

        if (electionStrategy.isInvalidReconfiguration(clusterState, getLastAcceptedConfiguration(), getLastCommittedConfiguration())) {
            logger.debug("handleClientValue: only allow reconfiguration while not already reconfiguring");
            throw new CoordinationStateRejectedException("only allow reconfiguration while not already reconfiguring");
        }
        if (joinVotesHaveQuorumFor(clusterState.getLastAcceptedConfiguration()) == false) {
            logger.debug("handleClientValue: only allow reconfiguration if joinVotes have quorum for new config");
            throw new CoordinationStateRejectedException("only allow reconfiguration if joinVotes have quorum for new config");
        }

        assert clusterState.getLastCommittedConfiguration().equals(getLastCommittedConfiguration())
            : "last committed configuration should not change";

        lastPublishedVersion = clusterState.version();
        lastPublishedConfiguration = clusterState.getLastAcceptedConfiguration();
        publishVotes = new VoteCollection();

        logger.trace("handleClientValue: processing request for version [{}] and term [{}]", lastPublishedVersion, getCurrentTerm());

        return new PublishRequest(clusterState);
    }
  • 什么时间发送CommitRequest呢?

    • Publication.handlePublishResponse ,leader 节点收到其他节点返回的PublishResponse之后触发

    • Publication.sendApplyCommit

    • Coordinator.handleApplyCommit负责处理commit信息

      • CoordinationState 负责commit信息持久化
private void handleApplyCommit(ApplyCommitRequest applyCommitRequest, ActionListener<Void> applyListener) {
        synchronized (mutex) {
            logger.trace("handleApplyCommit: applying commit {}", applyCommitRequest);

            coordinationState.get().handleCommit(applyCommitRequest);//持久化
            final ClusterState committedState = hideStateIfNotRecovered(coordinationState.get().getLastAcceptedState());
            applierState = mode == Mode.CANDIDATE ? clusterStateWithNoMasterBlock(committedState) : committedState;
            updateSingleNodeClusterChecker(); // in case nodes increase/decrease, possibly update the single-node checker
            if (applyCommitRequest.getSourceNode().equals(getLocalNode())) {
                // master node applies the committed state at the end of the publication process, not here.
                applyListener.onResponse(null);
            } else {
                //发布消息,submitTask to executor
                clusterApplier.onNewClusterState(applyCommitRequest.toString(), () -> applierState, applyListener.map(r -> {
                    onClusterStateApplied();
                    return r;
                }));
            }
        }
    }
// CoordinationState.java
public void handleCommit(ApplyCommitRequest applyCommit) {
        if (applyCommit.getTerm() != getCurrentTerm()) {
            logger.debug(
                "handleCommit: ignored commit request due to term mismatch "
                    + "(expected: [term {} version {}], actual: [term {} version {}])",
                getLastAcceptedTerm(),
                getLastAcceptedVersion(),
                applyCommit.getTerm(),
                applyCommit.getVersion()
            );
            throw new CoordinationStateRejectedException(
                "incoming term " + applyCommit.getTerm() + " does not match current term " + getCurrentTerm()
            );
        }
        if (applyCommit.getTerm() != getLastAcceptedTerm()) {
            logger.debug(
                "handleCommit: ignored commit request due to term mismatch "
                    + "(expected: [term {} version {}], actual: [term {} version {}])",
                getLastAcceptedTerm(),
                getLastAcceptedVersion(),
                applyCommit.getTerm(),
                applyCommit.getVersion()
            );
            throw new CoordinationStateRejectedException(
                "incoming term " + applyCommit.getTerm() + " does not match last accepted term " + getLastAcceptedTerm()
            );
        }
        if (applyCommit.getVersion() != getLastAcceptedVersion()) {
            logger.debug(
                "handleCommit: ignored commit request due to version mismatch (term {}, expected: [{}], actual: [{}])",
                getLastAcceptedTerm(),
                getLastAcceptedVersion(),
                applyCommit.getVersion()
            );
            throw new CoordinationStateRejectedException(
                "incoming version " + applyCommit.getVersion() + " does not match current version " + getLastAcceptedVersion()
            );
        }

        logger.trace(
            "handleCommit: applying commit request for term [{}] and version [{}]",
            applyCommit.getTerm(),
            applyCommit.getVersion()
        );

        assert getLastAcceptedTerm() == applyCommit.getTerm() && getLastAcceptedVersion() == applyCommit.getVersion();
        persistedState.markLastAcceptedStateAsCommitted();
        assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration());
    }
//ClusterApplierService.java
public void onNewClusterState(
        final String source,
        final Supplier<ClusterState> clusterStateSupplier,
        final ActionListener<Void> listener
    ) {
        submitStateUpdateTask(source, Priority.HIGH, currentState -> {
            ClusterState nextState = clusterStateSupplier.get();
            if (nextState != null) {
                return nextState;
            } else {
                return currentState;
            }
        }, listener);
    }
  • 这里面需要注意下onClusterStateApplied()方法

    • 完成commit意味着election结束,NodeConnectionsService处理节点间的请求

      • 这里需要清理下为了joining建立的连接joinHelper.onClusterStateApplied();
    • 关闭closeElectionScheduler

    • 如果node 是master 还需要重新track下DiscoveryNodes

private void onClusterStateApplied() {
        assert ThreadPool.assertCurrentThreadPool(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME);
        synchronized (mutex) {
            if (mode != Mode.CANDIDATE) {
                joinHelper.onClusterStateApplied();
                closeElectionScheduler();
                peerFinder.closePeers();
            }
        }
        if (getLocalNode().isMasterNode()) {
            joinReasonService.onClusterStateApplied(applierState.nodes());
        }
    }
  • 👆进行关闭closeElectionScheduler,何时重新触发呢?

  • 节点变动👆

    • Coordinator.onFoundPeersUpdated
    • 节点的add (estiblishConnection) or remove (handleWakeUp) 重新startElectionScheduler

Adding and removing master-eligible nodes

重新触发选举,流程类似👆不再展开,因节点变动带来的re-sharding放到后面的文章介绍

总结

ES代码量庞大,TLA对应的代码更易懂coordination的过程

read more

参考

官方文档

源代码 8.13

issue信息