使用 CompletableFuture 异步组装数据
最近在比赛一个项目 , 是给Dubbo写一个负载均衡接口 , 其实dubbo已经实现了下面四种, 所以他做的不是这个单面负载均衡, 需要做双向负载均衡 , 负载均衡的权重取决于服务端,所以有些时候我们不知道如何计算权重, 权重受到很多因素影响 ,所以就需要动态考虑了.
Dubbo 提供了4种负载均衡实现,分别是基于权重随机算法的 RandomLoadBalance、基于最少活跃调用数算法的 LeastActiveLoadBalance、基于 hash 一致性的 ConsistentHashLoadBalance,以及基于加权轮询算法的 RoundRobinLoadBalance。
1. RandomLoadBalance
RandomLoadBalance 是加权随机算法的具体实现,它的算法思想很简单。假设我们有一组服务器 servers = [A, B, C],他们对应的权重为 weights = [5, 3, 2],权重总和为10。现在把这些权重值平铺在一维坐标值上,[0, 5) 区间属于服务器 A,[5, 8) 区间属于服务器 B,[8, 10) 区间属于服务器 C。接下来通过随机数生成器生成一个范围在 [0, 10) 之间的随机数,然后计算这个随机数会落到哪个区间上。比如数字3会落到服务器 A 对应的区间上,此时返回服务器 A 即可。权重越大的机器,在坐标轴上对应的区间范围就越大,因此随机数生成器生成的数字就会有更大的概率落到此区间内。只要随机数生成器产生的随机数分布性很好,在经过多次选择后,每个服务器被选中的次数比例接近其权重比例。比如,经过一万次选择后,服务器 A 被选中的次数大约为5000次,服务器 B 被选中的次数约为3000次,服务器 C 被选中的次数约为2000次。
private <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int len = invokers.size();
int[] arr = new int[len];
int count = 0;
int totalWight = 0;
for (Invoker<T> invoker : invokers) {
int wight = invoker.getUrl().getParameter(org.apache.dubbo.common.Constants.WEIGHT_KEY, 1);
arr[count++] = wight;
totalWight += wight;
}
// 随机偏移量
int offset = ThreadLocalRandom.current().nextInt(totalWight);
for (int i = 0; i < arr.length; i++) {
// 比如 [2 , 3 , 5] , offset=7 , 此时 offset-2=5 , offset-3=2 , offset-5=-3 ,此时当他为负数时,说明就在这个区间内, 我们是左闭右开[0,2) ,[2,5) ,[5,10)
offset -= arr[i];
// 此时已经小于0了, 说明在这个区间内
if (offset < 0) {
// 返回就行了
return invokers.get(i);
}
}
return ...
}
2. LeastActiveLoadBalance
LeastActiveLoadBalance 翻译过来是最小活跃数负载均衡。活跃调用数越小,表明该服务提供者效率越高,单位时间内可处理更多的请求。此时应优先将请求分配给该服务提供者。在具体实现中,每个服务提供者对应一个活跃数 active。初始情况下,所有服务提供者活跃数均为0。每收到一个请求,活跃数加1,完成请求后则将活跃数减1。在服务运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求、这就是最小活跃数负载均衡算法的基本思想。除了最小活跃数,LeastActiveLoadBalance 在实现上还引入了权重值。所以准确的来说,LeastActiveLoadBalance 是基于加权最小活跃数算法实现的。举个例子说明一下,在一个服务提供者集群中,有两个性能优异的服务提供者。某一时刻它们的活跃数相同,此时 Dubbo 会根据它们的权重去分配请求,权重越大,获取到新请求的概率就越大。如果两个服务提供者权重相同,此时随机选择一个即可。
这个需要我们去维护一个最小连接数的计算, 配合加权 ,当连接数相同的时候,选择加权分最高的 …
3. ConsistentHashLoadBalance
一致性 hash 算法由麻省理工学院的 Karger 及其合作者于1997年提出的,算法提出之初是用于大规模缓存系统的负载均衡。它的工作过程是这样的,首先根据 ip 或者其他的信息为缓存节点生成一个 hash,并将这个 hash 投射到 [0, 232 – 1] 的圆环上。当有查询或写入请求时,则为缓存项的 key 生成一个 hash 值。然后查找第一个大于或等于该 hash 值的缓存节点,并到这个节点中查询或写入缓存项。如果当前节点挂了,则在下一次查询或写入缓存时,为缓存项查找另一个大于其 hash 值的缓存节点即可。
这个主要是靠hash算法 , 通过hash % 服务器数量 = 服务器索引
Java中可以使用这个 :
int identityHashCode = System.identityHashCode(invokers);
来获取
4. RoundRobinLoadBalance
加权轮询负载均衡的实现 RoundRobinLoadBalance , 我选择的就是这种
这里从最简单的轮询开始讲起,所谓轮询是指将请求轮流分配给每台服务器。举个例子,我们有三台服务器 A、B、C。我们将第一个请求分配给服务器 A,第二个请求分配给服务器 B,第三个请求分配给服务器 C,第四个请求再次分配给服务器 A。这个过程就叫做轮询。轮询是一种无状态负载均衡算法,实现简单,适用于每台服务器性能相近的场景下。但现实情况下,我们并不能保证每台服务器性能均相近。如果我们将等量的请求分配给性能较差的服务器,这显然是不合理的。
Java基础系列1:深入理解Java数据类型
因此,这个时候我们需要对轮询过程进行加权,以调控每台服务器的负载。经过加权后,每台服务器能够得到的请求数比例,接近或等于他们的权重比。比如服务器 A、B、C 权重比为 5:2:1。那么在8次请求中,服务器 A 将收到其中的5次请求,服务器 B 会收到其中的2次请求,服务器 C 则收到其中的1次请求。
下面有个表格 , 默认权重是 [5,1,1]
请求编号 | currentWeight 数组 | 选择结果 | 减去权重总和后的 currentWeight 数组 |
---|---|---|---|
1 | [5, 1, 1] | A | [-2, 1, 1] |
2 | [3, 2, 2] | A | [-4, 2, 2] |
3 | [1, 3, 3] | B | [1, -4, 3] |
4 | [6, -3, 4] | A | [-1, -3, 4] |
5 | [4, -2, 5] | C | [4, -2, -2] |
6 | [9, -1, -1] | A | [2, -1, -1] |
7 | [7, 0, 0] | A | [0, 0, 0] |
此时 7 次中A节点被选中的次数是 5 , B 是1 ,C是1 ,所以符合我们的需求
计算方法如下 , 首先有三个变量 , 记录了 currentWeight , effectiveWeight , totalWeight
private class Node {
private int currentWeight; // 当前权重
private int effectiveWeight; // 有效权重,初始化的时候等于当前权重
private int totalWeight; // 总权重
}
我们先看初始化 , 初始化时 ,计算权重
比如我们知道权重了 A : 5 , B : 1 , C : 1
初始化节点
A : new Node(5, 5, 7)
B : new Node(1, 1, 7)
C : new Node(1, 1, 7)
此时currentWeight的和是 : 7
当第一次的时候 , A节点权重最大 ,此时 5-7=-2, A节点变成了 Node(-2, 5, 7)
A : new Node(-2, 5, 7)
B : new Node(1, 1, 7)
C : new Node(1, 1, 7)
此时currentWeight的和是 : 0
然后重新回归, 回归需要currentWeight= currentWeight+effectiveWeight
A : new Node(3, 5, 7)
B : new Node(2, 1, 7)
C : new Node(2, 1, 7)
此时currentWeight的和是 : 7 // 所以又回来了 .. ..
怎样做呢 ?
此时我们用一个 PriorityQueue<Pair<String, Node>>
维护所有节点的信息 , 同时使用 Pair<String, Node>
维护单个节点
1. 初始化队列
PriorityBlockingQueue<Pair<String, Node>> weightQueue = new PriorityBlockingQueue<>(3, (o1, o2) -> o2.getValue().getCurrentWeight() - o1.getValue().getCurrentWeight());
2. 遍历放入权重
weightQueue.add(new Pair<>(key, new Node(currentWeight, effectiveWeight, totalWeight)));
3. 当放入以后, 此时就可以拿到一个当前权重最大的节点 , 如果不想使用优先队列, 可以自己实现一个大顶堆 ,很简单的.
// 获取当前节点权重最大的 ,
Pair<String, Node> hPair = weightQueue.take();
// 此时hPair节点的值应当是 5 - 7 = -2
int afterCurrentWeight = value.getCurrentWeight() - value.getTotalWeight();
// 设置值
hNode.setCurrentWeight(afterCurrentWeight);
// 遍历
for (Pair<String, Node> stringNodePair : weightQueue) {
//获取节点
Node node = stringNodePair.getValue();
// 剩余每个节点值 + 有效值
int after = node.getCurrentWeight() + node.getEffectiveWeight();
// 设置节点值
node.setCurrentWeight(after);
}
// 由于我们拿出来的节点还没有 重新计算, 还要计算
hNode.setCurrentWeight(value.getCurrentWeight() + value.getEffectiveWeight());
// 重新放入节点. ... ,重新排序
weightQueue.add(hPair);
其实自己维护的话可以进行 heapfy的 , 我们只能依赖JDK提供的数据结构进行的这种取巧方式
Linux系统实时数据同步inotify+rsync