Elasticsearch 源码 - Index Shard Rebalancing

本系列的目标:

  • Elasticsearch 索引是如何分片?
  • Elasticsearch 索引持久化机制?
  • Elasticsearch 索引如何故障恢复?
  • Elasticsearch 索引shard rebalancing?**

源码:8.13

Shard Rebalancing

触发时机

类似于shard allocation

入口函数

balancer.balance()如下

    public void allocate(RoutingAllocation allocation) {
        assert allocation.ignoreDisable() == false;

        if (allocation.routingNodes().size() == 0) {
            failAllocationOfNewPrimaries(allocation);
            return;
        }
        final WeightFunction weightFunction = new WeightFunction(
            indexBalanceFactor,
            shardBalanceFactor,
            writeLoadBalanceFactor,
            diskUsageBalanceFactor
        );
        final Balancer balancer = new Balancer(writeLoadForecaster, allocation, weightFunction, threshold);
        balancer.allocateUnassigned();
        balancer.moveShards();
        balancer.balance();
    }
//balancer.balance() => balanceByWeights()
/**
         * Balances the nodes on the cluster model according to the weight
         * function. The configured threshold is the minimum delta between the
         * weight of the maximum node and the minimum node according to the
         * {@link WeightFunction}. This weight is calculated per index to
         * distribute shards evenly per index. The balancer tries to relocate
         * shards only if the delta exceeds the threshold. In the default case
         * the threshold is set to {@code 1.0} to enforce gaining relocation
         * only, or in other words relocations that move the weight delta closer
         * to {@code 0.0}
         */
        private void balanceByWeights() {
            final AllocationDeciders deciders = allocation.deciders();
            final ModelNode[] modelNodes = sorter.modelNodes;
            final float[] weights = sorter.weights;
            for (String index : buildWeightOrderedIndices()) {
                IndexMetadata indexMetadata = metadata.index(index);

                // find nodes that have a shard of this index or where shards of this index are allowed to be allocated to,
                // move these nodes to the front of modelNodes so that we can only balance based on these nodes
                int relevantNodes = 0;
                for (int i = 0; i < modelNodes.length; i++) {
                    ModelNode modelNode = modelNodes[i];
                    if (modelNode.getIndex(index) != null
                        || deciders.canAllocate(indexMetadata, modelNode.getRoutingNode(), allocation).type() != Type.NO) {
                        // swap nodes at position i and relevantNodes
                        modelNodes[i] = modelNodes[relevantNodes];
                        modelNodes[relevantNodes] = modelNode;
                        relevantNodes++;
                    }
                }

                if (relevantNodes < 2) {
                    continue;
                }

                sorter.reset(index, 0, relevantNodes);
                int lowIdx = 0;
                int highIdx = relevantNodes - 1;
                final float localThreshold = sorter.minWeightDelta() * threshold;
                while (true) {
                    final ModelNode minNode = modelNodes[lowIdx];
                    final ModelNode maxNode = modelNodes[highIdx];
                    advance_range: if (maxNode.numShards(index) > 0) {
                        final float delta = absDelta(weights[lowIdx], weights[highIdx]);
                        if (lessThan(delta, localThreshold)) {
                            if (lowIdx > 0
                                && highIdx - 1 > 0 // is there a chance for a higher delta?
                                && (absDelta(weights[0], weights[highIdx - 1]) > localThreshold) // check if we need to break at all
                            ) {
                                /* This is a special case if allocations from the "heaviest" to the "lighter" nodes is not possible
                                 * due to some allocation decider restrictions like zone awareness. if one zone has for instance
                                 * less nodes than another zone. so one zone is horribly overloaded from a balanced perspective but we
                                 * can't move to the "lighter" shards since otherwise the zone would go over capacity.
                                 *
                                 * This break jumps straight to the condition below were we start moving from the high index towards
                                 * the low index to shrink the window we are considering for balance from the other direction.
                                 * (check shrinking the window from MAX to MIN)
                                 * See #3580
                                 */
                                break advance_range;
                            }
                            if (logger.isTraceEnabled()) {
                                logger.trace(
                                    "Stop balancing index [{}]  min_node [{}] weight: [{}] max_node [{}] weight: [{}] delta: [{}]",
                                    index,
                                    maxNode.getNodeId(),
                                    weights[highIdx],
                                    minNode.getNodeId(),
                                    weights[lowIdx],
                                    delta
                                );
                            }
                            break;
                        }
                        if (logger.isTraceEnabled()) {
                            logger.trace(
                                "Balancing from node [{}] weight: [{}] to node [{}] weight: [{}] delta: [{}]",
                                maxNode.getNodeId(),
                                weights[highIdx],
                                minNode.getNodeId(),
                                weights[lowIdx],
                                delta
                            );
                        }
                        if (delta <= localThreshold) {
                            /*
                             * prevent relocations that only swap the weights of the two nodes. a relocation must bring us closer to the
                             * balance if we only achieve the same delta the relocation is useless
                             *
                             * NB this comment above was preserved from an earlier version but doesn't obviously describe the code today. We
                             * already know that lessThan(delta, threshold) == false and threshold defaults to 1.0, so by default we never
                             * hit this case anyway.
                             */
                            logger.trace(
                                "Couldn't find shard to relocate from node [{}] to node [{}]",
                                maxNode.getNodeId(),
                                minNode.getNodeId()
                            );
                        } else if (tryRelocateShard(minNode, maxNode, index)) {
                            /*
                             * TODO we could be a bit smarter here, we don't need to fully sort necessarily
                             * we could just find the place to insert linearly but the win might be minor
                             * compared to the added complexity
                             */
                            weights[lowIdx] = sorter.weight(modelNodes[lowIdx]);
                            weights[highIdx] = sorter.weight(modelNodes[highIdx]);
                            sorter.sort(0, relevantNodes);
                            lowIdx = 0;
                            highIdx = relevantNodes - 1;
                            continue;
                        }
                    }
                    if (lowIdx < highIdx - 1) {
                        /* Shrinking the window from MIN to MAX
                         * we can't move from any shard from the min node lets move on to the next node
                         * and see if the threshold still holds. We either don't have any shard of this
                         * index on this node of allocation deciders prevent any relocation.*/
                        lowIdx++;
                    } else if (lowIdx > 0) {
                        /* Shrinking the window from MAX to MIN
                         * now we go max to min since obviously we can't move anything to the max node
                         * lets pick the next highest */
                        lowIdx = 0;
                        highIdx--;
                    } else {
                        /* we are done here, we either can't relocate anymore or we are balanced */
                        break;
                    }
                }
            }
        }
