成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專(zhuān)欄INFORMATION COLUMN

Artemis的JMS客戶(hù)端中的CompletionHandler是如何在artemis core

Edison / 1046人閱讀

摘要:在公開(kāi)的方法中,為的設(shè)置了繼承于回調(diào)句柄。如此看來(lái),如果想要異步通信完畢后,處理一些回調(diào),則只需實(shí)現(xiàn),并在適當(dāng)?shù)奈恢迷O(shè)置到的的里。在其保護(hù)方法里,創(chuàng)建了對(duì)象,并傳入了。

ActiveMQChannelHandler

NettyConnector在公開(kāi)的start方法中,為Channel的pipeline設(shè)置了ActiveMQChannelHandler(繼承于io.netty.channel.ChannelDuplexHandler)回調(diào)句柄。
ActiveMQChannelHandler其構(gòu)造函數(shù)定義如下:

ActiveMQChannelHandler(final ChannelGroup group,
                       final BufferHandler handler,
                       final BaseConnectionLifeCycleListener listener)

可見(jiàn)它接收了一個(gè)BufferHandler對(duì)象。在其channelRead這個(gè)callback方法中,調(diào)用了這個(gè)BufferHandler對(duì)象bufferReceived方法。

如此看來(lái),如果想要Netty異步通信完畢后,處理一些回調(diào),則只需實(shí)現(xiàn)BufferHandler,并在適當(dāng)?shù)奈恢迷O(shè)置到Netty的Channel的pipeline里。

BufferHandler

ClientSessionFactoryImpl在其保護(hù)方法createConnector里,創(chuàng)建了NettyConnector對(duì)象,并傳入了DelegatingBufferHandler。
DelegatingBufferHandler實(shí)現(xiàn)了BufferHandler,可用來(lái)處理Netty回調(diào)。

DelegatingBufferHandler

DelegatingBufferHandler定義如下,它是定義在ClientSessionFactoryImpl類(lèi)里的:

private class DelegatingBufferHandler implements BufferHandler {

      @Override
      public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) {
         RemotingConnection theConn = connection;

         if (theConn != null && connectionID.equals(theConn.getID())) {
            theConn.bufferReceived(connectionID, buffer);
         } else {
            logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet");
         }
      }
   }

也就是說(shuō),在Netty執(zhí)行回調(diào)時(shí),會(huì)調(diào)用ClientSessionFactory中的成員對(duì)象connection(類(lèi)型:RemotingConnection)的bufferReceived方法來(lái)處理數(shù)據(jù)。

實(shí)際上RemotingConnection也是一種BufferHandler

RemotingConnection

RemotingConnection(Impl)實(shí)現(xiàn)了bufferReceived(connectionID, buffer)方法,該方法會(huì)根據(jù)傳入的buffer來(lái)decode出一個(gè)package。
bufferReceived
=> doBufferReceived (以下ChannelImpl對(duì)應(yīng)的實(shí)例,是根據(jù)decode出來(lái)的package對(duì)應(yīng)的channelID,到RemotingConnectionImpl包含的channel集合里取得的)
=> ChannelImpl::doBufferReceived
=> ChannelImpl::handlePacket
=> ChannelImpl::clearUpTo
=> commandConfirmationHandler.commandConfirmed(packet)

舉例:Artemis中實(shí)現(xiàn)的JMS規(guī)范下的Producer在異步投遞消息后的回調(diào)函數(shù)是如何被調(diào)用的

以ArtemisMQMessageProducer為例:

他的send方法中,最后是調(diào)用core api的ClientProducer的send方法的,傳入一個(gè)core api的handler —— CompletionListenerWrapper(繼承于SendAcknowledgementHandler類(lèi)型),它包裝了JMS的CompletionListener。

再轉(zhuǎn)到ClientProducer的send方法, 它又調(diào)用了doSend方法,

然后它又調(diào)用了sendRegularMessage方法,它又調(diào)用了sessionContext.sendFullMessage方法。

在sessionContext.sendFullMessage方法里,可以看到,handler被包裝到packet里了,并且傳給了sessionChannel.sendBatched(packet)方法去異步發(fā)送了。

在服務(wù)器返回的packet里,也會(huì)帶有這個(gè)handler,然后BufferHandler的實(shí)現(xiàn)者RemotingConnection(Impl)的bufferReceived方法會(huì)被回調(diào),它會(huì)解析服務(wù)器回傳的packet里的handler進(jìn)行執(zhí)行。

