Elasticsearch 源码 - Discovery and Cluster Part3

本文的目标:

  • 集群的fault detection

源码:8.13

  • ES代码量过大,仅关注核心部分,串起流程
  • 借鉴ES的PR信息

如何加速源码阅读?github的PR

A new cluster coordination layer

非常值得学习的PR 范例,👍开源社区

本文是Discovery and Cluster 这个系统的最后一部分 fault detection

Cluster Coordination

Cluster fault detection

选举出的leader会定期检查集群中的每个节点,以确保它们仍然连接并且健康。集群中的每个节点也会定期检查leader的健康状况。这些检查分别称为 Followers Checks 和 Leader Checks

FollowersChecker

FollowersChecker 负责让leader检查followers是否仍然连接且健康。如果某follower已经失败,领导者将其从集群中移除。允许多次检查失败后才认为follower有故障,以便在发生短暂的网络分区或长时间的GC周期时,不会触发节点移除及随之而来的分片重新分配-😭成本太高

Elasticsearch允许这些检查偶尔失败或超时而不采取任何行动。只有在连续多次检查失败后,它才会认为节点出现故障。你可以通过 cluster.fault_detection.* 设置来控制故障检测行为

leader检测到某个节点已断开连接,这种情况将被立即视为故障。leader会绕过超时和重试设置的值,尝试从集群中移除该节点

  • FollowersChecker启动:leader节点在Coordinator.publish发布state时启动此checker
leaderChecker.setCurrentNodes(publishNodes);
followersChecker.setCurrentNodes(publishNodes);//followerChecker.start();
lagDetector.setTrackedNodes(publishNodes);
  • FOLLOWER_CHECK_INTERVAL_SETTING/FOLLOWER_CHECK_TIMEOUT_SETTING/FOLLOWER_CHECK_RETRY_COUNT_SETTING配置,防止网络抖动带来的影响

  • scheduleNextWakeUp函数 使用FOLLOWER_CHECK_INTERVAL_SETTING 进行间隔调度

  • transportService 发送check action,timeout依据sendRequest

  • handleException函数 判断失败标准

    • failureCountSinceLastSuccess + timeoutCountSinceLastSuccess >= followerCheckRetryCount
public final class FollowersChecker {
    public static final String FOLLOWER_CHECK_ACTION_NAME = "internal:coordination/fault_detection/follower_check";
    // the time between checks sent to each node
    public static final Setting<TimeValue> FOLLOWER_CHECK_INTERVAL_SETTING = Setting.timeSetting(
        "cluster.fault_detection.follower_check.interval",
        TimeValue.timeValueMillis(1000),
        TimeValue.timeValueMillis(100),
        Setting.Property.NodeScope
    );

    // the timeout for each check sent to each node
    public static final Setting<TimeValue> FOLLOWER_CHECK_TIMEOUT_SETTING = Setting.timeSetting(
        "cluster.fault_detection.follower_check.timeout",
        TimeValue.timeValueMillis(10000),
        TimeValue.timeValueMillis(1),
        Setting.Property.NodeScope
    );
    // the number of failed checks that must happen before the follower is considered to have failed.
    public static final Setting<Integer> FOLLOWER_CHECK_RETRY_COUNT_SETTING = Setting.intSetting(
        "cluster.fault_detection.follower_check.retry_count",
        3,
        1,
        Setting.Property.NodeScope
    );
     private final TimeValue followerCheckInterval;
    private final TimeValue followerCheckTimeout;
    private final int followerCheckRetryCount;
    private final BiConsumer<DiscoveryNode, String> onNodeFailure;
    private final Consumer<FollowerCheckRequest> handleRequestAndUpdateState;

    private final Object mutex = new Object(); // protects writes to this state; read access does not need sync
    private final Map<DiscoveryNode, FollowerChecker> followerCheckers = newConcurrentMap();
    private final Set<DiscoveryNode> faultyNodes = new HashSet<>();

    private final TransportService transportService;
    private final NodeHealthService nodeHealthService;
    private final Executor clusterCoordinationExecutor;
    private volatile FastResponseState fastResponseState;
    ......
    /**
     * A checker for an individual follower.
     */
    private class FollowerChecker {
        private final DiscoveryNode discoveryNode;
        private int failureCountSinceLastSuccess;
        private int timeoutCountSinceLastSuccess;
        ......
        void failNode(String reason) {
            ......
            protected void doRun() {
                    synchronized (mutex) {
                        if (running() == false) {
                            logger.trace("{} no longer running, not marking faulty", FollowerChecker.this);
                            return;
                        }
                        logger.debug("{} marking node as faulty", FollowerChecker.this);
                        faultyNodes.add(discoveryNode);
                        followerCheckers.remove(discoveryNode);
                    }
                    onNodeFailure.accept(discoveryNode, reason);
                }
        }
    }
}
  • Follower 检测

    • 每个follower节点配置一个FollowerChecker,计算失败和超时次数,达到阈值,将节点移入failNode👆代码示例
    • 如果node disconneced(不进行检测) 直接将node放入failNode
  • Follower收到request进行处理,满足下列情况抛出异常

    • statusInfo.getStatus() == UNHEALTHY => NodeHealthCheckFailureException
    • request.term < responder.term => CoordinationStateRejectedException

