摘要:關(guān)閉套接字和上下文備注說明如何利用使用首先下載所需的包,解壓以后將和文件放到自己電腦中的安裝路徑中的文件夾下,最后需要將之前解壓后的包放在項目的中或者資源下載鏈接密碼項目源碼下載鏈接鏈接密碼
在講ZeroMQ前先給大家講一下什么是消息隊列。
消息隊列簡介:消息隊列中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用耦合,異步消息,流量削鋒等問題。實(shí)現(xiàn)高性能,高可用,可伸縮和最終一致性架構(gòu)。是大型分布式系統(tǒng)不可缺少的中間件。目前在生產(chǎn)環(huán)境,使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。其實(shí)簡單點(diǎn)說,消息隊列就是如何使各分載器如何實(shí)現(xiàn)負(fù)載均衡使得完成分布式目標(biāo)。
ZeroMQ簡介:ZeroMQ是一種基于消息隊列的多線程網(wǎng)絡(luò)庫,其對套接字類型、連接處理、幀、甚至路由的底層細(xì)節(jié)進(jìn)行抽象,提供跨越多種傳輸協(xié)議的套接字。ZeroMQ是網(wǎng)絡(luò)通信中新的一層,介于應(yīng)用層和傳輸層之間(按照TCP/IP劃分),其是一個可伸縮層,可并行運(yùn)行,分散在分布式系統(tǒng)間。ZeroMQ幾乎所有的I/O操作都是異步的,主線程不會被阻塞。ZeroMQ會根據(jù)用戶調(diào)用zmq_init函數(shù)時傳入的接口參數(shù),創(chuàng)建對應(yīng)數(shù)量的I/O Thread。每個I/O Thread都有與之綁定的Poller,Poller采用經(jīng)典的Reactor模式實(shí)現(xiàn),Poller根據(jù)不同操作系統(tǒng)平臺使用不同的網(wǎng)絡(luò)I/O模型(select、poll、epoll、devpoll、kequeue等)。主線程與I/O線程通過Mail Box傳遞消息來進(jìn)行通信。Server開始監(jiān)聽或者Client發(fā)起連接時,在主線程中創(chuàng)建zmq_connecter或zmq_listener,通過Mail Box發(fā)消息的形式將其綁定到I/O線程,I/O線程會把zmq_connecter或zmq_listener添加到Poller中用以偵聽讀/寫事件。Server與Client在第一次通信時,會創(chuàng)建zmq_init來發(fā)送identity,用以進(jìn)行認(rèn)證。認(rèn)證結(jié)束后,雙方會為此次連接創(chuàng)建Session,以后雙方就通過Session進(jìn)行通信。每個Session都會關(guān)聯(lián)到相應(yīng)的讀/寫管道, 主線程收發(fā)消息只是分別從管道中讀/寫數(shù)據(jù)。Session并不實(shí)際跟kernel交換I/O數(shù)據(jù),而是通過plugin到Session中的Engine來與kernel交換I/O數(shù)據(jù)。
ZeroMQ三種模型講解及實(shí)例【1】Request-Response
由請求端發(fā)起請求,然后等待回應(yīng)端應(yīng)答。一個請求必須對應(yīng)一個回應(yīng),從請求端的角度來看是發(fā)-收配對,從回應(yīng)端的角度是收-發(fā)對。跟一對一結(jié)對模型的區(qū)別在于請求端可以是1~N個。該模型主要用于遠(yuǎn)程調(diào)用及任務(wù)分配等。Echo服務(wù)就是這種經(jīng)典模型的應(yīng)用。
下面通過Java實(shí)現(xiàn)這一模型:
server port
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public class Server {
public static void main(String[] args) throws InterruptedException { //實(shí)現(xiàn)服務(wù)器端的上下文及套接字 Context context = ZMQ.context(1); Socket responder = context.socket(ZMQ.REP); //使服務(wù)器端通過tcp協(xié)議通信,監(jiān)聽5555端口 responder.bind("tcp://*:5555"); while (!Thread.currentThread().isInterrupted()) { byte[] request = responder.recv(0); System.out.println("Received Hello"); Thread.sleep(1000); String reply = "World"; responder.send(reply.getBytes(), 0); } //關(guān)閉服務(wù)器端的上下文及套接字 responder.close(); context.close(); }
}
client port
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public class Client {
public static void main(String[] args) { //創(chuàng)立客戶端的上下文捷套接字 Context context = ZMQ.context(1); System.out.println("Connecting to hello world server…"); Socket requester = context.socket(ZMQ.REQ); //講客戶端綁定在5555端口 requester.connect("tcp://localhost:5555"); for (int requestNbr = 0; requestNbr != 100; requestNbr++) { String request = "Hello"; System.out.println("Sending Hello " + requestNbr); requester.send(request.getBytes(), 0); byte[] reply = requester.recv(0); System.out.println("Received " + new String(reply) + " " + requestNbr); } //關(guān)閉客戶端的上下文套接字 requester.close(); context.term(); }
}
【2】Publisher/Subscriber model
發(fā)布端單向分發(fā)數(shù)據(jù),且不關(guān)心是否把全部信息發(fā)送給訂閱端。如果發(fā)布端開始發(fā)布信息時,訂閱端尚未連接上來,則這些信息會被直接丟棄。訂閱端未連接導(dǎo)致信息丟失的問題,可以通過與請求回應(yīng)模型組合來解決。訂閱端只負(fù)責(zé)接收,而不能反饋,且在訂閱端消費(fèi)速度慢于發(fā)布端的情況下,會在訂閱端堆積數(shù)據(jù)。該模型主要用于數(shù)據(jù)分發(fā)。天氣預(yù)報、微博明星粉絲可以應(yīng)用這種經(jīng)典模型。
Server Port
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public class ZMQ_PUB {
public static void main(String[] args) throws InterruptedException { Context context = ZMQ.context(1); Socket publisher = context.socket(ZMQ.PUB); publisher.bind("tcp://*:5555"); Thread.sleep(3000); for(int i=0;i<100;i++){ publisher.send(("admin " + i).getBytes(), ZMQ.NOBLOCK); System.out.println("pub msg " + i); Thread.sleep(1000); } context.close(); publisher.close(); }
}
Client Port
import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; public class ZMQ_SUB { public static void main(String[] args) { Context context = ZMQ.context(1); Socket subscriber = context.socket(ZMQ.SUB); subscriber.connect("tcp://localhost:5555"); subscriber.subscribe("".getBytes()); for (int i=0;i<100;i++) { //Receive a message. String string = new String(subscriber.recv(0)); System.out.println("recv 1" + string); } //關(guān)閉套接字和上下文 subscriber.close(); context.term(); } }
【3】push/pull
push port import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; public class Push { public static void main(String[] args) { Context context = ZMQ.context(1); Socket push = context.socket(ZMQ.PUSH); push.bind("ipc://fjs"); for (int i = 0; i < 10000000; i++) { push.send("hello".getBytes(), i); } push.close(); context.term(); } }
pull port
import java.util.concurrent.atomic.AtomicInteger;
import org.zeromq.ZMQ;
public class Pull {
public static void main(String args[]) { final AtomicInteger number = new AtomicInteger(0); for (int i = 0; i < 5; i++) { new Thread(new Runnable(){ private int here = 0; public void run() { // TODO Auto-generated method stub ZMQ.Context context = ZMQ.context(1); ZMQ.Socket pull = context.socket(ZMQ.PULL); pull.connect("ipc://fjs"); //pull.connect("ipc://fjs"); while (true) { String message = new String(pull.recv()); int now = number.incrementAndGet(); here++; if (now % 1000000 == 0) { System.out.println(now + " here is : " + here); } } } }).start(); } }
}
備注說明:
【1】如何利用Java使用ZeroMQ
首先下載zmq所需的zip包,解壓以后將libzmq.dll和jzmq.dll文件放到自己電腦中的jdk安裝路徑中的bin文件夾下,最后需要將之前解壓后的zmq.jar包放在項目的lib中或者
zeromq資源下載:
鏈接:http://pan.baidu.com/s/1miuvSfQ 密碼:ttss
項目源碼下載鏈接:
鏈接:http://pan.baidu.com/s/1dE5Plr7 密碼:vqze
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.hztianpu.com/yun/68009.html
摘要:用于控制活動人數(shù),將超過此一定閥值的訂單直接丟棄。緩解短時間的高流量壓垮應(yīng)用。目前比較推薦的就是我們手動然后將消費(fèi)錯誤的消息轉(zhuǎn)移到其它的消息隊列中,做補(bǔ)償處理消費(fèi)者該方案是默認(rèn)的方式不太推薦。 SpringBoot 是為了簡化 Spring 應(yīng)用的創(chuàng)建、運(yùn)行、調(diào)試、部署等一系列問題而誕生的產(chǎn)物,自動裝配的特性讓我們可以更好的關(guān)注業(yè)務(wù)本身而不是外部的XML配置,我們只需遵循規(guī)范,引入相...
摘要:設(shè)備改造上傳結(jié)果數(shù)據(jù)的技術(shù)實(shí)現(xiàn)一項目需求及分析按照領(lǐng)導(dǎo)的要求,要改造一臺儀器,添加點(diǎn)功能,將測量數(shù)據(jù)上傳到服務(wù)器。所以選擇用提交,的通信可以多線程調(diào)度??紤]到新增的上傳功能不能影響之前的測量節(jié)拍,所以要多線程實(shí)現(xiàn)。 **設(shè)備改造——上傳結(jié)果數(shù)據(jù)的技術(shù)實(shí)現(xiàn) 一、項目需求及分析 按照領(lǐng)導(dǎo)的要求,要改造一臺儀器,添加點(diǎn)功能,將測量數(shù)據(jù)上傳到服務(wù)器。儀器測量節(jié)拍大概是20s,數(shù)據(jù)量目前不大,...
摘要:的明確目標(biāo)是成為標(biāo)準(zhǔn)網(wǎng)絡(luò)協(xié)議棧的一部分,之后進(jìn)入內(nèi)核。實(shí)現(xiàn)端測試消息已發(fā)送端正在轉(zhuǎn)發(fā)端輸出結(jié)果已發(fā)送已發(fā)送已發(fā)送正在轉(zhuǎn)發(fā)正在轉(zhuǎn)發(fā)正在轉(zhuǎn)發(fā)測試消息測試消息測試消息 簡介 ZMQ (以下 ZeroMQ 簡稱 ZMQ)是一個簡單好用的傳輸層,像框架一樣的一個 socket library,他使得 Socket 編程更加簡單、簡潔和性能更高。是一個消息處理隊列庫,可在多個線程、內(nèi)核和主機(jī)盒之間...
摘要:我推薦你使用進(jìn)行日志收集,將作為的出口。集群目前暫時沒有提供日志查看機(jī)制。以如下的形式啟動容器,容器日志將發(fā)往配置的。 【作者barnett】本文介紹了k8s官方提供的日志收集方法,并介紹了Fluentd日志收集器并與其他產(chǎn)品做了比較。最后介紹了好雨云幫如何對k8s進(jìn)行改造并使用ZeroMQ以消息的形式將日志傳輸?shù)浇y(tǒng)一的日志處理中心。 容器日志存在形式 目前容器日志有兩種輸出形式: ...
閱讀 1846·2023-04-26 00:20
閱讀 1900·2021-11-08 13:21
閱讀 2115·2021-09-10 10:51
閱讀 1682·2021-09-10 10:50
閱讀 3371·2019-08-30 15:54
閱讀 2202·2019-08-30 14:22
閱讀 1487·2019-08-29 16:10
閱讀 3157·2019-08-26 11:50