packet是SessionSendMessage類(lèi)型的消息的別名
sessionContext.sendFullMessage方法里負(fù)責(zé)將SendAcknowledgementHandler包裝到SessionSendMessage類(lèi)型的packet里,然后才發(fā)送至服務(wù)器
服務(wù)器返回的packet,也會(huì)首先被轉(zhuǎn)換成SessionSendMessage類(lèi)型,然后獲取里面包含的SendAcknowledgementHandler類(lèi)型的回調(diào)handler執(zhí)行回調(diào)。

CompletionListenerWrapper類(lèi)定義:
private static final class CompletionListenerWrapper implements SendAcknowledgementHandler {

      private final CompletionListener completionListener;
      private final Message jmsMessage;
      private final ActiveMQMessageProducer producer;

      /**
       * @param jmsMessage
       * @param producer
       */
      private CompletionListenerWrapper(CompletionListener listener,
                                        Message jmsMessage,
                                        ActiveMQMessageProducer producer) {
         this.completionListener = listener;
         this.jmsMessage = jmsMessage;
         this.producer = producer;
      }

      @Override
      public void sendAcknowledged(org.apache.activemq.artemis.api.core.Message clientMessage) {
         if (jmsMessage instanceof StreamMessage) {
            try {
               ((StreamMessage) jmsMessage).reset();
            } catch (JMSException e) {
               // HORNETQ-1209 XXX ignore?
            }
         }
         if (jmsMessage instanceof BytesMessage) {
            try {
               ((BytesMessage) jmsMessage).reset();
            } catch (JMSException e) {
               // HORNETQ-1209 XXX ignore?
            }
         }

         try {
            producer.connection.getThreadAwareContext().setCurrentThread(true);
            completionListener.onCompletion(jmsMessage);
         } finally {
            producer.connection.getThreadAwareContext().clearCurrentThread(true);
         }
      }

      @Override
      public String toString() {
         return CompletionListenerWrapper.class.getSimpleName() + "( completionListener=" + completionListener + ")";
      }
   }

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/66894.html

相關(guān)文章

  • Spring Boot 參考指南(消息傳遞)

    摘要:還自動(dòng)配置發(fā)送和接收消息所需的基礎(chǔ)設(shè)施。支持是一個(gè)輕量級(jí)的可靠的可伸縮的可移植的消息代理,基于協(xié)議,使用通過(guò)協(xié)議進(jìn)行通信。 32. 消息傳遞 Spring框架為與消息傳遞系統(tǒng)集成提供了廣泛的支持,從使用JmsTemplate簡(jiǎn)化的JMS API到使用完整的基礎(chǔ)設(shè)施異步接收消息,Spring AMQP為高級(jí)消息隊(duì)列協(xié)議提供了類(lèi)似的特性集。Spring Boot還為RabbitTempla...

    Doyle 評(píng)論0 收藏0
  • ArtemisMQ“未消費(fèi)之謎”

    摘要:通過(guò)以上修改保證了客戶(hù)端連接能夠快速的斷開(kāi),在應(yīng)用重啟時(shí)不會(huì)持續(xù)往這邊發(fā)送消息,我使用進(jìn)行壓測(cè),重啟消費(fèi)者過(guò)程中,消息都正常。 showImg(https://segmentfault.com/img/bVbjWjt?w=470&h=200);2018年6月份,我們開(kāi)發(fā)了兩個(gè)使用Artemis做消息隊(duì)列實(shí)現(xiàn)的積分模塊和PUSH推送模塊,在幾輪測(cè)試以后,大家信心滿(mǎn)滿(mǎn)的正式上線(xiàn)了,而且經(jīng)過(guò)...

    tomato 評(píng)論0 收藏0
  • 使用Spring/Spring Boot集成JMS陷阱

    摘要:本文旨在指出中集成的一些性能陷阱,在另一篇文章各組件詳解里有組件介紹及如何正確使用的內(nèi)容。因此的做法會(huì)大大降低性能,并且將大部分的時(shí)間都花在反復(fù)重建這些對(duì)象上。提供的可以讓使用避免頻繁創(chuàng)建的問(wèn)題。至于使用的性能測(cè)試則留給同學(xué)自己做了。 Github 本文旨在指出Spring/Spring Boot中集成JMS的一些性能陷阱,在另一篇文章Spring JMS各組件詳解里有Spring J...

    xcold 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

閱讀需要支付1元查看
<