读源码学架构系列:Dubbo服务暴露与服务消费流程分析

TL;DR

Dubbo的服务暴露和服务的引用流程比较复杂,尤其是Dubbo框架对于扩展点实现了自动包装、自动装配、自适应和自动激活的这些特性,导致我们直接在看源码分析时,会出现很多地方的逻辑不清楚(因为有很多的类是在内存中动态生成的),这里我会把一些动态生成的关键的类的源码贴出来,这样有助于更清晰的理解Dubbo的服务暴露和和消费的整个过程。

之所以对服务的暴露和引用的流程做分析,一方面是想更深入的理解Dubbo框架,另一方便是想结合Dubbo的这些扩展点,研究一下Dubbo的微内核加插件机制的架构,学架构不就得理解别人的思想吗?

Dubbo官方把服务的暴露叫做导出,把服务的消费叫作引用,下面也会使用这两个词语来代替。

本文大纲:

  1. Dubbo的配置覆盖策略
  2. Dubbo框架分层
  3. Dubbo服务的导出分析
  4. Dubbo服务的引用分析
  5. 微内核加插件的设计模式

以下正文开始。

0x01 Dubbo的配置覆盖策略

Dubbo对配置参数的处理分两种情况,一种是启动时期的参数处理,这时会有一组默认的参数覆盖策略;另一种是运行期,这里会有另一组默认的参数覆盖策略。

先看启动时期Dubbo的参数配置覆盖策略:

  1. -D传递给JVM的参数优先级最高,比如-Ddubbo.protocol.port=20882
  2. 代码(基于注解)或XML配置优先级次之,比如在SpringXML文件中指定<dubbo:protocol port="20881"/>
  3. 配置文件优先级最低,比如dubbo.properties文件指定dubbo.protocol.port=20880

一般我们考虑使用dubbo.properties作为默认配置。

