成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專欄INFORMATION COLUMN

dubbo源碼解析(四十七)服務(wù)端處理請求過程

yzzz / 3514人閱讀

摘要:而存在的意義就是保證請求或響應(yīng)對象可在線程池中被解碼,解碼完成后,就會(huì)分發(fā)到的。

2.7大揭秘——服務(wù)端處理請求過程
目標(biāo):從源碼的角度分析服務(wù)端接收到請求后的一系列操作,最終把客戶端需要的值返回。
前言

上一篇講到了消費(fèi)端發(fā)送請求的過程,該篇就要將服務(wù)端處理請求的過程。也就是當(dāng)服務(wù)端收到請求數(shù)據(jù)包后的一系列處理以及如何返回最終結(jié)果。我們也知道消費(fèi)端在發(fā)送請求的時(shí)候已經(jīng)做了編碼,所以我們也需要在服務(wù)端接收到數(shù)據(jù)包后,對協(xié)議頭和協(xié)議體進(jìn)行解碼。不過本篇不講解如何解碼。有興趣的可以翻翻我以前的文章,有講到關(guān)于解碼的邏輯。接下來就開始講解服務(wù)端收到請求后的邏輯。

處理過程

假設(shè)遠(yuǎn)程通信的實(shí)現(xiàn)還是用netty4,解碼器將數(shù)據(jù)包解析成 Request 對象后,NettyHandler 的 messageReceived 方法緊接著會(huì)收到這個(gè)對象,所以第一步就是NettyServerHandler的channelRead。

(一)NettyServerHandler的channelRead

可以參考《dubbo源碼解析(十七)遠(yuǎn)程通信——Netty4》的(三)NettyServerHandler

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 看是否在緩存中命中,如果沒有命中,則創(chuàng)建NettyChannel并且緩存。
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    try {
        // 接受消息
        handler.received(channel, msg);
    } finally {
        // 如果通道不活躍或者斷掉,則從緩存中清除
        NettyChannel.removeChannelIfDisconnected(ctx.channel());
    }
}

NettyServerHandler是基于netty4實(shí)現(xiàn)的服務(wù)端通道處理實(shí)現(xiàn)類,而該方法就是用來接收請求,接下來就是執(zhí)行AbstractPeer的received。

(二)AbstractPeer的received

可以參考《dubbo源碼解析(九)遠(yuǎn)程通信——Transport層》的(一)AbstractPeer

public void received(Channel ch, Object msg) throws RemotingException {
    // 如果通道已經(jīng)關(guān)閉,則直接返回
    if (closed) {
        return;
    }
    handler.received(ch, msg);
}

該方法比較簡單,之前也講過AbstractPeer類就做了裝飾模式中裝飾角色,只是維護(hù)了通道的正在關(guān)閉和關(guān)閉完成兩個(gè)狀態(tài)。然后到了MultiMessageHandler的received

(三)MultiMessageHandler的received

可以參考《dubbo源碼解析(九)遠(yuǎn)程通信——Transport層》的(八)MultiMessageHandler

public void received(Channel channel, Object message) throws RemotingException {
    // 如果消息是MultiMessage類型的,也就是多消息類型
    if (message instanceof MultiMessage) {
        // 強(qiáng)制轉(zhuǎn)化為MultiMessage
        MultiMessage list = (MultiMessage) message;
        // 把各個(gè)消息進(jìn)行發(fā)送
        for (Object obj : list) {
            handler.received(channel, obj);
        }
    } else {
        // 直接發(fā)送
        handler.received(channel, message);
    }
}

該方法也比較簡單,就是對于多消息的處理。

(四)HeartbeatHandler的received

可以參考《dubbo源碼解析(十)遠(yuǎn)程通信——Exchange層》的(二十)HeartbeatHandler。其中就是對心跳事件做了處理。如果不是心跳請求,那么接下去走到AllChannelHandler的received。

(五)AllChannelHandler的received

可以參考《dubbo源碼解析(九)遠(yuǎn)程通信——Transport層》的(十一)AllChannelHandler。該類處理的是連接、斷開連接、捕獲異常以及接收到的所有消息都分發(fā)到線程池。所以這里的received方法就是把請求分發(fā)到線程池,讓線程池去執(zhí)行該請求。

還記得我在之前文章里面講到到Dispatcher接口嗎,它是一個(gè)線程派發(fā)器。分別有五個(gè)實(shí)現(xiàn):

Dispatcher實(shí)現(xiàn)類 對應(yīng)的handler 用途
AllDispatcher AllChannelHandler 所有消息都派發(fā)到線程池,包括請求,響應(yīng),連接事件,斷開事件等
ConnectionOrderedDispatcher ConnectionOrderedChannelHandler 在 IO 線程上,將連接和斷開事件放入隊(duì)列,有序逐個(gè)執(zhí)行,其它消息派發(fā)到線程池
DirectDispatcher 所有消息都不派發(fā)到線程池,全部在 IO 線程上直接執(zhí)行
ExecutionDispatcher ExecutionChannelHandler 只有請求消息派發(fā)到線程池,不含響應(yīng)。其它消息均在 IO 線程上執(zhí)行
MessageOnlyDispatcher MessageOnlyChannelHandler 只有請求和響應(yīng)消息派發(fā)到線程池,其它消息均在 IO 線程上執(zhí)行

