Elasticsearch 源码 - Index Persistence

本系列的目标:

  • Elasticsearch 索引是如何分片?
  • Elasticsearch 索引持久化机制? **
  • Elasticsearch 索引如何故障恢复?
  • Elasticsearch 索引shard rebalancing?

源码:8.13

索引持久化机制

InternalEngine 核心类

主要成员变量

  • translog 负责持久化记录所有写操作,以便在节点崩溃后进行恢复
  • mergeScheduler 管理和调度Lucene索引的合并操作
  • indexWriter Lucene的IndexWriter,用于执行索引写操作
  • externalReaderManager / internalReaderManager 管理索引的外部和内部读操作,确保在读取数据时获得一致性视图
  • flushLock/optimizeLock 用于控制flush(刷新)和optimize(优化)操作的锁,确保这些操作是线程安全的
  • versionMap 管理实时的文档版本信息,确保在进行实时搜索时数据的一致性
  • lastUnsafeSegmentGenerationForGets/preCommitSegmentGeneration 跟踪在执行commit操作时的segment生成信息,确保实时获取操作的安全性
  • lastCommittedSegmentInfos 记录上次commit操作时的segment信息
  • throttle 限流
  • localCheckpointTracker 跟踪本地的检查点信息,用于确保数据的持久性和一致性
  • flushListener 监听flush操作的事件
  • translogSyncProcessor 处理translog同步
  • inFlightDocCount 跟踪正在添加到IndexWriter中的文档数
  • ......

InternalEngine类是Elasticsearch的核心部分,负责管理索引和搜索操作的底层实现。它通过各种锁、计数器、状态跟踪和策略组件,确保数据的可靠性、一致性和高效性

索引写入方法:index

ADD 操作

  • APPEND ONLY

    • 减少磁盘写入带来的latency
    • 实现简单,减少重试写冲突,减少了lock使用
    • autoGeneratedIdTimestamp。时间戳在文档生成时设置,并且在事务日志中保留。这样可以确保在重试时能够检测到重复文档
    • 在网络中断或其他问题导致文档重试时,Elasticsearch使用时间戳来建立一个先后关系(happens before relationship)。如果新文档的时间戳大于当前记录的最大时间戳(maxUnsafeAutoIdTimestamp),则执行添加操作,否则执行更新操作以确保一致性