再看运行期,一句话总结Dubbo的运行期配置覆盖策略:override > -D > Consumer > Provider(具体参考RegistryDirectory#mergeUrl()的源码)。

这个是Dubbo内部对配置的默认处理优先级,override最高,这个是通过override://xxx协议来操作的,主要是用来在dubbo-admin中对服务的参数进行调整,也可以理解为动态服务治理时使用该协议来对参数进行动态的调整。

其次是-D传递给JVM的参数,这个不难理解,因为启动时-D是优先级最高的,确切的说,这里应该是对启动完成时最终确定的参数。

再次就是Consumer的参数了,最后是Provider的参数。

Dubbo会按照这些规则对参数进行覆盖处理。

0x02 Dubbo框架分层

众所周知,Dubbo是分层良好的框架,乍一看上去挺复杂的,实际上也确实是挺复杂的,不过理解了Dubbo服务的导出和引用过程之后,你就不会再觉得这个分层复杂了。以下是Dubbo框架的分层,内容来自于Dubbo的文档:

  • config配置层:对外配置接口,以ServiceConfigReferenceConfig为中心,可以直接初始化配置类,也可能通过Spring解析配置生成配置类;
  • proxy服务代理层:服务接口透明代理,生成服务的客户端Stub和服务端Skeleton,以ServiceProxy为中心,扩展接口为ProxyFactory
  • registry注册中心层:封装服务地址的注册与发现,以服务URL为中心,扩展接口为RegistryFactoryRegistryRegistryService
  • cluster路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以Invoker为中心,扩展接口为ClusterDirectoryRouterLoadBalance
  • monitor监控层:RPC调用次数和调用时间监控,以Statistics为中心, 扩展接口为MonitoryFactoryMonitorMonitorService
  • protocol远程调用层:封装RPC调用,以InvocationResult为中心,扩展接口为ProtocolInvokerExporter
  • exchange信息交换层:封装请求响应模式,同步转异步,以RequestResponse为中心,扩展接口为ExchangerExchangeChannelExchangeClientExchangeServer
  • transport网络传输层:抽象minanetty为统一接口,以Message为中心,扩展接口为ChannelTransporterClientServerCodec
  • serialize数据序列化层:可复用的一些工具,扩展接口为SerializationObjectInputObjectOutputThreadPool

关系说明:

  • RPC中,Protocol是核心层,也就是只要有Protocol + Invoker + Exporter就可以完成非透明的RPC调用,然后在Invoker的主过程上Filter拦截点;
  • Cluster是外围概念,所以Cluster的目的是将多个Invoker伪装成一个Invoker,这样其它人只要关注Protocol层的Invoker即可;
  • Proxy层封装了所有接口的透明化代理,而在其它层都以Invoker为中心,只有到了暴露给用户使用时,才用ProxyInvoker转成接口,或将接口实现转成Invoker,也就是去掉ProxyRPC也是可以运行的,只是不那么透明,看起来不像调用本地服务一样调远程服务;
  • Remoting实现是Dubbo协议的实现,如果选择RMI协议,整个Remoting层都不会用上;Remoting内部再划分为Transport传输层和Exchange信息交换层,Transport层只负责单向消息传输,是对MinaNettyGrizzly的抽象,它也可以扩展UDP传输,而Exchange层是在传输层之上封装了Request-Response语义;
  • RegistryMonitor实际上不算一层,而是一个独立的节点,只是为了全局概览,用层的方式画在一起;

0x03 Dubbo服务的导出分析

这里以之前SPI篇中写的示例来分析吧,EchoProvider的代码如下:

public class EchoProvider {

    public static void main(String[] args) throws InterruptedException {
        ServiceConfig<EchoServiceImpl> service = new ServiceConfig<>();
        service.setInterface(EchoService.class);
        service.setRef(new EchoServiceImpl());
        service.setApplication(new ApplicationConfig("echo-provider"));
        service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
        service.setSerialization("protostuff");
        service.export();

        System.out.println("dubbo service started");
        new CountDownLatch(1).await();
    }
}

ServiceConfig类是API类型的,也就是说它是用来给使用者使用的。我们先创建了一个ServiceConfig对象,然后进行服务相关的配置,接着调用了service.export()方法来对服务进行导出,这个方法就是Dubbo服务导出的入口。

ServiceConfig会准备环境所需的config(检查用户配置,生成默认设置),并检查所需的环境配置,比如ApplicationConfigModuleConfigRegistryConfigMonitorConfigProtocolConfig等。

ServiceConfig初始化时会加载Protocol.classProxyFactory.class两个扩展点的自适应扩展,也就是说,实际上,这两个扩展加载时返回的是两个在内存动态生成的Apaptive对象:Protocol$AdaptiveProxyFactory$Adaptive,具体的过程在之前的SPI篇中详细的分析过dubbo的扩展加载特性;这两个对象一定要清楚,否则,后面的过程很多代码无法关联上;动态生成的这两个Adaptive类其实是一个包装类,它们的职责很简单,会根据url的参数动态的返回对应的扩展实现。下面是两个自适应类的代码:

Protocol$Adaptive

package com.alibaba.dubbo.rpc;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Protocol;
import com.alibaba.dubbo.rpc.RpcException;

public class Protocol$Adaptive
implements Protocol {
    public Invoker refer(Class class_, URL uRL) throws RpcException {
        String string;
        if (uRL == null) {
            throw new IllegalArgumentException("url == null");
        }
        URL uRL2 = uRL;
        String string2 = string = uRL2.getProtocol() == null ? "dubbo" : uRL2.getProtocol();
        if (string == null) {
            throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(").append(uRL2.toString()).append(") use keys([protocol])").toString());
        }
        Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(string);
        return protocol.refer(class_, uRL);
    }

    public Exporter export(Invoker invoker) throws RpcException {
        String string;
        if (invoker == null) {
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        }
        if (invoker.getUrl() == null) {
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        }
        URL uRL = invoker.getUrl();
        String string2 = string = uRL.getProtocol() == null ? "dubbo" : uRL.getProtocol();
        if (string == null) {
            throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(").append(uRL.toString()).append(") use keys([protocol])").toString());
        }
        Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(string);
        return protocol.export(invoker);
    }

    @Override
    public void destroy() {
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    @Override
    public int getDefaultPort() {
        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }
}

ProxyFactory$Adaptive

package com.alibaba.dubbo.rpc;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.ProxyFactory;
import com.alibaba.dubbo.rpc.RpcException;

public class ProxyFactory$Adaptive
implements ProxyFactory {
    public Object getProxy(Invoker invoker) throws RpcException {
        if (invoker == null) {
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        }
        if (invoker.getUrl() == null) {
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        }
        URL uRL = invoker.getUrl();
        String string = uRL.getParameter("proxy", "javassist");
        if (string == null) {
            throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(").append(uRL.toString()).append(") use keys([proxy])").toString());
        }
        ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(string);
        return proxyFactory.getProxy(invoker);
    }

    public Object getProxy(Invoker invoker, boolean bl) throws RpcException {
        if (invoker == null) {
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        }
        if (invoker.getUrl() == null) {
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        }
        URL uRL = invoker.getUrl();
        String string = uRL.getParameter("proxy", "javassist");
        if (string == null) {
            throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(").append(uRL.toString()).append(") use keys([proxy])").toString());
        }
        ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(string);
        return proxyFactory.getProxy(invoker, bl);
    }

    public Invoker getInvoker(Object object, Class class_, URL uRL) throws RpcException {
        if (uRL == null) {
            throw new IllegalArgumentException("url == null");
        }
        URL uRL2 = uRL;
        String string = uRL2.getParameter("proxy", "javassist");
        if (string == null) {
            throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(").append(uRL2.toString()).append(") use keys([proxy])").toString());
        }
        ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(string);
        return proxyFactory.getInvoker(object, class_, uRL);
    }
}

其实,在Dubbo的源码中可以看到有一些@Adaptive标注的类,但是不多,这些类的结构非常相似,可能是Dubbo的开发者后续发现了这些相似性,所以采用了自适应方式的设计来直接在内存中动态的生成,这样避免了大量重复结构的代码。

export()方法简单的处理后,直接调用了doExport()方法,doExport()方法做了大量的参数校验和初始化,然后调用doExportUrls()方法开始对服务做导出。

首先,它会加载注册中心:loadRegistries(true)。先解析配置的注册中心(可以是多个)到AbstractInterfaceConfig#registries列表中,然后循环registries列表,根据配置的参数来组装注册链接URL列表(dubbo的配置信息封装为URL对象)。

加载注册中心时会动态生成RegistryFactory扩展点的自适应对象,代码如下:

package com.alibaba.dubbo.registry;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.registry.Registry;
import com.alibaba.dubbo.registry.RegistryFactory;

public class RegistryFactory$Adaptive
implements RegistryFactory {
    @Override
    public Registry getRegistry(URL uRL) {
        String string;
        if (uRL == null) {
            throw new IllegalArgumentException("url == null");
        }
        URL uRL2 = uRL;
        String string2 = string = uRL2.getProtocol() == null ? "dubbo" : uRL2.getProtocol();
        if (string == null) {
            throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(").append(uRL2.toString()).append(") use keys([protocol])").toString());
        }
        RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(string);
        return registryFactory.getRegistry(uRL);
    }
}

接下来,循环ServiceConfig#protocols列表,为配置的protocol组装配置URL并导出服务,在正式export前会组装参数map,同时会解析MethodConfigArgumentConfig的配置信息,处理完配置信息,接下来就是真正的export了。

export分为本地导出和远程导出,由参数scopescope的值有nonelocalremote三种)决定,没有指定为remote时会做本地导出,没有指定为local时会做local导出,如果指定为none,则不做导出;本地导出,使用的是InJvmProtocol#export来导出(需要注意的是,protocol扩展对象是在ServiceConfig初始化时,这个加载的过程在之前的SPI篇中已经分析过,这里要注意的是protocol扩展对象会被Wrapper实现包装,并且,在不同的地方还会根据url#protocol参数的变化来动态的加载不同的Protocol.class的扩展实现);接下来就是远程导出了,远程导出时会对每个配置的protocol去循环前面生成的registryURLs列表,也就是说会远程导出到所有配置的注册中心,这里也要注意,在这里会检查配置的proxy参数,根据配置参数的值加载对应的ProxyFactory.class扩展实现来生成服务的代理对象Invoker.classInvoker对象是服务的核心(我们暂时不关注具体的细节,后面专门来分析Invoker的核心逻辑),这里要留意在proxyFactory#getInvoker()时,registryURL将具体服务的URLdubbo://xxxx开头的URL参数)作为export参数加入到了registryURL中,同时,默认使用JavassistProxyFactory扩展实现,返回的结果是通过Wrapper类动态生成的一个包装类Wrapper0;另外,这里循环时的URLregistry协议的URL,所以被Protocol$Adaptive#export处理时,真正做protocol#export的是RegistryProtocol#export,该方法先调用了RegistryProtocol#doLocalExport()doLocalExport()方法中在第一次时又会通过protocol#export()来做真正的服务导出,这里传入的Invoker对象是一个委托类型的InvokerDelegete对象,该对象在构造方法中会传入服务参数的URL,也就是前面registryURL中的export参数,因为默认服务使用的是dubbo协议,这里的protocol对象是从ServiceConfig传入的,是同一个对象,所以又会调用Protocol$Adaptive#export()方法进行包装处理,而这时的协议为dubbo,所以里面真正做exportprotocol实现就是DubboProtocol的实现了。

Wrapper#getWrapper()返回的包装类对象是一个动态拼接,然后在内存中使用javassist编译后再实例化的对象,具体的代码如下:

package com.alibaba.dubbo.common.bytecode;

import com.alibaba.dubbo.common.bytecode.ClassGenerator;
import com.alibaba.dubbo.common.bytecode.NoSuchMethodException;
import com.alibaba.dubbo.common.bytecode.NoSuchPropertyException;
import com.alibaba.dubbo.common.bytecode.Wrapper;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import me.zy.std.dubbo.spi.api.EchoService;

public class Wrapper0
extends Wrapper
implements ClassGenerator.DC {
    public static String[] pns;
    public static Map pts;
    public static String[] mns;
    public static String[] dmns;
    public static Class[] mts0;

    @Override
    public String[] getPropertyNames() {
        return pns;
    }

    @Override
    public boolean hasProperty(String string) {
        return pts.containsKey(string);
    }

    public Class getPropertyType(String string) {
        return (Class)pts.get(string);
    }

    @Override
    public String[] getMethodNames() {
        return mns;
    }

    @Override
    public String[] getDeclaredMethodNames() {
        return dmns;
    }

    @Override
    public void setPropertyValue(Object object, String string, Object object2) {
        try {
            EchoService echoService = (EchoService)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" filed or setter method in class me.zy.std.dubbo.spi.api.EchoService.").toString());
    }

    @Override
    public Object getPropertyValue(Object object, String string) {
        try {
            EchoService echoService = (EchoService)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" filed or setter method in class me.zy.std.dubbo.spi.api.EchoService.").toString());
    }

    public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
        EchoService echoService;
        try {
            echoService = (EchoService)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        try {
            if ("echo".equals(string) && arrclass.length == 1) {
                return echoService.echo((String)arrobject[0]);
            }
        }
        catch (Throwable throwable) {
            throw new InvocationTargetException(throwable);
        }
        throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class me.zy.std.dubbo.spi.api.EchoService.").toString());
    }
}

