Elasticsearch 源码 - Index Recovery

本系列的目标:

  • Elasticsearch 索引是如何分片?
  • Elasticsearch 索引持久化机制?
  • Elasticsearch 索引如何故障恢复?**
  • Elasticsearch 索引shard rebalancing?

源码:8.13

Index Recovery

触发时机

  • Recreates a shard lost during node failure
  • Relocates a shard to another node due to a cluster rebalance or changes to the shard allocation settings
  • 手动api触发

Local recovery

IndexShard.recoverLocallyUpToGlobalCheckpoint => doLocalRecovery

  • 检查安全提交点,根据globalCheckpoint

  • safeCommit = store.findSafeIndexCommit(globalCheckpoint) 比较maxSeqNo

    public Optional<SequenceNumbers.CommitInfo> findSafeIndexCommit(long globalCheckpoint) throws IOException {
            final List<IndexCommit> commits = DirectoryReader.listCommits(directory);
            assert commits.isEmpty() == false : "no commit found";
            final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commits, globalCheckpoint);
            final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(safeCommit.getUserData().entrySet());
            // all operations of the safe commit must be at most the global checkpoint.
            if (commitInfo.maxSeqNo <= globalCheckpoint) {
                return Optional.of(commitInfo);
            } else {
                return Optional.empty();
            }
        }
    
  • 启用currentEngineReference,启动临时引擎以便在给定检查点上恢复本地translog

     synchronized (engineMutex) {
              assert currentEngineReference.get() == null : "engine is running";
              verifyNotClosed();
              // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
              final Engine newEngine = createEngine(config);
              onNewEngine(newEngine);
              currentEngineReference.set(newEngine);
              // We set active because we are now writing operations to the engine; this way,
              // we can flush if we go idle after some time and become inactive.
              active.set(true);
          }
    
  • 使用runTranslogRecovery方法重放translog中的操作,并更新恢复状态

    final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> {
            recoveryState.getTranslog().totalLocal(snapshot.totalOperations());
            final int recoveredOps = runTranslogRecovery(
                engine,
                snapshot,
                Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY,
                recoveryState.getTranslog()::incrementRecoveredOperations
            );
            recoveryState.getTranslog().totalLocal(recoveredOps); // adjust the total local to reflect the actual count
            return recoveredOps;
        };
        innerOpenEngineAndTranslog(() -> globalCheckpoint);
        getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint, recoveryCompleteListener.map(v -> {
            logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint));
            return v;
        }));
    
  • InternalEngine.recoverFromTranslogInternal 读取translog.snapshot,执行👆的translogRecoveryRunner进行回放

    private void recoverFromTranslogInternal(
            TranslogRecoveryRunner translogRecoveryRunner,
            long recoverUpToSeqNo,
            ActionListener<Void> listener
        ) {
            ActionListener.run(listener, l -> {
                final int opsRecovered;
                final long localCheckpoint = getProcessedLocalCheckpoint();
                if (localCheckpoint < recoverUpToSeqNo) {
                    try (Translog.Snapshot snapshot = newTranslogSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) {
                        opsRecovered = translogRecoveryRunner.run(this, snapshot);
                    } catch (Exception e) {
                        throw new EngineException(shardId, "failed to recover from translog", e);
                    }
                } else {
                    opsRecovered = 0;
                }
                // flush if we recovered something or if we have references to older translogs
                // note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
                assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
                pendingTranslogRecovery.set(false); // we are good - now we can commit
                logger.trace(
                    () -> format(
                        "flushing post recovery from translog: ops recovered [%s], current translog generation [%s]",
                        opsRecovered,
                        translog.currentFileGeneration()
                    )
                );
    
                // flush might do something async and complete the listener on a different thread, from which we must fork back to a generic
                // thread to continue with recovery, but if it doesn't do anything async then there's no need to fork, hence why we use a
                // SubscribableListener here
                final var flushListener = new SubscribableListener<FlushResult>();
                flush(false, true, flushListener);
                flushListener.addListener(l.delegateFailureAndWrap((ll, r) -> {
                    translog.trimUnreferencedReaders();
                    ll.onResponse(null);
                }), engineConfig.getThreadPool().generic(), null);
            });
        }
    
  • 使用translog.snapshot 避免对原文件的影响

local translog 模式的recovery仅能解决Node本地问题,并不能完全解决集群故障问题

例如一个replica shard, 还需要进行peer recovery 👇

Peer recovery

