SpringCloud通过Ribbon实现客户端负载均衡,通过在 RestTemplate
Bean上注解 LoadBalance
实现
/**
* Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient
* @author Spencer Gibb
*/
public @interface LoadBalanced {
}
通过上面的注解我们找到LoadBalancerClient
这个类
这是SpringCloud定义的一个接口

从
LoadBalancerClient
这个类的所属包中我们可以看到
整理出如下关系,

从该类的解释
Auto configuration for Ribbon (client side load balancing).
可以看到LoadBalancerAutoConfiguration
为自动化配置Ribbon客户端负载均衡的类
LoadBalancerInterceptor
拦截器会调用LoadBalancerClient
的reconstructURI
方法转变成可以发送的实际URI(将服务名替换成Host:Port格式的请求)
我们继续找到LoadBalancerClient
的实现类RibbonLoadBalancerClient

查看
RibbonLoadBalancerClient
的源码可以看到
执行总共有分下面几步:
1. 得到客户端负载均衡器
2. 得到Server,并封装成RibbonServer
3. 回调LoadBalancerRequest
执行请求 request.apply
1. 得到客户端负载均衡器
ILoadBalancer
是选择哪个客户端负载均衡器的接口
具体实现有:

ILoadBalancer (com.netflix.loadbalancer)
AbstractLoadBalancer (com.netflix.loadbalancer)
NoOpLoadBalancer (com.netflix.loadbalancer)
BaseLoadBalancer (com.netflix.loadbalancer)
DynamicServerListLoadBalancer (com.netflix.loadbalancer)
ZoneAwareLoadBalancer (com.netflix.loadbalancer)
可以看到都是来自netflix 原生的代码,默认的Ribbon配置的是哪个LoadBalancer呢?
通过RibbonClientConfiguration
我们可以看到
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping) {
ZoneAwareLoadBalancer<Server> balancer = LoadBalancerBuilder.newBuilder()
.withClientConfig(config).withRule(rule).withPing(ping)
.withServerListFilter(serverListFilter).withDynamicServerList(serverList)
.buildDynamicServerListLoadBalancer();
return balancer;
}
Ribbon默认是 配置ZoneAwareLoadBalancer
这个负载均衡器
2. 得到Server,并封装成RibbonServer
继续分析 RibbonLoadBalancerClient.execute
根据loadBalancer得到Server ,将Server封装成RibbonServer
RibbonServer 比Server多了一些信息,并且是一个 ServiceInstance
实例
protected static class RibbonServer implements ServiceInstance {
private final String serviceId;
private final Server server;
private final boolean secure;
private Map<String, String> metadata;
3. 回调LoadBalancerRequest
执行请求 request.apply
在执行Request.apply的时候会执行 LoadBalancerInterceptor中定义的方法如下

其中
ServiceRequestWrapper
里有个方法通过调用负载均衡reconstructURI
的方法实现了服务名替换URI的功能
@Override
public URI getURI() {
URI uri = LoadBalancerInterceptor.this.loadBalancer.reconstructURI(
this.instance, getRequest().getURI());
return uri;
}
那 execution.execute(serviceRequest, body);
在哪里执行的呢?
查看其实现类可以发现是在InterceptingClientHttpRequest
这个类里面的内部类InterceptingRequestExecution implements ClientHttpRequestExecution
源码如下:
@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
else {
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), request.getMethod());
delegate.getHeaders().putAll(request.getHeaders());
if (body.length > 0) {
StreamUtils.copy(body, delegate.getBody());
}
return delegate.execute();
}
}
其中 requestFactory.createRequest(request.getURI()
的request.getURI() 会调用ServiceRequestWrapper
的getURI
方法,getURI又是调用RibbonLoadBalancerClient.reconstructURI
方法
负载均衡器
ILoadBalancer (com.netflix.loadbalancer)
netflix原生的接口
AbstractLoadBalancer (com.netflix.loadbalancer)
定义了 ServerGroup
枚举类
和另外两个方法
NoOpLoadBalancer (com.netflix.loadbalancer)
啥都是空的
BaseLoadBalancer (com.netflix.loadbalancer)
这个是负载均衡的基础实现类
- 定义了两个易变的变量以维护服务器清单,一个是所有的,一个是可用的
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
.synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
.synchronizedList(new ArrayList<Server>());
- 定义了用来存储负载均衡器各服务实例属性和统计信息的
LoadBalancerStats
对象
protected LoadBalancerStats lbStats;
- 定义了心跳检测的相关变量,IPing是要构造的时候传入,IPingStrategy
默认是用SerialPingStrategy 实现类,
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
protected IPing ping = null;
SerialPingStrategy 代码如下,注意其中的注释说明,当IPing速度不理想时应该怎么办...
/**
* Default implementation for <c>IPingStrategy</c>, performs ping
* serially, which may not be desirable, if your <c>IPing</c>
* implementation is slow, or you have large number of servers.
*/
private static class SerialPingStrategy implements IPingStrategy {
@Override
public boolean[] pingServers(IPing ping, Server[] servers) {
int numCandidates = servers.length;
boolean[] results = new boolean[numCandidates];
if (logger.isDebugEnabled()) {
logger.debug("LoadBalancer: PingTask executing ["
+ numCandidates + "] servers configured");
}
for (int i = 0; i < numCandidates; i++) {
results[i] = false; /* Default answer is DEAD. */
try {
// NOTE: IFF we were doing a real ping
// assuming we had a large set of servers (say 15)
// the logic below will run them serially
// hence taking 15 times the amount of time it takes
// to ping each server
// A better method would be to put this in an executor
// pool
// But, at the time of this writing, we dont REALLY
// use a Real Ping (its mostly in memory eureka call)
// hence we can afford to simplify this design and run
// this
// serially
if (ping != null) {
results[i] = ping.isAlive(servers[i]);
}
} catch (Throwable t) {
logger.error("Exception while pinging Server:"
+ servers[i], t);
}
}
return results;
}
}
- 定义了负载均衡的处理规则IRule对象
protected IRule rule = DEFAULT_RULE;
默认是用RoundRobinRule
,实现了线性负载均衡规则
private final static IRule DEFAULT_RULE = new RoundRobinRule();
- 启动Ping任务,默认每隔十秒对Server做一次健康检查
image.png
protected int pingIntervalSeconds = 10;
-
markServerDown
标记一个Server暂停服务
DynamicServerListLoadBalancer (com.netflix.loadbalancer)
对 BaseLoadBalancer
的扩展,实现了实例清单在运行期动态更新的能力,还具备对服务清单的过滤功能
- ServerList
image.png
image.png
默认配置的是DomainExtractingServerList
,从EurekaRibbonClientConfiguration
中可得到
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config) {
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
config);
DomainExtractingServerList serverList = new DomainExtractingServerList(
discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}
DiscoveryEnabledNIWSServerList
-
ServerListUpdater
获取服务清单以及更新本地服务清单
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
protected volatile ServerListUpdater serverListUpdater;

