DUBBO泛化調(diào)用原理與設(shè)計(jì)思想
JAVA前線?
歡迎大家關(guān)注公眾號(hào)「JAVA前線」查看更多精彩分享,主要包括源碼分析、實(shí)際應(yīng)用、架構(gòu)思維、職場(chǎng)分享、產(chǎn)品思考等等,同時(shí)也非常歡迎大家加我微信「java_front」一起交流學(xué)習(xí)
1 泛化調(diào)用實(shí)例
對(duì)于JAVA服務(wù)端開發(fā)者而言在使用Dubbo時(shí)并不經(jīng)常使用泛化調(diào)用,通常方法是在生產(chǎn)者發(fā)布服務(wù)之后,消費(fèi)者可以通過(guò)引入生產(chǎn)者提供的client進(jìn)行調(diào)用。那么泛化調(diào)用使用場(chǎng)景是什么呢?
第一種場(chǎng)景是消費(fèi)者不希望引入生產(chǎn)者提供的client依賴,只希望關(guān)注調(diào)用哪個(gè)方法,需要傳什么參數(shù)即可。第二種場(chǎng)景是消費(fèi)者不是使用Java語(yǔ)言,而是使用例如Python語(yǔ)言,那么如何調(diào)用使用Java語(yǔ)言生產(chǎn)者提供的服務(wù)呢?這時(shí)我們可以選擇泛化調(diào)用。
泛化調(diào)用使用方法并不復(fù)雜,下面我們編寫一個(gè)泛化調(diào)用實(shí)例。首先生產(chǎn)者發(fā)布服務(wù),這與普通服務(wù)發(fā)布沒(méi)有任何區(qū)別。
package?com.java.front.dubbo.demo.provider;
public?interface?HelloService?{
????public?String?sayHelloGeneric(Person?person,?String?message);
}
public?class?HelloServiceImpl?implements?HelloService?{
????@Override
????public?String?sayHelloGeneric(Person?person,?String?message)?throws?Exception?{
????????String?result?=?"hello["?+?person?+?"],message="?+?message;
????????return?result;
????}
}
Person類聲明:
package?com.java.front.dubbo.demo.provider.model;
public?class?Person?implements?Serializable?{
????private?String?name;
}
provider.xml文件內(nèi)容:
<beans?xmlns="http://www.springframework.org/schema/beans"?
????xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"?
????xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
????xsi:schemaLocation="http://www.springframework.org/schema/beans????????
????http://www.springframework.org/schema/beans/spring-beans-4.3.xsd????????
????http://code.alibabatech.com/schema/dubbo????????
????http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
?
?<dubbo:application?name="java-front-provider"?/>
?
?<dubbo:registry?address="zookeeper://127.0.0.1:2181"?/>
?
?<dubbo:protocol?name="dubbo"?port="9999"?/>
?
?
?<bean?id="helloService"?class="com.java.front.dubbo.demo.provider.HelloServiceImpl"?/>
?
?
?<dubbo:service?interface="com.java.front.dubbo.demo.provider.HelloService"?ref="helloService"?/>
beans>
消費(fèi)者代碼有所不同:
import?org.apache.dubbo.config.ApplicationConfig;
import?org.apache.dubbo.config.ReferenceConfig;
import?org.apache.dubbo.config.RegistryConfig;
import?org.apache.dubbo.rpc.RpcContext;
import?org.apache.dubbo.rpc.service.GenericService;
public?class?Consumer?{
????public?static?void?testGeneric()?{
????????ReferenceConfig?reference?=?new?ReferenceConfig();
????????reference.setApplication(new?ApplicationConfig("java-front-consumer"));
????????reference.setRegistry(new?RegistryConfig("zookeeper://127.0.0.1:2181"));
????????reference.setInterface("com.java.front.dubbo.demo.provider.HelloService");
????????reference.setGeneric(true);
????????GenericService?genericService?=?reference.get();
????????Map?person?=?new?HashMap();
????????person.put("name",?"微信公眾號(hào)「JAVA前線」");
????????String?message?=?"你好";
????????Object?result?=?genericService.$invoke("sayHelloGeneric",?new?String[]?{?"com.java.front.dubbo.demo.provider.model.Person",?"java.lang.String"?},?new?Object[]?{?person,?message?});
????????System.out.println(result);
????}
}
2 ?Invoker
我們通過(guò)源碼分析講解泛化調(diào)用原理,首先需要了解Invoker這個(gè)Dubbo重量級(jí)概念。
在生產(chǎn)者暴露服務(wù)流程總體分為兩步,第一步是接口實(shí)現(xiàn)類轉(zhuǎn)換為Invoker,第二步是Invoker轉(zhuǎn)換為Exporter并放入ExporterMap,我們首先分析生產(chǎn)者暴露服務(wù)流程:

