成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專欄INFORMATION COLUMN

JMS 在 SpringBoot 中的使用

Michael_Ding / 2567人閱讀

摘要:本文主要講述消息服務(wù)在中的使用。所以需要一個(gè)監(jiān)聽容器工廠的概念,即接口,它會(huì)引用上面創(chuàng)建好的與的連接工廠,由它來負(fù)責(zé)接收消息以及將消息分發(fā)給指定的監(jiān)聽器。為了消費(fèi)消息,訂閱者必須保持運(yùn)行的狀態(tài)。

JMS 在 SpringBoot 中的使用

摘要:本文屬于原創(chuàng),歡迎轉(zhuǎn)載,轉(zhuǎn)載請(qǐng)保留出處:https://github.com/jasonGeng88/blog>

本文所有服務(wù)均采用docker容器化方式部署

當(dāng)前環(huán)境

Mac OS 10.11.x

docker 1.12.1

JDK 1.8

SpringBoot 1.5

前言

基于之前一篇“一個(gè)故事告訴你什么是消息隊(duì)列”,了解了消息隊(duì)列的使用場(chǎng)景以及相關(guān)的特性。本文主要講述消息服務(wù)在 JAVA 中的使用。

市面上的有關(guān)消息隊(duì)列的技術(shù)選型非常多,如果我們的代碼框架要支持不同的消息實(shí)現(xiàn),在保證框架具有較高擴(kuò)展性的前提下,我們勢(shì)必要進(jìn)行一定的封裝。

在 JAVA 中,大可不必如此。因?yàn)?JAVA 已經(jīng)制定了一套標(biāo)準(zhǔn)的 JMS 規(guī)范。該規(guī)范定義了一套通用的接口和相關(guān)語(yǔ)義,提供了諸如持久、驗(yàn)證和事務(wù)的消息服務(wù),其最主要的目的是允許Java應(yīng)用程序訪問現(xiàn)有的消息中間件。就和 JDBC 一樣。

基本概念

在介紹具體的使用之前,先簡(jiǎn)單介紹一下 JMS 的一些基本知識(shí)。這里我打算分為 3 部分來介紹,即 消息隊(duì)列(MQ)的連接、消息發(fā)送與消息接收。

這里我們的技術(shù)選型是 SpringBoot、JMS、ActiveMQ

為了更好的理解 JMS,這里沒有使用 SpringBoot 零配置來搭建項(xiàng)目

MQ 的連接

使用 MQ 的第一步一定是先連接 MQ。因?yàn)檫@里使用的是 JMS 規(guī)范,對(duì)于任何遵守 JMS 規(guī)范的 MQ 來說,都會(huì)實(shí)現(xiàn)相應(yīng)的ConnectionFactory接口,因此我們只需要?jiǎng)?chuàng)建一個(gè)ConnectionFactory工廠類,由它來實(shí)現(xiàn) MQ 的連接,以及封裝一系列特性的 MQ 參數(shù)。

例子:這里我們以 ActiveMQ 為例,

maven 依賴:


    org.springframework.boot
    spring-boot-starter-parent
    1.5.3.RELEASE



    
        org.springframework.boot
        spring-boot-starter-activemq
    

創(chuàng)建 ActiveMQ 連接工廠:

@Bean
public ConnectionFactory connectionFactory(){

    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    connectionFactory.setBrokerURL(ActiveMQ_URL);
    connectionFactory.setUserName(ActiveMQ_USER);
    connectionFactory.setPassword(ActiveMQ_PASSWORD);
    return connectionFactory;
    
}
消息發(fā)送

關(guān)于消息的發(fā)送,是通過 JMS 核心包中的JmsTemplate類來實(shí)現(xiàn)的,它簡(jiǎn)化了 JMS 的使用,因?yàn)樵诎l(fā)送或同步接收消息時(shí)它幫我們處理了資源的創(chuàng)建和釋放。從它的作用也不難推測(cè)出,它需要引用我們上面創(chuàng)建的連接工廠,具體代碼如下:

@Bean
public JmsTemplate jmsQueueTemplate(){

    return new JmsTemplate(connectionFactory());
    
}