public IndexResult index(Index index) throws IOException {
        assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();
        final boolean doThrottle = index.origin().isRecovery() == false;
        try (var ignored1 = acquireEnsureOpenRef()) {
            assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
            int reservedDocs = 0;
            try (
                Releasable ignored = versionMap.acquireLock(index.uid().bytes());
                Releasable indexThrottle = doThrottle ? throttle.acquireThrottle() : () -> {}
            ) {
                lastWriteNanos = index.startTime();
                ......
                final IndexingStrategy plan = indexingStrategyForOperation(index);
                reservedDocs = plan.reservedDocs;

                final IndexResult indexResult;
                if (plan.earlyResultOnPreFlightError.isPresent()) {
                    assert index.origin() == Operation.Origin.PRIMARY : index.origin();
                    indexResult = plan.earlyResultOnPreFlightError.get();
                    assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
                } else {
                    // generate or register sequence number
                    if (index.origin() == Operation.Origin.PRIMARY) {
                        index = new Index(
                            index.uid(),
                            index.parsedDoc(),
                            generateSeqNoForOperationOnPrimary(index),
                            index.primaryTerm(),
                            index.version(),
                            index.versionType(),
                            index.origin(),
                            index.startTime(),
                            index.getAutoGeneratedIdTimestamp(),
                            index.isRetry(),
                            index.getIfSeqNo(),
                            index.getIfPrimaryTerm()
                        );

                        final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
                        if (toAppend == false) {
                            advanceMaxSeqNoOfUpdatesOnPrimary(index.seqNo());
                        }
                    } else {
                        markSeqNoAsSeen(index.seqNo());
                    }

                    assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin();

                    if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
                        indexResult = indexIntoLucene(index, plan);
                    } else {
                        indexResult = new IndexResult(
                            plan.versionForIndexing,
                            index.primaryTerm(),
                            index.seqNo(),
                            plan.currentNotFoundOrDeleted,
                            index.id()
                        );
                    }
                }
                if (index.origin().isFromTranslog() == false) {
                    final Translog.Location location;
                    if (indexResult.getResultType() == Result.Type.SUCCESS) {
                        location = translog.add(new Translog.Index(index, indexResult));
                    } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
                        // if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no
                        final NoOp noOp = new NoOp(
                            indexResult.getSeqNo(),
                            index.primaryTerm(),
                            index.origin(),
                            index.startTime(),
                            indexResult.getFailure().toString()
                        );
                        location = innerNoOp(noOp).getTranslogLocation();
                    } else {
                        location = null;
                    }
                    indexResult.setTranslogLocation(location);
                }
                if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) {
                    final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
                    versionMap.maybePutIndexUnderLock(
                        index.uid().bytes(),
                        new IndexVersionValue(translogLocation, plan.versionForIndexing, index.seqNo(), index.primaryTerm())
                    );
                }
                localCheckpointTracker.markSeqNoAsProcessed(indexResult.getSeqNo());
                if (indexResult.getTranslogLocation() == null) {
                    // the op is coming from the translog (and is hence persisted already) or it does not have a sequence number
                    assert index.origin().isFromTranslog() || indexResult.getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO;
                    localCheckpointTracker.markSeqNoAsPersisted(indexResult.getSeqNo());
                }
                indexResult.setTook(relativeTimeInNanosSupplier.getAsLong() - index.startTime());
                indexResult.freeze();
                return indexResult;
            } finally {
                releaseInFlightDocs(reservedDocs);
            }
        } catch (RuntimeException | IOException e) {
            try {
                if (e instanceof AlreadyClosedException == false && treatDocumentFailureAsTragicError(index)) {
                    failEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
                } else {
                    maybeFailEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
                }
            } catch (Exception inner) {
                e.addSuppressed(inner);
            }
            throw e;
        }
    }

持久化过程

  • 先写入 Lucene的Segment
  • 再写入 Translog

持久性

  • Lucene的Segment 异步保证,默认刷新时间1s
  • translog 同步保证,recover不会丢失
  • 集群中primary 与 replica 多副本存储,保证数据冗余

一致性

  • write-ack机制

    • 当写操作完成时,Elasticsearch会等待Segment和translog都写入成功后才返回确认给客户端,详细代码见👆
  • Lucene的Segment 写入成功,translog失败,异常,由调用者进行重试,一致性有保障

  • Lucene的Segment 写入成功,写入translog成功,断电/故障导致segment内存丢失

    • 当 Elasticsearch 节点启动/重启的时候, 它会从磁盘中使用最后一个提交点去恢复已知的段,并且会重放 translog 中所有在最后一次提交后发生的变更操作

    • 详细见👇代码

      • RecoveryTranslogOperationsRequest
      • PeerRecoveryTargetService.performTranslogOps
      • RecoveryTarget.indexTranslogOperations
  • lucene 异步flush机制,在多Node上难以保证同步,所以这时一致性保证较弱,最终一致性

容错性

  • 单shard,translog 同步存储,在节点重启或崩溃后,Elasticsearch会重放translog中的操作,以确保所有未被持久化的变更都能被恢复
  • 集群,primary 与 replica 多副本冗余数据,保证一定程度的容错能力

传统RDS/No-SQL实现正好相反,先写commit log (WHL),用于故障恢复和一致性保证

why?

Elasticsearch如此处理是基于性能考虑

  • 先写入Lucene的Segment时,lucene 优先写入内存,执行比较快(异步refresh segment)
  • 后写translog需要同步I/O操作,落盘持久化,延时相对上一步较大。这样做还有一个好处是,写入lucene成功,表明数据合法,之后写入translog减少无效数据存储,同时也减少了recovery阶段的判断
  • segment 批量flush 磁盘,减少I/O操作