這些Dispatcher的實(shí)現(xiàn)類以及對應(yīng)的Handler都可以在《dubbo源碼解析(九)遠(yuǎn)程通信——Transport層》中查看相關(guān)實(shí)現(xiàn)。dubbo默認(rèn)all為派發(fā)策略。所以我在這里講了AllChannelHandler的received。把消息送到線程池后,可以看到首先會(huì)創(chuàng)建一個(gè)ChannelEventRunnable實(shí)體。那么接下來就是線程接收并且執(zhí)行任務(wù)了。

(六)ChannelEventRunnable的run

ChannelEventRunnable實(shí)現(xiàn)了Runnable接口,主要是用來接收消息事件,并且根據(jù)事件的種類來分別執(zhí)行不同的操作。來看看它的run方法:

public void run() {
    // 如果是接收的消息
    if (state == ChannelState.RECEIVED) {
        try {
            // 直接調(diào)用下一個(gè)received
            handler.received(channel, message);
        } catch (Exception e) {
            logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                    + ", message is " + message, e);
        }
    } else {
        switch (state) {
            //如果是連接事件請求
        case CONNECTED:
            try {
                // 執(zhí)行連接
                handler.connected(channel);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
            }
            break;
            // 如果是斷開連接事件請求
        case DISCONNECTED:
            try {
                // 執(zhí)行斷開連接
                handler.disconnected(channel);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
            }
            break;
            // 如果是發(fā)送消息
        case SENT:
            try {
                // 執(zhí)行發(fā)送消息
                handler.sent(channel, message);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is " + message, e);
            }
            break;
            // 如果是異常
        case CAUGHT:
            try {
                // 執(zhí)行異常捕獲
                handler.caught(channel, exception);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is: " + message + ", exception is " + exception, e);
            }
            break;
        default:
            logger.warn("unknown state: " + state + ", message is " + message);
        }
    }

}

可以看到把消息分為了幾種類別,因?yàn)檎埱蠛晚憫?yīng)消息出現(xiàn)頻率明顯比其他類型消息高,也就是RECEIVED,所以多帶帶先做處理,根據(jù)不同的類型的消息,會(huì)被執(zhí)行不同的邏輯,我們這里主要看state為RECEIVED的,那么如果是RECEIVED,則會(huì)執(zhí)行下一個(gè)received方法。

(七)DecodeHandler的received

可以參考《dubbo源碼解析(九)遠(yuǎn)程通信——Transport層》的(七)DecodeHandler。可以看到received方法中根據(jù)消息的類型進(jìn)行不同的解碼。而DecodeHandler 存在的意義就是保證請求或響應(yīng)對象可在線程池中被解碼,解碼完成后,就會(huì)分發(fā)到HeaderExchangeHandler的received。

