Dubbo负载均衡之加权轮训


最近有个需求,需要对mq集群调用时做个权重分配,之前客户端对mq发送时都是普通轮询的策略,由于近期扩容,集群中部分服务器的性能不太一致,需要在mq发送时的策略做成带权重的轮询,让性能好的服务器处理更多的请求,也保护性能相对较差的服务器不被大流量压垮。实现此功能时参考了dubbo框架中的负载均衡轮询算法,在此做下笔记。

问题

在dubbo中,负载均衡的粒度可以细到方法级别。如图,在服务器A,服务器B和服务器C都提供hello()方法,消费者调用方法时需要选择其中一台服务器进行调用,当消费者对hello()进行大量调用时,如何将请求合理地分配到各服务器中,就是我们本文要探讨的问题。

006tNbRwly1fw96wkok1yj30b50bmjrh

解决

解决方法有很多,在dubbo中默认提供了四种负载均衡策略(随机、轮询、最少活跃调用数、一致性Hash),这里我们只讨论轮询策略。

无权重轮询

无权重的轮询非常简单,如上面的例子,只需依次循环调用各服务器就行。调用顺序如下:

[A, B, C, A, B, C, ...]

带权重轮询

一般来说,无权重轮询也够用了,但是如果A、B、C者三台服务器的性能不一样(假设A>B>C),我们希望更多的请求落在A服务器上,这时候该如何做呢?

我们假设权重比为A:B:C=6:3:1,有10个调用请求,那么比较直接想到的调用顺序如下:

[A, A, A, A, A, A, B, B, B, C]

请求按上面的顺序循环,的确是满足了按一定比例的分配,但是这样分配不太好,在一小段时间内,单台服务器连续处理多个请求会导致负载偏高,我们期望请求即要按权重分配,也要均匀地调用。

我们用以下表格来表示,表格的高度表示权重,对应列分别表示服务器A、B、C。均匀的调用,即从左向右,从上到下遍历表格,得到调用顺序为如下:

[A, B, C, A, B, A, B, A, A, A]

006tNbRwly1fw95hs8w5rj305k08wa9w

到目前dubbo最新的版本2.6.4为止,RoundRobinLoadBalance采用的算法如下:

  • 用sequences来记录每个method的调用次数
  • 计算出最大权重值maxWeight,即算出上图表格的高度
  • 计算出最小权重值minWeight,用来与maxWeight比较判断是否有权重差
  • 计算出权重和weightSum,用来与调用次数做计算取余
  • mod为当前调用次数与权重和的余数,即表格上的数标
  • 得到数标值也就确定了目标服务器
  • 源码的双层for循环就是遍历表格,得到目标服务器
  • 每次遍历没有命中时,v.decrement()是为了跳过表格中灰色部分
  • 每次遍历没用命中时,mod减1可理解为从表格数标0位置开始往下走了一步
  • 当mod == 0且v.getValue() > 0时,表示走到指定的位置,即选中的服务器

啰嗦了一点,其实算法时比较简单的,可以结合以下源码和表格理解。

public class RoundRobinLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "roundrobin";

    private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        int length = invokers.size(); // Number of invokers
        int maxWeight = 0; // The maximum weight
        int minWeight = Integer.MAX_VALUE; // The minimum weight
        final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
        int weightSum = 0;
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
            minWeight = Math.min(minWeight, weight); // Choose the minimum weight
            if (weight > 0) {
                invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
                weightSum += weight;
            }
        }
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }
        int currentSequence = sequence.getAndIncrement();
        if (maxWeight > 0 && minWeight < maxWeight) {
            int mod = currentSequence % weightSum;
            for (int i = 0; i < maxWeight; i++) {
                for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
                    final Invoker<T> k = each.getKey();
                    final IntegerWrapper v = each.getValue();
                    if (mod == 0 && v.getValue() > 0) {
                        return k;
                    }
                    if (v.getValue() > 0) {
                        v.decrement();
                        mod--;
                    }
                }
            }
        }
        // Round robin
        return invokers.get(currentSequence % length);
    }

    private static final class IntegerWrapper {
        private int value;

        public IntegerWrapper(int value) {
            this.value = value;
        }

        public int getValue() {
            return value;
        }

        public void setValue(int value) {
            this.value = value;
        }

        public void decrement() {
            this.value--;
        }
    }

}

