美文网首页
dubbo的服务消费

dubbo的服务消费

作者: 剑道_7ffc | 来源:发表于2020-06-08 07:57 被阅读0次

入口

xml

<dubbo:reference id="xxxService" interface="xxx.xxx.Service"/>

注解

ReferenceAnnotationBeanPostProcessor->ReferenceBeanInvocationHandler.init- >ReferenceConfig.get() 获得一个远程代理类

ReferenceBeanInvocationHandler#init

private void init() {
    this.bean = referenceBean.get();
}

ReferenceConfig#get

public synchronized T get() {
    checkAndUpdateSubConfigs();

    if (destroyed) {
        throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
    }
    if (ref == null) {
        init();
    }
    return ref;
}

ReferenceConfig#init

private void init() {
    ref = createProxy(map);
}

ReferenceConfig#createProxy

因为URL是register://ip:port,所以REF_PROTOCOL.refer调用的是RegistryProtocol的refer

private T createProxy(Map<String, String> map) {
    List<URL> us = loadRegistries(false);
    if (CollectionUtils.isNotEmpty(us)) {
        for (URL u : us) {
            URL monitorUrl = loadMonitor(u);
            if (monitorUrl != null) {
                map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
            }
            urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
        }
    }
    invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
    // create service proxy
    return (T) PROXY_FACTORY.getProxy(invoker);
}

RegistryProtocol.refer

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    //zookeeper://
    url = URLBuilder.from(url)
            .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
            .removeParameter(REGISTRY_KEY)
            .build();
    Registry registry = registryFactory.getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    return doRefer(cluster, registry, type, url);
}

RegistryProtocol#doRefer

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    //构建RegistryDirectory
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    //注册consumer://协议的url
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
        //consumeUrl
        directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
        registry.register(directory.getRegisteredConsumerUrl());
    }
    directory.buildRouterChain(subscribeUrl);
    //订阅节点的变化
    directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
            PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
    //构建Invoker
    Invoker invoker = cluster.join(directory);
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    return invoker;
}

Cluster是什么

Cluester是一个自适应扩展点,通过set方法注意注入进来的

@SPI(FailoverCluster.NAME)
public interface Cluster {
    @Adaptive
    <T> Invoker<T> join(Directory<T> directory) throws RpcException;
}

Cluster$Adaptive

import org.apache.dubbo.common.extension.ExtensionLoader;

public class Cluster$Adaptive implements org.apache.dubbo.rpc.cluster.Cluster {
    public org.apache.dubbo.rpc.Invoker join(org.apache.dubbo.rpc.cluster.Directory arg0) throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.cluster.Directory argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("org.apache.dubbo.rpc.cluster.Directory argument getUrl() == null");
        org.apache.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("cluster", "failover");
        if (extName == null)
            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.cluster.Cluster) name from url (" + url.toString() + ") use keys([cluster])");
        org.apache.dubbo.rpc.cluster.Cluster extension = (org.apache.dubbo.rpc.cluster.Cluster) 
                ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
        return extension.join(arg0);
    }
}

MockClusterWrapper#join

public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
    return new MockClusterInvoker<T>(directory,
            this.cluster.join(directory));
}

FailoverCluster#join

public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
    return new FailoverClusterInvoker<T>(directory);
}

RegistryProtocol#doRefer

Invoker是MockClusterWrapper(FailoverClusterInvoker(RegistryDirectory))的对象

Invoker invoker = cluster.join(directory);

JavassistProxyFactory#getProxy

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

Proxy#getProxy

动态生成字节码文件,@Reference本质上是动态代理类,通过handler.invoke来返回结果,handler是InvokerInvocationHandler

public java.lang.String queryPayList() {
    Object[] args = new Object[0];
    Object ret = handler.invoke(this, methods[0], args);
    return (java.lang.String) ret;
}

InvokerInvocationHandler#invoke

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    Class<?>[] parameterTypes = method.getParameterTypes();
    if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
    }
    if ("toString".equals(methodName) && parameterTypes.length == 0) {
        return invoker.toString();
    }
    if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
        return invoker.hashCode();
    }
    if ("equals".equals(methodName) && parameterTypes.length == 1) {
        return invoker.equals(args[0]);
    }

    return invoker.invoke(new RpcInvocation(method, args)).recreate();
}

相关文章

  • dubbo 学习笔记

    服务接口消费接口

  • 2018-06-20-dubbo基于注解配置

    1、配置springboot的dubbo服务【服务提供方】 2、dubbo接口【对外暴露的dubbo接口,方便消费...

  • Dubbo

    Dubbo 基础 Dubbo角色 provider:服务提供方 consumer:服务消费方 registry:注...

  • dubbo-Reference注解改进

    1.dubbo服务过多引用的问题 1.1 dubbo服务以xml配置消费者 由于引用多个dubbo服务时,在未使用...

  • 如何查看Zookeeper下Dubbo的Provider与Con

    查看Zookeeper下Dubbo的服务提供者与服务消费者信息:有些场景下我们需要查看ZK下dubbo的注册和消费...

  • dubbo 小结

    dubbo soa,rpc框架 提供服务和消费端模式 简单流程 provider 注册服务到zookeper,消费...

  • dubbo的服务消费

    入口 xml 注解 ReferenceAnnotationBeanPostProcessor->Reference...

  • 高可用

    1、zookeeper宕机与dubbo直连 现象:zookeeper注册中心宕机,还可以消费dubbo暴露的服务。...

  • Dubbo服务端

    Dubbo服务端的主要功能就是服务提供端向注册中心注册服务,这样消费端能够从注册中心获取相应的服务。 Dubbo ...

  • 2.Dubbo源码阅读-配置篇

    Dubbo的分支: 3.0Dubbo的服务提供者会将RPC服务的调用说明,导出到配置中心。然后服务的消费者向配置中...

网友评论

      本文标题:dubbo的服务消费

      本文链接:https://www.haomeiwen.com/subject/wbuctktx.html