private String[] buildWeightOrderedIndices() {
            final String[] indices = allocation.routingTable().indicesRouting().keySet().toArray(String[]::new);
            final float[] deltas = new float[indices.length];
            for (int i = 0; i < deltas.length; i++) {
                sorter.reset(indices[i]);
                deltas[i] = sorter.delta();
            }
            new IntroSorter() {

                float pivotWeight;

                @Override
                protected void swap(int i, int j) {
                    final String tmpIdx = indices[i];
                    indices[i] = indices[j];
                    indices[j] = tmpIdx;
                    final float tmpDelta = deltas[i];
                    deltas[i] = deltas[j];
                    deltas[j] = tmpDelta;
                }

                @Override
                protected int compare(int i, int j) {
                    return Float.compare(deltas[j], deltas[i]);
                }

                @Override
                protected void setPivot(int i) {
                    pivotWeight = deltas[i];
                }

                @Override
                protected int comparePivot(int j) {
                    return Float.compare(deltas[j], pivotWeight);
                }
            }.sort(0, deltas.length);

            return indices;
        }

根据权重函数对集群模型中的节点进行平衡

  • 给定某节点上的当前分配,权重函数会为节点上的分片提供权重。具有高权重值的节点较之低权重的节点,不适合放置分片。比较不同节点上分片的权重,我们可以确定重新定位是否能够改善整体权重分布

  • 为作出重新均衡决策,Elasticsearch 要计算每个节点上每个索引的权重(buildWeightOrderedIndices函数),以及索引的最小和最大可能权重之间的差值 ( sorter.delta()👇代码)(这可以在索引级别完成,因为索引中的每个分片在 Elasticsearch 中都是平等处理的) 然后,按照最不均衡索引最先处理的顺序处理索引(comparePivot权重差异大的排在前面)

    public float delta() {
              return weights[weights.length - 1] - weights[0];
          }
    protected int comparePivot(int j) {
                      return Float.compare(deltas[j], pivotWeight);
                  }
              }.sort(0, deltas.length);
    
  • 分片移动是一项繁重的操作。Elasticsearch 会在重新均衡之前和之后对分片权重进行建模;仅当操作可实现权重的更均衡分布时才会重新定位分片canRebalance / canAllocate

  • 超过某个阈值后,移动分片的成本开始超过权重均衡的益处。在 Elasticsearch 中,此阈值当前是固定值,可通过动态设定 cluster.routing.allocation.balance.threshold 来配置。当索引的计算权重增量 – 跨节点的最小和最大权重之间的差值 – 小于该阈值时,该索引被认为是均衡的,

         final ModelNode minNode = modelNodes[lowIdx];
         final ModelNode maxNode = modelNodes[highIdx];
         final float localThreshold = sorter.minWeightDelta() * threshold;// 节点本地阈值计算,minWeightDelta留在 参数配置里介绍,详细见👇
         final float delta = absDelta(weights[lowIdx], weights[highIdx]);// min & max差异
         if (delta <= localThreshold) {
                            /*
                             * prevent relocations that only swap the weights of the two nodes. a relocation must bring us closer to the
                             * balance if we only achieve the same delta the relocation is useless
                             *
                             * NB this comment above was preserved from an earlier version but doesn't obviously describe the code today. We
                             * already know that lessThan(delta, threshold) == false and threshold defaults to 1.0, so by default we never
                             * hit this case anyway.
                             */
                            logger.trace(
                                "Couldn't find shard to relocate from node [{}] to node [{}]",
                                maxNode.getNodeId(),
                                minNode.getNodeId()
                            );
                        } else if (tryRelocateShard(minNode, maxNode, index)) {
                            //do rebalance
                        }
    