当 Elasticsearch 发生以下情况时,peer恢复会自动进行:

  • 在节点故障期间重新创建丢失的分片
  • 由于集群重新平衡或分片分配设置的更改,将分片迁移到另一个节点

IndexShard.startRecovery 执行对一个特定分片的恢复流程,根据此分片的恢复类型执行相应的恢复过程

public void startRecovery(
        RecoveryState recoveryState,
        PeerRecoveryTargetService recoveryTargetService,
        PeerRecoveryTargetService.RecoveryListener recoveryListener,
        RepositoriesService repositoriesService,
        BiConsumer<MappingMetadata, ActionListener<Void>> mappingUpdateConsumer,
        IndicesService indicesService,
        long clusterStateVersion
    ) {
        // TODO: Create a proper object to encapsulate the recovery context
        // all of the current methods here follow a pattern of:
        // resolve context which isn't really dependent on the local shards and then async
        // call some external method with this pointer.
        // with a proper recovery context object we can simply change this to:
        // startRecovery(RecoveryState recoveryState, ShardRecoverySource source ) {
        // markAsRecovery("from " + source.getShortDescription(), recoveryState);
        // threadPool.generic().execute() {
        // onFailure () { listener.failure() };
        // doRun() {
        // if (source.recover(this)) {
        // recoveryListener.onRecoveryDone(recoveryState);
        // }
        // }
        // }}
        // }
        assert recoveryState.getRecoverySource().equals(shardRouting.recoverySource());
        switch (recoveryState.getRecoverySource().getType()) {
            case EMPTY_STORE, EXISTING_STORE -> executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
            case PEER -> {
                try {
                    markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);
                    recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), clusterStateVersion, recoveryListener);
                } catch (Exception e) {
                    failShard("corrupted preexisting index", e);
                    recoveryListener.onRecoveryFailure(new RecoveryFailedException(recoveryState, null, e), true);
                }
            }
            case SNAPSHOT -> {
                final String repo = ((SnapshotRecoverySource) recoveryState.getRecoverySource()).snapshot().getRepository();
                executeRecovery(
                    "from snapshot",
                    recoveryState,
                    recoveryListener,
                    l -> restoreFromRepository(repositoriesService.repository(repo), l)
                );
            }
            case LOCAL_SHARDS -> {
                final IndexMetadata indexMetadata = indexSettings().getIndexMetadata();
                final Index resizeSourceIndex = indexMetadata.getResizeSourceIndex();
                final List<IndexShard> startedShards = new ArrayList<>();
                final IndexService sourceIndexService = indicesService.indexService(resizeSourceIndex);
                final Set<ShardId> requiredShards;
                final int numShards;
                if (sourceIndexService != null) {
                    requiredShards = IndexMetadata.selectRecoverFromShards(
                        shardId().id(),
                        sourceIndexService.getMetadata(),
                        indexMetadata.getNumberOfShards()
                    );
                    for (IndexShard shard : sourceIndexService) {
                        if (shard.state() == IndexShardState.STARTED && requiredShards.contains(shard.shardId())) {
                            startedShards.add(shard);
                        }
                    }
                    numShards = requiredShards.size();
                } else {
                    numShards = -1;
                    requiredShards = Collections.emptySet();
                }
                if (numShards == startedShards.size()) {
                    assert requiredShards.isEmpty() == false;
                    executeRecovery(
                        "from local shards",
                        recoveryState,
                        recoveryListener,
                        l -> recoverFromLocalShards(
                            mappingUpdateConsumer,
                            startedShards.stream().filter((s) -> requiredShards.contains(s.shardId())).toList(),
                            l
                        )
                    );
                } else {
                    final RuntimeException e;
                    if (numShards == -1) {
                        e = new IndexNotFoundException(resizeSourceIndex);
                    } else {
                        e = new IllegalStateException(
                            "not all required shards of index "
                                + resizeSourceIndex
                                + " are started yet, expected "
                                + numShards
                                + " found "
                                + startedShards.size()
                                + " can't recover shard "
                                + shardId()
                        );
                    }
                    throw e;
                }
            }
            default -> throw new IllegalArgumentException("Unknown recovery source " + recoveryState.getRecoverySource());
        }
    }
  • EMPTY_STORE, EXISTING_STORE 略
  • PEER replica 向primary 请求recovery
  • SNAPSHOT 通过snapshot 进行restore
  • LOCAL_SHARDS replica与primary在同一个Node