Leader checker

代码逻辑与follower类似,每个follower都会定时发启监测,leader进行ack

如果某个节点检测到leader已断开连接,这种情况也将被立即视为故障。该节点会绕过超时和重试设置,并重新启动其发现阶段以尝试找到或选举新的主节点

//LeaderChecker.java
void leaderFailed(Supplier<String> messageSupplier, Exception e) {
            if (isClosed.compareAndSet(false, true)) {
                transportService.getThreadPool().executor(Names.CLUSTER_COORDINATION).execute(new AbstractRunnable() {
                    @Override
                    protected void doRun() {
                        leaderFailureListener.onLeaderFailure(messageSupplier, e);
                    }

                    @Override
                    public void onRejection(Exception e2) {
                        e.addSuppressed(e2);
                        logger.debug("rejected execution of onLeaderFailure", e);
                        assert e2 instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown() : e;
                    }

                    @Override
                    public void onFailure(Exception e2) {
                        e2.addSuppressed(e);
                        logger.error("failed execution of onLeaderFailure", e2);
                        assert false : e2;
                    }

                    @Override
                    public String toString() {
                        return "notification of leader failure: " + e.getMessage();
                    }
                });
            } else {
                logger.trace("already closed, not failing leader");
            }
        }

//Coordinator.java
private void onLeaderFailure(Supplier<String> message, Exception e) {
        synchronized (mutex) {
            if (mode != Mode.CANDIDATE) {
                assert lastKnownLeader.isPresent();
                if (logger.isDebugEnabled()) {
                    // TODO this is a workaround for log4j's Supplier. We should remove this, once using ES logging api
                    logger.info(() -> message.get(), e);
                } else {
                    logger.info(() -> message.get());
                }
            }
            becomeCandidate("onLeaderFailure");
        }
    }
  • 检测到leader fail👆,触发becomeCandidate,重新选举

此外,每个节点还会定期验证其数据路径是否健康,方法是在磁盘上写入一个小文件然后再删除它。如果节点发现其数据路径不健康,它将从集群中移除,直到数据路径恢复。你可以通过 monitor.fs.health 设置来控制这种行为,本文不再这里展开

Lag Detector

如果节点无法在合理时间内应用更新的集群状态,leader主节点也会将这些节点从集群中移除

例如,一次publication操作可以在所有节点应用并确认发布的状态之前成功完成;需要每个节点反馈要么应用发布的状态(或更晚的状态),要么从集群中移除。这个组件通过在超时后移除滞后的节点来实现这一点

  • startLagDetector 由Coordinator收到publication Response时启动

    • filter(t -> t.appliedVersionLessThan(version)) 过滤出version小于leader

    • 再加上时间判断CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING,默认2分钟

    • 超过这个时间的node进入checkForLag()

      • 调用HotThreadsLoggingLagListener.onLagDetected
      • 为hot threads保持连接之后 removeNode(node, "lagging")
//LagDetector.java
public void startLagDetector(final long version) {
        final List<NodeAppliedStateTracker> laggingTrackers = appliedStateTrackersByNode.values()
            .stream()
            .filter(t -> t.appliedVersionLessThan(version))
            .toList();

        if (laggingTrackers.isEmpty()) {
            logger.trace("lag detection for version {} is unnecessary: {}", version, appliedStateTrackersByNode.values());
        } else {
            logger.debug("starting lag detector for version {}: {}", version, laggingTrackers);

            threadPool.scheduleUnlessShuttingDown(clusterStateApplicationTimeout, clusterCoordinationExecutor, new Runnable() {
                @Override
                public void run() {
                    laggingTrackers.forEach(t -> t.checkForLag(version));
                }

                @Override
                public String toString() {
                    return "lag detector for version " + version + " on " + laggingTrackers;
                }
            });
        }
    }
void checkForLag(final long version) {
            if (appliedStateTrackersByNode.get(discoveryNode) != this) {
                logger.trace("{} no longer active when checking version {}", this, version);
                return;
            }

            long appliedVersion = this.appliedVersion.get();
            if (version <= appliedVersion) {
                logger.trace("{} satisfied when checking version {}, node applied version {}", this, version, appliedVersion);
                return;
            }

            logger.warn(
                "node [{}] is lagging at cluster state version [{}], although publication of cluster state version [{}] completed [{}] ago",
                discoveryNode,
                appliedVersion,
                version,
                clusterStateApplicationTimeout
            );
            lagListener.onLagDetected(discoveryNode, appliedVersion, version);
        }
