6.源码分析—和dubbo相比SOFARPC是如何实现负载均衡的?

官方目前建议使用的负载均衡包括以下几种:

  1. random(随机算法)
  2. localPref(本地优先算法)
  3. roundRobin(轮询算法)
  4. consistentHash(一致性hash算法)

所以我们接下来分析以下以上四种负载均衡的源码是怎样的。

随机算法

我们先看一下SOFARPC的源码实现:

@Override
public ProviderInfo doSelect(SofaRequest invocation, List<ProviderInfo> providerInfos) {
    ProviderInfo providerInfo = null;
    int size = providerInfos.size(); // 总个数
    int totalWeight = 0; // 总权重
    boolean isWeightSame = true; // 权重是否都一样
    for (int i = 0; i < size; i++) {
        int weight = getWeight(providerInfos.get(i));
        totalWeight += weight; // 累计总权重
        if (isWeightSame && i > 0 && weight != getWeight(providerInfos.get(i - 1))) {
            isWeightSame = false; // 计算所有权重是否一样
        }
    }
    if (totalWeight > 0 && !isWeightSame) {
        // 如果权重不相同且权重大于0则按总权重数随机
        int offset = random.nextInt(totalWeight);
        // 并确定随机值落在哪个片断上
        for (int i = 0; i < size; i++) {
            offset -= getWeight(providerInfos.get(i));
            if (offset < 0) {
                providerInfo = providerInfos.get(i);
                break;
            }
        }
    } else {
        // 如果权重相同或权重为0则均等随机
        providerInfo = providerInfos.get(random.nextInt(size));
    }
    return providerInfo;
}

上面主要做了几件事:

  1. 获取所有的provider
  2. 遍历provier,如果当前的provider的权重和上一个provider的权重不一样,那么就做个标记
  3. 如果权重不相同那么就随机取一个0到总权重之间的值,遍历provider去减随机数,如果减到小于0,那么就返回那个provider
  4. 如果没有权重相同,那么用随机函数取一个provider

我们再来看看dubbo是怎么实现的:

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    int length = invokers.size(); // Number of invokers
    boolean sameWeight = true; // Every invoker has the same weight?
    int firstWeight = getWeight(invokers.get(0), invocation);
    int totalWeight = firstWeight; // The sum of weights
    for (int i = 1; i < length; i++) {
        int weight = getWeight(invokers.get(i), invocation);
        totalWeight += weight; // Sum
        if (sameWeight && weight != firstWeight) {
            sameWeight = false;
        }
    }
    if (totalWeight > 0 && !sameWeight) {
        // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
        int offset = ThreadLocalRandom.current().nextInt(totalWeight);
        // Return a invoker based on the random value.
        for (int i = 0; i < length; i++) {
            offset -= getWeight(invokers.get(i), invocation);
            if (offset < 0) {
                return invokers.get(i);
            }
        }
    }
    // If all invokers have the same weight value or totalWeight=0, return evenly.
    return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
  1. 获取invoker的数量
  2. 获取第一个invoker的权重,并复制给firstWeight
  3. 循环invoker集合,把它们的权重全部相加,并复制给totalWeight,如果权重不相等,那么sameWeight为false
  4. 如果invoker集合的权重并不是全部相等的,那么获取一个随机数在1到totalWeight之间,赋值给offset属性
  5. 循环遍历invoker集合,获取权重并与offset相减,当offset减到小于零,那么就返回这个inovker
  6. 如果权重相等,那么直接在invoker集合里面取一个随机数返回

从上面我们可以看到,基本上SOFARPC和dubbo的负载均衡实现是一致的。

本地优先算法

在负载均衡时使用保持本机优先。这个相信大家也比较好理解。在所有的可选地址中,找到本机发布的地址,然后进行调用。

@Override
public ProviderInfo doSelect(SofaRequest invocation, List<ProviderInfo> providerInfos) {
    String localhost = SystemInfo.getLocalHost();
    if (StringUtils.isEmpty(localhost)) {
        return super.doSelect(invocation, providerInfos);
    }
    List<ProviderInfo> localProviderInfo = new ArrayList<ProviderInfo>();
    for (ProviderInfo providerInfo : providerInfos) { // 解析IP,看是否和本地一致
        if (localhost.equals(providerInfo.getHost())) {
            localProviderInfo.add(providerInfo);
        }
    }
    if (CommonUtils.isNotEmpty(localProviderInfo)) { // 命中本机的服务端
        return super.doSelect(invocation, localProviderInfo);
    } else { // 没有命中本机上的服务端
        return super.doSelect(invocation, providerInfos);
    }
}
  1. 查看本机的host,如果为空,那么直接调用父类随机算法
  2. 遍历所有的provider,如果服务提供方的host和服务调用方的host一致,那么保存到集合里
  3. 如果存在服务提供方的host和服务调用方的host一致,那么就在这些集合中选取
  4. 如果不一致,那么就在所有provider中选取