带权重轮询(优化版)

本以为上面的轮询算法就可以直接拿来借鉴,但是发现dubbo最近有个提交是优化该算法的,只是还没有并入迭代的版本,处于未正式发布状态。我们来看看相关的issue。

问题发起的issue:https://github.com/apache/incubator-dubbo/issues/2578

接受PR的issue:https://github.com/apache/incubator-dubbo/pull/2586

旧算法的时间复杂度是O(n*w),新算法的时间复杂度为O(n)。n为invokers数,w为weight。

在极端情况下,个别weight特别大时,会导致旧算法的性能较差。

还是用表格来表示,在新的算法中,10次请求得到的顺序,如数标所示,即:

[A, B, A, B, A, A, A, A, B, C]

由于轮训该顺序会不断的重复,其实与前面算法的结果是一样的,只是把前三位挪到了后面而已。

006tNbRwly1fw96wop4i4j30ac0atmx6

新版RoundRobinLoadBalance的算法基于原来的做了改良,特点如下:

  • 新增indexSeqs,与sequences集合使用。sequences意义与原来不同。
  • 算法中的index由indexSeqs与invokers的数量取余得到,可理解为表格的横坐标
  • 算法中的currentWeight由sequeces与maxWeight取余得到,可理解为表格的纵坐标
  • 用while单层遍历,每次判断当前元素的权重是否大于currentWeight,true即选中
  • 图中的数标即为遍历选中的顺序
public class RoundRobinLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "roundrobin";

    private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

    private final ConcurrentMap<String, AtomicPositiveInteger> indexSeqs = new ConcurrentHashMap<String, AtomicPositiveInteger>();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        int length = invokers.size(); // Number of invokers
        int maxWeight = 0; // The maximum weight
        int minWeight = Integer.MAX_VALUE; // The minimum weight
        final List<Invoker<T>> nonZeroWeightedInvokers = new ArrayList<>();
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
            minWeight = Math.min(minWeight, weight); // Choose the minimum weight
            if (weight > 0) {
                nonZeroWeightedInvokers.add(invokers.get(i));
            }
        }
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }

        if (maxWeight > 0 && minWeight < maxWeight) {
            AtomicPositiveInteger indexSeq = indexSeqs.get(key);
            if (indexSeq == null) {
                indexSeqs.putIfAbsent(key, new AtomicPositiveInteger(-1));
                indexSeq = indexSeqs.get(key);
            }
            length = nonZeroWeightedInvokers.size();
            while (true) {
                int index = indexSeq.incrementAndGet() % length;
                int currentWeight;
                if (index == 0) {
                    currentWeight = sequence.incrementAndGet() % maxWeight;
                } else {
                    currentWeight = sequence.get() % maxWeight;
                }
                if (getWeight(nonZeroWeightedInvokers.get(index), invocation) > currentWeight) {
                    return nonZeroWeightedInvokers.get(index);
                }
            }
        }
        // Round robin
        return invokers.get(sequence.getAndIncrement() % length);
    }
}

性能测试

这里简单提供下我本地测试的数据,我模拟了100w次请求,造了10个invoker,权重分别为:

[100,100,200,200,300,300,400,400,500,50000]

故意让最后一个权重设置大一点,得到结果如下:

优化前耗时:548445 mills
请求分布:[2000,2000,4000,4000,6000,6000,7925,7925,9825,950325]

优化后耗时:1665 millis
请求分布:[1999,1999,3999,3999,5999,5999,7927,7927,9826,950326]

在以上条件,耗时竟相差330倍左右,看来这个优化后续很快就会纳入正式版中。

由于篇幅原因,就不贴测试代码与机器配置了,这里只是提供一些测试报告辅助说明。

总结

原本只是为了了解dubbo负载均衡轮询带权重的实现,却意外收获了该算法的优化。我们经常听说时间换空间,空间换时间,这里便是典型的优化案例,利用少量的空间换取时间,将时间复杂度从O(n*w)降到O(n)。

在以后的工作中,要多考虑性能上的优化,一般多重循环都有优化空间。