(八)HeaderExchangeHandler的received
public void received(Channel channel, Object message) throws RemotingException {
    // 設(shè)置接收到消息的時(shí)間戳
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    // 獲得通道
    final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        // 如果消息是Request類型
        if (message instanceof Request) {
            // handle request.
            // 強(qiáng)制轉(zhuǎn)化為Request
            Request request = (Request) message;
            // 如果該請求是事件心跳事件或者只讀事件
            if (request.isEvent()) {
                // 執(zhí)行事件
                handlerEvent(channel, request);
            } else {
                // 如果是正常的調(diào)用請求,且需要響應(yīng)
                if (request.isTwoWay()) {
                    // 處理請求
                    handleRequest(exchangeChannel, request);
                } else {
                    // 如果不需要響應(yīng),則繼續(xù)下一步
                    handler.received(exchangeChannel, request.getData());
                }
            }
        } else if (message instanceof Response) {
            // 處理響應(yīng)
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            // 如果是telnet相關(guān)的請求
            if (isClientSide(channel)) {
                // 如果是客戶端側(cè),則直接拋出異常,因?yàn)榭蛻舳藗?cè)不支持telnet
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                // 如果是服務(wù)端側(cè),則執(zhí)行telnet命令
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
        } else {
            // 如果都不是,則繼續(xù)下一步
            handler.received(exchangeChannel, message);
        }
    } finally {
        // 移除關(guān)閉或者不活躍的通道
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

該方法中就對消息進(jìn)行了細(xì)分,事件請求、正常的調(diào)用請求、響應(yīng)、telnet命令請求等,并且針對不同的消息類型做了不同的邏輯調(diào)用。我們這里主要看正常的調(diào)用請求。見下一步。

(九)HeaderExchangeHandler的handleRequest
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
    // 創(chuàng)建一個(gè)Response實(shí)例
    Response res = new Response(req.getId(), req.getVersion());
    // 如果請求被破壞了
    if (req.isBroken()) {
        // 獲得請求的數(shù)據(jù)包
        Object data = req.getData();

        String msg;
        // 如果數(shù)據(jù)為空
        if (data == null) {
            //消息設(shè)置為空
            msg = null;
            // 如果在這之前已經(jīng)出現(xiàn)異常,也就是數(shù)據(jù)為Throwable類型
        } else if (data instanceof Throwable) {
            // 響應(yīng)消息把異常信息返回
            msg = StringUtils.toString((Throwable) data);
        } else {
            // 返回請求數(shù)據(jù)
            msg = data.toString();
        }
        res.setErrorMessage("Fail to decode request due to: " + msg);
        // 設(shè)置錯(cuò)誤請求的狀態(tài)碼
        res.setStatus(Response.BAD_REQUEST);

        // 發(fā)送該消息
        channel.send(res);
        return;
    }
    // find handler by message class.
    // 獲得請求數(shù)據(jù) 也就是 RpcInvocation 對象
    Object msg = req.getData();
    try {
        // 繼續(xù)向下調(diào)用 返回一個(gè)future
        CompletionStage future = handler.reply(channel, msg);
        future.whenComplete((appResult, t) -> {
            try {
                if (t == null) {
                    //設(shè)置調(diào)用結(jié)果狀態(tài)為成功
                    res.setStatus(Response.OK);
                    // 把結(jié)果放入響應(yīng)
                    res.setResult(appResult);
                } else {
                    // 如果服務(wù)調(diào)用有異常,則設(shè)置結(jié)果狀態(tài)碼為服務(wù)錯(cuò)誤
                    res.setStatus(Response.SERVICE_ERROR);
                    // 把報(bào)錯(cuò)信息放到響應(yīng)中
                    res.setErrorMessage(StringUtils.toString(t));
                }
                // 發(fā)送該響應(yīng)
                channel.send(res);
            } catch (RemotingException e) {
                logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
            } finally {
                // HeaderExchangeChannel.removeChannelIfDisconnected(channel);
            }
        });
    } catch (Throwable e) {
        // 如果在執(zhí)行中拋出異常,則也算服務(wù)異常
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
        channel.send(res);
    }
}

該方法是處理正常的調(diào)用請求,主要做了正常、異常調(diào)用的情況處理,并且加入了狀態(tài)碼,然后發(fā)送執(zhí)行后的結(jié)果給客戶端。接下來看下一步。

(十)DubboProtocol的requestHandler實(shí)例的reply

這里我默認(rèn)是使用dubbo協(xié)議,所以執(zhí)行的是DubboProtocol的requestHandler的reply方法。可以參考《dubbo源碼解析(二十四)遠(yuǎn)程調(diào)用——dubbo協(xié)議》的(三)DubboProtocol

public CompletableFuture reply(ExchangeChannel channel, Object message) throws RemotingException {

    // 如果請求消息不屬于會(huì)話域,則拋出異常
    if (!(message instanceof Invocation)) {
        throw new RemotingException(channel, "Unsupported request: "
                + (message == null ? null : (message.getClass().getName() + ": " + message))
                + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
    }

    //強(qiáng)制類型轉(zhuǎn)化
    Invocation inv = (Invocation) message;
    // 獲得暴露的服務(wù)invoker
    Invoker invoker = getInvoker(channel, inv);
    // need to consider backward-compatibility if it"s a callback
    // 如果是回調(diào)服務(wù)
    if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
        // 獲得 方法定義
        String methodsStr = invoker.getUrl().getParameters().get("methods");
        boolean hasMethod = false;
        // 如果只有一個(gè)方法定義
        if (methodsStr == null || !methodsStr.contains(",")) {
            // 設(shè)置會(huì)話域中是否有一致的方法定義標(biāo)志
            hasMethod = inv.getMethodName().equals(methodsStr);
        } else {
            // 分割不同的方法
            String[] methods = methodsStr.split(",");
            // 如果方法不止一個(gè),則分割后遍歷查詢,找到了則設(shè)置為true
            for (String method : methods) {
                if (inv.getMethodName().equals(method)) {
                    hasMethod = true;
                    break;
                }
            }
        }
        // 如果沒有該方法,則打印告警日志
        if (!hasMethod) {
            logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                    + " not found in callback service interface ,invoke will be ignored."
                    + " please update the api interface. url is:"
                    + invoker.getUrl()) + " ,invocation is :" + inv);
            return null;
        }
    }
    // 設(shè)置遠(yuǎn)程地址
    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
    // 調(diào)用下一個(gè)調(diào)用鏈
    Result result = invoker.invoke(inv);
    //  返回CompletableFuture
    return result.completionFuture().thenApply(Function.identity());
}

上述代碼有些變化,是最新的代碼,加入了CompletableFuture,這個(gè)我會(huì)在后續(xù)的異步化改造中講到。這里主要關(guān)注的是又開始跟客戶端發(fā)請求一樣執(zhí)行invoke調(diào)用鏈了。

(十一)ProtocolFilterWrapper的CallbackRegistrationInvoker的invoke

可以直接參考《dubbo源碼解析(四十六)消費(fèi)端發(fā)送請求過程》的(六)ProtocolFilterWrapper的內(nèi)部類CallbackRegistrationInvoker的invoke。

(十二)ProtocolFilterWrapper的buildInvokerChain方法中的invoker實(shí)例的invoke方法。

可以直接參考《dubbo源碼解析(四十六)消費(fèi)端發(fā)送請求過程》的(七)ProtocolFilterWrapper的buildInvokerChain方法中的invoker實(shí)例的invoke方法。

(十三)EchoFilter的invoke