ServerListFilter

ZoneAwareLoadBalancer (com.netflix.loadbalancer)
解决了多区域部署时,DynamicServerListLoadBalancer 可能会产生的性能问题,因为跨区域会产生更高的延迟
public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {
负载均衡策略
IRule 的实现类有

RandomRule
随机取一个实例,如果取不到可能会有BUG
RoundRobinRule
线性轮询的方式去,超过10次取不到就报警告退出
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextServerCyclicCounter.get();
int next = (current + 1) % modulo;
if (nextServerCyclicCounter.compareAndSet(current, next))
return next;
}
}
RetryRule
该类内部定义了个IRule对象,如下
public class RetryRule extends AbstractLoadBalancerRule {
IRule subRule = new RoundRobinRule();
long maxRetryMillis = 500;
默认是用 RoundRobinRule
public Server choose(ILoadBalancer lb, Object key) {
long requestTime = System.currentTimeMillis();
long deadline = requestTime + maxRetryMillis;
Server answer = null;
answer = subRule.choose(key);
if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
InterruptTask task = new InterruptTask(deadline
- System.currentTimeMillis());
while (!Thread.interrupted()) {
answer = subRule.choose(key);
if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
/* pause and retry hoping it's transient */
Thread.yield();
} else {
break;
}
}
task.cancel();
}
if ((answer == null) || (!answer.isAlive())) {
return null;
} else {
return answer;
}
}
网友评论