Dubbo
Dubbo是有阿里巴巴开源的高性能RPC框架,主要用于分布式服务间调用。本文着重从下列这几方面Dubbo Provider的暴露过程 以及 Dubbo Consumer的引用过程,已经消费者侧的集群容错,以及Dubbo的线程模型分析。最后来个灵魂拷问你为什么使用dubbo。
Dubbo Provider的暴露过程
说到为何Dubbo Provider的暴露过程,首先我们观察dubbo的申明一个Provider,在使用Xml会直观的定义如下的配置:
<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />
<bean id="moodService" class="org.moodfly.dubbo.impl.MoodServiceImpl"/>
<dubbo:service interface="org.moodfly.dubbo.impl.MoodService" ref="moodService"/>
然后根据读取xml根据dubbo定义的schema,解析为一个URL,Dubbo 就是采用 URL 的方式来作为约定的参数类型,被称为公共契约,就是我们都通过 URL 来交互。OK现有这个了解在脑海,我们把上述的配置解析的结果模拟下,
dubbo://129.139.93.23:20881/org.moodfly.dubbo.impl.MoodService?anyhost=true&application=moodFly&bean.name=ServiceBean:org.moodfly.dubbo.impl.MoodService&bind.ip=129.139.93.23&bind.port=20880&channel.readonly.sent=true&codec=dubbo&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&heartbeat=60000&interface=org.moodfly.dubbo.impl.MoodService&methods=hello&pid=39604&qos.enable=false®ister=true&release=chd-2.7.3-v1.0.4&service.filter=dubboLogFilter&side=provider&threadpool=fixed&threads=50×tamp=1611385596266
在服务提供方的一宿重要信息如交互的协议 服务提供方的ip 端口,以及服务方的工作线程模式,全部按照规则append到这个URL中。在眼尖的同学肯定看到了一个ServiceBean
,那么根据我们构建完成的URL就要去暴露了。我以和SpringBoot集成的场景下介绍,我们在application.yml,配置dubbo.application.name
还有dubbo的协议之类的 配置完毕,在程序启动类添加@EnableDubbo注解之后,自动扫描针对dubbo的@Service的Java类,将其中的所有方法暴露出去。 把被
Dubbo@Service注解标记的类是如何变成
ServiceBean,这里有的同学可能不是很清楚,主要是给予Dubbo实现了一个BeanDefinition的后置处理器,也就是BeanDefinitionRegistryPostProcessor,用于把对应的注解解析为SpringIOC中的单例Bean。然而此时也并未实现暴露,继续找寻源码在ServiceBean的实现接口中有基于
ContextRefreshedEvent`事件的监听,找到了export(),也就是说dubbo是在上下文刷新完毕,也就是bean注册完毕之后进行的暴露。
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext() != this.applicationContext) {
return;
}
if (!isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
找到了Provider的暴露入口。我先给个结论,就是通过构建的公共契约,层层暴露,层层解析,然后根据ip和端口根据一些默认的参数,创建一个NettyServer,同事注册多层handler,启动之后然后等待客户端链接发送消息,收到之后在channelHander#receive方法中处理
- 第一步接受上下文事件,开始校验构建URL契约的参数。
- 根据配置
delay
不为空或者大于0执行发布或者延时发布。 - 执行
doExport
方法可以了解到dubbo支持多注册中心和多协议 - 进入Invoker层;调用
doExportUrlsFor1Protocol
方法执行构建URL契约,然后分别进行本地inJvm协议暴露和远端暴露。针对远程暴露,获取判断食肉有监控配置,如果有的话会把url注册进去。紧接着就是获取url中的真是的接口实现,将其包装为一个Invoker对象,然后根据制定的协议转化为Protocol对象执行export方法上报。针对本地本地暴露而言则是转向为Javassist代理,用做本地调用。 - 然后进入到注册层;去做进一步的暴露。执行RegistryProtocol的export方法。在这里主要是把服务提供者列表,根据配置的注册中心,然后进行服务注册。然后拿到URL中指定的协议,进行经一步的暴露。执行
doLocalExport
方法获取到对应的协议实现进行暴露。URL registryUrl = getRegistryUrl(originInvoker); // url to export locally URL providerUrl = getProviderUrl(originInvoker); // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call // the same service. Because the subscribed is cached key with the name of the service, it causes the // subscription information to cover. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); //export invoker final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // url to registry final Registry registry = getRegistry(originInvoker); final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); //to judge if we need to delay publish boolean register = registeredProviderUrl.getParameter("register", true); if (register) { register(registryUrl, registeredProviderUrl); providerInvokerWrapper.setReg(true); } // Deprecated! Subscribe to override rules in 2.6.x or before. registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<>(exporter); }
- 进入协议层;以DubboProtocol为例,读取到Invoker对象中的URL,后续执行OpenServer来注册handler;
URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } openServer(url); optimizeSerialization(url); return exporter; }
- 来到Exchange信息交换层;根据Dubbo SPI加载HeaderExchanger,衔接协议层的bind方法执行,调用传输层同时把解序列化handler添加上去,调用
Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
进入传输层。去添加处理接受任务DisPather的handler。 - 来到transport 网络传输层:调用handlers去吧任务分发的handler添加进来。dubbo默认使用的是ALL的分发模式也就是对应的
AllDispatcher
来处理,这个处理模型是把 所有的链接都放在线程池当中执行请求,响应,连接事件,断开事件,心跳。这里就会读取我们在dubbo配置中设定的threads="100"
,添加Dispatcher完毕之后执行bind,也就是开启(默认netty4)NettyServer,开启一个channel在channel上顺序增加之前所有的ChannelHandler
,然后姐完活了 等待消费者调用事件的到来,当入站请求到来之后,先反序列化,然后进入AllDispatcher开启一个线程来执行后续的操作,包括拿到对应的invoker,然后通过代理类执行暴露的方法,然后回传给Client。
偷个懒就不画图了,借一下官方的图。
Dubbo Consumer的引用过程
在消费端使用@Reference注解来引用。具体的实现方式是通过实现 Spring的InitializingBean接口中的 afterPropertiesSet方法,容器通过调用 ReferenceBean的 afterPropertiesSet方法时引入服务。
- 当我们调用dubbo服务的方法的时候,发现对应的ReferenceBean中的ref值为null。执行init方法去拉取注册中心的数据,然后根据配置的dubbo配置来构建URL契约。
- 来到ReferenceConfig的createProxy方法,首先判断是本地还是远程调用。本地走的是inJvm协议。判断是远程的时候解析注册中心的url配置,通过调用RegistryProtocol#refer中向注册中心注册自己的信息,并且订阅 Provider 和配置信息,判断是集群还是非集群,集群采用构造StaticDirectory(处理多个Provider的情况)加入集群合并为一个invoker。
- 单机模式直接调用的时候通过AbstractProtocol#refer的调用最终拿到使用模版方法
protocolBindingRefer
桥接配置的交互协议;如Dubbo协议,然后通过getClient(url);来开启一个NettyClient连接到对应的Provider服务端;具体的细节和Provider暴露nettyServer一样也是进过信息交换层,传输层,最终开启。public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException { optimizeSerialization(url); // create rpc invoker. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }```
- 最终将调用 return (T) proxyFactory.getProxy(invoker); 返回一个代理对象。
- 当执行代理对象的方法的时候,其实是执行DubboInvoker的doInvoke方法,实际上即使调用
currentClient.send(inv, isSent);
通过nettyClient把请求参数发送给Provider,然后返回一个CompletableFuture对象等待Provider结果的返回。
集群容错
FailfastClusterInvoker
这个 cluster 只会进行一次远程调用,如果失败后立即抛出异常,也就是快速失败,它适合于不支持幂等的一些调用。
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
this.checkInvokers(invokers, invocation);
Invoker invoker = this.select(loadbalance, invocation, invokers, (List)null);
try {
return invoker.invoke(invocation);
} catch (Throwable var6) {
if (var6 instanceof RpcException && ((RpcException)var6).isBiz()) {
throw (RpcException)var6;
} else {
throw new RpcException(var6 instanceof RpcException ? ((RpcException)var6).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + this.getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + var6.getMessage(), var6.getCause() != null ? var6.getCause() : var6);
}
}
}
从代码可以看到,很简单还是通过负载均衡invoker选择一个来invoker,然后发起调用,如果失败了就抛错。
FailsafeClusterInvoker
这个 cluster 是一种失败安全的 cluster,也就是调用出错仅仅就日志记录一下,然后返回了一个空结果,适用于写入审计日志等操作。
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
this.checkInvokers(invokers, invocation);
Invoker<T> invoker = this.select(loadbalance, invocation, invokers, (List)null);
return invoker.invoke(invocation);
} catch (Throwable var5) {
logger.error("Failsafe ignore exception: " + var5.getMessage(), var5);
return AsyncRpcResult.newDefaultAsyncResult((Object)null, (Throwable)null, invocation);
}
}
FailbackClusterInvoker
这个 cluster 会在调用失败后,记录下来这次调用,然后返回一个空结果给服务消费者,并且会通过定时任务对失败的调用进行重调。
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
Invoker invoker = null;
try {
this.checkInvokers(invokers, invocation);
invoker = this.select(loadbalance, invocation, invokers, (List)null);
return invoker.invoke(invocation);
} catch (Throwable var6) {
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + var6.getMessage() + ", ", var6);
this.addFailed(loadbalance, invocation, invokers, invoker);
return AsyncRpcResult.newDefaultAsyncResult((Object)null, (Throwable)null, invocation);
}
}
当调用出错的时候就返回空结果,并且加入到 failed 中,并且会有一个定时任务会定时的去调用 failed里面的调用,如果调用成功就从 failed 中移除这个调用。
ForkingClusterInvoker
这个 cluster 会在运行时把所有 invoker 都通过线程池进行并发调用,只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行,适合用在对实时性要求比较高读操作,根据URL上配置的fork参数生成去多个Provider做去重处理,然后发起异步发起调用,然后把结果放在阻塞队列里。在阻塞读队列的时候,只要有一个返回,就返回
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
Result var19;
try {
this.checkInvokers(invokers, invocation);
int forks = this.getUrl().getParameter("forks", 2);
int timeout = this.getUrl().getParameter("timeout", 1000);
final Object selected;
if (forks > 0 && forks < invokers.size()) {
selected = new ArrayList();
for(int i = 0; i < forks; ++i) {
Invoker<T> invoker = this.select(loadbalance, invocation, invokers, (List)selected);
if (!((List)selected).contains(invoker)) {
((List)selected).add(invoker);
}
}
} else {
selected = invokers;
}
RpcContext.getContext().setInvokers((List)selected);
final AtomicInteger count = new AtomicInteger();
final BlockingQueue<Object> ref = new LinkedBlockingQueue();
Iterator var9 = ((List)selected).iterator();
while(var9.hasNext()) {
final Invoker<T> invoker = (Invoker)var9.next();
this.executor.execute(new Runnable() {
public void run() {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch (Throwable var3) {
int value = count.incrementAndGet();
if (value >= ((List)selected).size()) {
ref.offer(var3);
}
}
}
});
}
try {
Object ret = ref.poll((long)timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
Throwable e = (Throwable)ret;
throw new RpcException(e instanceof RpcException ? ((RpcException)e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
var19 = (Result)ret;
} catch (InterruptedException var14) {
throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + var14.getMessage(), var14);
}
} finally {
RpcContext.getContext().clearAttachments();
}
return var19;
}
BroadcastClusterInvoker
这个 cluster 会在运行时把所有 invoker 逐个同步调用,然后在最后判断如果有一个调用抛错的话,就抛出异常,适合通知所有提供者更新缓存或日志等本地资源信息的场景。
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
this.checkInvokers(invokers, invocation);
RpcContext.getContext().setInvokers(invokers);
RpcException exception = null;
Result result = null;
Iterator var6 = invokers.iterator();
while(var6.hasNext()) {
Invoker invoker = (Invoker)var6.next();
try {
result = invoker.invoke(invocation);
} catch (RpcException var9) {
exception = var9;
logger.warn(var9.getMessage(), var9);
} catch (Throwable var10) {
exception = new RpcException(var10.getMessage(), var10);
logger.warn(var10.getMessage(), var10);
}
}
if (exception != null) {
throw exception;
} else {
return result;
}
}
}
负载均衡
Dubbo内置了4种负载均衡策略:
RandomLoadBalance:随机负载均衡。随机的选择一个。是Dubbo的默认负载均衡策略。
RoundRobinLoadBalance:轮询负载均衡。轮询选择一个。
LeastActiveLoadBalance:最少活跃调用数,相同活跃数的随机。活跃数指调用前后计数差。使慢的 Provider 收到更少请求,因为越慢的 Provider 的调用前后计数差会越大。
ConsistentHashLoadBalance:一致性哈希负载均衡。相同参数的请求总是落在同一台机器上。
ConsistentHashLoadBalance
默认在hash环中增加160个虚拟节点。吧所有的的节点放在TreeMap中,根据hash值取出对应的节点之后,在拿到真实的Provider;一致性Hash算法和缓存机制配合起来使用,设置了Hash算法后,相同的key的调用,都会发送到同一个 Provider。这个 Provider 上可以把用户数据在内存中进行缓存,减少访问数据库或分布式缓存的次数。如果业务上允许这部分数据有一段时间的不一致,可以考虑这种做法。减少对数据库,缓存等中间件的依赖和访问次数,同时减少了网络IO操作,提高系统性能;
http,dubbo使用场景的规范,http和dubbo的区别
HTTP是应用层协议,使用TCP/IP协议的DUBBO自然性能要比HTTP协议快
dubbo默认使用socket长连接,即首次访问建立连接以后,后续网络请求使用相同的网络通道
http1.1协议默认使用短连接,每次请求均需要进行三次握手;
选SpringCloud全家桶:社区支持强大,更新非常快,所以开发效率高。速度慢不是缺点,扩展性不强也不是缺点,http可靠传输,同时http传输很占带宽,同时使用http协议一般会使用JSON报文,消耗会更大; 入手快,普使度比较高,自由度高,但带来的问题是无法“强力约束接口规范”,
dubbo 由于要对外提供对外暴露的服务的接口依赖,所以天生就规定了接口的入参和返回值。dubbo的需要通过dubbofilter,拓展一些东西,很多东西没有,需要自己实现,如监控,如日志,如限流,如追踪。还有就是dubbo的分层太细了,阅读源码的过程,看是很顺利,实际上极高的扩展性导致了太臃肿。dubbo的网络消耗小于SpringCloud,但是在国内95%的公司内,网络消耗不是什么太大问题。
对于使用dubbo还是SpringCloud。我认为也无可厚非,基于自己公司的现有业务,社区成熟度,还有公司的技术栈而已 来做,如果你的公司主要技术栈是Java。而且服务化做的特别棒 那你可以试着选择Dubbo,不能说Dubbo是阿里开发的,阿里就不用SpringCloud 那他不后来也开源了个 SpringCloudAlibaba? 没有绝对的好坏、也没有真香可言,对于打工人来说,你都要会哦。