轮询算法

我们首先来看看SOFARPC的轮训是怎么实现的:

private final ConcurrentMap<String, PositiveAtomicCounter> sequences = new ConcurrentHashMap<String, PositiveAtomicCounter>();

@Override
public ProviderInfo doSelect(SofaRequest request, List<ProviderInfo> providerInfos) {
    String key = getServiceKey(request); // 每个方法级自己轮询,互不影响
    int length = providerInfos.size(); // 总个数
    PositiveAtomicCounter sequence = sequences.get(key);
    if (sequence == null) {
        sequences.putIfAbsent(key, new PositiveAtomicCounter());
        sequence = sequences.get(key);
    }
    return providerInfos.get(sequence.getAndIncrement() % length);
}

private String getServiceKey(SofaRequest request) {
    StringBuilder builder = new StringBuilder();
    builder.append(request.getTargetAppName()).append("#")
        .append(request.getMethodName());
    return builder.toString();
}

从上面的代码我们可以看出,SOFARPC的轮询做的很直接简单。就是new了一个map,然后把每个服务的服务名拼上方法名存到map里面,然后每次value值自增1对provider取模。

我们再看dubbo的实现方式:

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
    ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
    if (map == null) {
        methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
        map = methodWeightMap.get(key);
    }
    int totalWeight = 0;
    long maxCurrent = Long.MIN_VALUE;
    long now = System.currentTimeMillis();
    Invoker<T> selectedInvoker = null;
    WeightedRoundRobin selectedWRR = null;
    for (Invoker<T> invoker : invokers) {
        String identifyString = invoker.getUrl().toIdentityString();
        WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
        int weight = getWeight(invoker, invocation);
        if (weight < 0) {
            weight = 0;
        }
        if (weightedRoundRobin == null) {
            weightedRoundRobin = new WeightedRoundRobin();
            weightedRoundRobin.setWeight(weight);
            map.putIfAbsent(identifyString, weightedRoundRobin);
            weightedRoundRobin = map.get(identifyString);
        }
        if (weight != weightedRoundRobin.getWeight()) {
            //weight changed
            weightedRoundRobin.setWeight(weight);
        }
        long cur = weightedRoundRobin.increaseCurrent();
        weightedRoundRobin.setLastUpdate(now);
        if (cur > maxCurrent) {
            maxCurrent = cur;
            selectedInvoker = invoker;
            selectedWRR = weightedRoundRobin;
        }
        totalWeight += weight;
    }
    if (!updateLock.get() && invokers.size() != map.size()) {
        if (updateLock.compareAndSet(false, true)) {
            try {
                // copy -> modify -> update reference
                ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
                newMap.putAll(map);
                Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<String, WeightedRoundRobin> item = it.next();
                    if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
                        it.remove();
                    }
                }
                methodWeightMap.put(key, newMap);
            } finally {
                updateLock.set(false);
            }
        }
    }
    if (selectedInvoker != null) {
        selectedWRR.sel(totalWeight);
        return selectedInvoker;
    }
    // should not happen here
    return invokers.get(0);
}

dubbo的轮询的实现里面还加入了权重在里面,sofarpc的权重轮询是放到另外一个类当中去做的,因为性能太差了而被弃用了。

我们举个例子来简单看一下dubbo的加权轮询是怎么做的:

假定有3台dubbo provider:

10.0.0.1:20884, weight=2
10.0.0.1:20886, weight=3
10.0.0.1:20888, weight=4

totalWeight=9;

那么第一次调用的时候:
10.0.0.1:20884, weight=2    selectedWRR -> current = 2
10.0.0.1:20886, weight=3    selectedWRR -> current = 3
10.0.0.1:20888, weight=4    selectedWRR -> current = 4

selectedInvoker-> 10.0.0.1:20888 
调用 selectedWRR.sel(totalWeight); 
10.0.0.1:20888, weight=4    selectedWRR -> current = -5
返回10.0.0.1:20888这个实例