可以參考《dubbo源碼解析(二十)遠(yuǎn)程調(diào)用——Filter》的(八)EchoFilter

public Result invoke(Invoker invoker, Invocation inv) throws RpcException {
    // 如果調(diào)用的方法是回聲測試的方法 則直接返回結(jié)果,否則 調(diào)用下一個(gè)調(diào)用鏈
    if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) {
        // 創(chuàng)建一個(gè)默認(rèn)的AsyncRpcResult返回
        return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv);
    }
    return invoker.invoke(inv);
}

該過濾器就是對回聲測試的調(diào)用進(jìn)行攔截,上述源碼跟連接中源碼唯一區(qū)別就是改為了AsyncRpcResult。

(十四)ClassLoaderFilter的invoke

可以參考《dubbo源碼解析(二十)遠(yuǎn)程調(diào)用——Filter》的(三)ClassLoaderFilter,用來做類加載器的切換。

(十五)GenericFilter的invoke

可以參考《dubbo源碼解析(二十)遠(yuǎn)程調(diào)用——Filter》的(十一)GenericFilter。

public Result invoke(Invoker invoker, Invocation inv) throws RpcException {
    // 如果是泛化調(diào)用
    if ((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC))
            && inv.getArguments() != null
            && inv.getArguments().length == 3
            && !GenericService.class.isAssignableFrom(invoker.getInterface())) {
        // 獲得請求名字
        String name = ((String) inv.getArguments()[0]).trim();
        // 獲得請求參數(shù)類型
        String[] types = (String[]) inv.getArguments()[1];
        // 獲得請求參數(shù)
        Object[] args = (Object[]) inv.getArguments()[2];
        try {
            // 獲得方法
            Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
            // 獲得該方法的參數(shù)類型
            Class[] params = method.getParameterTypes();
            if (args == null) {
                args = new Object[params.length];
            }
            // 獲得附加值
            String generic = inv.getAttachment(GENERIC_KEY);

            if (StringUtils.isBlank(generic)) {
                generic = RpcContext.getContext().getAttachment(GENERIC_KEY);
            }

            // 如果附加值為空,在用上下文攜帶的附加值
            if (StringUtils.isEmpty(generic)
                    || ProtocolUtils.isDefaultGenericSerialization(generic)) {
                // 直接進(jìn)行類型轉(zhuǎn)化
                args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
            } else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
                for (int i = 0; i < args.length; i++) {
                    if (byte[].class == args[i].getClass()) {
                        try (UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i])) {
                            // 使用nativejava方式反序列化
                            args[i] = ExtensionLoader.getExtensionLoader(Serialization.class)
                                    .getExtension(GENERIC_SERIALIZATION_NATIVE_JAVA)
                                    .deserialize(null, is).readObject();
                        } catch (Exception e) {
                            throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e);
                        }
                    } else {
                        throw new RpcException(
                                "Generic serialization [" +
                                        GENERIC_SERIALIZATION_NATIVE_JAVA +
                                        "] only support message type " +
                                        byte[].class +
                                        " and your message type is " +
                                        args[i].getClass());
                    }
                }
            } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                for (int i = 0; i < args.length; i++) {
                    if (args[i] instanceof JavaBeanDescriptor) {
                        // 用JavaBean方式反序列化
                        args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]);
                    } else {
                        throw new RpcException(
                                "Generic serialization [" +
                                        GENERIC_SERIALIZATION_BEAN +
                                        "] only support message type " +
                                        JavaBeanDescriptor.class.getName() +
                                        " and your message type is " +
                                        args[i].getClass().getName());
                    }
                }
            } else if (ProtocolUtils.isProtobufGenericSerialization(generic)) {
                // as proto3 only accept one protobuf parameter
                if (args.length == 1 && args[0] instanceof String) {
                    try (UnsafeByteArrayInputStream is =
                                 new UnsafeByteArrayInputStream(((String) args[0]).getBytes())) {
                        // 用protobuf-json進(jìn)行反序列化
                        args[0] = ExtensionLoader.getExtensionLoader(Serialization.class)
                                .getExtension("" + GENERIC_SERIALIZATION_PROTOBUF)
                                .deserialize(null, is).readObject(method.getParameterTypes()[0]);
                    } catch (Exception e) {
                        throw new RpcException("Deserialize argument failed.", e);
                    }
                } else {
                    throw new RpcException(
                            "Generic serialization [" +
                                    GENERIC_SERIALIZATION_PROTOBUF +
                                    "] only support one" + String.class.getName() +
                                    " argument and your message size is " +
                                    args.length + " and type is" +
                                    args[0].getClass().getName());
                }
            }
            return invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));
        } catch (NoSuchMethodException e) {
            throw new RpcException(e.getMessage(), e);
        } catch (ClassNotFoundException e) {
            throw new RpcException(e.getMessage(), e);
        }
    }
    return invoker.invoke(inv);
}

static class GenericListener implements Listener {