具体到DubboProtocol协议中导出时,首先会根据服务的参数URL生成服务的唯一标识serviceKey(url),然后就是openServer(url)了,第一次会创建ExchangeServer对象,这里会加载Transporter.class扩展实现,默认使用netty实现,ExchangeServer对象是通过Exchangers.bind(url, requestHandler)方法来生成的,在这个bind()方法中,会根据URLexchanger参数来加载Exchanger.class的扩展实现,默认使用HeaderExchanger的实现,然后调用HeaderExchanger#bind()方法来返回HeaderExchangeServerExchangeServer实现,这里又会调用Transporters.bind()来真正的创建Server对象,这里通过自适应的方式来加载Transporter.class扩展实现,默认是netty实现NettyTransporter,在NettyTransporter#bind()方法中,返回了Server的实现NettyServer对象,这样就创建好Server了。在Exchangers#bind()方法创建Server对象时,一同传入的还有一个ExchangeHandler对象参数,该参数是一个抽象类ExchangeHandlerAdapter的实现,该handlerHeaderExchanger中传入Transporters.bind()方法之前会分别被HeaderExchangeHandlerDecodeHandler进行包装,然后在Transporters.bind()中进一步被包装为ChannelHandlerDispatcher,接着传给了NettyServer,在NettyServer中,又被ChannelHandlers#wrapInternal()方法分别用HeartbeatHandlerMultiMessageHandler进行包装,在这里又会通过自适应的方式加载Dispatcher.class扩展实现,这又是一个动态生成的自适应包装类Dispatcher$Adaptive,该自适应类会动态的根据URL中的dispatcher参数来动态的调用对应的Dispather的扩展实现来做dispatch()操作,默认使用AllDispatcher的实现,这里会再一次用AllChannelHandler来对前面传入的handler进行包装。在前面的步骤中,生成NettyServer对象时,在NettyServer的构造方法中调用了父类的构造方法,也就是AbstractServer的构造方法,这个构造方法回调了子类的doOpen()方法,也就是NettyServer#doOpen()方法,这里就正式的启动了Netty服务器。Netty服务器会绑定Codec2扩展实现,默认使用telnet实现,但DubbpProtocol中指定了codecdubbo,所以这里会加载DubboCountCodec的实现,在Netty服务器中,codec被封装到NettyCodecAdapter适配器对象中,同时,该对象实现了Netty的编码和解码器handler,编解码器通过回调Codec2中的encode()decode()方法来做编解码处理,DubboCountCodec又调用DubboCodec的实现,DubboCodec中会加载Serialization.class扩展实现来进行不同协议的编解码操作,解码是在ExchangeCodec#decodeBody()方法中,调用CodecSupport.deserialize()方法时动态加载Serialization.class的扩展实现的。

