摘要:而存在的意義就是保證請求或響應(yīng)對象可在線程池中被解碼,解碼完成后,就會(huì)分發(fā)到的。
2.7大揭秘——服務(wù)端處理請求過程
目標(biāo):從源碼的角度分析服務(wù)端接收到請求后的一系列操作,最終把客戶端需要的值返回。前言
上一篇講到了消費(fèi)端發(fā)送請求的過程,該篇就要將服務(wù)端處理請求的過程。也就是當(dāng)服務(wù)端收到請求數(shù)據(jù)包后的一系列處理以及如何返回最終結(jié)果。我們也知道消費(fèi)端在發(fā)送請求的時(shí)候已經(jīng)做了編碼,所以我們也需要在服務(wù)端接收到數(shù)據(jù)包后,對協(xié)議頭和協(xié)議體進(jìn)行解碼。不過本篇不講解如何解碼。有興趣的可以翻翻我以前的文章,有講到關(guān)于解碼的邏輯。接下來就開始講解服務(wù)端收到請求后的邏輯。
處理過程假設(shè)遠(yuǎn)程通信的實(shí)現(xiàn)還是用netty4,解碼器將數(shù)據(jù)包解析成 Request 對象后,NettyHandler 的 messageReceived 方法緊接著會(huì)收到這個(gè)對象,所以第一步就是NettyServerHandler的channelRead。
(一)NettyServerHandler的channelRead可以參考《dubbo源碼解析(十七)遠(yuǎn)程通信——Netty4》的(三)NettyServerHandler
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 看是否在緩存中命中,如果沒有命中,則創(chuàng)建NettyChannel并且緩存。 NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try { // 接受消息 handler.received(channel, msg); } finally { // 如果通道不活躍或者斷掉,則從緩存中清除 NettyChannel.removeChannelIfDisconnected(ctx.channel()); } }
NettyServerHandler是基于netty4實(shí)現(xiàn)的服務(wù)端通道處理實(shí)現(xiàn)類,而該方法就是用來接收請求,接下來就是執(zhí)行AbstractPeer的received。
(二)AbstractPeer的received可以參考《dubbo源碼解析(九)遠(yuǎn)程通信——Transport層》的(一)AbstractPeer
public void received(Channel ch, Object msg) throws RemotingException { // 如果通道已經(jīng)關(guān)閉,則直接返回 if (closed) { return; } handler.received(ch, msg); }
該方法比較簡單,之前也講過AbstractPeer類就做了裝飾模式中裝飾角色,只是維護(hù)了通道的正在關(guān)閉和關(guān)閉完成兩個(gè)狀態(tài)。然后到了MultiMessageHandler的received
(三)MultiMessageHandler的received可以參考《dubbo源碼解析(九)遠(yuǎn)程通信——Transport層》的(八)MultiMessageHandler
public void received(Channel channel, Object message) throws RemotingException { // 如果消息是MultiMessage類型的,也就是多消息類型 if (message instanceof MultiMessage) { // 強(qiáng)制轉(zhuǎn)化為MultiMessage MultiMessage list = (MultiMessage) message; // 把各個(gè)消息進(jìn)行發(fā)送 for (Object obj : list) { handler.received(channel, obj); } } else { // 直接發(fā)送 handler.received(channel, message); } }
該方法也比較簡單,就是對于多消息的處理。
(四)HeartbeatHandler的received可以參考《dubbo源碼解析(十)遠(yuǎn)程通信——Exchange層》的(二十)HeartbeatHandler。其中就是對心跳事件做了處理。如果不是心跳請求,那么接下去走到AllChannelHandler的received。
(五)AllChannelHandler的received可以參考《dubbo源碼解析(九)遠(yuǎn)程通信——Transport層》的(十一)AllChannelHandler。該類處理的是連接、斷開連接、捕獲異常以及接收到的所有消息都分發(fā)到線程池。所以這里的received方法就是把請求分發(fā)到線程池,讓線程池去執(zhí)行該請求。
還記得我在之前文章里面講到到Dispatcher接口嗎,它是一個(gè)線程派發(fā)器。分別有五個(gè)實(shí)現(xiàn):
Dispatcher實(shí)現(xiàn)類 | 對應(yīng)的handler | 用途 |
---|---|---|
AllDispatcher | AllChannelHandler | 所有消息都派發(fā)到線程池,包括請求,響應(yīng),連接事件,斷開事件等 |
ConnectionOrderedDispatcher | ConnectionOrderedChannelHandler | 在 IO 線程上,將連接和斷開事件放入隊(duì)列,有序逐個(gè)執(zhí)行,其它消息派發(fā)到線程池 |
DirectDispatcher | 無 | 所有消息都不派發(fā)到線程池,全部在 IO 線程上直接執(zhí)行 |
ExecutionDispatcher | ExecutionChannelHandler | 只有請求消息派發(fā)到線程池,不含響應(yīng)。其它消息均在 IO 線程上執(zhí)行 |
MessageOnlyDispatcher | MessageOnlyChannelHandler | 只有請求和響應(yīng)消息派發(fā)到線程池,其它消息均在 IO 線程上執(zhí)行 |
這些Dispatcher的實(shí)現(xiàn)類以及對應(yīng)的Handler都可以在《dubbo源碼解析(九)遠(yuǎn)程通信——Transport層》中查看相關(guān)實(shí)現(xiàn)。dubbo默認(rèn)all為派發(fā)策略。所以我在這里講了AllChannelHandler的received。把消息送到線程池后,可以看到首先會(huì)創(chuàng)建一個(gè)ChannelEventRunnable實(shí)體。那么接下來就是線程接收并且執(zhí)行任務(wù)了。
(六)ChannelEventRunnable的runChannelEventRunnable實(shí)現(xiàn)了Runnable接口,主要是用來接收消息事件,并且根據(jù)事件的種類來分別執(zhí)行不同的操作。來看看它的run方法:
public void run() { // 如果是接收的消息 if (state == ChannelState.RECEIVED) { try { // 直接調(diào)用下一個(gè)received handler.received(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } } else { switch (state) { //如果是連接事件請求 case CONNECTED: try { // 執(zhí)行連接 handler.connected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; // 如果是斷開連接事件請求 case DISCONNECTED: try { // 執(zhí)行斷開連接 handler.disconnected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; // 如果是發(fā)送消息 case SENT: try { // 執(zhí)行發(fā)送消息 handler.sent(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } break; // 如果是異常 case CAUGHT: try { // 執(zhí)行異常捕獲 handler.caught(channel, exception); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is: " + message + ", exception is " + exception, e); } break; default: logger.warn("unknown state: " + state + ", message is " + message); } } }
可以看到把消息分為了幾種類別,因?yàn)檎埱蠛晚憫?yīng)消息出現(xiàn)頻率明顯比其他類型消息高,也就是RECEIVED,所以多帶帶先做處理,根據(jù)不同的類型的消息,會(huì)被執(zhí)行不同的邏輯,我們這里主要看state為RECEIVED的,那么如果是RECEIVED,則會(huì)執(zhí)行下一個(gè)received方法。
(七)DecodeHandler的received可以參考《dubbo源碼解析(九)遠(yuǎn)程通信——Transport層》的(七)DecodeHandler。可以看到received方法中根據(jù)消息的類型進(jìn)行不同的解碼。而DecodeHandler 存在的意義就是保證請求或響應(yīng)對象可在線程池中被解碼,解碼完成后,就會(huì)分發(fā)到HeaderExchangeHandler的received。
(八)HeaderExchangeHandler的receivedpublic void received(Channel channel, Object message) throws RemotingException { // 設(shè)置接收到消息的時(shí)間戳 channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); // 獲得通道 final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { // 如果消息是Request類型 if (message instanceof Request) { // handle request. // 強(qiáng)制轉(zhuǎn)化為Request Request request = (Request) message; // 如果該請求是事件心跳事件或者只讀事件 if (request.isEvent()) { // 執(zhí)行事件 handlerEvent(channel, request); } else { // 如果是正常的調(diào)用請求,且需要響應(yīng) if (request.isTwoWay()) { // 處理請求 handleRequest(exchangeChannel, request); } else { // 如果不需要響應(yīng),則繼續(xù)下一步 handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { // 處理響應(yīng) handleResponse(channel, (Response) message); } else if (message instanceof String) { // 如果是telnet相關(guān)的請求 if (isClientSide(channel)) { // 如果是客戶端側(cè),則直接拋出異常,因?yàn)榭蛻舳藗?cè)不支持telnet Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { // 如果是服務(wù)端側(cè),則執(zhí)行telnet命令 String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { // 如果都不是,則繼續(xù)下一步 handler.received(exchangeChannel, message); } } finally { // 移除關(guān)閉或者不活躍的通道 HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }
該方法中就對消息進(jìn)行了細(xì)分,事件請求、正常的調(diào)用請求、響應(yīng)、telnet命令請求等,并且針對不同的消息類型做了不同的邏輯調(diào)用。我們這里主要看正常的調(diào)用請求。見下一步。
(九)HeaderExchangeHandler的handleRequestvoid handleRequest(final ExchangeChannel channel, Request req) throws RemotingException { // 創(chuàng)建一個(gè)Response實(shí)例 Response res = new Response(req.getId(), req.getVersion()); // 如果請求被破壞了 if (req.isBroken()) { // 獲得請求的數(shù)據(jù)包 Object data = req.getData(); String msg; // 如果數(shù)據(jù)為空 if (data == null) { //消息設(shè)置為空 msg = null; // 如果在這之前已經(jīng)出現(xiàn)異常,也就是數(shù)據(jù)為Throwable類型 } else if (data instanceof Throwable) { // 響應(yīng)消息把異常信息返回 msg = StringUtils.toString((Throwable) data); } else { // 返回請求數(shù)據(jù) msg = data.toString(); } res.setErrorMessage("Fail to decode request due to: " + msg); // 設(shè)置錯(cuò)誤請求的狀態(tài)碼 res.setStatus(Response.BAD_REQUEST); // 發(fā)送該消息 channel.send(res); return; } // find handler by message class. // 獲得請求數(shù)據(jù) 也就是 RpcInvocation 對象 Object msg = req.getData(); try { // 繼續(xù)向下調(diào)用 返回一個(gè)future CompletionStage
該方法是處理正常的調(diào)用請求,主要做了正常、異常調(diào)用的情況處理,并且加入了狀態(tài)碼,然后發(fā)送執(zhí)行后的結(jié)果給客戶端。接下來看下一步。
(十)DubboProtocol的requestHandler實(shí)例的reply這里我默認(rèn)是使用dubbo協(xié)議,所以執(zhí)行的是DubboProtocol的requestHandler的reply方法。可以參考《dubbo源碼解析(二十四)遠(yuǎn)程調(diào)用——dubbo協(xié)議》的(三)DubboProtocol
public CompletableFuture
上述代碼有些變化,是最新的代碼,加入了CompletableFuture,這個(gè)我會(huì)在后續(xù)的異步化改造中講到。這里主要關(guān)注的是又開始跟客戶端發(fā)請求一樣執(zhí)行invoke調(diào)用鏈了。
(十一)ProtocolFilterWrapper的CallbackRegistrationInvoker的invoke可以直接參考《dubbo源碼解析(四十六)消費(fèi)端發(fā)送請求過程》的(六)ProtocolFilterWrapper的內(nèi)部類CallbackRegistrationInvoker的invoke。
(十二)ProtocolFilterWrapper的buildInvokerChain方法中的invoker實(shí)例的invoke方法。可以直接參考《dubbo源碼解析(四十六)消費(fèi)端發(fā)送請求過程》的(七)ProtocolFilterWrapper的buildInvokerChain方法中的invoker實(shí)例的invoke方法。
(十三)EchoFilter的invoke可以參考《dubbo源碼解析(二十)遠(yuǎn)程調(diào)用——Filter》的(八)EchoFilter
public Result invoke(Invoker> invoker, Invocation inv) throws RpcException { // 如果調(diào)用的方法是回聲測試的方法 則直接返回結(jié)果,否則 調(diào)用下一個(gè)調(diào)用鏈 if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) { // 創(chuàng)建一個(gè)默認(rèn)的AsyncRpcResult返回 return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv); } return invoker.invoke(inv); }
該過濾器就是對回聲測試的調(diào)用進(jìn)行攔截,上述源碼跟連接中源碼唯一區(qū)別就是改為了AsyncRpcResult。
(十四)ClassLoaderFilter的invoke可以參考《dubbo源碼解析(二十)遠(yuǎn)程調(diào)用——Filter》的(三)ClassLoaderFilter,用來做類加載器的切換。
(十五)GenericFilter的invoke可以參考《dubbo源碼解析(二十)遠(yuǎn)程調(diào)用——Filter》的(十一)GenericFilter。
public Result invoke(Invoker> invoker, Invocation inv) throws RpcException { // 如果是泛化調(diào)用 if ((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC)) && inv.getArguments() != null && inv.getArguments().length == 3 && !GenericService.class.isAssignableFrom(invoker.getInterface())) { // 獲得請求名字 String name = ((String) inv.getArguments()[0]).trim(); // 獲得請求參數(shù)類型 String[] types = (String[]) inv.getArguments()[1]; // 獲得請求參數(shù) Object[] args = (Object[]) inv.getArguments()[2]; try { // 獲得方法 Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types); // 獲得該方法的參數(shù)類型 Class>[] params = method.getParameterTypes(); if (args == null) { args = new Object[params.length]; } // 獲得附加值 String generic = inv.getAttachment(GENERIC_KEY); if (StringUtils.isBlank(generic)) { generic = RpcContext.getContext().getAttachment(GENERIC_KEY); } // 如果附加值為空,在用上下文攜帶的附加值 if (StringUtils.isEmpty(generic) || ProtocolUtils.isDefaultGenericSerialization(generic)) { // 直接進(jìn)行類型轉(zhuǎn)化 args = PojoUtils.realize(args, params, method.getGenericParameterTypes()); } else if (ProtocolUtils.isJavaGenericSerialization(generic)) { for (int i = 0; i < args.length; i++) { if (byte[].class == args[i].getClass()) { try (UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i])) { // 使用nativejava方式反序列化 args[i] = ExtensionLoader.getExtensionLoader(Serialization.class) .getExtension(GENERIC_SERIALIZATION_NATIVE_JAVA) .deserialize(null, is).readObject(); } catch (Exception e) { throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e); } } else { throw new RpcException( "Generic serialization [" + GENERIC_SERIALIZATION_NATIVE_JAVA + "] only support message type " + byte[].class + " and your message type is " + args[i].getClass()); } } } else if (ProtocolUtils.isBeanGenericSerialization(generic)) { for (int i = 0; i < args.length; i++) { if (args[i] instanceof JavaBeanDescriptor) { // 用JavaBean方式反序列化 args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]); } else { throw new RpcException( "Generic serialization [" + GENERIC_SERIALIZATION_BEAN + "] only support message type " + JavaBeanDescriptor.class.getName() + " and your message type is " + args[i].getClass().getName()); } } } else if (ProtocolUtils.isProtobufGenericSerialization(generic)) { // as proto3 only accept one protobuf parameter if (args.length == 1 && args[0] instanceof String) { try (UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream(((String) args[0]).getBytes())) { // 用protobuf-json進(jìn)行反序列化 args[0] = ExtensionLoader.getExtensionLoader(Serialization.class) .getExtension("" + GENERIC_SERIALIZATION_PROTOBUF) .deserialize(null, is).readObject(method.getParameterTypes()[0]); } catch (Exception e) { throw new RpcException("Deserialize argument failed.", e); } } else { throw new RpcException( "Generic serialization [" + GENERIC_SERIALIZATION_PROTOBUF + "] only support one" + String.class.getName() + " argument and your message size is " + args.length + " and type is" + args[0].getClass().getName()); } } return invoker.invoke(new RpcInvocation(method, args, inv.getAttachments())); } catch (NoSuchMethodException e) { throw new RpcException(e.getMessage(), e); } catch (ClassNotFoundException e) { throw new RpcException(e.getMessage(), e); } } return invoker.invoke(inv); } static class GenericListener implements Listener { @Override public void onResponse(Result appResponse, Invoker> invoker, Invocation inv) { // 如果是泛化調(diào)用 if ((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC)) && inv.getArguments() != null && inv.getArguments().length == 3 && !GenericService.class.isAssignableFrom(invoker.getInterface())) { // 獲得序列化方式 String generic = inv.getAttachment(GENERIC_KEY); // 如果為空,默認(rèn)獲取會(huì)話域中的配置 if (StringUtils.isBlank(generic)) { generic = RpcContext.getContext().getAttachment(GENERIC_KEY); } // 如果回調(diào)有異常,直接設(shè)置異常 if (appResponse.hasException() && !(appResponse.getException() instanceof GenericException)) { appResponse.setException(new GenericException(appResponse.getException())); } // 如果是native java形式序列化 if (ProtocolUtils.isJavaGenericSerialization(generic)) { try { UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512); // 使用native java形式序列化 ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(GENERIC_SERIALIZATION_NATIVE_JAVA).serialize(null, os).writeObject(appResponse.getValue()); // 加入結(jié)果 appResponse.setValue(os.toByteArray()); } catch (IOException e) { throw new RpcException( "Generic serialization [" + GENERIC_SERIALIZATION_NATIVE_JAVA + "] serialize result failed.", e); } } else if (ProtocolUtils.isBeanGenericSerialization(generic)) { // 用JavaBean方式序列化 appResponse.setValue(JavaBeanSerializeUtil.serialize(appResponse.getValue(), JavaBeanAccessor.METHOD)); } else if (ProtocolUtils.isProtobufGenericSerialization(generic)) { try { UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512); // 用protobuf-json進(jìn)行序列化 ExtensionLoader.getExtensionLoader(Serialization.class) .getExtension(GENERIC_SERIALIZATION_PROTOBUF) .serialize(null, os).writeObject(appResponse.getValue()); appResponse.setValue(os.toString()); } catch (IOException e) { throw new RpcException("Generic serialization [" + GENERIC_SERIALIZATION_PROTOBUF + "] serialize result failed.", e); } } else { // 直接進(jìn)行類型轉(zhuǎn)化并且設(shè)置值 appResponse.setValue(PojoUtils.generalize(appResponse.getValue())); } } } @Override public void onError(Throwable t, Invoker> invoker, Invocation invocation) { } }
跟連接內(nèi)的代碼對比,第一個(gè)就是對過濾器設(shè)計(jì)發(fā)生了變化,這個(gè)我在異步化改造里面會(huì)講到,第二個(gè)是新增了protobuf-json的泛化序列化方式。
(十六)ContextFilter的invoke可以參考《dubbo源碼解析(二十)遠(yuǎn)程調(diào)用——Filter》的(六)ContextFilter,最新代碼幾乎差不多,除了因?yàn)閷ilter的設(shè)計(jì)做了修改以外,還有新增了tag路由的相關(guān)邏輯,tag相關(guān)部分我會(huì)在后續(xù)文章中講解,該類主要是做了初始化rpc上下文。
(十七)TraceFilter的invoke可以參考《dubbo源碼解析(二十四)遠(yuǎn)程調(diào)用——dubbo協(xié)議》的(十三)TraceFilter,該過濾器是增強(qiáng)的功能是通道的跟蹤,會(huì)在通道內(nèi)把最大的調(diào)用次數(shù)和現(xiàn)在的調(diào)用數(shù)量放進(jìn)去。方便使用telnet來跟蹤服務(wù)的調(diào)用次數(shù)等。
(十八)TimeoutFilter的invoke可以參考《dubbo源碼解析(二十)遠(yuǎn)程調(diào)用——Filter》的(十三)TimeoutFilter,該過濾器是當(dāng)服務(wù)調(diào)用超時(shí)的時(shí)候,記錄告警日志。
(十九)MonitorFilter的invokepublic Result invoke(Invoker> invoker, Invocation invocation) throws RpcException { // 如果開啟監(jiān)控 if (invoker.getUrl().hasParameter(MONITOR_KEY)) { // 設(shè)置開始監(jiān)控時(shí)間 invocation.setAttachment(MONITOR_FILTER_START_TIME, String.valueOf(System.currentTimeMillis())); // 對同時(shí)在線數(shù)量加1 getConcurrent(invoker, invocation).incrementAndGet(); // count up } return invoker.invoke(invocation); // proceed invocation chain }
class MonitorListener implements Listener { @Override public void onResponse(Result result, Invoker> invoker, Invocation invocation) { // 如果開啟監(jiān)控 if (invoker.getUrl().hasParameter(MONITOR_KEY)) { // 收集監(jiān)控對數(shù)據(jù),并且更新監(jiān)控?cái)?shù)據(jù) collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false); // 同時(shí)在線監(jiān)控?cái)?shù)減1 getConcurrent(invoker, invocation).decrementAndGet(); // count down } } @Override public void onError(Throwable t, Invoker> invoker, Invocation invocation) { if (invoker.getUrl().hasParameter(MONITOR_KEY)) { // 收集監(jiān)控對數(shù)據(jù),并且更新監(jiān)控?cái)?shù)據(jù) collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true); // 同時(shí)在線監(jiān)控?cái)?shù)減1 getConcurrent(invoker, invocation).decrementAndGet(); // count down } } private void collect(Invoker> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) { try { // 獲得監(jiān)控的url URL monitorUrl = invoker.getUrl().getUrlParameter(MONITOR_KEY); // 通過該url獲得Monitor實(shí)例 Monitor monitor = monitorFactory.getMonitor(monitorUrl); if (monitor == null) { return; } // 創(chuàng)建一個(gè)統(tǒng)計(jì)的url URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error); // 把收集的信息更新并且發(fā)送信息 monitor.collect(statisticsURL); } catch (Throwable t) { logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t); } } private URL createStatisticsUrl(Invoker> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) { // ---- service statistics ---- // 調(diào)用服務(wù)消耗的時(shí)間 long elapsed = System.currentTimeMillis() - start; // invocation cost // 獲得同時(shí)監(jiān)控的數(shù)量 int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count String application = invoker.getUrl().getParameter(APPLICATION_KEY); // 獲得服務(wù)名 String service = invoker.getInterface().getName(); // service name // 獲得調(diào)用的方法名 String method = RpcUtils.getMethodName(invocation); // method name // 獲得組 String group = invoker.getUrl().getParameter(GROUP_KEY); // 獲得版本號 String version = invoker.getUrl().getParameter(VERSION_KEY); int localPort; String remoteKey, remoteValue; // 如果是消費(fèi)者端的監(jiān)控 if (CONSUMER_SIDE.equals(invoker.getUrl().getParameter(SIDE_KEY))) { // ---- for service consumer ---- // 本地端口為0 localPort = 0; // key為provider remoteKey = MonitorService.PROVIDER; // value為服務(wù)ip remoteValue = invoker.getUrl().getAddress(); } else { // ---- for service provider ---- // 端口為服務(wù)端口 localPort = invoker.getUrl().getPort(); // key為consumer remoteKey = MonitorService.CONSUMER; // value為遠(yuǎn)程地址 remoteValue = remoteHost; } String input = "", output = ""; if (invocation.getAttachment(INPUT_KEY) != null) { input = invocation.getAttachment(INPUT_KEY); } if (result != null && result.getAttachment(OUTPUT_KEY) != null) { output = result.getAttachment(OUTPUT_KEY); } // 返回一個(gè)url return new URL(COUNT_PROTOCOL, NetUtils.getLocalHost(), localPort, service + PATH_SEPARATOR + method, MonitorService.APPLICATION, application, MonitorService.INTERFACE, service, MonitorService.METHOD, method, remoteKey, remoteValue, error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1", MonitorService.ELAPSED, String.valueOf(elapsed), MonitorService.CONCURRENT, String.valueOf(concurrent), INPUT_KEY, input, OUTPUT_KEY, output, GROUP_KEY, group, VERSION_KEY, version); } }
在invoke里面只是做了記錄開始監(jiān)控的時(shí)間以及對同時(shí)監(jiān)控的數(shù)量加1操作,當(dāng)結(jié)果回調(diào)時(shí),會(huì)對結(jié)果數(shù)據(jù)做搜集計(jì)算,最后通過監(jiān)控服務(wù)記錄和發(fā)送最新信息。
(二十)ExceptionFilter的invoke可以參考《dubbo源碼解析(二十)遠(yuǎn)程調(diào)用——Filter》的(九)ExceptionFilter,該過濾器主要是對異常的處理。
(二十一)InvokerWrapper的invoke可以參考《dubbo源碼解析(二十二)遠(yuǎn)程調(diào)用——Protocol》的(五)InvokerWrapper。該類用了裝飾模式,不過并沒有實(shí)現(xiàn)實(shí)際的功能增強(qiáng)。
(二十二)DelegateProviderMetaDataInvoker的invokepublic Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); }
該類也是用了裝飾模式,不過該類是invoker和配置中心的適配類,其中也沒有進(jìn)行實(shí)際的功能增強(qiáng)。
(二十三)AbstractProxyInvoker的invoke可以參考《dubbo源碼解析(二十三)遠(yuǎn)程調(diào)用——Proxy》的(二)AbstractProxyInvoker。不過代碼已經(jīng)有更新了,下面貼出最新的代碼。
public Result invoke(Invocation invocation) throws RpcException { try { // 執(zhí)行下一步 Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()); // 把返回結(jié)果用CompletableFuture包裹 CompletableFuture
這里主要是因?yàn)楫惒交脑於霈F(xiàn)的代碼變化,我會(huì)在異步化改造中講到這部分?,F(xiàn)在主要來看下一步。
(二十四)JavassistProxyFactory的getInvoker方法中匿名類的doInvoke這里默認(rèn)代理實(shí)現(xiàn)方式是Javassist??梢詤⒖肌禿ubbo源碼解析(二十三)遠(yuǎn)程調(diào)用——Proxy》的(六)JavassistProxyFactory。其中Wrapper 是一個(gè)抽象類,其中 invokeMethod 是一個(gè)抽象方法。dubbo 會(huì)在運(yùn)行時(shí)通過 Javassist 框架為 Wrapper 生成實(shí)現(xiàn)類,并實(shí)現(xiàn) invokeMethod 方法,該方法最終會(huì)根據(jù)調(diào)用信息調(diào)用具體的服務(wù)。以 DemoServiceImpl 為例,Javassist 為其生成的代理類如下。
/** Wrapper0 是在運(yùn)行時(shí)生成的,大家可使用 Arthas 進(jìn)行反編譯 */ public class Wrapper0 extends Wrapper implements ClassGenerator.DC { public static String[] pns; public static Map pts; public static String[] mns; public static String[] dmns; public static Class[] mts0; // 省略其他方法 public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException { DemoService demoService; try { // 類型轉(zhuǎn)換 demoService = (DemoService)object; } catch (Throwable throwable) { throw new IllegalArgumentException(throwable); } try { // 根據(jù)方法名調(diào)用指定的方法 if ("sayHello".equals(string) && arrclass.length == 1) { return demoService.sayHello((String)arrobject[0]); } } catch (Throwable throwable) { throw new InvocationTargetException(throwable); } throw new NoSuchMethodException(new StringBuffer().append("Not found method "").append(string).append("" in class com.alibaba.dubbo.demo.DemoService.").toString()); } }
然后就是直接調(diào)用的是對應(yīng)的方法了。到此,方法執(zhí)行完成了。
結(jié)果返回可以看到我上述講到的(八)HeaderExchangeHandler的received和(九)HeaderExchangeHandler的handleRequest有好幾處channel.send方法的調(diào)用,也就是當(dāng)結(jié)果返回的返回的時(shí)候,會(huì)主動(dòng)發(fā)送執(zhí)行結(jié)果給客戶端。當(dāng)然發(fā)送的時(shí)候還是會(huì)對結(jié)果Response 對象進(jìn)行編碼,編碼邏輯我就先不在這里闡述。
當(dāng)客戶端接收到這個(gè)返回的消息時(shí)候,進(jìn)行解碼后,識別為Response 對象,將該對象派發(fā)到線程池中,該過程跟服務(wù)端接收到調(diào)用請求到邏輯是一樣的,可以參考上述的解析,區(qū)別在于到(八)HeaderExchangeHandler的received方法的時(shí)候,執(zhí)行的是handleResponse方法。
(九)HeaderExchangeHandler的handleResponsestatic void handleResponse(Channel channel, Response response) throws RemotingException { // 如果響應(yīng)不為空,并且不是心跳事件的響應(yīng),則調(diào)用received if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } }(十)DefaultFuture的received
可以參考《dubbo源碼解析(十)遠(yuǎn)程通信——Exchange層》的(七)DefaultFuture,不過該類的繼承的是CompletableFuture,因?yàn)閷Ξ惒交母脑?,該類已?jīng)做了一些變化。
public static void received(Channel channel, Response response) { received(channel, response, false); } public static void received(Channel channel, Response response, boolean timeout) { try { // future集合中移除該請求的future,(響應(yīng)id和請求id一一對應(yīng)的) DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { //獲得超時(shí) Timeout t = future.timeoutCheckTask; // 如果沒有超時(shí),則取消timeoutCheckTask if (!timeout) { // decrease Time t.cancel(); } // 接收響應(yīng)結(jié)果 future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { // 通道集合移除該請求對應(yīng)的通道,代表著這一次請求結(jié)束 CHANNELS.remove(response.getId()); } }
該方法中主要是對超時(shí)的處理,還有對應(yīng)的請求和響應(yīng)的匹配,也就是返回相應(yīng)id的future。
(十一)DefaultFuture的doReceived可以參考《dubbo源碼解析(十)遠(yuǎn)程通信——Exchange層》的(七)DefaultFuture,不過因?yàn)檫\(yùn)用了CompletableFuture,所以該方法完全重寫了。這部分的改造我也會(huì)在異步化改造中講述到。
private void doReceived(Response res) { // 如果結(jié)果為空,則拋出異常 if (res == null) { throw new IllegalStateException("response cannot be null"); } // 如果結(jié)果的狀態(tài)碼為ok if (res.getStatus() == Response.OK) { // 則future調(diào)用完成 this.complete(res.getResult()); } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { // 如果超時(shí),則返回一個(gè)超時(shí)異常 this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { // 否則返回一個(gè)RemotingException this.completeExceptionally(new RemotingException(channel, res.getErrorMessage())); } }
隨后用戶線程即可從 DefaultFuture 實(shí)例中獲取到相應(yīng)結(jié)果。
后記該文章講解了dubbo調(diào)服務(wù)端接收到請求后一系列處理的所有步驟以及服務(wù)端怎么把結(jié)果發(fā)送給客戶端,是目前最新代碼的解析。因?yàn)槔锩嫔婕暗胶芏嗔鞒?,所以可以自己debug一步一步看一看整個(gè)過程??梢钥吹奖疚纳婕暗疆惒交脑煲呀?jīng)很多了,這也是我在講解異步化改造前想先講解這些過程的原因。只有弄清楚這些過程,才能更加理解對于異步化改造的意義。下一篇文將講解異步化改造。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.hztianpu.com/yun/74869.html
摘要:大揭秘異步化改造目標(biāo)從源碼的角度分析的新特性中對于異步化的改造原理??丛创a解析四十六消費(fèi)端發(fā)送請求過程講到的十四的,在以前的邏輯會(huì)直接在方法中根據(jù)配置區(qū)分同步異步單向調(diào)用。改為關(guān)于可以參考源碼解析十遠(yuǎn)程通信層的六。 2.7大揭秘——異步化改造 目標(biāo):從源碼的角度分析2.7的新特性中對于異步化的改造原理。 前言 dubbo中提供了很多類型的協(xié)議,關(guān)于協(xié)議的系列可以查看下面的文章: du...
摘要:可以參考源碼解析二十四遠(yuǎn)程調(diào)用協(xié)議的八。十六的該類也是用了適配器模式,該類主要的作用就是增加了心跳功能,可以參考源碼解析十遠(yuǎn)程通信層的四。二十的可以參考源碼解析十七遠(yuǎn)程通信的一。 2.7大揭秘——消費(fèi)端發(fā)送請求過程 目標(biāo):從源碼的角度分析一個(gè)服務(wù)方法調(diào)用經(jīng)歷怎么樣的磨難以后到達(dá)服務(wù)端。 前言 前一篇文章講到的是引用服務(wù)的過程,引用服務(wù)無非就是創(chuàng)建出一個(gè)代理。供消費(fèi)者調(diào)用服務(wù)的相關(guān)方法。...
摘要:服務(wù)暴露過程目標(biāo)從源碼的角度分析服務(wù)暴露過程。導(dǎo)出服務(wù),包含暴露服務(wù)到本地,和暴露服務(wù)到遠(yuǎn)程兩個(gè)過程。其中服務(wù)暴露的第八步已經(jīng)沒有了。將泛化調(diào)用版本號或者等信息加入獲得服務(wù)暴露地址和端口號,利用內(nèi)數(shù)據(jù)組裝成。 dubbo服務(wù)暴露過程 目標(biāo):從源碼的角度分析服務(wù)暴露過程。 前言 本來這一篇一個(gè)寫異步化改造的內(nèi)容,但是最近我一直在想,某一部分的優(yōu)化改造該怎么去撰寫才能更加的讓讀者理解。我覺...
摘要:大揭秘目標(biāo)了解的新特性,以及版本升級的引導(dǎo)。四元數(shù)據(jù)改造我們知道以前的版本只有注冊中心,注冊中心的有數(shù)十個(gè)的鍵值對,包含了一個(gè)服務(wù)所有的元數(shù)據(jù)。 DUBBO——2.7大揭秘 目標(biāo):了解2.7的新特性,以及版本升級的引導(dǎo)。 前言 我們知道Dubbo在2011年開源,停止更新了一段時(shí)間。在2017 年 9 月 7 日,Dubbo 悄悄的在 GitHub 發(fā)布了 2.5.4 版本。隨后,版本...
摘要:服務(wù)引用過程目標(biāo)從源碼的角度分析服務(wù)引用過程。并保留服務(wù)提供者的部分配置,比如版本,,時(shí)間戳等最后將合并后的配置設(shè)置為查詢字符串中。的可以參考源碼解析二十三遠(yuǎn)程調(diào)用的一的源碼分析。 dubbo服務(wù)引用過程 目標(biāo):從源碼的角度分析服務(wù)引用過程。 前言 前面服務(wù)暴露過程的文章講解到,服務(wù)引用有兩種方式,一種就是直連,也就是直接指定服務(wù)的地址來進(jìn)行引用,這種方式更多的時(shí)候被用來做服務(wù)測試,不...
閱讀 3580·2021-10-18 13:33
閱讀 891·2019-08-30 14:20
閱讀 2683·2019-08-30 13:14
閱讀 2578·2019-08-29 18:38
閱讀 2938·2019-08-29 16:44
閱讀 1258·2019-08-29 15:23
閱讀 3591·2019-08-29 13:28
閱讀 1969·2019-08-28 18:00