摘要:前言這篇主要來講解多線程中一個(gè)非常經(jīng)典的設(shè)計(jì)模式包括它的基礎(chǔ)到拓展希望大家能夠有所收獲生產(chǎn)者消費(fèi)者模式簡述此設(shè)計(jì)模式中主要分兩類線程生產(chǎn)者線程和消費(fèi)者線程生產(chǎn)者提供數(shù)據(jù)和任務(wù)消費(fèi)者處理數(shù)據(jù)和任務(wù)該模式的核心就是數(shù)據(jù)和任務(wù)的交互點(diǎn)共享內(nèi)存緩
前言
這篇主要來講解多線程中一個(gè)非常經(jīng)典的設(shè)計(jì)模式
包括它的基礎(chǔ)到拓展
希望大家能夠有所收獲
此設(shè)計(jì)模式中主要分兩類線程:生產(chǎn)者線程和消費(fèi)者線程
生產(chǎn)者提供數(shù)據(jù)和任務(wù)
消費(fèi)者處理數(shù)據(jù)和任務(wù)
該模式的核心就是數(shù)據(jù)和任務(wù)的交互點(diǎn):共享內(nèi)存緩存區(qū)
下面給出簡單易懂的一張圖:
使用BlockingQueue來做緩沖區(qū)是非常合適的
通過BlockingQueue來理解生產(chǎn)者消費(fèi)者模式
首先我們要知道BlockingQueue是什么?
它是一個(gè)實(shí)現(xiàn)接口,有很多實(shí)現(xiàn)類,比如:
ArrayBlockingQueue:前面講過,這個(gè)隊(duì)列適合做有界隊(duì)列,固定線程數(shù)
LinkedBlockingQueue:它適合做無界隊(duì)列
......
以ArrayBlockingQueue為例
它在內(nèi)部放置了一個(gè)對(duì)象數(shù)組:
final Object[] items;
通過items數(shù)組來進(jìn)行元素的存取
1(存).向隊(duì)列中壓入一個(gè)元素:
.offer():如果隊(duì)列滿了,返回false
.put():將元素壓入隊(duì)列末尾,如果隊(duì)列滿了,它就會(huì)一直等待
2(取).向隊(duì)列中彈出元素(從頭部彈出):
.poll():如果隊(duì)列為空,返回null
.take():如果隊(duì)列為空,繼續(xù)等待,知道隊(duì)列中有元素
了解了上面這些基礎(chǔ)后,我們來看下實(shí)際操作是怎樣的
在開始之前我們要有一個(gè)Entity類,只存一個(gè)long類型的value值進(jìn)去:
public class MyData { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } }
有了這個(gè)數(shù)據(jù)模型,看下最后的執(zhí)行main方法:
public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); //建立線程池 BlockingQueueblockingQueue = new ArrayBlockingQueue (10); //建立緩存隊(duì)列 for (int i=0;i<3;i++){ Producer i = new Producer(queue); executor.execute(i); } //制造三個(gè)生產(chǎn)線程 for (int j=0;j<3;j++){ Consumer j = new Consumer(queue); executor.execute(j); } //制造三個(gè)消費(fèi)線程 Thread.sleep(10000); for (int i=0;i<3;i++){ i.stop(); } //停止生產(chǎn) Thread.sleep(5000); executor.shutdown(); }
這里只給出Main,大家可以通過代碼簡單理解使用BlockingQueue做緩沖區(qū)的過程
沒有給出生產(chǎn)者和消費(fèi)者的具體線程實(shí)現(xiàn)類,除了博主比較懶之外,還有是因?yàn)槭褂肂lockingQueue做緩沖區(qū)并不推薦使用
雖然BlockingQueue是個(gè)不錯(cuò)的選擇,但它使用了鎖和阻塞來保證線程間的同步,并不具備良好的并發(fā)性能
下面講解一種具有高性能的共享緩沖區(qū)
我們知道BlockingQueue隊(duì)列的性能不是特別優(yōu)越
而之前講到過ConcurrentLinkedQueue是一個(gè)高性能隊(duì)列,因?yàn)樗褂昧舜罅康腃AS操作
同理,如果我們利用CAS操作實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模式,性能就可以得到客觀的提升
但是大量的CAS操作自己實(shí)現(xiàn)起來非常困難
所以推薦使用Disruptor框架
實(shí)際工作還是得使用成熟的框架,Disruptor是一款高效的無鎖內(nèi)存隊(duì)列
它不像傳統(tǒng)隊(duì)列有head和tail指針來操控入列和出列
而是實(shí)現(xiàn)了一個(gè)固定大小的環(huán)形隊(duì)列(RingBuffer),來看下實(shí)際模型圖:
生產(chǎn)者向緩沖區(qū)寫入數(shù)據(jù),消費(fèi)者從緩沖區(qū)讀取數(shù)據(jù),大家都使用了CAS操作
而且由于是環(huán)形隊(duì)列的原因,可以做到完全的內(nèi)存復(fù)用
從而大大減少系統(tǒng)分配空間以及回收空間的額外開銷
那么這個(gè)框架怎么使用呢?
1.導(dǎo)入包(博主使用了Maven依賴,不同版本大同小異):
com.lmax disruptor 3.3.2
2.依舊創(chuàng)建一個(gè)entity類:
public class MyData { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } }
3.還要寫一個(gè)Factory類,細(xì)心的同學(xué)會(huì)看到環(huán)形隊(duì)列是固定大小的
這個(gè)Factory會(huì)在Disruptor實(shí)例對(duì)象構(gòu)造時(shí),構(gòu)造所有緩沖區(qū)中的對(duì)象實(shí)例
public class DataFactory implements EventFactory{ @Override public Object newInstance() { return new MyData(); } }
4.生產(chǎn)者(具體每行代碼的作用都已經(jīng)注釋):
public class Producers { private final RingBufferringBuffer; //創(chuàng)建環(huán)形隊(duì)列(環(huán)形緩沖區(qū)) public Producers(RingBuffer ringBuffer) { this.ringBuffer = ringBuffer; //將ringBuffer與Producers綁定 } public void putData(ByteBuffer byteBuffer){ //此方法將產(chǎn)生的數(shù)據(jù)推入緩沖區(qū) long sequeue = ringBuffer.next(); //通過.next()方法得到ringBuffer的下一個(gè)節(jié)點(diǎn),并且賦值給sequeue MyData event = ringBuffer.get(sequeue); //將mydata數(shù)據(jù)存入到下一個(gè)節(jié)點(diǎn) event.setValue(byteBuffer.getLong(0)); //mydata的值有ByteBuffer參數(shù)帶入 ringBuffer.publish(sequeue); //將sequeue節(jié)點(diǎn)內(nèi)的數(shù)據(jù)發(fā)布 } }
5.消費(fèi)者:
public class Consumers implements WorkHandler{ @Override public void onEvent(MyData myData) throws Exception { System.out.println("當(dāng)前線程為:"+Thread.currentThread().getId()+"線程,它處理的數(shù)據(jù)是:"+myData.getValue()); } }
6.執(zhí)行函數(shù):
public class RunTest { public static void main(String[] args) throws InterruptedException { Executor executor = Executors.newCachedThreadPool(); //創(chuàng)建線程池 DataFactory dataFactory = new DataFactory(); //創(chuàng)建Factory實(shí)例 int bufferSize = 1024; //設(shè)置緩存區(qū)大小為1024(必須是2的整數(shù)次冪) Disruptordisruptor = new Disruptor ( dataFactory, bufferSize, executor, ProducerType.MULTI, new BlockingWaitStrategy() ); disruptor.handleEventsWithWorkerPool( new Consumers(), new Consumers(), new Consumers(), new Consumers() ); disruptor.start(); //Disruptor啟動(dòng) RingBuffer ringBuffer = disruptor.getRingBuffer(); //實(shí)例化環(huán)形隊(duì)列并與Disruptor綁定 Producers producers = new Producers(ringBuffer); //實(shí)例化生產(chǎn)者并綁定ringBuffer ByteBuffer byteBuffe = ByteBuffer.allocate(8); //創(chuàng)建一個(gè)容量為256字節(jié)的ByteBuffer for (long n = 0;true;n++){ byteBuffe.putLong(0,n); producers.putData(byteBuffe); Thread.sleep(100); System.out.println("add data "+n); } } }
我們來看下執(zhí)行結(jié)果:
當(dāng)前線程為:13線程,它處理的數(shù)據(jù)是:1059 add data 1059 當(dāng)前線程為:11線程,它處理的數(shù)據(jù)是:1060 add data 1060 當(dāng)前線程為:10線程,它處理的數(shù)據(jù)是:1061 add data 1061 當(dāng)前線程為:12線程,它處理的數(shù)據(jù)是:1062 add data 1062 當(dāng)前線程為:13線程,它處理的數(shù)據(jù)是:1063 add data 1063 當(dāng)前線程為:11線程,它處理的數(shù)據(jù)是:1064 add data 1064 當(dāng)前線程為:10線程,它處理的數(shù)據(jù)是:1065
可以看出,因?yàn)槲覠o限的讓生產(chǎn)線程生產(chǎn)數(shù)據(jù),而RingBuffer中那十幾條消費(fèi)線程不停的消費(fèi)數(shù)據(jù)
此外Disruptor不止CAS操作,還提供了四種等待策略讓消費(fèi)者監(jiān)控緩沖區(qū)的信息:
1.BlockingWaitStrategy:默認(rèn)策略,最節(jié)省CPU,但在高并發(fā)下性能表現(xiàn)最糟糕
2.SleepingWaitStrategy:等待數(shù)據(jù)時(shí)自旋等待,不成功會(huì)使用LockSupport方法阻塞自己,通常用于異步日志
3.YieldWaitStrategy:用于低延時(shí)場合,在內(nèi)部執(zhí)行Thread.yield()死循環(huán)
4.BusySpinWaitStrategy:消費(fèi)線程進(jìn)行死循環(huán)監(jiān)控緩沖區(qū),吃掉所有CPU資源
除了CAS操作,消費(fèi)者等待策略,Disruptor還使用CPU Cache的優(yōu)化來進(jìn)行優(yōu)化
根據(jù)Disruptor官方報(bào)道:Disruptor的性能比BlockingQueuez至少高一倍以上!
以上便是生產(chǎn)者消費(fèi)者模式的應(yīng)用
謝謝閱讀,記得點(diǎn)關(guān)注看更新
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/70719.html
摘要:今天就先到這里,大家可以看看這些內(nèi)容的拓展記得點(diǎn)關(guān)注看更新,謝謝閱讀 前言 這是一個(gè)長篇博客,希望大家關(guān)注我并且一起學(xué)習(xí)java高并發(fā)廢話不多說,直接開始 并行和并發(fā) 并行:多個(gè)線程同時(shí)處理多個(gè)任務(wù)并發(fā):多個(gè)線程處理同個(gè)任務(wù),不一定要同時(shí) 下面用圖來描述并行和并發(fā)的區(qū)別:(實(shí)現(xiàn)和虛線表示兩個(gè)不同的線程) showImg(https://segmentfault.com/img/bVYT...
摘要:可以用代替可以用代替定義的對(duì)象的值是不可變的今天就先到這里,大家可以看看這些內(nèi)容的拓展記得點(diǎn)關(guān)注看更新,謝謝閱讀 前言 java高并發(fā)第二篇講的是java線程的基礎(chǔ)依舊不多說廢話 線程和進(jìn)程 進(jìn)程是操作系統(tǒng)運(yùn)行的基礎(chǔ),是一個(gè)程序運(yùn)行的實(shí)體,windows上打開任務(wù)管理器就能看到進(jìn)程線程是輕量級(jí)的進(jìn)程,是程序執(zhí)行的最小單位,是在進(jìn)程這個(gè)容器下進(jìn)行的 線程基本操作 新建一個(gè)線程類有兩種方式...
摘要:前言今天講的多線程的同步控制直接進(jìn)入正題重入鎖重入鎖可以完全代替,它需要類來實(shí)現(xiàn)下面用一個(gè)簡單的例子來實(shí)現(xiàn)重入鎖以上代碼打印出來的是,可以說明也實(shí)現(xiàn)了線程同步它相比更加靈活,因?yàn)橹厝腈i實(shí)現(xiàn)了用戶自己加鎖,自己釋放鎖記得一定要釋放,不然其他線 前言 今天講的多線程的同步控制直接進(jìn)入正題 ReentrantLock重入鎖 重入鎖可以完全代替synchronized,它需要java.util...
摘要:前言本篇主要講解如何去優(yōu)化鎖機(jī)制或者克服多線程因?yàn)殒i可導(dǎo)致性能下降的問題線程變量有這樣一個(gè)場景,前面是一大桶水,個(gè)人去喝水,為了保證線程安全,我們要在杯子上加鎖導(dǎo)致大家輪著排隊(duì)喝水,因?yàn)榧恿随i的杯子是同步的,只能有一個(gè)人拿著這個(gè)唯一的杯子喝 前言 本篇主要講解如何去優(yōu)化鎖機(jī)制或者克服多線程因?yàn)殒i可導(dǎo)致性能下降的問題 ThreadLocal線程變量 有這樣一個(gè)場景,前面是一大桶水,10個(gè)...
摘要:只有動(dòng)手,你才能真的理解作者的構(gòu)思的巧妙只有動(dòng)手,你才能真正掌握一門技術(shù)持續(xù)更新中項(xiàng)目地址求求求源碼系列跟一起學(xué)如何寫函數(shù)庫中高級(jí)前端面試手寫代碼無敵秘籍如何用不到行代碼寫一款屬于自己的類庫原理講解實(shí)現(xiàn)一個(gè)對(duì)象遵循規(guī)范實(shí)戰(zhàn)手摸手,帶你用擼 Do it yourself!!! 只有動(dòng)手,你才能真的理解作者的構(gòu)思的巧妙 只有動(dòng)手,你才能真正掌握一門技術(shù) 持續(xù)更新中…… 項(xiàng)目地址 https...
閱讀 1786·2021-11-12 10:35
閱讀 1705·2021-08-03 14:02
閱讀 2772·2019-08-30 15:55
閱讀 2103·2019-08-30 15:54
閱讀 844·2019-08-30 14:01
閱讀 2493·2019-08-29 17:07
閱讀 2317·2019-08-26 18:37
閱讀 3108·2019-08-26 16:51