上面的Transporter扩展点动态生成的自适应类,代码如下:

package com.alibaba.dubbo.remoting;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.Client;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.Server;
import com.alibaba.dubbo.remoting.Transporter;

public class Transporter$Adaptive
implements Transporter {
    @Override
    public Server bind(URL uRL, ChannelHandler channelHandler) throws RemotingException {
        if (uRL == null) {
            throw new IllegalArgumentException("url == null");
        }
        URL uRL2 = uRL;
        String string = uRL2.getParameter("server", uRL2.getParameter("transporter", "netty"));
        if (string == null) {
            throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(").append(uRL2.toString()).append(") use keys([server, transporter])").toString());
        }
        Transporter transporter = ExtensionLoader.getExtensionLoader(Transporter.class).getExtension(string);
        return transporter.bind(uRL, channelHandler);
    }

    @Override
    public Client connect(URL uRL, ChannelHandler channelHandler) throws RemotingException {
        if (uRL == null) {
            throw new IllegalArgumentException("url == null");
        }
        URL uRL2 = uRL;
        String string = uRL2.getParameter("client", uRL2.getParameter("transporter", "netty"));
        if (string == null) {
            throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(").append(uRL2.toString()).append(") use keys([client, transporter])").toString());
        }
        Transporter transporter = ExtensionLoader.getExtensionLoader(Transporter.class).getExtension(string);
        return transporter.connect(uRL, channelHandler);
    }
}