流程的控制通过状态机控制

 /**
     * Marks the shard as recovering based on a recovery state, fails with exception is recovering is not allowed to be set.
     */
    public IndexShardState markAsRecovering(String reason, RecoveryState recoveryState) throws IndexShardStartedException,
        IndexShardRelocatedException, IndexShardRecoveringException, IndexShardClosedException {
        synchronized (mutex) {
            if (state == IndexShardState.CLOSED) {
                throw new IndexShardClosedException(shardId);
            }
            if (state == IndexShardState.STARTED) {
                throw new IndexShardStartedException(shardId);
            }
            if (state == IndexShardState.RECOVERING) {
                throw new IndexShardRecoveringException(shardId);
            }
            if (state == IndexShardState.POST_RECOVERY) {
                throw new IndexShardRecoveringException(shardId);
            }
            this.recoveryState = recoveryState;
            return changeState(IndexShardState.RECOVERING, reason);
        }
    }

重点介绍 PEER 类型

startRecovery 过程

  • tryAcquireSnapshotDownloadPermits 并发控制

  • onGoingRecoveries.startRecovery 创建一个新的RecoveryTarget

    public class RecoveryTarget extends AbstractRefCounted implements RecoveryTargetHandler {
    
    private final Logger logger;
    
    private static final AtomicLong idGenerator = new AtomicLong();
    
    private static final String RECOVERY_PREFIX = "recovery.";
    
    private final ShardId shardId;//表示正在恢复的分片的 ID
    private final long recoveryId;//当前恢复过程的唯一标识符,由 idGenerator 生成
    private final IndexShard indexShard; //目标分片的索引分片实例
    private final DiscoveryNode sourceNode;//数据源节点(主分片所在的节点)
    private final long clusterStateVersion;//集群状态版本,用于协调分片恢复的一致性
    private final SnapshotFilesProvider snapshotFilesProvider;//用于提供快照文件的提供者
    private volatile MultiFileWriter multiFileWriter;//通过 MultiFileWriter 实例管理多个文件的写入,确保数据从主分片正确复制到目标分片
    private final RecoveryRequestTracker requestTracker = new RecoveryRequestTracker();//恢复请求的追踪器,管理恢复过程中各请求的状
    private final Store store;//分片的数据存储
    private final PeerRecoveryTargetService.RecoveryListener listener;//通过 RecoveryRequestTracker👆 和 RecoveryListener 跟踪和监听恢复过程的状态变化,确保恢复过程按预期进行
    
    private final AtomicBoolean finished = new AtomicBoolean();
    
    private final CancellableThreads cancellableThreads;
    
    // last time this status was accessed
    private volatile long lastAccessTime = System.nanoTime();
    
    private final AtomicInteger recoveryMonitorBlocks = new AtomicInteger();
    
    @Nullable // if we're not downloading files from snapshots in this recovery or we're retrying
    private volatile Releasable snapshotFileDownloadsPermit;//用于控制快照文件下载的许可
    
    // latch that can be used to blockingly wait for RecoveryTarget to be closed
    private final CountDownLatch closedLatch = new CountDownLatch(1);//用于等待 RecoveryTarget 关闭的闭锁
    ......
    }
    
  • threadPool.generic().execute(new RecoveryRunner(recoveryId)) 执行recovery

//PeerRecoveryTargetService.java

 public void startRecovery(
        final IndexShard indexShard,
        final DiscoveryNode sourceNode,
        final long clusterStateVersion,
        final RecoveryListener listener
    ) {
    final Releasable snapshotFileDownloadsPermit = tryAcquireSnapshotDownloadPermits();
        // create a new recovery status, and process...
        final long recoveryId = onGoingRecoveries.startRecovery(
            indexShard,
            sourceNode,
            clusterStateVersion,
            snapshotFilesProvider,
            listener,
            recoverySettings.activityTimeout(),
            snapshotFileDownloadsPermit
        );
        // we fork off quickly here and go async but this is called from the cluster state applier thread too and that can cause
        // assertions to trip if we executed it on the same thread hence we fork off to the generic threadpool.
        threadPool.generic().execute(new RecoveryRunner(recoveryId));
}