而Elasticsearch在搜索方面是NRT,不保证实时search,符合产品设计初衷

RecoveryTranslogOperationsRequest(StreamInput in) throws IOException {
        super(in);
        operations = Translog.readOperations(in, "recovery");
        totalTranslogOps = in.readVInt();
        maxSeenAutoIdTimestampOnPrimary = in.readZLong();
        maxSeqNoOfUpdatesOrDeletesOnPrimary = in.readZLong();
        retentionLeases = new RetentionLeases(in);
        mappingVersionOnPrimary = in.readVLong();
    }

//PeerRecoveryTargetService.java
 private class TranslogOperationsRequestHandler extends RecoveryRequestHandler<RecoveryTranslogOperationsRequest> {

        @Override
        protected void handleRequest(
            RecoveryTranslogOperationsRequest request,
            RecoveryTarget recoveryTarget,
            ActionListener<Void> listener
        ) {
            performTranslogOps(request, listener, recoveryTarget);
        }
         private void performTranslogOps(
            final RecoveryTranslogOperationsRequest request,
            final ActionListener<Void> listener,
            final RecoveryTarget recoveryTarget
        ) {
            ......
            recoveryTarget.indexTranslogOperations(
                request.operations(),
                request.totalTranslogOps(),
                request.maxSeenAutoIdTimestampOnPrimary(),
                request.maxSeqNoOfUpdatesOrDeletesOnPrimary(),
                request.retentionLeases(),
                request.mappingVersionOnPrimary(),
                ActionListener.wrap(checkpoint -> listener.onResponse(null), e -> {
                    // do not retry if the mapping on replica is at least as recent as the mapping
                    // that the primary used to index the operations in the request.
                    if (mappingVersionOnTarget < request.mappingVersionOnPrimary() && e instanceof MapperException) {
                        retryOnMappingException.accept(e);
                    } else {
                        listener.onFailure(e);
                    }
                })
            );
            ......
        }
 }

 public void indexTranslogOperations(
        final List<Translog.Operation> operations,
        final int totalTranslogOps,
        final long maxSeenAutoIdTimestampOnPrimary,
        final long maxSeqNoOfDeletesOrUpdatesOnPrimary,
        final RetentionLeases retentionLeases,
        final long mappingVersionOnPrimary,
        final ActionListener<Long> listener
    ) {
        ActionListener.completeWith(listener, () -> {
            final RecoveryState.Translog translog = state().getTranslog();
            translog.totalOperations(totalTranslogOps);
            assert indexShard().recoveryState() == state();
            if (indexShard().state() != IndexShardState.RECOVERING) {
                throw new IndexShardNotRecoveringException(shardId, indexShard().state());
            }
            /*
             * The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation
             * will be replayed. Bootstrapping this timestamp here will disable the optimization for original append-only requests
             * (source of these operations) replicated via replication. Without this step, we may have duplicate documents if we
             * replay these operations first (without timestamp), then optimize append-only requests (with timestamp).
             */
            indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary);
            /*
             * Bootstrap the max_seq_no_of_updates from the primary to make sure that the max_seq_no_of_updates on this replica when
             * replaying any of these operations will be at least the max_seq_no_of_updates on the primary when that op was executed on.
             */
            indexShard().advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfDeletesOrUpdatesOnPrimary);
            /*
             * We have to update the retention leases before we start applying translog operations to ensure we are retaining according to
             * the policy.
             */
            indexShard().updateRetentionLeasesOnReplica(retentionLeases);
            for (Translog.Operation operation : operations) {
                Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY);
                if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
                    throw new MapperException("mapping updates are not allowed [" + operation + "]");
                }
                if (result.getFailure() != null) {
                    if (Assertions.ENABLED && result.getFailure() instanceof MapperException == false) {
                        throw new AssertionError("unexpected failure while replicating translog entry", result.getFailure());
                    }
                    ExceptionsHelper.reThrowIfNotNull(result.getFailure());
                }
            }
            // update stats only after all operations completed (to ensure that mapping updates don't mess with stats)
            translog.incrementRecoveredOperations(operations.size());
            indexShard().sync();
            // roll over / flush / trim if needed
            indexShard().afterWriteOperation();
            return indexShard().getLocalCheckpoint();
        });
    }