    @Override
    public void onResponse(Result appResponse, Invoker invoker, Invocation inv) {
        // 如果是泛化調(diào)用
        if ((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC))
                && inv.getArguments() != null
                && inv.getArguments().length == 3
                && !GenericService.class.isAssignableFrom(invoker.getInterface())) {

            // 獲得序列化方式
            String generic = inv.getAttachment(GENERIC_KEY);
            // 如果為空,默認(rèn)獲取會(huì)話域中的配置
            if (StringUtils.isBlank(generic)) {
                generic = RpcContext.getContext().getAttachment(GENERIC_KEY);
            }

            // 如果回調(diào)有異常,直接設(shè)置異常
            if (appResponse.hasException() && !(appResponse.getException() instanceof GenericException)) {
                appResponse.setException(new GenericException(appResponse.getException()));
            }
            // 如果是native java形式序列化
            if (ProtocolUtils.isJavaGenericSerialization(generic)) {
                try {
                    UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512);
                    // 使用native java形式序列化
                    ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(GENERIC_SERIALIZATION_NATIVE_JAVA).serialize(null, os).writeObject(appResponse.getValue());
                    // 加入結(jié)果
                    appResponse.setValue(os.toByteArray());
                } catch (IOException e) {
                    throw new RpcException(
                            "Generic serialization [" +
                                    GENERIC_SERIALIZATION_NATIVE_JAVA +
                                    "] serialize result failed.", e);
                }
            } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                // 用JavaBean方式序列化
                appResponse.setValue(JavaBeanSerializeUtil.serialize(appResponse.getValue(), JavaBeanAccessor.METHOD));
            } else if (ProtocolUtils.isProtobufGenericSerialization(generic)) {
                try {
                    UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512);
                    // 用protobuf-json進(jìn)行序列化
                    ExtensionLoader.getExtensionLoader(Serialization.class)
                            .getExtension(GENERIC_SERIALIZATION_PROTOBUF)
                            .serialize(null, os).writeObject(appResponse.getValue());
                    appResponse.setValue(os.toString());
                } catch (IOException e) {
                    throw new RpcException("Generic serialization [" +
                            GENERIC_SERIALIZATION_PROTOBUF +
                            "] serialize result failed.", e);
                }
            } else {
                // 直接進(jìn)行類型轉(zhuǎn)化并且設(shè)置值
                appResponse.setValue(PojoUtils.generalize(appResponse.getValue()));
            }
        }
    }

    @Override
    public void onError(Throwable t, Invoker invoker, Invocation invocation) {

    }
}

跟連接內(nèi)的代碼對比,第一個(gè)就是對過濾器設(shè)計(jì)發(fā)生了變化,這個(gè)我在異步化改造里面會(huì)講到,第二個(gè)是新增了protobuf-json的泛化序列化方式。

(十六)ContextFilter的invoke

可以參考《dubbo源碼解析(二十)遠(yuǎn)程調(diào)用——Filter》的(六)ContextFilter,最新代碼幾乎差不多,除了因?yàn)閷ilter的設(shè)計(jì)做了修改以外,還有新增了tag路由的相關(guān)邏輯,tag相關(guān)部分我會(huì)在后續(xù)文章中講解,該類主要是做了初始化rpc上下文。

(十七)TraceFilter的invoke

可以參考《dubbo源碼解析(二十四)遠(yuǎn)程調(diào)用——dubbo協(xié)議》的(十三)TraceFilter,該過濾器是增強(qiáng)的功能是通道的跟蹤,會(huì)在通道內(nèi)把最大的調(diào)用次數(shù)和現(xiàn)在的調(diào)用數(shù)量放進(jìn)去。方便使用telnet來跟蹤服務(wù)的調(diào)用次數(shù)等。

(十八)TimeoutFilter的invoke

可以參考《dubbo源碼解析(二十)遠(yuǎn)程調(diào)用——Filter》的(十三)TimeoutFilter,該過濾器是當(dāng)服務(wù)調(diào)用超時(shí)的時(shí)候,記錄告警日志。

(十九)MonitorFilter的invoke
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
    // 如果開啟監(jiān)控
    if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
        // 設(shè)置開始監(jiān)控時(shí)間
        invocation.setAttachment(MONITOR_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
        // 對同時(shí)在線數(shù)量加1
        getConcurrent(invoker, invocation).incrementAndGet(); // count up
    }
    return invoker.invoke(invocation); // proceed invocation chain
}
class MonitorListener implements Listener {