生產(chǎn)者通過(guò)ProxyFactory.getInvoker方法創(chuàng)建Invoker(AbstractProxyInvoker):
public?class?JdkProxyFactory?extends?AbstractProxyFactory?{
????@Override
????public??Invoker?getInvoker(T?proxy,?Class?type,?URL?url) ? {
????????return?new?AbstractProxyInvoker(proxy,?type,?url)?{
????????????@Override
????????????protected?Object?doInvoke(T?proxy,?String?methodName,
??????????????????????????????????????Class>[]?parameterTypes,
??????????????????????????????????????Object[]?arguments)?throws?Throwable?{
????????????????//?proxy為被代理對(duì)象?->?com.java.front.dubbo.demo.provider.HelloServiceImpl
????????????????Method?method?=?proxy.getClass().getMethod(methodName,?parameterTypes);
????????????????return?method.invoke(proxy,?arguments);
????????????}
????????};
????}
}
我們?cè)俜治鱿M(fèi)者引用服務(wù)流程:

消費(fèi)者Invoker通過(guò)顯示實(shí)例化創(chuàng)建,例如本地暴露和遠(yuǎn)程暴露都是通過(guò)顯示初始化的方法創(chuàng)建Invoker(AbstractInvoker):
new?InjvmInvoker(serviceType,?url,?url.getServiceKey(),?exporterMap)
new?DubboInvoker(serviceType,?url,?getClients(url),?invokers)
再通過(guò)ProxyFactory.getProxy創(chuàng)建代理:
public?class?JdkProxyFactory?extends?AbstractProxyFactory?{
????@Override
????public??T?getProxy(Invoker?invoker,?Class>[]?interfaces) ?{
????????InvokerInvocationHandler?invokerInvocationHandler?=?new?InvokerInvocationHandler(invoker);
????????return?(T)?Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),?interfaces,?invokerInvocationHandler);
????}
}
無(wú)論是生產(chǎn)者還是消費(fèi)者的Invoker都實(shí)現(xiàn)自org.apache.dubbo.rpc.Invoker:
public?abstract?class?AbstractInvoker<T>?implements?Invoker<T>{
}
public?abstract?class?AbstractProxyInvoker<T>?implements?Invoker<T>?{
}3?責(zé)任鏈模式
為什么生產(chǎn)者和消費(fèi)者都要轉(zhuǎn)換為Invoker,而不是不直接調(diào)用呢?我認(rèn)為Invoker正是Dubbo設(shè)計(jì)精彩之處:真實(shí)調(diào)用都轉(zhuǎn)換為Invoker,這樣就可以通過(guò)責(zé)任鏈模式增強(qiáng)Invoker功能。
public?class?ProtocolFilterWrapper?implements?Protocol?{
????@Override
????public??Exporter?export(Invoker?invoker) ?throws?RpcException? {
????????if?(Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol()))?{
????????????return?protocol.export(invoker);
????????}
????????//?增加過(guò)濾器鏈
????????Invoker?invokerChain?=?buildInvokerChain(invoker,?Constants.SERVICE_FILTER_KEY,?Constants.PROVIDER);
????????return?protocol.export(invokerChain);
????}
????@Override
????public??Invoker?refer(Class?type,?URL?url) ?throws?RpcException? {
????????if?(Constants.REGISTRY_PROTOCOL.equals(url.getProtocol()))?{
????????????return?protocol.refer(type,?url);
????????}
????????//?增加過(guò)濾器鏈
????????Invoker?invoker?=?protocol.refer(type,?url);
????????Invoker?result?=?buildInvokerChain(invoker,?Constants.REFERENCE_FILTER_KEY,?Constants.CONSUMER);
????????return?result;
????}
}
無(wú)論是生產(chǎn)者還是消費(fèi)者都會(huì)創(chuàng)建過(guò)濾器鏈,我們看看buildInvokerChain這個(gè)方法:
public?class?ProtocolFilterWrapper?implements?Protocol?{
????private?final?Protocol?protocol;
????public?ProtocolFilterWrapper(Protocol?protocol)?{
????????if?(protocol?==?null)?{
????????????throw?new?IllegalArgumentException("protocol?==?null");
????????}
????????this.protocol?=?protocol;
????}
????private?static??Invoker?buildInvokerChain(final?Invoker?invoker,?String?key,?String?group) ? {
????????Invoker?last?=?invoker;
????????//?(1)加載所有包含Activate注解的過(guò)濾器
????????//?(2)根據(jù)group過(guò)濾得到過(guò)濾器列表
????????//?(3)Invoker最終被放到過(guò)濾器鏈尾部
????????List?filters?=?ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(),?key,?group);
????????if?(!filters.isEmpty())?{
????????????for?(int?i?=?filters.size()?-?1;?i?>=?0;?i--)?{
????????????????final?Filter?filter?=?filters.get(i);
????????????????final?Invoker?next?=?last;
????????????????//?構(gòu)造一個(gè)簡(jiǎn)化Invoker
????????????????last?=?new?Invoker()?{
????????????????????@Override
????????????????????public?Class?getInterface()? {
????????????????????????return?invoker.getInterface();
????????????????????}
????????????????????@Override
????????????????????public?URL?getUrl()?{
????????????????????????return?invoker.getUrl();
????????????????????}
????????????????????@Override
????????????????????public?boolean?isAvailable()?{
????????????????????????return?invoker.isAvailable();
????????????????????}
????????????????????@Override
????????????????????public?Result?invoke(Invocation?invocation)?throws?RpcException?{
????????????????????????//?構(gòu)造過(guò)濾器鏈路
????????????????????????Result?result?=?filter.invoke(next,?invocation);
????????????????????????if?(result?instanceof?AsyncRpcResult)?{
????????????????????????????AsyncRpcResult?asyncResult?=?(AsyncRpcResult)?result;
????????????????????????????asyncResult.thenApplyWithContext(r?->?filter.onResponse(r,?invoker,?invocation));
????????????????????????????return?asyncResult;
????????????????????????}?else?{
????????????????????????????return?filter.onResponse(result,?invoker,?invocation);
????????????????????????}
????????????????????}
????????????????????@Override
????????????????????public?void?destroy()?{
????????????????????????invoker.destroy();
????????????????????}
????????????????????@Override
????????????????????public?String?toString()?{
????????????????????????return?invoker.toString();
????????????????????}
????????????????};
????????????}
????????}
????????return?last;
????}
}
加載所有包含Activate注解的過(guò)濾器,根據(jù)group過(guò)濾得到過(guò)濾器列表,Invoker最終被放到過(guò)濾器鏈尾部,生產(chǎn)者最終生成鏈路:
EchoFilter?->?ClassloaderFilter?->?GenericFilter?->?ContextFilter?->?TraceFilter?->?TimeoutFilter?->?MonitorFilter?->?ExceptionFilter?->?AbstractProxyInvoker
消費(fèi)者最終生成鏈路:
ConsumerContextFilter?->?FutureFilter?->?MonitorFilter?->?GenericImplFilter?->?DubboInvoker
4 泛化調(diào)用原理
我們終于看到了泛化調(diào)用核心原理,在生產(chǎn)者鏈路看到GenericFilter過(guò)濾器,消費(fèi)者鏈路看到GenericImplFilter過(guò)濾器,正是這兩個(gè)過(guò)濾器實(shí)現(xiàn)了泛化調(diào)用。
(1) GenericImplFilter
@Activate(group?=?Constants.CONSUMER,?value?=?Constants.GENERIC_KEY,?order?=?20000)
public?class?GenericImplFilter?implements?Filter?{
????@Override
????public?Result?invoke(Invoker>?invoker,?Invocation?invocation)?throws?RpcException?{
????????//?方法名=$invoke
????????//?invocation.getArguments()=("sayHelloGeneric",?new?String[]?{?"com.java.front.dubbo.demo.provider.model.Person",?"java.lang.String"?},?new?Object[]?{?person,?"你好"?});
????????if?(invocation.getMethodName().equals(Constants.$INVOKE)
????????????????&&?invocation.getArguments()?!=?null
????????????????&&?invocation.getArguments().length?==?3
????????????????&&?ProtocolUtils.isGeneric(generic))?{
????????????//?第一個(gè)參數(shù)表示方法名
????????????//?第二個(gè)參數(shù)表示參數(shù)類型
????????????//?第三個(gè)參數(shù)表示參數(shù)值?->?[{name=微信公眾號(hào)「JAVA前線」},你好]
????????????Object[]?args?=?(Object[])?invocation.getArguments()[2];
????????????if?(ProtocolUtils.isJavaGenericSerialization(generic))?{
????????????????for?(Object?arg?:?args)?{
????????????????????if?(!(byte[].class?==?arg.getClass()))?{
????????????????????????error(generic,?byte[].class.getName(),?arg.getClass().getName());
????????????????????}
????????????????}
????????????}?else?if?(ProtocolUtils.isBeanGenericSerialization(generic))?{
????????????????for?(Object?arg?:?args)?{
????????????????????if?(!(arg?instanceof?JavaBeanDescriptor))?{
????????????????????????error(generic,?JavaBeanDescriptor.class.getName(),?arg.getClass().getName());
????????????????????}
????????????????}
????????????}
????????????//?附加參數(shù)generic值設(shè)置為true
????????????((RpcInvocation)?invocation).setAttachment(Constants.GENERIC_KEY,?invoker.getUrl().getParameter(Constants.GENERIC_KEY));
????????}
????????//?繼續(xù)執(zhí)行過(guò)濾器鏈路
????????return?invoker.invoke(invocation);
????}
}
(2) GenericFilter
@Activate(group?=?Constants.PROVIDER,?order?=?-20000)
public?class?GenericFilter?implements?Filter?{
????@Override
????public?Result?invoke(Invoker>?invoker,?Invocation?inv)?throws?RpcException?{
????????//?RpcInvocation[methodName=$invoke,?parameterTypes=[class?java.lang.String,?class?[Ljava.lang.String;,?class?[Ljava.lang.Object;],?arguments=[sayHelloGeneric,?[Ljava.lang.String;@14e77f6b,?[Ljava.lang.Object;@51e5f393],?attachments={path=com.java.front.dubbo.demo.provider.HelloService,?input=451,?dubbo=2.0.2,?interface=com.java.front.dubbo.demo.provider.HelloService,?version=0.0.0,?generic=true}]
????????if?(inv.getMethodName().equals(Constants.$INVOKE)
????????????????&&?inv.getArguments()?!=?null
????????????????&&?inv.getArguments().length?==?3
????????????????&&?!GenericService.class.isAssignableFrom(invoker.getInterface()))?{
????????????//?sayHelloGeneric
????????????String?name?=?((String)?inv.getArguments()[0]).trim();
????????????//?[com.java.front.dubbo.demo.provider.model.Person,?java.lang.String]
????????????String[]?types?=?(String[])?inv.getArguments()[1];
????????????//?[{name=微信公眾號(hào)「JAVA前線」},?你好]
????????????Object[]?args?=?(Object[])?inv.getArguments()[2];
????????????//?RpcInvocation[methodName=sayHelloGeneric,?parameterTypes=[class?com.java.front.dubbo.demo.provider.model.Person,?class?java.lang.String],?arguments=[Person(name=JAVA前線),?abc],?attachments={path=com.java.front.dubbo.demo.provider.HelloService,?input=451,?dubbo=2.0.2,?interface=com.java.front.dubbo.demo.provider.HelloService,?version=0.0.0,?generic=true}]
????????????RpcInvocation?rpcInvocation?=?new?RpcInvocation(method,?args,?inv.getAttachments());
????????????Result?result?=?invoker.invoke(rpcInvocation);
????????}
????}
}
5 文章總結(jié)
第一本文介紹了如何使用泛化調(diào)用,并引出泛化調(diào)用為什么生效這個(gè)問(wèn)題。
第二本文介紹了重量級(jí)概念I(lǐng)nvoker,并引出為什么Dubbo要?jiǎng)?chuàng)建Invoker這個(gè)問(wèn)題。
第三通過(guò)源碼分析知道了過(guò)濾器鏈增強(qiáng)了Invoker功能,并且是實(shí)現(xiàn)泛化調(diào)用的核心。
JAVA前線?
歡迎大家關(guān)注公眾號(hào)「JAVA前線」查看更多精彩分享,主要包括源碼分析、實(shí)際應(yīng)用、架構(gòu)思維、職場(chǎng)分享、產(chǎn)品思考等等,同時(shí)也非常歡迎大家加我微信「java_front」一起交流學(xué)習(xí)