那么第二次调用的时候:
10.0.0.1:20884, weight=2    selectedWRR -> current = 4
10.0.0.1:20886, weight=3    selectedWRR -> current = 6
10.0.0.1:20888, weight=4    selectedWRR -> current = -1

selectedInvoker-> 10.0.0.1:20886 
调用 selectedWRR.sel(totalWeight); 
10.0.0.1:20886 , weight=4   selectedWRR -> current = -3
返回10.0.0.1:20886这个实例

那么第三次调用的时候:
10.0.0.1:20884, weight=2    selectedWRR -> current = 6
10.0.0.1:20886, weight=3    selectedWRR -> current = 0
10.0.0.1:20888, weight=4    selectedWRR -> current = 3

selectedInvoker-> 10.0.0.1:20884
调用 selectedWRR.sel(totalWeight); 
10.0.0.1:20884, weight=2    selectedWRR -> current = -3
返回10.0.0.1:20884这个实例

一致性hash算法

在SOFARPC中有两种方式实现一致性hash算法,一种是带权重的一种是不带权重的,我对比了一下,两边的代码基本上是一样的,所以我直接分析带权重的代码就好了。

下面我们来分析一下代码:

private final ConcurrentHashMap<String, Selector> selectorCache = new ConcurrentHashMap<String, Selector>();

@Override
public ProviderInfo doSelect(SofaRequest request, List<ProviderInfo> providerInfos) {
    String interfaceId = request.getInterfaceName();
    String method = request.getMethodName();
    String key = interfaceId + "#" + method;
    // 判断是否同样的服务列表
    int hashcode = providerInfos.hashCode();
    Selector selector = selectorCache.get(key);
    // 原来没有
    if (selector == null ||
        // 或者服务列表已经变化
        selector.getHashCode() != hashcode) {
        selector = new Selector(interfaceId, method, providerInfos, hashcode);
        selectorCache.put(key, selector);
    }
    return selector.select(request);
} 

上面的doSelect方法就是获取到相同服务的Selector,如果没有就新建一个。Selector是WeightConsistentHashLoadBalancer里面的内部类,我们接下来看看这个内部类的实现。


public Selector(String interfaceId, String method, List<ProviderInfo> actualNodes, int hashcode) {
    this.interfaceId = interfaceId;
    this.method = method;
    this.hashcode = hashcode;
    // 创建虚拟节点环 (provider创建虚拟节点数 =  真实节点权重 * 32)
    this.virtualNodes = new TreeMap<Long, ProviderInfo>();
    // 设置越大越慢,精度越高
    int num = 32;
    for (ProviderInfo providerInfo : actualNodes) {
        for (int i = 0; i < num * providerInfo.getWeight() / 4; i++) {
            byte[] digest = HashUtils.messageDigest(providerInfo.getHost() + providerInfo.getPort() + i);
            for (int h = 0; h < 4; h++) {
                long m = HashUtils.hash(digest, h);
                virtualNodes.put(m, providerInfo);
            }
        }
    }
}

Selector内部类中就是构建了一个TreeMap实例,然后遍历所有的provider,每个provider虚拟的节点数是(真实节点权重 * 32)个。

虚拟好节点后,我们直接调用Selector#select方法在hash环中得到相应的provider。

public ProviderInfo select(SofaRequest request) {
    String key = buildKeyOfHash(request.getMethodArgs());
    byte[] digest = HashUtils.messageDigest(key);
    return selectForKey(HashUtils.hash(digest, 0));
}

/**
 * 获取第一参数作为hash的key
 *
 * @param args the args
 * @return the string
 */
private String buildKeyOfHash(Object[] args) {
    if (CommonUtils.isEmpty(args)) {
        return StringUtils.EMPTY;
    } else {
        return StringUtils.toString(args[0]);
    }
}

/**
 * Select for key.
 *
 * @param hash the hash
 * @return the provider
 */
private ProviderInfo selectForKey(long hash) {
    Map.Entry<Long, ProviderInfo> entry = virtualNodes.ceilingEntry(hash);
    if (entry == null) {
        entry = virtualNodes.firstEntry();
    }
    return entry.getValue();
}

这上面主要是获取第一参数作为hash的key,然后对它进行hash。所以我感觉这里可能有一个问题就是如果一个某个服务里面很多个参数一样的服务,那么是不是都会打到那同一台机器上呢?

dubbo的实现方式也和SOFARPC类似,这里不再赘述。