    @Override
    public void onResponse(Result result, Invoker invoker, Invocation invocation) {
        // 如果開啟監(jiān)控
        if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
            // 收集監(jiān)控對數(shù)據(jù),并且更新監(jiān)控?cái)?shù)據(jù)
            collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false);
            // 同時(shí)在線監(jiān)控?cái)?shù)減1
            getConcurrent(invoker, invocation).decrementAndGet(); // count down
        }
    }

    @Override
    public void onError(Throwable t, Invoker invoker, Invocation invocation) {
        if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
            // 收集監(jiān)控對數(shù)據(jù),并且更新監(jiān)控?cái)?shù)據(jù)
            collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true);
            // 同時(shí)在線監(jiān)控?cái)?shù)減1
            getConcurrent(invoker, invocation).decrementAndGet(); // count down
        }
    }

    private void collect(Invoker invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
        try {
            // 獲得監(jiān)控的url
            URL monitorUrl = invoker.getUrl().getUrlParameter(MONITOR_KEY);
            // 通過該url獲得Monitor實(shí)例
            Monitor monitor = monitorFactory.getMonitor(monitorUrl);
            if (monitor == null) {
                return;
            }
            // 創(chuàng)建一個(gè)統(tǒng)計(jì)的url
            URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error);
            // 把收集的信息更新并且發(fā)送信息
            monitor.collect(statisticsURL);
        } catch (Throwable t) {
            logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
        }
    }

    private URL createStatisticsUrl(Invoker invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
        // ---- service statistics ----
        // 調(diào)用服務(wù)消耗的時(shí)間
        long elapsed = System.currentTimeMillis() - start; // invocation cost
        // 獲得同時(shí)監(jiān)控的數(shù)量
        int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count
        String application = invoker.getUrl().getParameter(APPLICATION_KEY);
        // 獲得服務(wù)名
        String service = invoker.getInterface().getName(); // service name
        // 獲得調(diào)用的方法名
        String method = RpcUtils.getMethodName(invocation); // method name
        // 獲得組
        String group = invoker.getUrl().getParameter(GROUP_KEY);
        // 獲得版本號
        String version = invoker.getUrl().getParameter(VERSION_KEY);

        int localPort;
        String remoteKey, remoteValue;
        // 如果是消費(fèi)者端的監(jiān)控
        if (CONSUMER_SIDE.equals(invoker.getUrl().getParameter(SIDE_KEY))) {
            // ---- for service consumer ----
            // 本地端口為0
            localPort = 0;
            // key為provider
            remoteKey = MonitorService.PROVIDER;
            // value為服務(wù)ip
            remoteValue = invoker.getUrl().getAddress();
        } else {
            // ---- for service provider ----
            // 端口為服務(wù)端口
            localPort = invoker.getUrl().getPort();
            // key為consumer
            remoteKey = MonitorService.CONSUMER;
            // value為遠(yuǎn)程地址
            remoteValue = remoteHost;
        }
        String input = "", output = "";
        if (invocation.getAttachment(INPUT_KEY) != null) {
            input = invocation.getAttachment(INPUT_KEY);
        }
        if (result != null && result.getAttachment(OUTPUT_KEY) != null) {
            output = result.getAttachment(OUTPUT_KEY);
        }

        // 返回一個(gè)url
        return new URL(COUNT_PROTOCOL, NetUtils.getLocalHost(), localPort, service + PATH_SEPARATOR + method, MonitorService.APPLICATION, application, MonitorService.INTERFACE, service, MonitorService.METHOD, method, remoteKey, remoteValue, error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1", MonitorService.ELAPSED, String.valueOf(elapsed), MonitorService.CONCURRENT, String.valueOf(concurrent), INPUT_KEY, input, OUTPUT_KEY, output, GROUP_KEY, group, VERSION_KEY, version);
    }

}

在invoke里面只是做了記錄開始監(jiān)控的時(shí)間以及對同時(shí)監(jiān)控的數(shù)量加1操作,當(dāng)結(jié)果回調(diào)時(shí),會(huì)對結(jié)果數(shù)據(jù)做搜集計(jì)算,最后通過監(jiān)控服務(wù)記錄和發(fā)送最新信息。

(二十)ExceptionFilter的invoke

可以參考《dubbo源碼解析(二十)遠(yuǎn)程調(diào)用——Filter》的(九)ExceptionFilter,該過濾器主要是對異常的處理。

(二十一)InvokerWrapper的invoke

可以參考《dubbo源碼解析(二十二)遠(yuǎn)程調(diào)用——Protocol》的(五)InvokerWrapper。該類用了裝飾模式,不過并沒有實(shí)現(xiàn)實(shí)際的功能增強(qiáng)。

(二十二)DelegateProviderMetaDataInvoker的invoke
public Result invoke(Invocation invocation) throws RpcException {
    return invoker.invoke(invocation);
}

該類也是用了裝飾模式,不過該類是invoker和配置中心的適配類,其中也沒有進(jìn)行實(shí)際的功能增強(qiáng)。

(二十三)AbstractProxyInvoker的invoke

可以參考《dubbo源碼解析(二十三)遠(yuǎn)程調(diào)用——Proxy》的(二)AbstractProxyInvoker。不過代碼已經(jīng)有更新了,下面貼出最新的代碼。

public Result invoke(Invocation invocation) throws RpcException {
    try {
        // 執(zhí)行下一步
        Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
        // 把返回結(jié)果用CompletableFuture包裹
        CompletableFuture future = wrapWithFuture(value, invocation);
        // 創(chuàng)建AsyncRpcResult實(shí)例
        AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
        future.whenComplete((obj, t) -> {
            AppResponse result = new AppResponse();
            // 如果拋出異常
            if (t != null) {
                // 屬于CompletionException異常
                if (t instanceof CompletionException) {
                    // 設(shè)置異常信息
                    result.setException(t.getCause());
                } else {
                    // 直接設(shè)置異常
                    result.setException(t);
                }
            } else {
                // 如果沒有異常,則把結(jié)果放入異步結(jié)果內(nèi)
                result.setValue(obj);
            }
            // 完成
            asyncRpcResult.complete(result);
        });
        return asyncRpcResult;
    } catch (InvocationTargetException e) {
        if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
            logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
        }
        return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
    } catch (Throwable e) {
        throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

這里主要是因?yàn)楫惒交脑於霈F(xiàn)的代碼變化,我會(huì)在異步化改造中講到這部分?,F(xiàn)在主要來看下一步。