算法介绍完(),下面是整个rebalance的过程抽象:

  • 平衡节点权重:根据每个节点的权重,尝试通过分片重新分配来平衡负载

  • 阈值判断:仅当最大节点和最小节点的权重差异超过预设阈值时才进行分片重新分配

    • 计算最大节点和最小节点之间的权重差异

    • 如果差异小于阈值,不进行重分配

    • 尝试重新分配分片,如果成功,更新权重并重新排序

      • tryRelocateShard(minNode, maxNode, index)

        • maxNode.removeShard(shard)

          public void removeShard(ShardRouting shard) {
              ModelIndex index = indices.get(shard.getIndexName());
              if (index != null) {
                  index.removeShard(shard);
                  if (index.numShards() == 0) {
                      indices.remove(shard.getIndexName());
                  }
              }
              IndexMetadata indexMetadata = metadata.index(shard.index());
              writeLoad -= writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0);
              diskUsageInBytes -= Balancer.getShardDiskUsageInBytes(shard, indexMetadata, clusterInfo);
              numShards--;
          }
          
        • minNode.addShard(routingNodes.relocateShard(...))

          public void addShard(ShardRouting shard) {
              indices.computeIfAbsent(shard.getIndexName(), t -> new ModelIndex()).addShard(shard);
              IndexMetadata indexMetadata = metadata.index(shard.index());
              writeLoad += writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0);
              diskUsageInBytes += Balancer.getShardDiskUsageInBytes(shard, indexMetadata, clusterInfo);
              numShards++;
          }
              /**
           * Relocate a shard to another node, adding the target initializing
          * shard as well as assigning it.
          *
          * @return pair of source relocating and target initializing shards.
          */
          public Tuple<ShardRouting, ShardRouting> relocateShard(
              ShardRouting startedShard,
              String nodeId,
              long expectedShardSize,
              RoutingChangesObserver changes
          ) {
              ensureMutable();
              relocatingShards++;
              ShardRouting source = startedShard.relocate(nodeId, expectedShardSize);
              ShardRouting target = source.getTargetRelocatingShard();
              updateAssigned(startedShard, source);
              node(target.currentNodeId()).add(target);
              assignedShardsAdd(target);
              addRecovery(target);
              changes.relocationStarted(startedShard, target);
              return Tuple.tuple(source, target);
          }
          
          • 最终执行relocate的操作通过一组Observer完成
           private final RoutingChangesObserver routingChangesObserver = new RoutingChangesObserver.DelegatingRoutingChangesObserver(
          nodesChangedObserver,
          indexMetadataUpdater,
          restoreInProgressUpdater,
          resizeSourceIndexUpdater
          );
          

参数配置

  • cluster.routing.allocation.balance.index 0.55f
  • cluster.routing.allocation.balance.shard 0.45f
  • cluster.routing.allocation.balance.write_load 10.0f
  • cluster.routing.allocation.balance.disk_usage 2e-11f
  • cluster.routing.allocation.balance.threshold 1.0f

以上参数统统进入weight计算

节点本地阈值计算: localThreshold = sorter.minWeightDelta() * threshold; 详细见下面代码,minWeightDelta相对比较简单

 final WeightFunction weightFunction = new WeightFunction(
            indexBalanceFactor,
            shardBalanceFactor,
            writeLoadBalanceFactor,
            diskUsageBalanceFactor
        );
private static class WeightFunction {

        private final float theta0;
        private final float theta1;
        private final float theta2;
        private final float theta3;

        WeightFunction(float indexBalance, float shardBalance, float writeLoadBalance, float diskUsageBalance) {
            float sum = indexBalance + shardBalance + writeLoadBalance + diskUsageBalance;
            if (sum <= 0.0f) {
                throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
            }
            theta0 = shardBalance / sum;
            theta1 = indexBalance / sum;
            theta2 = writeLoadBalance / sum;
            theta3 = diskUsageBalance / sum;
        }

        float weight(Balancer balancer, ModelNode node, String index) {
            final float weightShard = node.numShards() - balancer.avgShardsPerNode();
            final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index);
            final float ingestLoad = (float) (node.writeLoad() - balancer.avgWriteLoadPerNode());
            final float diskUsage = (float) (node.diskUsageInBytes() - balancer.avgDiskUsageInBytesPerNode());
            return theta0 * weightShard + theta1 * weightIndex + theta2 * ingestLoad + theta3 * diskUsage;
        }

        float minWeightDelta(Balancer balancer, String index) {
            return theta0 * 1 + theta1 * 1 + theta2 * balancer.getShardWriteLoad(index) + theta3 * balancer.maxShardSizeBytes(index);
        }
    }

总结

ES给出了一个rebance的策略,结合节点weight、磁盘写入、reques 写入量,index & shard权重,来决定最终的策略执行

值得学习

参考

源码 8.13

ChatGPT