Dispatcher扩展点的自适应类,这里可以注意一下获取多个配置参数的处理:

package com.alibaba.dubbo.remoting;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.Dispatcher;

public class Dispatcher$Adaptive implements Dispatcher {
    @Override
    public ChannelHandler dispatch(ChannelHandler channelHandler, URL uRL) {
        if (uRL == null) {
            throw new IllegalArgumentException("url == null");
        }
        URL uRL2 = uRL;
        String string = uRL2.getParameter("dispatcher", uRL2.getParameter("dispather", uRL2.getParameter("channel.handler", "all")));
        if (string == null) {
            throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.remoting.Dispatcher) name from url(").append(uRL2.toString()).append(") use keys([dispatcher, dispather, channel.handler])").toString());
        }
        Dispatcher dispatcher = ExtensionLoader.getExtensionLoader(Dispatcher.class).getExtension(string);
        return dispatcher.dispatch(channelHandler, uRL);
    }
}

DubboProtocol#export()完成后会回到RegistryProtocol#export()方法中继续服务的注册流程,RegistryProtocol#getRegistry()方法会根据registryUrlregistry参数传入的值来动态加载Registry的扩展实现,这里是通过RegistryFactory的扩展实现来创建Registry对象的,RegistryFactoryRegistryProtocol扩展在实例化阶段通过ExtensionLoader#injectExtension()方法注入的一个动态的自适应扩展包装类RegistryFactory$Adaptive,该类会根据传入的registryUrl对象的protocol参数来动态加载Registry的扩展实现,创建Registry对象后,会生成registeredProviderUrl对象,这是服务提供者的参数URL对象,然后ProviderConsumerRegTable#registerProvider()方法会将InvokerregistryUrlproviderUrl对象包装为ProviderInvokerWrapper对象,并缓存在本地,然后调用前面创建的Registry对象实例来做注册操作,我们这里使用的是zookeeper,所以此时会调用ZookeeperRegistry#doRegister()方法来注册要导出的服务,本质上就是在ZooKeeper上创建路径节点,ZookeeperRegistry在实例化时,会动态获取ZookeeperTransporter的扩展实现,默认为curator的实现,ZookeeperTransporter的创建同样也是通过自适应的方式生成自适应的包装类ZookeeperTransporter$Adaptive,然后通过client参数和transporter参数来动态加载ZookeeperTransporter的实现。注册成功后会将ProviderConsumerRegTable中对应的ProviderInvokerWrapper包装类的isReg设置为true。随后,还创建了OverrideListener监听器来监听服务的重载,通过ZookeeperRegistryZooKeeper上创建节点来订阅服务端服务的变更通知,这样,当服务端发布的服务发生变更时,服务端通过ZooKeeper上注册的对应的客户端列表来通知服务的变更,此时CuratorZookeeperClient会接收到WatchedEvent事件,然后回调注册的OverrideListener#notify()方法,该方法又回调用RegistryProtocol#doChangeLocalExport()方法,doChangeLocalExport()方法会重新用newInvokerUrl参数来重新注册变更的服务。这样,整个服务注册的流程基本上完全清晰了。