JmsTemplate創(chuàng)建完成后,我們就可以調(diào)用它的方法來發(fā)送消息了。這里有兩個(gè)概念需要注意:

消息會(huì)發(fā)送到哪里?-> 即需要指定發(fā)送隊(duì)列的目的地(Destination),是可以在 JNDI 中進(jìn)行存儲(chǔ)和提取的 JMS 管理對(duì)象。

發(fā)送的消息體具體是什么?-> 實(shí)現(xiàn)了javax.jms.Message的對(duì)象,類似于 JAVA RMI 的 Remote 對(duì)象。

代碼示例:

@Autowired
private JmsTemplate jmsQueueTemplate;

/**
 * 發(fā)送原始消息 Message
 */
public void send(){

    jmsQueueTemplate.send("queue1", new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
            return session.createTextMessage("我是原始消息");
        }
    });
    
}

優(yōu)化:當(dāng)然,我們不用每次都通過MessageCreator 匿名類的方式來創(chuàng)建Message對(duì)象,JmsTemplate類中提供了對(duì)象實(shí)體自動(dòng)轉(zhuǎn)換為Message對(duì)象的方法,convertAndSend(String destinationName, final Object message)。

優(yōu)化代碼示例:

/**
 * 發(fā)送消息自動(dòng)轉(zhuǎn)換成原始消息
 */
public void convertAndSend(){

    jmsQueueTemplate.convertAndSend("queue1", "我是自動(dòng)轉(zhuǎn)換的消息");
    
}

注:關(guān)于消息轉(zhuǎn)換,還可以通過實(shí)現(xiàn)MessageConverter接口來自定義轉(zhuǎn)換內(nèi)容

消息接收

講完了消息發(fā)送,我們最后來說說消息是如何接收的。消息既然是以Message對(duì)象的形式發(fā)送到指定的目的地,那么消息的接收勢(shì)必會(huì)去指定的目的地上去接收消息。這里采用的是監(jiān)聽者的方式來監(jiān)聽指定地點(diǎn)的消息,采用注解@JmsListener來設(shè)置監(jiān)聽方法。

代碼示例:

@Component
public class Listener1 {

    @JmsListener(destination = "queue1")
    public void receive(String msg){
        System.out.println("監(jiān)聽到的消息內(nèi)容為: " + msg);
    }
    
}

有了監(jiān)聽的目標(biāo)和方法后,監(jiān)聽器還得和 MQ 關(guān)聯(lián)起來,這樣才能運(yùn)作起來。這里的監(jiān)聽器可能不止一個(gè),如果每個(gè)都要和 MQ 建立連接,肯定不太合適。所以需要一個(gè)監(jiān)聽容器工廠的概念,即接口JmsListenerContainerFactory,它會(huì)引用上面創(chuàng)建好的與 MQ 的連接工廠,由它來負(fù)責(zé)接收消息以及將消息分發(fā)給指定的監(jiān)聽器。當(dāng)然也包括事務(wù)管理、資源獲取與釋放和異常轉(zhuǎn)換等。

代碼示例:

@Bean
public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory() {
    
    DefaultJmsListenerContainerFactory factory =
            new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    //設(shè)置連接數(shù)
    factory.setConcurrency("3-10");
    //重連間隔時(shí)間
    factory.setRecoveryInterval(1000L);
    return factory;
    
}
場(chǎng)景

代碼地址:https://github.com/jasonGeng88/springboot-jms

對(duì) JMS 有了基本的理解后,我們就來在具體的場(chǎng)景中使用一下。

首先,我們需要先啟動(dòng) ActiveMQ,這里我們以 Docker 容器化的方式進(jìn)行啟動(dòng)。

啟動(dòng)命令:

docker run -d -p 8161:8161 -p 61616:61616 --name activemq webcenter/activemq