恢复可以划分阶段

  1. indexShard::preRecovery 准备阶段, loadSnapshot

    private static void ensureSnapshotIsLoaded(IndexShard indexShard) {
         final var store = indexShard.store();
         final SearchableSnapshotDirectory directory = unwrapDirectory(store.directory());
         assert directory != null;
         final ListenableFuture<Void> preWarmListener = new ListenableFuture<>();
         final boolean success = directory.loadSnapshot(indexShard.recoveryState(), store::isClosing, preWarmListener);
         final ShardRouting shardRouting = indexShard.routingEntry();
         if (success && shardRouting.isRelocationTarget()) {
             final Runnable preWarmCondition = indexShard.addCleanFilesDependency();
             preWarmListener.addListener(ActionListener.wrap(v -> preWarmCondition.run(), e -> {
                 logger.warn(
                     () -> format(
                         "pre-warm operation failed for [%s] while it was the target of primary relocation [%s]",
                         shardRouting.shardId(),
                         shardRouting
                     ),
                     e
                 );
                 preWarmCondition.run();
             }));
         }
         assert directory.listAll().length > 0 : "expecting directory listing to be non-empty";
         assert success || indexShard.routingEntry().recoverySource().getType() == RecoverySource.Type.PEER
             : "loading snapshot must not be called twice unless we are retrying a peer recovery";
     }
    
  2. prepareForIndexRecovery 为recovery做最后的准备,设置状态

  3. recoverFromStore/recoverLocallyUpToGlobalCheckpoint(无法晋升为primary分片的恢复逻辑openEngineAndSkipTranslogRecovery 这里不展开),recoverLocallyUpToGlobalCheckpoint逻辑同👆的local recovery,recoverFromStore再👇详细展开

  4. markRecoveryAsDone 清理recoveryId

  5. send Response 返回recovery结果

private void doRecovery(final long recoveryId, final StartRecoveryRequest preExistingRequest) {
        final RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId);
        if (recoveryRef == null) {
            logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId);
            return;
        }
        final RecoveryTarget recoveryTarget = recoveryRef.target();
        assert recoveryTarget.sourceNode() != null : "cannot do a recovery without a source node";
        final RecoveryState recoveryState = recoveryTarget.state();
        final RecoveryState.Timer timer = recoveryState.getTimer();
        final IndexShard indexShard = recoveryTarget.indexShard();
        final Releasable onCompletion = Releasables.wrap(recoveryTarget.disableRecoveryMonitor(), recoveryRef);

        // async version of the catch/finally structure we need, but this does nothing with successes so needs further modification below
        final var cleanupOnly = ActionListener.notifyOnce(ActionListener.runBefore(ActionListener.noop().delegateResponse((l, e) -> {
            // this will be logged as warning later on...
            logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e);
            onGoingRecoveries.failRecovery(
                recoveryId,
                new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e),
                true
            );
        }), onCompletion::close));

        if (indexShard.routingEntry().isPromotableToPrimary() == false) {
            assert preExistingRequest == null;
            assert indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot() == false;
            ActionListener.run(cleanupOnly.map(v -> {
                logger.trace("{} preparing unpromotable shard for recovery", recoveryTarget.shardId());
                indexShard.prepareForIndexRecovery();
                // Skip unnecessary intermediate stages
                recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
                recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
                indexShard.openEngineAndSkipTranslogRecovery();
                recoveryState.getIndex().setFileDetailsComplete();
                recoveryState.setStage(RecoveryState.Stage.FINALIZE);
                onGoingRecoveries.markRecoveryAsDone(recoveryId);
                return null;
            }), indexShard::preRecovery);
            return;
        }

        if (indexShard.routingEntry().isSearchable() == false && recoveryState.getPrimary()) {
            assert preExistingRequest == null;
            assert indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot() == false;
            try (onCompletion) {
                client.execute(
                    StatelessPrimaryRelocationAction.TYPE,
                    new StatelessPrimaryRelocationAction.Request(
                        recoveryId,
                        indexShard.shardId(),
                        transportService.getLocalNode(),
                        indexShard.routingEntry().allocationId().getId(),
                        recoveryTarget.clusterStateVersion()
                    ),
                    new ActionListener<>() {
                        @Override
                        public void onResponse(ActionResponse.Empty ignored) {
                            onGoingRecoveries.markRecoveryAsDone(recoveryId);
                        }

                        @Override
                        public void onFailure(Exception e) {
                            final var cause = ExceptionsHelper.unwrapCause(e);
                            final var sendShardFailure =
                                // these indicate the source shard has already failed, which will independently notify the master and fail
                                // the target shard
                                false == (cause instanceof ShardNotFoundException
                                    || cause instanceof IndexNotFoundException
                                    || cause instanceof AlreadyClosedException);

                            // TODO retries? See RecoveryResponseHandler#handleException
                            onGoingRecoveries.failRecovery(
                                recoveryId,
                                new RecoveryFailedException(recoveryState, null, e),
                                sendShardFailure
                            );
                        }
                    }
                );
                return;
            }
        }

        record StartRecoveryRequestToSend(StartRecoveryRequest startRecoveryRequest, String actionName, TransportRequest requestToSend) {}
        final ActionListener<StartRecoveryRequestToSend> toSendListener = cleanupOnly.map(r -> {
            logger.trace(
                "{} [{}]: recovery from {}",
                r.startRecoveryRequest().shardId(),
                r.actionName(),
                r.startRecoveryRequest().sourceNode()
            );
            transportService.sendRequest(
                r.startRecoveryRequest().sourceNode(),
                r.actionName(),
                r.requestToSend(),
                new RecoveryResponseHandler(r.startRecoveryRequest(), timer)
            );
            return null;
        });

        if (preExistingRequest == null) {
            SubscribableListener
                // run pre-recovery activities
                .newForked(indexShard::preRecovery)
                // recover the shard as far as possible based on data held locally
                .<Long>andThen((l, v) -> {
                    logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
                    indexShard.prepareForIndexRecovery();
                    if (indexShard.indexSettings().getIndexMetadata().isSearchableSnapshot()) {
                        // for searchable snapshots, peer recovery is treated similarly to recovery from snapshot
                        indexShard.getIndexEventListener().afterFilesRestoredFromRepository(indexShard);
                        final Store store = indexShard.store();
                        store.incRef();
                        try {
                            StoreRecovery.bootstrap(indexShard, store);
                        } finally {
                            store.decRef();
                        }
                    }
                    indexShard.recoverLocallyUpToGlobalCheckpoint(ActionListener.assertOnce(l));
                })
                // now construct the start-recovery request
                .andThenApply(startingSeqNo -> {
                    assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG
                        : "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
                    final var startRequest = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);
                    return new StartRecoveryRequestToSend(startRequest, PeerRecoverySourceService.Actions.START_RECOVERY, startRequest);
                })
                // finally send the start-recovery request
                .addListener(toSendListener);
        } else {
            toSendListener.onResponse(
                new StartRecoveryRequestToSend(
                    preExistingRequest,
                    PeerRecoverySourceService.Actions.REESTABLISH_RECOVERY,
                    new ReestablishRecoveryRequest(recoveryId, preExistingRequest.shardId(), preExistingRequest.targetAllocationId())
                )
            );
        }
    }