Update/Delete Operation

  1. Update 先 delete 再更新doc

indexIntoLucene => updateDocs => IndexWriter.softUpdateDocuments

public long softUpdateDocuments(
      Term term, Iterable<? extends Iterable<? extends IndexableField>> docs, Field... softDeletes)
      throws IOException {
    if (term == null) {
      throw new IllegalArgumentException("term must not be null");
    }
    if (softDeletes == null || softDeletes.length == 0) {
      throw new IllegalArgumentException("at least one soft delete must be present");
    }
    return updateDocuments(
        DocumentsWriterDeleteQueue.newNode(buildDocValuesUpdate(term, softDeletes)), docs);
  }
  1. Version Map 作用
  • 使用Version Map来跟踪每个文档的版本信息。当文档被并发更新或删除时,系统会检查Version Map以确保操作是基于最新的版本。这可以防止“写入丢失”的问题,即较旧版本的文档覆盖较新版本的文档

index 函数内代码

if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) {
                    final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
                    versionMap.maybePutIndexUnderLock(
                        index.uid().bytes(),
                        new IndexVersionValue(translogLocation, plan.versionForIndexing, index.seqNo(), index.primaryTerm())
                    );
                }

VersionValue 重要attrs

    final long version;//counter
    /** the seq number of the operation that last changed the associated uuid */
    final long seqNo;
    /** the term of the operation that last changed the associated uuid */
    final long term;
  • 在处理实时Read请求(例如,实时的GET请求)时,Version Map允许Elasticsearch快速查找到文档的最新版本,而不需要从磁盘上读取索引文件。这提高了实时查找的性能

getVersionFromMap

protected GetResult realtimeGetUnderLock(
        Get get,
        MappingLookup mappingLookup,
        DocumentParser documentParser,
        Function<Engine.Searcher, Engine.Searcher> searcherWrapper,
        boolean getFromSearcher
    ) {
        assert isDrainedForClose() == false;
        assert get.realtime();
        final VersionValue versionValue;
        try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
            // we need to lock here to access the version map to do this truly in RT
            versionValue = getVersionFromMap(get.uid().bytes());
        }
        。。。。。。
    }
  • Version Map还用于管理“垃圾回收删除”(GC deletes),即那些在特定时间窗口后被删除的文档。在主节点上,删除操作会保留一个GC周期,以确保用户可以看到这些删除操作。在副本节点上,Version Map会根据本地检查点修剪过时的删除操作
public void writeIndexingBuffer() throws IOException {
    // 获取versionMap中可回收的内存字节数
    final long reclaimableVersionMapBytes = versionMap.reclaimableRefreshRamBytes();
    // 获取IndexWriter当前使用的内存字节数,减去正在刷新的内存字节数
    final long indexWriterBytesUsed = indexWriter.ramBytesUsed() - indexWriter.getFlushingBytes();

    // 如果versionMap中可回收的内存大于等于IndexWriter的内存使用量
    if (reclaimableVersionMapBytes >= indexWriterBytesUsed) {
        // 刷新以回收versionMap的内存
        reclaimVersionMapMemory();
    } else {
        // 否则,写入最大的待写入段
        indexWriter.flushNextBuffer();
    }
}

//LiveVersionMap.java
  /**
     * Returns how much RAM could be reclaimed from the version map.
     * <p>
     * In stateful, this is the RAM usage of the current version map, and could be reclaimed by refreshing. It doesn't include tombstones
     * since they don't get cleared on refresh, nor the old version map that is being reclaimed.
     * <p>
     * In stateless, this is the RAM usage of current and old version map plus the RAM usage of the parts of the archive that require
     * a new unpromotable refresh. To reclaim all three components we need to refresh AND flush.
     */
    long reclaimableRefreshRamBytes() {
        return archive == LiveVersionMapArchive.NOOP_ARCHIVE
            ? maps.current.ramBytesUsed.get()
            : maps.ramBytesUsed() + archive.getReclaimableRamBytes();
    }