啟動(dòng)成功后,在 ActiveMQ 可視化界面查看效果(http://localhost:8161):

點(diǎn)對(duì)點(diǎn)模式(單消費(fèi)者)

下面介紹消息隊(duì)列中最常用的一種場(chǎng)景,即點(diǎn)對(duì)點(diǎn)模式?;靖拍钊缦拢?/p>

每個(gè)消息只能被一個(gè)消費(fèi)者(Consumer)進(jìn)行消費(fèi)。一旦消息被消費(fèi)后,就不再在消息隊(duì)列中存在。

發(fā)送者和接收者之間在時(shí)間上沒有依賴性,也就是說當(dāng)發(fā)送者發(fā)送了消息之后,不管接收者有沒有正在運(yùn)行,它不會(huì)影響到消息被發(fā)送到隊(duì)列。

接收者在成功接收消息之后需向隊(duì)列應(yīng)答成功。

代碼實(shí)現(xiàn)(為簡(jiǎn)化代碼,部分代碼沿用上面所述):

啟動(dòng)文件(Application.java)

@SpringBootApplication
@EnableJms
public class Application {

    ...

    /**
     * JMS 隊(duì)列的模板類
     * connectionFactory() 為 ActiveMQ 連接工廠
     */
    @Bean
    public JmsTemplate jmsQueueTemplate(){
        return new JmsTemplate(connectionFactory());
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

注解@EnableJms設(shè)置在@Configuration類上,用來聲明對(duì) JMS 注解的支持。

消息生產(chǎn)者(PtpProducer.java)

@Component
public class PtpProducer {

    @Autowired
    private JmsTemplate jmsQueueTemplate;

    /**
     * 發(fā)送消息自動(dòng)轉(zhuǎn)換成原始消息
     */
    public void convertAndSend(){
        jmsQueueTemplate.convertAndSend("ptp", "我是自動(dòng)轉(zhuǎn)換的消息");
    }
}

生產(chǎn)者調(diào)用類(PtpController.java)

@RestController
@RequestMapping(value = "/ptp")
public class PtpController {

    @Autowired
    private PtpProducer ptpProducer;

    @RequestMapping(value = "/convertAndSend")
    public Object convertAndSend(){
        ptpProducer.convertAndSend();
        return "success";
    }

}

消息監(jiān)聽容器工廠

@SpringBootApplication
@EnableJms
public class Application {

    ...

    /**
     * JMS 隊(duì)列的監(jiān)聽容器工廠
     */
    @Bean(name = "jmsQueueListenerCF")
    public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        //設(shè)置連接數(shù)
        factory.setConcurrency("3-10");
        //重連間隔時(shí)間
        factory.setRecoveryInterval(1000L);
        return factory;
    }

   ...
   
}

消息監(jiān)聽器

@Component
public class PtpListener1 {

    /**
     * 消息隊(duì)列監(jiān)聽器
     * destination 隊(duì)列地址
     * containerFactory 監(jiān)聽器容器工廠, 若存在2個(gè)以上的監(jiān)聽容器工廠,需進(jìn)行指定
     */
    @JmsListener(destination = "ptp", containerFactory = "jmsQueueListenerCF")
    public void receive(String msg){
    
        System.out.println("點(diǎn)對(duì)點(diǎn)模式1: " + msg);
        
    }
}
演示

啟動(dòng)項(xiàng)目啟動(dòng)后,通過 REST 接口的方式來調(diào)用消息生產(chǎn)者發(fā)送消息,請(qǐng)求如下:

curl -XGET 127.0.0.1:8080/ptp/convertAndSend

消費(fèi)者控制臺(tái)信息:

ActiveMQ 控制臺(tái)信息:

列表說明:

Name:隊(duì)列名稱。

Number Of Pending Messages:等待消費(fèi)的消息個(gè)數(shù)。

Number Of Consumers:當(dāng)前連接的消費(fèi)者數(shù)目,因?yàn)槲覀儾捎玫氖沁B接池的方式連接,初始連接數(shù)為 3,所以顯示數(shù)字為 3。

Messages Enqueued:進(jìn)入隊(duì)列的消息總個(gè)數(shù),包括出隊(duì)列的和待消費(fèi)的,這個(gè)數(shù)量只增不減。

Messages Dequeued:出了隊(duì)列的消息,可以理解為是已經(jīng)消費(fèi)的消息數(shù)量。

點(diǎn)對(duì)點(diǎn)模式(多消費(fèi)者)

基于上面一個(gè)消費(fèi)者消費(fèi)的模式,因?yàn)樯a(chǎn)者可能會(huì)有很多,同時(shí)像某個(gè)隊(duì)列發(fā)送消息,這時(shí)一個(gè)消費(fèi)者可能會(huì)成為瓶頸。所以需要多個(gè)消費(fèi)者來分?jǐn)傁M(fèi)壓力(消費(fèi)線程池能解決一定壓力,但畢竟在單機(jī)上,做不到分布式分布,所以多消費(fèi)者是有必要的),也就產(chǎn)生了下面的場(chǎng)景。