recovery的数据来源于哪里?RecoverySourceHandler.recoverToTarget

假设由你来实现,会如何做?

  • index & translog flush到磁盘
  • 生成本地copy - snapshot - 耗时操作
  • 依照request的offset将snapshot发送到remote节点 - 耗时操作,应减少sendfile的bytes
  • append Updates 处理snapshot之后的增量

看看Elasticsearch是如何做的?

  1. 判断是否SeqNoBased恢复 - primary local checkpoint <= request.startingNo

    • Local checkpoint primary commit&持久化的checkpoint
    • 此时request请求者的本地offset 大于primary local checkpoint
 final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
            && isTargetSameHistory()
            && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo())
            && ((retentionLease == null && shard.useRetentionLeasesInPeerRecovery() == false)
                || (retentionLease != null && retentionLease.retainingSequenceNumber() <= request.startingSeqNo()));
// InternalEngine.java
public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) {
        return getMinRetainedSeqNo() <= startingSeqNo;
    }
/*
             * This policy retains operations for two purposes: peer-recovery and querying changes history.
             *  - Peer-recovery is driven by the local checkpoint of the safe commit. In peer-recovery, the primary transfers a safe commit,
             *    then sends operations after the local checkpoint of that commit. This requires keeping all ops after
             *    localCheckpointOfSafeCommit.
             *  - Changes APIs are driven by a combination of the global checkpoint, retention operations, and retention leases. Here we
             *    prefer using the global checkpoint instead of the maximum sequence number because only operations up to the global
             *    checkpoint are exposed in the changes APIs.
             */
