TL;DR
Dubbo
的服务暴露和服务的引用流程比较复杂,尤其是Dubbo
框架对于扩展点实现了自动包装、自动装配、自适应和自动激活的这些特性,导致我们直接在看源码分析时,会出现很多地方的逻辑不清楚(因为有很多的类是在内存中动态生成的),这里我会把一些动态生成的关键的类的源码贴出来,这样有助于更清晰的理解Dubbo
的服务暴露和和消费的整个过程。
之所以对服务的暴露和引用的流程做分析,一方面是想更深入的理解Dubbo
框架,另一方便是想结合Dubbo
的这些扩展点,研究一下Dubbo
的微内核加插件机制的架构,学架构不就得理解别人的思想吗?
Dubbo
官方把服务的暴露叫做导出,把服务的消费叫作引用,下面也会使用这两个词语来代替。
本文大纲:
Dubbo
的配置覆盖策略Dubbo
框架分层Dubbo
服务的导出分析Dubbo
服务的引用分析- 微内核加插件的设计模式
以下正文开始。
0x01 Dubbo
的配置覆盖策略
Dubbo
对配置参数的处理分两种情况,一种是启动时期的参数处理,这时会有一组默认的参数覆盖策略;另一种是运行期,这里会有另一组默认的参数覆盖策略。
先看启动时期Dubbo
的参数配置覆盖策略:
-D
传递给JVM
的参数优先级最高,比如-Ddubbo.protocol.port=20882
;- 代码(基于注解)或
XML
配置优先级次之,比如在Spring
的XML
文件中指定<dubbo:protocol port="20881"/>
; - 配置文件优先级最低,比如
dubbo.properties
文件指定dubbo.protocol.port=20880
;
一般我们考虑使用dubbo.properties
作为默认配置。
再看运行期,一句话总结Dubbo
的运行期配置覆盖策略:override > -D > Consumer > Provider
(具体参考RegistryDirectory#mergeUrl()
的源码)。
这个是Dubbo
内部对配置的默认处理优先级,override
最高,这个是通过override://xxx
协议来操作的,主要是用来在dubbo-admin
中对服务的参数进行调整,也可以理解为动态服务治理时使用该协议来对参数进行动态的调整。
其次是-D
传递给JVM
的参数,这个不难理解,因为启动时-D
是优先级最高的,确切的说,这里应该是对启动完成时最终确定的参数。
再次就是Consumer
的参数了,最后是Provider
的参数。
Dubbo
会按照这些规则对参数进行覆盖处理。
0x02 Dubbo
框架分层
众所周知,Dubbo
是分层良好的框架,乍一看上去挺复杂的,实际上也确实是挺复杂的,不过理解了Dubbo
服务的导出和引用过程之后,你就不会再觉得这个分层复杂了。以下是Dubbo
框架的分层,内容来自于Dubbo
的文档:
config
配置层:对外配置接口,以ServiceConfig
、ReferenceConfig
为中心,可以直接初始化配置类,也可能通过Spring
解析配置生成配置类;proxy
服务代理层:服务接口透明代理,生成服务的客户端Stub
和服务端Skeleton
,以ServiceProxy
为中心,扩展接口为ProxyFactory
;registry
注册中心层:封装服务地址的注册与发现,以服务URL
为中心,扩展接口为RegistryFactory
、Registry
、RegistryService
;cluster
路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以Invoker
为中心,扩展接口为Cluster
、Directory
、Router
、LoadBalance
;monitor
监控层:RPC
调用次数和调用时间监控,以Statistics
为中心, 扩展接口为MonitoryFactory
、Monitor
,MonitorService
;protocol
远程调用层:封装RPC
调用,以Invocation
、Result
为中心,扩展接口为Protocol
、Invoker
、Exporter
;exchange
信息交换层:封装请求响应模式,同步转异步,以Request
、Response
为中心,扩展接口为Exchanger
、ExchangeChannel
、ExchangeClient
、ExchangeServer
;transport
网络传输层:抽象mina
和netty
为统一接口,以Message
为中心,扩展接口为Channel
、Transporter
、Client
、Server
、Codec
;serialize
数据序列化层:可复用的一些工具,扩展接口为Serialization
、ObjectInput
、ObjectOutput
、ThreadPool
;
关系说明:
- 在
RPC
中,Protocol
是核心层,也就是只要有Protocol + Invoker + Exporter
就可以完成非透明的RPC
调用,然后在Invoker
的主过程上Filter
拦截点; Cluster
是外围概念,所以Cluster
的目的是将多个Invoker
伪装成一个Invoker
,这样其它人只要关注Protocol
层的Invoker
即可;Proxy
层封装了所有接口的透明化代理,而在其它层都以Invoker
为中心,只有到了暴露给用户使用时,才用Proxy
将Invoker
转成接口,或将接口实现转成Invoker
,也就是去掉Proxy
层RPC
也是可以运行的,只是不那么透明,看起来不像调用本地服务一样调远程服务;Remoting
实现是Dubbo
协议的实现,如果选择RMI
协议,整个Remoting
层都不会用上;Remoting
内部再划分为Transport
传输层和Exchange
信息交换层,Transport
层只负责单向消息传输,是对Mina
、Netty
、Grizzly
的抽象,它也可以扩展UDP
传输,而Exchange
层是在传输层之上封装了Request-Response
语义;Registry
和Monitor
实际上不算一层,而是一个独立的节点,只是为了全局概览,用层的方式画在一起;
0x03 Dubbo
服务的导出分析
这里以之前SPI
篇中写的示例来分析吧,EchoProvider
的代码如下:
public class EchoProvider {
public static void main(String[] args) throws InterruptedException {
ServiceConfig<EchoServiceImpl> service = new ServiceConfig<>();
service.setInterface(EchoService.class);
service.setRef(new EchoServiceImpl());
service.setApplication(new ApplicationConfig("echo-provider"));
service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
service.setSerialization("protostuff");
service.export();
System.out.println("dubbo service started");
new CountDownLatch(1).await();
}
}
ServiceConfig
类是API
类型的,也就是说它是用来给使用者使用的。我们先创建了一个ServiceConfig
对象,然后进行服务相关的配置,接着调用了service.export()
方法来对服务进行导出,这个方法就是Dubbo
服务导出的入口。
ServiceConfig
会准备环境所需的config
(检查用户配置,生成默认设置),并检查所需的环境配置,比如ApplicationConfig
、ModuleConfig
、RegistryConfig
、MonitorConfig
、ProtocolConfig
等。
ServiceConfig
初始化时会加载Protocol.class
和ProxyFactory.class
两个扩展点的自适应扩展,也就是说,实际上,这两个扩展加载时返回的是两个在内存动态生成的Apaptive
对象:Protocol$Adaptive
和ProxyFactory$Adaptive
,具体的过程在之前的SPI
篇中详细的分析过dubbo
的扩展加载特性;这两个对象一定要清楚,否则,后面的过程很多代码无法关联上;动态生成的这两个Adaptive
类其实是一个包装类,它们的职责很简单,会根据url
的参数动态的返回对应的扩展实现。下面是两个自适应类的代码:
Protocol$Adaptive
:
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Protocol;
import com.alibaba.dubbo.rpc.RpcException;
public class Protocol$Adaptive
implements Protocol {
public Invoker refer(Class class_, URL uRL) throws RpcException {
String string;
if (uRL == null) {
throw new IllegalArgumentException("url == null");
}
URL uRL2 = uRL;
String string2 = string = uRL2.getProtocol() == null ? "dubbo" : uRL2.getProtocol();
if (string == null) {
throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(").append(uRL2.toString()).append(") use keys([protocol])").toString());
}
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(string);
return protocol.refer(class_, uRL);
}
public Exporter export(Invoker invoker) throws RpcException {
String string;
if (invoker == null) {
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
}
if (invoker.getUrl() == null) {
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
}
URL uRL = invoker.getUrl();
String string2 = string = uRL.getProtocol() == null ? "dubbo" : uRL.getProtocol();
if (string == null) {
throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(").append(uRL.toString()).append(") use keys([protocol])").toString());
}
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(string);
return protocol.export(invoker);
}
@Override
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
@Override
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
}
ProxyFactory$Adaptive
:
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.ProxyFactory;
import com.alibaba.dubbo.rpc.RpcException;
public class ProxyFactory$Adaptive
implements ProxyFactory {
public Object getProxy(Invoker invoker) throws RpcException {
if (invoker == null) {
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
}
if (invoker.getUrl() == null) {
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
}
URL uRL = invoker.getUrl();
String string = uRL.getParameter("proxy", "javassist");
if (string == null) {
throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(").append(uRL.toString()).append(") use keys([proxy])").toString());
}
ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(string);
return proxyFactory.getProxy(invoker);
}
public Object getProxy(Invoker invoker, boolean bl) throws RpcException {
if (invoker == null) {
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
}
if (invoker.getUrl() == null) {
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
}
URL uRL = invoker.getUrl();
String string = uRL.getParameter("proxy", "javassist");
if (string == null) {
throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(").append(uRL.toString()).append(") use keys([proxy])").toString());
}
ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(string);
return proxyFactory.getProxy(invoker, bl);
}
public Invoker getInvoker(Object object, Class class_, URL uRL) throws RpcException {
if (uRL == null) {
throw new IllegalArgumentException("url == null");
}
URL uRL2 = uRL;
String string = uRL2.getParameter("proxy", "javassist");
if (string == null) {
throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(").append(uRL2.toString()).append(") use keys([proxy])").toString());
}
ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(string);
return proxyFactory.getInvoker(object, class_, uRL);
}
}
其实,在Dubbo
的源码中可以看到有一些@Adaptive
标注的类,但是不多,这些类的结构非常相似,可能是Dubbo
的开发者后续发现了这些相似性,所以采用了自适应方式的设计来直接在内存中动态的生成,这样避免了大量重复结构的代码。
export()
方法简单的处理后,直接调用了doExport()
方法,doExport()
方法做了大量的参数校验和初始化,然后调用doExportUrls()
方法开始对服务做导出。
首先,它会加载注册中心:loadRegistries(true)
。先解析配置的注册中心(可以是多个)到AbstractInterfaceConfig#registries
列表中,然后循环registries
列表,根据配置的参数来组装注册链接URL
列表(dubbo
的配置信息封装为URL
对象)。
加载注册中心时会动态生成RegistryFactory
扩展点的自适应对象,代码如下:
package com.alibaba.dubbo.registry;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.registry.Registry;
import com.alibaba.dubbo.registry.RegistryFactory;
public class RegistryFactory$Adaptive
implements RegistryFactory {
@Override
public Registry getRegistry(URL uRL) {
String string;
if (uRL == null) {
throw new IllegalArgumentException("url == null");
}
URL uRL2 = uRL;
String string2 = string = uRL2.getProtocol() == null ? "dubbo" : uRL2.getProtocol();
if (string == null) {
throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(").append(uRL2.toString()).append(") use keys([protocol])").toString());
}
RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(string);
return registryFactory.getRegistry(uRL);
}
}
接下来,循环ServiceConfig#protocols
列表,为配置的protocol
组装配置URL
并导出服务,在正式export
前会组装参数map
,同时会解析MethodConfig
及ArgumentConfig
的配置信息,处理完配置信息,接下来就是真正的export
了。
export
分为本地导出和远程导出,由参数scope
(scope
的值有none
、local
、remote
三种)决定,没有指定为remote
时会做本地导出,没有指定为local
时会做local
导出,如果指定为none
,则不做导出;本地导出,使用的是InJvmProtocol#export
来导出(需要注意的是,protocol
扩展对象是在ServiceConfig
初始化时,这个加载的过程在之前的SPI
篇中已经分析过,这里要注意的是protocol
扩展对象会被Wrapper
实现包装,并且,在不同的地方还会根据url#protocol
参数的变化来动态的加载不同的Protocol.class
的扩展实现);接下来就是远程导出了,远程导出时会对每个配置的protocol
去循环前面生成的registryURLs
列表,也就是说会远程导出到所有配置的注册中心,这里也要注意,在这里会检查配置的proxy
参数,根据配置参数的值加载对应的ProxyFactory.class
扩展实现来生成服务的代理对象Invoker.class
,Invoker
对象是服务的核心(我们暂时不关注具体的细节,后面专门来分析Invoker
的核心逻辑),这里要留意在proxyFactory#getInvoker()
时,registryURL
将具体服务的URL
(dubbo://xxxx
开头的URL
参数)作为export
参数加入到了registryURL
中,同时,默认使用JavassistProxyFactory
扩展实现,返回的结果是通过Wrapper
类动态生成的一个包装类Wrapper0
;另外,这里循环时的URL
是registry
协议的URL
,所以被Protocol$Adaptive#export
处理时,真正做protocol#export
的是RegistryProtocol#export
,该方法先调用了RegistryProtocol#doLocalExport()
,doLocalExport()
方法中在第一次时又会通过protocol#export()
来做真正的服务导出,这里传入的Invoker
对象是一个委托类型的InvokerDelegete
对象,该对象在构造方法中会传入服务参数的URL
,也就是前面registryURL
中的export
参数,因为默认服务使用的是dubbo
协议,这里的protocol
对象是从ServiceConfig
传入的,是同一个对象,所以又会调用Protocol$Adaptive#export()
方法进行包装处理,而这时的协议为dubbo
,所以里面真正做export
的protocol
实现就是DubboProtocol
的实现了。
Wrapper#getWrapper()
返回的包装类对象是一个动态拼接,然后在内存中使用javassist
编译后再实例化的对象,具体的代码如下:
package com.alibaba.dubbo.common.bytecode;
import com.alibaba.dubbo.common.bytecode.ClassGenerator;
import com.alibaba.dubbo.common.bytecode.NoSuchMethodException;
import com.alibaba.dubbo.common.bytecode.NoSuchPropertyException;
import com.alibaba.dubbo.common.bytecode.Wrapper;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import me.zy.std.dubbo.spi.api.EchoService;
public class Wrapper0
extends Wrapper
implements ClassGenerator.DC {
public static String[] pns;
public static Map pts;
public static String[] mns;
public static String[] dmns;
public static Class[] mts0;
@Override
public String[] getPropertyNames() {
return pns;
}
@Override
public boolean hasProperty(String string) {
return pts.containsKey(string);
}
public Class getPropertyType(String string) {
return (Class)pts.get(string);
}
@Override
public String[] getMethodNames() {
return mns;
}
@Override
public String[] getDeclaredMethodNames() {
return dmns;
}
@Override
public void setPropertyValue(Object object, String string, Object object2) {
try {
EchoService echoService = (EchoService)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" filed or setter method in class me.zy.std.dubbo.spi.api.EchoService.").toString());
}
@Override
public Object getPropertyValue(Object object, String string) {
try {
EchoService echoService = (EchoService)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" filed or setter method in class me.zy.std.dubbo.spi.api.EchoService.").toString());
}
public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
EchoService echoService;
try {
echoService = (EchoService)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
try {
if ("echo".equals(string) && arrclass.length == 1) {
return echoService.echo((String)arrobject[0]);
}
}
catch (Throwable throwable) {
throw new InvocationTargetException(throwable);
}
throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class me.zy.std.dubbo.spi.api.EchoService.").toString());
}
}
具体到DubboProtocol
协议中导出时,首先会根据服务的参数URL
生成服务的唯一标识serviceKey(url)
,然后就是openServer(url)
了,第一次会创建ExchangeServer
对象,这里会加载Transporter.class
扩展实现,默认使用netty
实现,ExchangeServer
对象是通过Exchangers.bind(url, requestHandler)
方法来生成的,在这个bind()
方法中,会根据URL
的exchanger
参数来加载Exchanger.class
的扩展实现,默认使用HeaderExchanger
的实现,然后调用HeaderExchanger#bind()
方法来返回HeaderExchangeServer
的ExchangeServer
实现,这里又会调用Transporters.bind()
来真正的创建Server
对象,这里通过自适应的方式来加载Transporter.class
扩展实现,默认是netty
实现NettyTransporter
,在NettyTransporter#bind()
方法中,返回了Server
的实现NettyServer
对象,这样就创建好Server
了。在Exchangers#bind()
方法创建Server
对象时,一同传入的还有一个ExchangeHandler
对象参数,该参数是一个抽象类ExchangeHandlerAdapter
的实现,该handler
在HeaderExchanger
中传入Transporters.bind()
方法之前会分别被HeaderExchangeHandler
和DecodeHandler
进行包装,然后在Transporters.bind()
中进一步被包装为ChannelHandlerDispatcher
,接着传给了NettyServer
,在NettyServer
中,又被ChannelHandlers#wrapInternal()
方法分别用HeartbeatHandler
和MultiMessageHandler
进行包装,在这里又会通过自适应的方式加载Dispatcher.class
扩展实现,这又是一个动态生成的自适应包装类Dispatcher$Adaptive
,该自适应类会动态的根据URL
中的dispatcher
参数来动态的调用对应的Dispather
的扩展实现来做dispatch()
操作,默认使用AllDispatcher
的实现,这里会再一次用AllChannelHandler
来对前面传入的handler
进行包装。在前面的步骤中,生成NettyServer
对象时,在NettyServer
的构造方法中调用了父类的构造方法,也就是AbstractServer
的构造方法,这个构造方法回调了子类的doOpen()
方法,也就是NettyServer#doOpen()
方法,这里就正式的启动了Netty
服务器。Netty
服务器会绑定Codec2
扩展实现,默认使用telnet
实现,但DubbpProtocol
中指定了codec
为dubbo
,所以这里会加载DubboCountCodec
的实现,在Netty
服务器中,codec
被封装到NettyCodecAdapter
适配器对象中,同时,该对象实现了Netty
的编码和解码器handler
,编解码器通过回调Codec2
中的encode()
和decode()
方法来做编解码处理,DubboCountCodec
又调用DubboCodec
的实现,DubboCodec
中会加载Serialization.class
扩展实现来进行不同协议的编解码操作,解码是在ExchangeCodec#decodeBody()
方法中,调用CodecSupport.deserialize()
方法时动态加载Serialization.class
的扩展实现的。
上面的Transporter
扩展点动态生成的自适应类,代码如下:
package com.alibaba.dubbo.remoting;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.Client;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.Server;
import com.alibaba.dubbo.remoting.Transporter;
public class Transporter$Adaptive
implements Transporter {
@Override
public Server bind(URL uRL, ChannelHandler channelHandler) throws RemotingException {
if (uRL == null) {
throw new IllegalArgumentException("url == null");
}
URL uRL2 = uRL;
String string = uRL2.getParameter("server", uRL2.getParameter("transporter", "netty"));
if (string == null) {
throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(").append(uRL2.toString()).append(") use keys([server, transporter])").toString());
}
Transporter transporter = ExtensionLoader.getExtensionLoader(Transporter.class).getExtension(string);
return transporter.bind(uRL, channelHandler);
}
@Override
public Client connect(URL uRL, ChannelHandler channelHandler) throws RemotingException {
if (uRL == null) {
throw new IllegalArgumentException("url == null");
}
URL uRL2 = uRL;
String string = uRL2.getParameter("client", uRL2.getParameter("transporter", "netty"));
if (string == null) {
throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(").append(uRL2.toString()).append(") use keys([client, transporter])").toString());
}
Transporter transporter = ExtensionLoader.getExtensionLoader(Transporter.class).getExtension(string);
return transporter.connect(uRL, channelHandler);
}
}
Dispatcher
扩展点的自适应类,这里可以注意一下获取多个配置参数的处理:
package com.alibaba.dubbo.remoting;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.Dispatcher;
public class Dispatcher$Adaptive implements Dispatcher {
@Override
public ChannelHandler dispatch(ChannelHandler channelHandler, URL uRL) {
if (uRL == null) {
throw new IllegalArgumentException("url == null");
}
URL uRL2 = uRL;
String string = uRL2.getParameter("dispatcher", uRL2.getParameter("dispather", uRL2.getParameter("channel.handler", "all")));
if (string == null) {
throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.remoting.Dispatcher) name from url(").append(uRL2.toString()).append(") use keys([dispatcher, dispather, channel.handler])").toString());
}
Dispatcher dispatcher = ExtensionLoader.getExtensionLoader(Dispatcher.class).getExtension(string);
return dispatcher.dispatch(channelHandler, uRL);
}
}
DubboProtocol#export()
完成后会回到RegistryProtocol#export()
方法中继续服务的注册流程,RegistryProtocol#getRegistry()
方法会根据registryUrl
的registry
参数传入的值来动态加载Registry
的扩展实现,这里是通过RegistryFactory
的扩展实现来创建Registry
对象的,RegistryFactory
是RegistryProtocol
扩展在实例化阶段通过ExtensionLoader#injectExtension()
方法注入的一个动态的自适应扩展包装类RegistryFactory$Adaptive
,该类会根据传入的registryUrl
对象的protocol
参数来动态加载Registry
的扩展实现,创建Registry
对象后,会生成registeredProviderUrl
对象,这是服务提供者的参数URL
对象,然后ProviderConsumerRegTable#registerProvider()
方法会将Invoker
、registryUrl
及providerUrl
对象包装为ProviderInvokerWrapper
对象,并缓存在本地,然后调用前面创建的Registry
对象实例来做注册操作,我们这里使用的是zookeeper
,所以此时会调用ZookeeperRegistry#doRegister()
方法来注册要导出的服务,本质上就是在ZooKeeper
上创建路径节点,ZookeeperRegistry
在实例化时,会动态获取ZookeeperTransporter
的扩展实现,默认为curator
的实现,ZookeeperTransporter
的创建同样也是通过自适应的方式生成自适应的包装类ZookeeperTransporter$Adaptive
,然后通过client
参数和transporter
参数来动态加载ZookeeperTransporter
的实现。注册成功后会将ProviderConsumerRegTable
中对应的ProviderInvokerWrapper
包装类的isReg
设置为true
。随后,还创建了OverrideListener
监听器来监听服务的重载,通过ZookeeperRegistry
在ZooKeeper
上创建节点来订阅服务端服务的变更通知,这样,当服务端发布的服务发生变更时,服务端通过ZooKeeper
上注册的对应的客户端列表来通知服务的变更,此时CuratorZookeeperClient
会接收到WatchedEvent
事件,然后回调注册的OverrideListener#notify()
方法,该方法又回调用RegistryProtocol#doChangeLocalExport()
方法,doChangeLocalExport()
方法会重新用newInvokerUrl
参数来重新注册变更的服务。这样,整个服务注册的流程基本上完全清晰了。
下面的图片是Dubbo
服务导出的时序图:
0x04 Dubbo
服务的引用分析
这里还是以SPI
篇中的示例来来分析:
public class EchoConsumer {
public static void main(String[] args) throws InterruptedException {
ReferenceConfig<EchoService> reference = new ReferenceConfig<>();
reference.setApplication(new ApplicationConfig("echo-service"));
reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
reference.setInterface(EchoService.class);
EchoService service = reference.get();
String message = service.echo("dubbo");
System.out.println(message);
new CountDownLatch(1).await();
}
}
Dubbo
的服务引用逻辑与导出大致相似,首先也是通过ReferenceConfig
来配置服务引用的各种参数,如application config
、module config
、consumer config
等。
ReferenceConfig#init()
初始化各种参数,然后调用createProxy()
方法来创建代理对象,重点就在这个createProxy()
方法中了。
首先,它会判断调用的方式,这里有三种情况:同一JVM
内调用、通过指定的url
直连、通过注册中心调用。无论是哪种方式,最终都是返回一个Invoker
对象出来(Invoker
是Dubbo
的核心领域对象)。三种方式的入口都是Protocol$Adaptive#refer()
方法,这是一个动态生成的自适应类,refer()
方法会根据URL
的protocol
参数来动态加载Protocol
的扩展实现,默认是dubbo
的实现,通过注册中心调用时,首先会加载RegistryProtocol
的实现,这里会获取注册中心的实例Registry
,然后调用doRefer()
方法继续执行refer
的逻辑,doRefer()
方法中,首先会创建RegistryDirectory
实例,然后绑定registry
和protocol
,组装subscribeUrl
,接着会向注册中心注册registry.register(registeredConsumerUrl)
,注册的URL
也会绑定到directory
,再接下来会调用directory.subscribe()
方法来订阅providers
、configurators
、routers
的数据,这里的subscribe()
方法调用进去会调用到FailbackRegistry#subscribe()
方法,然后会回调到子类ZookeeperRegistry#doSubscribe()
方法,这里的逻辑和服务的导出类似,创建zookeeper
的路径节点,然后调用notify()
方法来通知监听器,notify()
方法又会调回父类,然后进一步调回AbstractRegistry#notify()
方法,再继续调回到RegistryDirectory#notify()
方法,在这里,会调用refreshInvoker()
方法刷新invoker
,进入到该方法中后,如果有提供者的invokerUrl
,则会调用toInvokers()
方法来进行转换操作,这里的转换是将invokerUrl
转换为具体的invoker
对象,这里会动态加载Protocol
的自适应对象,返回的是自适应对象Protocol$Adaptive
,它会根据protocol
参数来动态的返回具体的Protocol
实现,默认是dubbo
,也就是会返回DubboProtocol
实现,然后会有一个参数合并的过程,这里有根据默认的覆盖规则(override > -D >Consumer > Provider
)进行一些处理,然后再进一步做判断,之后会创建一个InvokerDelegate
对象,该对象的构造方法中的其中一个参数就是DubboProtocol#refer()
的返回值,到这里,服务引用的调用就到了DubboProtocol#refer()
方法,这里返回的是DubboInvoker
的包装对象,包装之前,会调用getClients()
方法来创建ExchangeClient
对象,这里面会判断是否共享连接,然后首次还会调用initClient()
方法来创建新的连接,initClient()
方法中会绑定client
参数(默认为netty
),codec
参数(默认为dubbo
),heartbeat
参数(默认为60000
),然后会生成Transporter$Adaptive
对象根据参数client
动态的加载Transporter
扩展,真正的连接是在Exchangers#connect()
方法中,先获取Exchanger
的扩展实现,然后再进行connect()
,这里的逻辑和前面服务导出部分的bind()
的逻辑非常相似,只不过一个是bind()
方法,一个是connect()
方法,经过一系列的包装,最终会创建NettyClient
对象,然后经过层层的包装后返回,到这里,invoker
对象还没包装完,这里只是处理完了directory#subscribe()
的逻辑,subscribe()
完成后,还会进一步使用Cluster
扩展进行包装,Cluster$Adaptive
自适应类会根据cluster
参数来动态加载具体的扩展实现,默认为failover
的实现,Cluster
扩展在加载时,也有一个Wrapper
会自动包装,它就是MockClusterWrapper
,这里会包装成一个MockClusterInvoker
返回(包装了服务降级的处理),里面才是调用FailoverCluster#join()
,这里又会包装成一个FailoverClusterInvoker
对象(包装了负载均衡的处理,即LoadBalance
扩展,以及失败重试的处理,默认最多调用三次),到这里,服务引用时的invoker
对象才算是真正的创建完成了。有了invoker
对象之后,还会调用ProviderConsumerRegTable#registerConsumer()
方法来将服务消费者注册到本地。有了invoker
对象,下一步就回到了ReferenceConfig#createProxy()
方法,这里还有最后一步,使用ProxyFactory#getProxy()
来获取代理对象,ProxyFactory$Adaptive#getProxy()
方法会根据proxy
参数动态的加载具体的扩展实现,默认是javassist
的实现,这里又回到前面分析过的JavassistProxyFactory#getProxy()
方法的调用了,它会返回包装好的代理对象。
一些相关的扩展点的自适应类的代码与服务导出时生成的是一样的,参考上面贴的代码就行了。
下面的图片是Dubbo
服务引用的时序图:
0x05 微内核加插件式的架构
到这里,服务的导出和服务的引用流程就分析完了,其实这里面还有很多的细节,比如,根据URL
的参数做一些动态的处理逻辑,光是内置的参数就已经很多了,而且很多的参数都是支持一些不同的策略的,所以这里面的一些处理逻辑很多。另外,无论是服务的导出,还是服务的引用,最后真正调用的都是一个代理对象,这个代理对象是对Invoker
对象的包装,这里面还包括了过滤器链的加载和处理,以及到具体的Transporter
实现时的各种Handler
的层层包装,整个逻辑比较复杂,Invoker
是Dubbo
框架的核心领域对象,这个后面会专门分析。
再回过头来看Dubbo
服务的导出和引用的时序图,我用绿色标出了Dubbo
的扩展点,你会发现,使用Dubbo
框架时,除了环境配置使用的是相关的API
对象之后,其它的流程全部是基于扩展点之前的调用,换句话说就是,Dubbo
真正的服务导出和服务引用全部是基于抽象(这些抽象全部都是Dubbo
框架的扩展点,可以随时自由扩展)的实现,这就是框架的高层设计,在这一点上,Dubbo
框架封装得非常漂亮,完全基于抽象之间的调用来组装框架的处理流程,而真正的逻辑处理则是通过插件(这里抽象的具体实现)的方式来实现的,在使用的过程中,按照自己的需要去配置好要使用的具体实现即可,就算你不配置,Dubbo
也会提供默认的实现,对使用者友好。Dubbo
这里基于扩展点的抽象设计组装主流程,具体实现交给插件实现的模式就是现在非常流行的微内核加插件的模式(microkernel + plugin
),对于我们开发一个框架,非常具有借鉴的意义。
0x06 总结
对于Dubbo
的服务导出和引用的过程分析,我这里没有贴出Dubbo
具体步骤的相关代码,一方面是贴出来代码太多了,另一方面,我已经把详细的调用链写出来了,看的时候只需要对照我上面说的入口和相关的方法调用的过程就已经很清晰了。
分析Dubbo
服务导出时,因为很多的控制类是在内存中动态处理的,还有一些是在内存中拼接源码字符串,然后在内存中编译,再反射创建对象的(比如Wrapper#getWrapper()
),对于这些类的源码看不到的时候,心里会有一种莫名的不踏实感,好在后来找到了阿里开源的Arthas
框架,通过arthas
可以动态的查找JVM
中生成的类、反编译这些类的源码、实时监控方法的返回值等,功能非常强大,太感谢阿里团队了。
也正是因为这些动态生成的类不太清楚的原因,所以在分析服务导出的时候浪费了一些时间,不过找到了解决办法之后,一切都变得简单轻快了。
其实那些Xxxx$Adaptive
自适应类生成出来的模式非常相似,本质上就是配置URL
中的参数值来动态加载不同的扩展实现,没看到源码的时候心里总不踏实,看多了之后基本上也就不用看了。
尤其是Wrapper#getWrapper()
那里,开始我是去拼出来的,后来还去看了javassist
的文档(不看文档没办法理解里面的一些特殊符号的意义),基本上把Wrapper0
的源码分析出来了,但是因为对javassist
不是特别熟悉,心里老纠结会不会没拼对,后来使用arthas
反编译之后发现拼对了,这才踏实了。
分析完导出和引用的流程后,对照Dubbo
文档画了一次这两个操作的时序图,然后顺序把扩展点用绿色标识了出来,现在你再回过头去看两张时序图,你会发现除了Dubbo
暴露给用户配置用的API
(Config
相关的对象)之外,其它的流程全部是通过扩展点来处理的,也就是说,Dubbo
通过抽象的扩展点来封装了框架的主流程,但具体的处理逻辑交给了扩展点的具体实现,这些实现可以是Dubbo
自己实现的,也可以是任何第三方实现的,这样就使得Dubbo
框架具有了非常良好的可扩展性。
此外,Dubbo
框架的源码中还使用了非常多的设计模式来解决不同的问题,比如代理模式、装饰模式、委托模式、过滤器链模式、模板方法模式等等,非常的多,这些东西都值得我们去分析学习。
到这里,Dubbo
框架的一些大致的流程和机制都分析清楚了,剩下还有Proxy
和Invoker
相关的核心领域对象没有完整的分析(其实在分析导出的时候,我没按捺住,追着代码已经分析了一部分了),后续会对这个核心域进行分析。
好了,这篇就到此为止了,欢迎各种反馈和交流!