下面的图片是Dubbo服务导出的时序图:

0x04 Dubbo服务的引用分析

这里还是以SPI篇中的示例来来分析:

public class EchoConsumer {

    public static void main(String[] args) throws InterruptedException {
        ReferenceConfig<EchoService> reference = new ReferenceConfig<>();
        reference.setApplication(new ApplicationConfig("echo-service"));
        reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
        reference.setInterface(EchoService.class);
        EchoService service = reference.get();
        String message = service.echo("dubbo");
        System.out.println(message);
        new CountDownLatch(1).await();
    }
}

Dubbo的服务引用逻辑与导出大致相似,首先也是通过ReferenceConfig来配置服务引用的各种参数,如application configmodule configconsumer config等。

ReferenceConfig#init()初始化各种参数,然后调用createProxy()方法来创建代理对象,重点就在这个createProxy()方法中了。

首先,它会判断调用的方式,这里有三种情况:同一JVM内调用、通过指定的url直连、通过注册中心调用。无论是哪种方式,最终都是返回一个Invoker对象出来(InvokerDubbo的核心领域对象)。三种方式的入口都是Protocol$Adaptive#refer()方法,这是一个动态生成的自适应类,refer()方法会根据URLprotocol参数来动态加载Protocol的扩展实现,默认是dubbo的实现,通过注册中心调用时,首先会加载RegistryProtocol的实现,这里会获取注册中心的实例Registry,然后调用doRefer()方法继续执行refer的逻辑,doRefer()方法中,首先会创建RegistryDirectory实例,然后绑定registryprotocol,组装subscribeUrl,接着会向注册中心注册registry.register(registeredConsumerUrl),注册的URL也会绑定到directory,再接下来会调用directory.subscribe()方法来订阅providersconfiguratorsrouters的数据,这里的subscribe()方法调用进去会调用到FailbackRegistry#subscribe()方法,然后会回调到子类ZookeeperRegistry#doSubscribe()方法,这里的逻辑和服务的导出类似,创建zookeeper的路径节点,然后调用notify()方法来通知监听器,notify()方法又会调回父类,然后进一步调回AbstractRegistry#notify()方法,再继续调回到RegistryDirectory#notify()方法,在这里,会调用refreshInvoker()方法刷新invoker,进入到该方法中后,如果有提供者的invokerUrl,则会调用toInvokers()方法来进行转换操作,这里的转换是将invokerUrl转换为具体的invoker对象,这里会动态加载Protocol的自适应对象,返回的是自适应对象Protocol$Adaptive,它会根据protocol参数来动态的返回具体的Protocol实现,默认是dubbo,也就是会返回DubboProtocol实现,然后会有一个参数合并的过程,这里有根据默认的覆盖规则(override > -D >Consumer > Provider)进行一些处理,然后再进一步做判断,之后会创建一个InvokerDelegate对象,该对象的构造方法中的其中一个参数就是DubboProtocol#refer()的返回值,到这里,服务引用的调用就到了DubboProtocol#refer()方法,这里返回的是DubboInvoker的包装对象,包装之前,会调用getClients()方法来创建ExchangeClient对象,这里面会判断是否共享连接,然后首次还会调用initClient()方法来创建新的连接,initClient()方法中会绑定client参数(默认为netty),codec参数(默认为dubbo),heartbeat参数(默认为60000),然后会生成Transporter$Adaptive对象根据参数client动态的加载Transporter扩展,真正的连接是在Exchangers#connect()方法中,先获取Exchanger的扩展实现,然后再进行connect(),这里的逻辑和前面服务导出部分的bind()的逻辑非常相似,只不过一个是bind()方法,一个是connect()方法,经过一系列的包装,最终会创建NettyClient对象,然后经过层层的包装后返回,到这里,invoker对象还没包装完,这里只是处理完了directory#subscribe()的逻辑,subscribe()完成后,还会进一步使用Cluster扩展进行包装,Cluster$Adaptive自适应类会根据cluster参数来动态加载具体的扩展实现,默认为failover的实现,Cluster扩展在加载时,也有一个Wrapper会自动包装,它就是MockClusterWrapper,这里会包装成一个MockClusterInvoker返回(包装了服务降级的处理),里面才是调用FailoverCluster#join(),这里又会包装成一个FailoverClusterInvoker对象(包装了负载均衡的处理,即LoadBalance扩展,以及失败重试的处理,默认最多调用三次),到这里,服务引用时的invoker对象才算是真正的创建完成了。有了invoker对象之后,还会调用ProviderConsumerRegTable#registerConsumer()方法来将服务消费者注册到本地。有了invoker对象,下一步就回到了ReferenceConfig#createProxy()方法,这里还有最后一步,使用ProxyFactory#getProxy()来获取代理对象,ProxyFactory$Adaptive#getProxy()方法会根据proxy参数动态的加载具体的扩展实现,默认是javassist的实现,这里又回到前面分析过的JavassistProxyFactory#getProxy()方法的调用了,它会返回包装好的代理对象。