public final long getMinRetainedSeqNo() {
        return softDeletesPolicy.getMinRetainedSeqNo();
    }
  1. 如果是 SeqNoBased恢复 ,无须执行phase1;否则 执行snapshot & phase1

  2. 执行snaphot,详细不展开,index 本地的safecommit snapshots

        final Engine.IndexCommitRef safeCommitRef;
            try {
                safeCommitRef = acquireSafeCommit(shard);
                resources.add(safeCommitRef);
            } catch (final Exception e) {
                throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
            }
        /**
     * Snapshots the most recent safe index commit from the currently running engine.
     * All index files referenced by this index commit won't be freed until the commit/snapshot is closed.
     */
    public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException {
        final IndexShardState state = this.state; // one time volatile read
        // we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
        if (state == IndexShardState.STARTED || state == IndexShardState.CLOSED) {
            return getEngine().acquireSafeIndexCommit();
        } else {
            throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
        }
    }
    
  3. Phase1 处理Index文件Sync

    • 计算offset:estimateNumOps = estimateNumberOfHistoryOperations(startingSeqNo) 从startingSeqNo开始计算所有的操作

    • 如果两个分片(source/target) 拥有相同的syncId,则无需sendfile

    • 否则发送offset内容

      • 计算两个分片的diff,生成 ShardRecoveryPlan
      • 根据ShardRecoveryPlan 发送内容
 /**
     * Perform phase1 of the recovery operations. Once this {@link IndexCommit}
     * snapshot has been performed no commit operations (files being fsync'd)
     * are effectively allowed on this index until all recovery phases are done
     * <p>
     * Phase1 examines the segment files on the target node and copies over the
     * segments that are missing. Only segments that have the same size and
     * checksum can be reused
     */
    void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener<SendFileResult> listener) {
        
    }
  1. prepareTarget For Translog

    • 准备好负责处理recovery的engine(targetNode)

    • 基于startingSeqNo,生成phase2 translog snapshot

      • LuceneChangesSnapshot 创建一个新的来自 Lucene 的 "translog" 快照
      void prepareTargetForTranslog(int totalTranslogOps, ActionListener<TimeValue> listener) {
          StopWatch stopWatch = new StopWatch().start();
          final ActionListener<Void> wrappedListener = ActionListener.wrap(nullVal -> {
              stopWatch.stop();
              final TimeValue tookTime = stopWatch.totalTime();
              logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime);
              listener.onResponse(tookTime);
          }, e -> listener.onFailure(new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e)));
          // Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
          // garbage collection (not the JVM's GC!) of tombstone deletes.
          logger.trace("recovery [phase1]: prepare remote engine for translog");
          cancellableThreads.checkForCancel();
          recoveryTarget.prepareForTranslogOperations(totalTranslogOps, wrappedListener);
      }
      // 生成phase2 snapshot
      final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot(
                          "peer-recovery",
                          startingSeqNo,
                          Long.MAX_VALUE,
                          false,
                          false,
                          true
                      );
      
  2. Phase2

    • 使用当前 translog 的快照(translog 快照是 translog 的时间点视图)。然后,它将每个 translog 操作发送到目标节点,以便这些操作可以在新的分片中重放
      final OperationBatchSender sender = new OperationBatchSender(
             startingSeqNo,
             endingSeqNo,
             snapshot,
             maxSeenAutoIdTimestamp,
             maxSeqNoOfUpdatesOrDeletes,
             retentionLeases,
             mappingVersion,
             sendListener
         );
         sendListener.addListener(listener.delegateFailureAndWrap((delegate, ignored) -> {
             final long skippedOps = sender.skippedOps.get();
             final int totalSentOps = sender.sentOps.get();
             final long targetLocalCheckpoint = sender.targetLocalCheckpoint.get();
             assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps + totalSentOps
                 : String.format(
                     Locale.ROOT,
                     "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
                     snapshot.totalOperations(),
                     snapshot.skippedOperations(),
                     skippedOps,
                     totalSentOps
                 );
             stopWatch.stop();
             final TimeValue tookTime = stopWatch.totalTime();
             logger.trace("recovery [phase2]: took [{}]", tookTime);
             delegate.onResponse(new SendSnapshotResult(targetLocalCheckpoint, totalSentOps, tookTime));
         }));
         sender.start();
    
  3. finalize Recovery target 节点trim所有序列号 >= startingSeqNo 的操作,因为我们已经在第二阶段发送了所有这些操作

没有见到如何处理append updates的过程,ES如何处理?

  • phase1,执行了IndexCommit 快照,在所有恢复阶段完成之前,该索引上实际上不允许进行任何提交操作(文件被 fsync),此期间block write操作
  • phase2,target 分片是可以同步处理新的write请求

这里面就引出了另一个问题,recovery的数据是旧的,重放操作数据不一致如何避免?

lucene index放入了version

根据版本决定新旧,解决了👆的问题

参考

源码 8.13

ChatGPT