(二十四)JavassistProxyFactory的getInvoker方法中匿名類的doInvoke

這里默認(rèn)代理實(shí)現(xiàn)方式是Javassist??梢詤⒖肌禿ubbo源碼解析(二十三)遠(yuǎn)程調(diào)用——Proxy》的(六)JavassistProxyFactory。其中Wrapper 是一個(gè)抽象類,其中 invokeMethod 是一個(gè)抽象方法。dubbo 會(huì)在運(yùn)行時(shí)通過 Javassist 框架為 Wrapper 生成實(shí)現(xiàn)類,并實(shí)現(xiàn) invokeMethod 方法,該方法最終會(huì)根據(jù)調(diào)用信息調(diào)用具體的服務(wù)。以 DemoServiceImpl 為例,Javassist 為其生成的代理類如下。

/** Wrapper0 是在運(yùn)行時(shí)生成的,大家可使用 Arthas 進(jìn)行反編譯 */
public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
    public static String[] pns;
    public static Map pts;
    public static String[] mns;
    public static String[] dmns;
    public static Class[] mts0;

    // 省略其他方法

    public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
        DemoService demoService;
        try {
            // 類型轉(zhuǎn)換
            demoService = (DemoService)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        try {
            // 根據(jù)方法名調(diào)用指定的方法
            if ("sayHello".equals(string) && arrclass.length == 1) {
                return demoService.sayHello((String)arrobject[0]);
            }
        }
        catch (Throwable throwable) {
            throw new InvocationTargetException(throwable);
        }
        throw new NoSuchMethodException(new StringBuffer().append("Not found method "").append(string).append("" in class com.alibaba.dubbo.demo.DemoService.").toString());
    }
}

然后就是直接調(diào)用的是對應(yīng)的方法了。到此,方法執(zhí)行完成了。

結(jié)果返回

可以看到我上述講到的(八)HeaderExchangeHandler的received和(九)HeaderExchangeHandler的handleRequest有好幾處channel.send方法的調(diào)用,也就是當(dāng)結(jié)果返回的返回的時(shí)候,會(huì)主動(dòng)發(fā)送執(zhí)行結(jié)果給客戶端。當(dāng)然發(fā)送的時(shí)候還是會(huì)對結(jié)果Response 對象進(jìn)行編碼,編碼邏輯我就先不在這里闡述。

當(dāng)客戶端接收到這個(gè)返回的消息時(shí)候,進(jìn)行解碼后,識別為Response 對象,將該對象派發(fā)到線程池中,該過程跟服務(wù)端接收到調(diào)用請求到邏輯是一樣的,可以參考上述的解析,區(qū)別在于到(八)HeaderExchangeHandler的received方法的時(shí)候,執(zhí)行的是handleResponse方法。

(九)HeaderExchangeHandler的handleResponse
static void handleResponse(Channel channel, Response response) throws RemotingException {
    // 如果響應(yīng)不為空,并且不是心跳事件的響應(yīng),則調(diào)用received
    if (response != null && !response.isHeartbeat()) {
        DefaultFuture.received(channel, response);
    }
}
(十)DefaultFuture的received

可以參考《dubbo源碼解析(十)遠(yuǎn)程通信——Exchange層》的(七)DefaultFuture,不過該類的繼承的是CompletableFuture,因?yàn)閷Ξ惒交母脑?,該類已?jīng)做了一些變化。

public static void received(Channel channel, Response response) {
    received(channel, response, false);
}

public static void received(Channel channel, Response response, boolean timeout) {
    try {
        // future集合中移除該請求的future,(響應(yīng)id和請求id一一對應(yīng)的)
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            //獲得超時(shí)
            Timeout t = future.timeoutCheckTask;
            // 如果沒有超時(shí),則取消timeoutCheckTask
            if (!timeout) {
                // decrease Time
                t.cancel();
            }
            // 接收響應(yīng)結(jié)果
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at "
                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                    + ", response " + response
                    + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                    + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        // 通道集合移除該請求對應(yīng)的通道,代表著這一次請求結(jié)束
        CHANNELS.remove(response.getId());
    }
}

該方法中主要是對超時(shí)的處理,還有對應(yīng)的請求和響應(yīng)的匹配,也就是返回相應(yīng)id的future。

(十一)DefaultFuture的doReceived

可以參考《dubbo源碼解析(十)遠(yuǎn)程通信——Exchange層》的(七)DefaultFuture,不過因?yàn)檫\(yùn)用了CompletableFuture,所以該方法完全重寫了。這部分的改造我也會(huì)在異步化改造中講述到。

private void doReceived(Response res) {
    // 如果結(jié)果為空,則拋出異常
    if (res == null) {
        throw new IllegalStateException("response cannot be null");
    }
    // 如果結(jié)果的狀態(tài)碼為ok
    if (res.getStatus() == Response.OK) {
        // 則future調(diào)用完成
        this.complete(res.getResult());
    } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        // 如果超時(shí),則返回一個(gè)超時(shí)異常
        this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
    } else {
        // 否則返回一個(gè)RemotingException
        this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
    }
}