代碼實(shí)現(xiàn)

添加新的監(jiān)聽器

@Component
public class PtpListener2 {

    @JmsListener(destination = Constant.QUEUE_NAME, containerFactory = "jmsQueueListenerCF")
    public void receive(String msg){
    
        System.out.println("點(diǎn)對(duì)點(diǎn)模式2: " + msg);
        
    }
}
演示

這里我們發(fā)起 10 次請(qǐng)求,來觀察消費(fèi)者的消費(fèi)情況:

這里因?yàn)楸O(jiān)聽容器設(shè)置了線程池的緣故,在實(shí)際消費(fèi)過程中,監(jiān)聽器消費(fèi)的順序會(huì)有所差異。

發(fā)布訂閱模式

除了點(diǎn)對(duì)點(diǎn)模式,發(fā)布訂閱模式也是消息隊(duì)列中常見的一種使用。試想一下,有一個(gè)即時(shí)聊天群,你在群里發(fā)送一條消息。所有在這個(gè)群里的人(即訂閱了該群的人),都會(huì)收到你發(fā)送的信息。

基本概念:

每個(gè)消息可以有多個(gè)消費(fèi)者。

發(fā)布者和訂閱者之間有時(shí)間上的依賴性。針對(duì)某個(gè)主題(Topic)的訂閱者,它必須創(chuàng)建一個(gè)訂閱者之后,才能消費(fèi)發(fā)布者的消息。

為了消費(fèi)消息,訂閱者必須保持運(yùn)行的狀態(tài)。

代碼實(shí)現(xiàn)

修改 JmsTemplate 模板類,使其支持發(fā)布訂閱功能

@SpringBootApplication
@EnableJms
public class Application {

    ...

    @Bean
    public JmsTemplate jmsTopicTemplate(){
        JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
        jmsTemplate.setPubSubDomain(true);
        return jmsTemplate;
    }
    
    ...

}

消息生產(chǎn)者(PubSubProducer.java)

@Component
public class PtpProducer {

    @Autowired
    private JmsTemplate jmsTopicTemplate;

    public void convertAndSend(){
        jmsTopicTemplate.convertAndSend("topic", "我是自動(dòng)轉(zhuǎn)換的消息");
    }
}

生產(chǎn)者調(diào)用類(PubSubController.java)

@RestController
@RequestMapping(value = "/pubsub")
public class PtpController {

    @Autowired
    private PubSubProducer pubSubProducer;

    @RequestMapping(value = "/convertAndSend")
    public String convertAndSend(){
        pubSubProducer.convertAndSend();
        return "success";
    }

}

修改 DefaultJmsListenerContainerFactory 類,使其支持發(fā)布訂閱功能

@SpringBootApplication
@EnableJms
public class Application {

    ...

    /**
     * JMS 隊(duì)列的監(jiān)聽容器工廠
     */
    @Bean(name = "jmsTopicListenerCF")
    public DefaultJmsListenerContainerFactory jmsTopicListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrency("1");
        factory.setPubSubDomain(true);
        return factory;
    }

   ...
   
}

消息監(jiān)聽器(這里設(shè)置2個(gè)訂閱者)

@Component
public class PubSubListener1 {

    @JmsListener(destination = "topic", containerFactory = "jmsTopicListenerCF")
    public void receive(String msg){
        System.out.println("訂閱者1 - " + msg);
    }
}