一些相关的扩展点的自适应类的代码与服务导出时生成的是一样的,参考上面贴的代码就行了。

下面的图片是Dubbo服务引用的时序图:

0x05 微内核加插件式的架构

到这里,服务的导出和服务的引用流程就分析完了,其实这里面还有很多的细节,比如,根据URL的参数做一些动态的处理逻辑,光是内置的参数就已经很多了,而且很多的参数都是支持一些不同的策略的,所以这里面的一些处理逻辑很多。另外,无论是服务的导出,还是服务的引用,最后真正调用的都是一个代理对象,这个代理对象是对Invoker对象的包装,这里面还包括了过滤器链的加载和处理,以及到具体的Transporter实现时的各种Handler的层层包装,整个逻辑比较复杂,InvokerDubbo框架的核心领域对象,这个后面会专门分析。

再回过头来看Dubbo服务的导出和引用的时序图,我用绿色标出了Dubbo的扩展点,你会发现,使用Dubbo框架时,除了环境配置使用的是相关的API对象之后,其它的流程全部是基于扩展点之前的调用,换句话说就是,Dubbo真正的服务导出和服务引用全部是基于抽象(这些抽象全部都是Dubbo框架的扩展点,可以随时自由扩展)的实现,这就是框架的高层设计,在这一点上,Dubbo框架封装得非常漂亮,完全基于抽象之间的调用来组装框架的处理流程,而真正的逻辑处理则是通过插件(这里抽象的具体实现)的方式来实现的,在使用的过程中,按照自己的需要去配置好要使用的具体实现即可,就算你不配置,Dubbo也会提供默认的实现,对使用者友好。Dubbo这里基于扩展点的抽象设计组装主流程,具体实现交给插件实现的模式就是现在非常流行的微内核加插件的模式(microkernel + plugin),对于我们开发一个框架,非常具有借鉴的意义。