隨后用戶線程即可從 DefaultFuture 實(shí)例中獲取到相應(yīng)結(jié)果。

后記

該文章講解了dubbo調(diào)服務(wù)端接收到請求后一系列處理的所有步驟以及服務(wù)端怎么把結(jié)果發(fā)送給客戶端,是目前最新代碼的解析。因?yàn)槔锩嫔婕暗胶芏嗔鞒?,所以可以自己debug一步一步看一看整個(gè)過程??梢钥吹奖疚纳婕暗疆惒交脑煲呀?jīng)很多了,這也是我在講解異步化改造前想先講解這些過程的原因。只有弄清楚這些過程,才能更加理解對于異步化改造的意義。下一篇文將講解異步化改造。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://m.hztianpu.com/yun/74869.html

相關(guān)文章

  • dubbo源碼解析四十八)異步化改造

    摘要:大揭秘異步化改造目標(biāo)從源碼的角度分析的新特性中對于異步化的改造原理??丛创a解析四十六消費(fèi)端發(fā)送請求過程講到的十四的,在以前的邏輯會(huì)直接在方法中根據(jù)配置區(qū)分同步異步單向調(diào)用。改為關(guān)于可以參考源碼解析十遠(yuǎn)程通信層的六。 2.7大揭秘——異步化改造 目標(biāo):從源碼的角度分析2.7的新特性中對于異步化的改造原理。 前言 dubbo中提供了很多類型的協(xié)議,關(guān)于協(xié)議的系列可以查看下面的文章: du...

    lijinke666 評論0 收藏0
  • dubbo源碼解析四十六)消費(fèi)發(fā)送請求過程

    摘要:可以參考源碼解析二十四遠(yuǎn)程調(diào)用協(xié)議的八。十六的該類也是用了適配器模式,該類主要的作用就是增加了心跳功能,可以參考源碼解析十遠(yuǎn)程通信層的四。二十的可以參考源碼解析十七遠(yuǎn)程通信的一。 2.7大揭秘——消費(fèi)端發(fā)送請求過程 目標(biāo):從源碼的角度分析一個(gè)服務(wù)方法調(diào)用經(jīng)歷怎么樣的磨難以后到達(dá)服務(wù)端。 前言 前一篇文章講到的是引用服務(wù)的過程,引用服務(wù)無非就是創(chuàng)建出一個(gè)代理。供消費(fèi)者調(diào)用服務(wù)的相關(guān)方法。...

    fish 評論0 收藏0
  • dubbo源碼解析四十四)服務(wù)暴露過程

    摘要:服務(wù)暴露過程目標(biāo)從源碼的角度分析服務(wù)暴露過程。導(dǎo)出服務(wù),包含暴露服務(wù)到本地,和暴露服務(wù)到遠(yuǎn)程兩個(gè)過程。其中服務(wù)暴露的第八步已經(jīng)沒有了。將泛化調(diào)用版本號或者等信息加入獲得服務(wù)暴露地址和端口號,利用內(nèi)數(shù)據(jù)組裝成。 dubbo服務(wù)暴露過程 目標(biāo):從源碼的角度分析服務(wù)暴露過程。 前言 本來這一篇一個(gè)寫異步化改造的內(nèi)容,但是最近我一直在想,某一部分的優(yōu)化改造該怎么去撰寫才能更加的讓讀者理解。我覺...

    light 評論0 收藏0
  • dubbo源碼解析四十三)2.7新特性

    摘要:大揭秘目標(biāo)了解的新特性,以及版本升級的引導(dǎo)。四元數(shù)據(jù)改造我們知道以前的版本只有注冊中心,注冊中心的有數(shù)十個(gè)的鍵值對,包含了一個(gè)服務(wù)所有的元數(shù)據(jù)。 DUBBO——2.7大揭秘 目標(biāo):了解2.7的新特性,以及版本升級的引導(dǎo)。 前言 我們知道Dubbo在2011年開源,停止更新了一段時(shí)間。在2017 年 9 月 7 日,Dubbo 悄悄的在 GitHub 發(fā)布了 2.5.4 版本。隨后,版本...

    qqlcbb 評論0 收藏0
  • dubbo源碼解析四十五)服務(wù)引用過程

    摘要:服務(wù)引用過程目標(biāo)從源碼的角度分析服務(wù)引用過程。并保留服務(wù)提供者的部分配置,比如版本,,時(shí)間戳等最后將合并后的配置設(shè)置為查詢字符串中。的可以參考源碼解析二十三遠(yuǎn)程調(diào)用的一的源碼分析。 dubbo服務(wù)引用過程 目標(biāo):從源碼的角度分析服務(wù)引用過程。 前言 前面服務(wù)暴露過程的文章講解到,服務(wù)引用有兩種方式,一種就是直連,也就是直接指定服務(wù)的地址來進(jìn)行引用,這種方式更多的時(shí)候被用來做服務(wù)測試,不...

    xiaowugui666 評論0 收藏0

發(fā)表評論

0條評論

最新活動(dòng)
閱讀需要支付1元查看
<