public void onLagDetected(DiscoveryNode discoveryNode, long appliedVersion, long expectedVersion) {
    ......
     // we're removing the node from the cluster so we need to keep the connection open for the hot threads request
                transportService.connectToNode(discoveryNode, new ActionListener<>() {
                    @Override
                    public void onResponse(Releasable releasable) {
                        boolean success = false;
                        final ThreadContext threadContext = transportService.getThreadPool().getThreadContext();
                        try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
                            threadContext.markAsSystemContext();
                            client.execute(
                                TransportNodesHotThreadsAction.TYPE,
                                new NodesHotThreadsRequest(discoveryNode).threads(500),
                                ActionListener.runBefore(debugListener, () -> Releasables.close(releasable))
                            );
                            success = true;
                        } finally {
                            if (success == false) {
                                Releasables.close(releasable);
                            }
                        }
                    }

                    @Override
                    public void onFailure(Exception e) {
                        debugListener.onFailure(e);
                    }
                });
            } finally {
                // call delegate after transportService#connectToNode to keep existing connection open
                delegate.onLagDetected(discoveryNode, appliedVersion, expectedVersion);//removeNode(node, "lagging")👆
            }
}

Troubleshooting an unstable cluster

disconnected nodes

节点通常在关闭时会以“disconnected”的原因离开集群,但如果它们在不重启的情况下重新加入集群,则存在其他问题。

Elasticsearch设计用于运行在相对可靠的网络上。它在节点之间打开了许多TCP连接,并期望这些连接永远保持打开状态。如果连接关闭,Elasticsearch会尝试重新连接,因此即使受影响的节点短暂离开集群,偶尔的故障也应对集群影响有限。相比之下,反复断开的连接会严重影响其运行。

从选举出的主节点到集群中其他每个节点的连接尤其重要。选举出的主节点绝不会自发关闭其到其他节点的出站连接。同样地,一旦连接完全建立,除非节点正在关闭,否则节点绝不会自发关闭其入站连接。

如果您发现一个节点意外地以“disconnected”原因离开集群,很可能是Elasticsearch以外的某个原因导致连接关闭。一个常见的原因是防火墙配置错误,超时设置不当或其他与Elasticsearch不兼容的策略。也可能是由于一般的连接问题,例如由于硬件故障或网络拥堵导致的数据包丢失

下面是debug方式 logger.org.elasticsearch.transport.TcpTransport: DEBUG logger.org.elasticsearch.xpack.core.security.transport.netty4.SecurityNetty4Transport: DEBUG

lagging nodes

Elasticsearch 需要每个节点能够相当快速地处理集群状态更新。如果一个节点处理集群状态更新的时间过长,会对集群造成不利影响。主节点将因滞后原因移除这些节点。关于控制此机制的设置,请参阅“发现和集群形成设置”。

滞后通常是由被移除节点的性能问题引起的。然而,节点也可能因为严重的网络延迟而滞后。为排除网络延迟,请确保 net.ipv4.tcp_retries2 配置正确。包含“warn threshold”的日志消息可能会提供有关根本原因的更多信息。

关于这个参数,这里不再展开,请移步

logger.org.elasticsearch.cluster.coordination.LagDetector: DEBUG

  • follower check retry count exceeded nodes

节点有时会在关闭时以“follower检查重试计数超出”的原因离开集群,但如果它们重新加入集群而不重新启动,则存在其他问题。

任何检查超时,按如下方式缩小问题范围:

  • GC 暂停记录在 Elasticsearch 默认发出的 GC 日志中,并通常也由主节点日志中的 JvmMonitorService 记录。使用这些日志来确认节点是否出现了高堆使用率和长时间 GC 暂停。如果是这样,高堆使用率的故障排除指南提供了一些进一步调查建议,但通常您需要在堆使用率高的时候捕获堆转储和垃圾回收器日志,以充分理解问题。
  • VM 暂停还会影响同一主机上的其他进程。VM 暂停通常也会导致系统时钟的不连续性,Elasticsearch 会在其日志中报告。如果您看到其他进程在同一时间暂停的证据,或者出现了意外的时钟不连续性,请调查您运行 Elasticsearch 的基础设施。
  • 数据包捕获将显示系统级和网络级故障,特别是如果您同时在选定的主节点和故障节点处捕获网络流量。用于跟随者检查的连接不用于任何其他流量,因此即使使用了 TLS,它也可以仅从流量模式中轻松识别出来:几乎每秒都会有几百字节的双向传输,首先是主节点的请求,然后是跟随者的响应。您应该能够观察到该连接上的任何重传、丢包或其他延迟。
  • 长时间等待特定线程可用可以通过在相关日志消息之前几秒钟内对主 Elasticsearch 进程进行堆栈转储(例如,使用 jstack)或分析跟踪(例如,使用 Java Flight Recorder)来识别。
  • 节点热线程 API 有时会提供有用信息,但请注意,该 API 还需要集群中所有节点上的一些 transport_worker 和通用线程。该 API 可能受到您试图诊断的问题的影响。由于不需要任何 JVM 线程,jstack 更可靠。
  • 发现和集群成员身份验证中涉及的线程主要是 transport_worker 和 cluster_coordination 线程,这些线程不应该长时间等待。Elasticsearch 日志中可能还会有长时间等待线程的证据,特别是查看来自 org.elasticsearch.transport.InboundHandler 的警告日志

ShardLockObtainFailedException failures

logger.org.elasticsearch.env.NodeEnvironment: DEBUG

参考

[官方文档(]https://www.elastic.co/guide/en/elasticsearch/reference/master/cluster-fault-detection.html)

源代码 8.13

PR信息