0x06 总结

对于Dubbo的服务导出和引用的过程分析,我这里没有贴出Dubbo具体步骤的相关代码,一方面是贴出来代码太多了,另一方面,我已经把详细的调用链写出来了,看的时候只需要对照我上面说的入口和相关的方法调用的过程就已经很清晰了。

分析Dubbo服务导出时,因为很多的控制类是在内存中动态处理的,还有一些是在内存中拼接源码字符串,然后在内存中编译,再反射创建对象的(比如Wrapper#getWrapper()),对于这些类的源码看不到的时候,心里会有一种莫名的不踏实感,好在后来找到了阿里开源的Arthas框架,通过arthas可以动态的查找JVM中生成的类、反编译这些类的源码、实时监控方法的返回值等,功能非常强大,太感谢阿里团队了。

也正是因为这些动态生成的类不太清楚的原因,所以在分析服务导出的时候浪费了一些时间,不过找到了解决办法之后,一切都变得简单轻快了。

其实那些Xxxx$Adaptive自适应类生成出来的模式非常相似,本质上就是配置URL中的参数值来动态加载不同的扩展实现,没看到源码的时候心里总不踏实,看多了之后基本上也就不用看了。

尤其是Wrapper#getWrapper()那里,开始我是去拼出来的,后来还去看了javassist的文档(不看文档没办法理解里面的一些特殊符号的意义),基本上把Wrapper0的源码分析出来了,但是因为对javassist不是特别熟悉,心里老纠结会不会没拼对,后来使用arthas反编译之后发现拼对了,这才踏实了。

分析完导出和引用的流程后,对照Dubbo文档画了一次这两个操作的时序图,然后顺序把扩展点用绿色标识了出来,现在你再回过头去看两张时序图,你会发现除了Dubbo暴露给用户配置用的APIConfig相关的对象)之外,其它的流程全部是通过扩展点来处理的,也就是说,Dubbo通过抽象的扩展点来封装了框架的主流程,但具体的处理逻辑交给了扩展点的具体实现,这些实现可以是Dubbo自己实现的,也可以是任何第三方实现的,这样就使得Dubbo框架具有了非常良好的可扩展性。

此外,Dubbo框架的源码中还使用了非常多的设计模式来解决不同的问题,比如代理模式、装饰模式、委托模式、过滤器链模式、模板方法模式等等,非常的多,这些东西都值得我们去分析学习。

到这里,Dubbo框架的一些大致的流程和机制都分析清楚了,剩下还有ProxyInvoker相关的核心领域对象没有完整的分析(其实在分析导出的时候,我没按捺住,追着代码已经分析了一部分了),后续会对这个核心域进行分析。

好了,这篇就到此为止了,欢迎各种反馈和交流!

References:
  1. Dubbo
  2. Arthas
  3. 示例代码仓库