Elasticsearch需要有效地管理内存,以确保在高负载下不会因为内存不足而导致性能下降或者崩溃。通过比较versionMap和IndexWriter的内存使用情况,可以决定是通过刷新versionMap来释放内存,还是将IndexWriter的缓冲区写入磁盘

Merge Operation

Merge是索引的一个重要操作,旨在将多个较小的段合并成一个较大的段,从而减少段的数量,提高查询效率,同时淘汰deleted状态的doc,减少文件数据冗余

  1. API触发

InternalEngine.java

 public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, final String forceMergeUUID)
        throws EngineException, IOException {
        if (onlyExpungeDeletes && maxNumSegments >= 0) {
            throw new IllegalArgumentException("only_expunge_deletes and max_num_segments are mutually exclusive");
        }
        /*
         * We do NOT acquire the readlock here since we are waiting on the merges to finish
         * that's fine since the IW.rollback should stop all the threads and trigger an IOException
         * causing us to fail the forceMerge
         */
        optimizeLock.lock();
        try {
            ensureOpen();
            store.incRef(); // increment the ref just to ensure nobody closes the store while we optimize
            try {
                if (onlyExpungeDeletes) {
                    indexWriter.forceMergeDeletes(true /* blocks and waits for merges*/);
                } else if (maxNumSegments <= 0) {
                    indexWriter.maybeMerge();
                } else {
                    indexWriter.forceMerge(maxNumSegments, true /* blocks and waits for merges*/);
                    this.forceMergeUUID = forceMergeUUID;
                }
                if (flush) {
                    // TODO: Migrate to using async apic
                    flush(false, true);

                    // If any merges happened then we need to release the unmerged input segments so they can be deleted. A periodic refresh
                    // will do this eventually unless the user has disabled refreshes or isn't searching this shard frequently, in which
                    // case we should do something here to ensure a timely refresh occurs. However there's no real need to defer it nor to
                    // have any should-we-actually-refresh-here logic: we're already doing an expensive force-merge operation at the user's
                    // request and therefore don't expect any further writes so we may as well do the final refresh immediately and get it
                    // out of the way.
                    refresh("force-merge");
                }
            } finally {
                store.decRef();
            }
        } catch (AlreadyClosedException ex) {
            /* in this case we first check if the engine is still open. If so this exception is just fine
             * and expected. We don't hold any locks while we block on forceMerge otherwise it would block
             * closing the engine as well. If we are not closed we pass it on to failOnTragicEvent which ensures
             * we are handling a tragic even exception here */
            ensureOpen(ex);
            failOnTragicEvent(ex);
            throw ex;
        } catch (Exception e) {
            try {
                maybeFailEngine("force merge", e);
            } catch (Exception inner) {
                e.addSuppressed(inner);
            }
            throw e;
        } finally {
            optimizeLock.unlock();
        }
    }
  1. 周期触发

ElasticsearchConcurrentMergeScheduler 对lucene ConcurrentMergeScheduler的扩展

//ElasticsearchConcurrentMergeScheduler.java
protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
    int totalNumDocs = merge.totalNumDocs();
    long totalSizeInBytes = merge.totalBytesSize();
    long timeNS = System.nanoTime();  // 获取当前时间,以纳秒为单位
    currentMerges.inc();
    currentMergesNumDocs.inc(totalNumDocs);
    currentMergesSizeInBytes.inc(totalSizeInBytes);

    OnGoingMerge onGoingMerge = new OnGoingMerge(merge);
    onGoingMerges.add(onGoingMerge);

    if (logger.isTraceEnabled()) {
        logger.trace(
            "merge [{}] starting..., merging [{}] segments, [{}] docs, [{}] size, into [{}] estimated_size",
            getSegmentName(merge),
            merge.segments.size(),
            totalNumDocs,
            ByteSizeValue.ofBytes(totalSizeInBytes),
            ByteSizeValue.ofBytes(merge.estimatedMergeBytes)
        );
    }
    try {
        beforeMerge(onGoingMerge);  // 在合并之前进行一些处理
        super.doMerge(mergeSource, merge);  // 调用父类的合并方法
    } finally {
        ......
    }