@Component
public class PubSubListener2 {

    @JmsListener(destination = "topic", containerFactory = "jmsTopicListenerCF")
    public void receive(String msg){
        System.out.println("訂閱者2 - " + msg);
    }
}
演示
curl -XGET 127.0.0.1:8080/pubSub/convertAndSend

消費(fèi)者控制臺(tái)信息:

ActiveMQ 控制臺(tái)信息:

總結(jié)

這里只是對(duì) SpringBoot 與 JMS 集成的簡(jiǎn)單說明與使用,詳細(xì)的介紹可以查看 Spring 的官方文檔,我這里也有幸參與 并發(fā)編程網(wǎng) 發(fā)起的 Spring 5 的翻譯工作,我主要翻譯了 Spring 5 的 JMS 章節(jié),其內(nèi)容對(duì)于上述 JMS 的基本概念,都有詳細(xì)的展開說明,有興趣的可以看一下,當(dāng)然翻譯水平有限,英文好的建議看原文。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/67190.html

相關(guān)文章

  • SpringBoot ActiveMq JmsTemplate 異步發(fā)送、非持久化

    摘要:異步發(fā)送不會(huì)在受到的確認(rèn)之前一直阻塞方法。方法成功返回意味著所有的持久消息都以被寫到二級(jí)存儲(chǔ)中。總結(jié)默認(rèn)情況,非持久化消息事務(wù)內(nèi)的消息均采用異步發(fā)送對(duì)于持久化消息采用同步發(fā)送。 ActiveMq事務(wù) ActiveMq事務(wù)的作用就是在發(fā)送、接收處理消息過程中,如果出現(xiàn)問題,可以回滾。 ActiveMq異步/同步發(fā)送 以下摘抄自https://blog.csdn.net/songhai.....

    AprilJ 評(píng)論0 收藏0
  • SpringBoot ActiveMQ 整合使用

    摘要:介紹它是出品,最流行的,能力強(qiáng)勁的開源消息總線。是一個(gè)完全支持和規(guī)范的實(shí)現(xiàn),盡管規(guī)范出臺(tái)已經(jīng)是很久的事情了,但是在當(dāng)今的應(yīng)用中間仍然扮演著特殊的地位。相關(guān)文章整合使用整合使用關(guān)注我轉(zhuǎn)載請(qǐng)務(wù)必注明原創(chuàng)地址為安裝同之前一樣,直接在里面玩吧。 showImg(https://segmentfault.com/img/remote/1460000012996066?w=1920&h=1281)...

    gaara 評(píng)論0 收藏0
  • SpringBoot非官方教程 | 第二十一篇: springboot集成JMS

    摘要:對(duì)提供了很好的支持,對(duì)其做了起步依賴。構(gòu)架工程創(chuàng)建一個(gè)工程,在其文件加入添加配置在中填寫自己的郵箱密碼。啟用設(shè)置附件發(fā)送郵件郵件已發(fā)送測(cè)試已全部通過,沒有坑。 springboot對(duì)JMS提供了很好的支持,對(duì)其做了起步依賴。 構(gòu)架工程 創(chuàng)建一個(gè)springboot工程,在其pom文件加入: org.springframework.boot spring-boot-st...

    roundstones 評(píng)論0 收藏0
  • SpringBoot | 自動(dòng)配置原理

    摘要:下班后閑著無聊看了下中的自動(dòng)配置,把我的理解跟大家說下。上述的每一個(gè)自動(dòng)配置類都有自動(dòng)配置功能,也可在配置文件中自定義配置。 微信公眾號(hào):一個(gè)優(yōu)秀的廢人。如有問題,請(qǐng)后臺(tái)留言,反正我也不會(huì)聽。 前言 這個(gè)月過去兩天了,這篇文章才跟大家見面,最近比較累,大家見諒下。下班后閑著無聊看了下 SpringBoot 中的自動(dòng)配置,把我的理解跟大家說下。 配置文件能寫什么? 相信接觸過 Sprin...

    KitorinZero 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

閱讀需要支付1元查看
<