详细代码均在lucene里面,这里不再展开

Others

Translog 过大如何处理?

  • IndexShard.shouldRollTranslogGeneration() 根据indexSettings配置判断是否rollGeneration
  • Translog.rollGeneration() 生成一个新的log,ps. flush操作也会触发rollGeneration()
  • translog.trimUnreferencedReaders() 清理不再饮用的translog
  • afterWriteOperation方法会在replica/recover/flush阶段被触发
//IndexShard.java
/**
     * Schedules a flush or translog generation roll if needed but will not schedule more than one concurrently. The operation will be
     * executed asynchronously on the flush thread pool.
     */
public void afterWriteOperation() {
    ....
 if (shouldRollTranslogGeneration()) {
                    logger.debug("submitting async roll translog generation request");
                    final AbstractRunnable roll = new AbstractRunnable() {
                        @Override
                        public void onFailure(final Exception e) {
                            if (state != IndexShardState.CLOSED) {
                                logger.warn("failed to roll translog generation", e);
                            }
                        }

                        @Override
                        protected void doRun() {
                            rollTranslogGeneration();
                        }

                        @Override
                        public void onAfter() {
                            flushOrRollRunning.compareAndSet(true, false);
                            afterWriteOperation();
                        }
.....
}
//InternalEngine.java
/**
     * Tests whether or not the translog generation should be rolled to a new generation. This test
     * is based on the size of the current generation compared to the configured generation
     * threshold size.
     *
     * @return {@code true} if the current generation should be rolled to a new generation
     */
    public boolean shouldRollGeneration() {
        final long threshold = this.indexSettings.getGenerationThresholdSize().getBytes();
        try (ReleasableLock ignored = readLock.acquire()) {
            return this.current.sizeInBytes() > threshold;
        }
    }

 public void rollTranslogGeneration() throws EngineException {
        try (var ignored = acquireEnsureOpenRef()) {
            translog.rollGeneration();
            translog.trimUnreferencedReaders();
        } catch (AlreadyClosedException e) {
            failOnTragicEvent(e);
            throw e;
        } catch (Exception e) {
            try {
                failEngine("translog trimming failed", e);
            } catch (Exception inner) {
                e.addSuppressed(inner);
            }
            throw new EngineException(shardId, "failed to roll translog", e);
        }
    }
//Translog.java
 /**
     * Roll the current translog generation into a new generation if it's not empty. This does not commit the translog.
     *
     * @throws IOException if an I/O exception occurred during any file operations
     */
    public void rollGeneration() throws IOException {
        syncBeforeRollGeneration();
        if (current.totalOperations() == 0 && primaryTermSupplier.getAsLong() == current.getPrimaryTerm()) {
            return;
        }
        try (Releasable ignored = writeLock.acquire()) {
            ensureOpen();
            try {
                final TranslogReader reader = current.closeIntoReader();
                readers.add(reader);
                assert Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME)).generation == current.getGeneration();
                copyCheckpointTo(location.resolve(getCommitCheckpointFileName(current.getGeneration())));
                // create a new translog file; this will sync it and update the checkpoint data;
                current = createWriter(current.getGeneration() + 1);
                logger.trace("current translog set to [{}]", current.getGeneration());
            } catch (final Exception e) {
                tragedy.setTragicException(e);
                closeOnTragicEvent(e);
                throw e;
            }
        }
    }

总结

以上梳理了InternalEngine写入部分的主要逻辑

向ES学习如何包裹第三方框架,如lucene,如何添加可靠性保证translog,如何做commit checkpoint 管理等

顺便也了解下持久化 - I/O 耗时操作在哪里

参考

源码8.13

ChatGPT