摘要:此時,會新建一個新的工作者線程用于對這個入隊列失敗的任務(wù)進(jìn)行處理假設(shè)此時線程池的大小還未達(dá)到其最大線程池大小。但此時需要限定線程池的最大大小為一個合理的有限值,而不是,否則可能導(dǎo)致線程池中的工作者線程的數(shù)量一直增加到系統(tǒng)資源所無法承受為止。
序
本文主要講一下SynchronousQueue。
定義SynchronousQueue,實際上它不是一個真正的隊列,因為它不會為隊列中元素維護(hù)存儲空間。與其他隊列不同的是,它維護(hù)一組線程,這些線程在等待著把元素加入或移出隊列。
如果以洗盤子的比喻為例,那么這就相當(dāng)于沒有盤架,而是將洗好的盤子直接放入下一個空閑的烘干機(jī)中。這種實現(xiàn)隊列的方式看似很奇怪,但由于可以直接交付工作,從而降低了將數(shù)據(jù)從生產(chǎn)者移動到消費者的延遲。(在傳統(tǒng)的隊列中,在一個工作單元可以交付之前,必須通過串行方式首先完成入列[Enqueue]或者出列[Dequeue]等操作。)
直接交付方式還會將更多關(guān)于任務(wù)狀態(tài)的信息反饋給生產(chǎn)者。當(dāng)交付被接受時,它就知道消費者已經(jīng)得到了任務(wù),而不是簡單地把任務(wù)放入一個隊列——這種區(qū)別就好比將文件直接交給同事,還是將文件放到她的郵箱中并希望她能盡快拿到文件。
因為SynchronousQueue沒有存儲功能,因此put和take會一直阻塞,直到有另一個線程已經(jīng)準(zhǔn)備好參與到交付過程中。僅當(dāng)有足夠多的消費者,并且總是有一個消費者準(zhǔn)備好獲取交付的工作時,才適合使用同步隊列。
實例public class SynchronousQueueExample { static class SynchronousQueueProducer implements Runnable { protected BlockingQueueblockingQueue; final Random random = new Random(); public SynchronousQueueProducer(BlockingQueue queue) { this.blockingQueue = queue; } @Override public void run() { while (true) { try { String data = UUID.randomUUID().toString(); System.out.println("Put: " + data); blockingQueue.put(data); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class SynchronousQueueConsumer implements Runnable { protected BlockingQueue blockingQueue; public SynchronousQueueConsumer(BlockingQueue queue) { this.blockingQueue = queue; } @Override public void run() { while (true) { try { String data = blockingQueue.take(); System.out.println(Thread.currentThread().getName() + " take(): " + data); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { final BlockingQueue synchronousQueue = new SynchronousQueue (); SynchronousQueueProducer queueProducer = new SynchronousQueueProducer( synchronousQueue); new Thread(queueProducer).start(); SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer( synchronousQueue); new Thread(queueConsumer1).start(); SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer( synchronousQueue); new Thread(queueConsumer2).start(); } }
應(yīng)用場景插入數(shù)據(jù)的線程和獲取數(shù)據(jù)的線程,交替執(zhí)行
Executors.newCachedThreadPool()
/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available, and uses the provided * ThreadFactory to create new threads when needed. * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null */ public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), threadFactory); }
由于ThreadPoolExecutor內(nèi)部實現(xiàn)任務(wù)提交的時候調(diào)用的是工作隊列(BlockingQueue接口的實現(xiàn)類)的非阻塞式入隊列方法(offer方法),因此,在使用SynchronousQueue作為工作隊列的前提下,客戶端代碼向線程池提交任務(wù)時,而線程池中又沒有空閑的線程能夠從SynchronousQueue隊列實例中取一個任務(wù),那么相應(yīng)的offer方法調(diào)用就會失?。慈蝿?wù)沒有被存入工作隊列)。此時,ThreadPoolExecutor會新建一個新的工作者線程用于對這個入隊列失敗的任務(wù)進(jìn)行處理(假設(shè)此時線程池的大小還未達(dá)到其最大線程池大?。?/p>
所以,使用SynchronousQueue作為工作隊列,工作隊列本身并不限制待執(zhí)行的任務(wù)的數(shù)量。但此時需要限定線程池的最大大小為一個合理的有限值,而不是Integer.MAX_VALUE,否則可能導(dǎo)致線程池中的工作者線程的數(shù)量一直增加到系統(tǒng)資源所無法承受為止。
doc如果應(yīng)用程序確實需要比較大的工作隊列容量,而又想避免無界工作隊列可能導(dǎo)致的問題,不妨考慮SynchronousQueue。SynchronousQueue實現(xiàn)上并不使用緩存空間。
使用SynchronousQueue的目的就是保證“對于提交的任務(wù),如果有空閑線程,則使用空閑線程來處理;否則新建一個線程來處理任務(wù)”。
A Guide to Java SynchronousQueue
SynchronousQueue Example in Java - Produer Consumer Solution
Java SynchronousQueue
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.hztianpu.com/yun/70453.html
摘要:如果節(jié)點不為說明已經(jīng)有其他線程進(jìn)行操作將節(jié)點替換為節(jié)點等待有消費者消費線程。如果頭節(jié)點下一個節(jié)點是當(dāng)前節(jié)點以防止其他線程已經(jīng)修改了節(jié)點則運算,否則直接返回。 一、介紹 SynchronousQueue是一個雙棧雙隊列算法,無空間的隊列或棧,任何一個對SynchronousQueue寫需要等到一個對SynchronousQueue的讀操作,反之亦然。一個讀操作需要等待一個寫操作,相當(dāng)于是...
摘要:實際上是公平模式和的超集。而使用操作實現(xiàn)一個非阻塞的方法,這是避免序列化處理任務(wù)的關(guān)鍵。在這樣的設(shè)計中,消費者的消費能力將決定生產(chǎn)者產(chǎn)生消息的速度。實例輸出中的模式手記之似懂非懂的和長度為的 序 本文主要簡介一下TransferQueue。 TransferQueue TransferQueue(java7引入)繼承了BlockingQueue(BlockingQueue又繼承了Que...
摘要:這些工具包括名稱主要作用顯示指定系統(tǒng)內(nèi)所有的虛擬機(jī)進(jìn)程。虛擬機(jī)堆轉(zhuǎn)存快照分析工具命令用于與搭配使用,用來分析生成的文件。命令格式命令樣例線程堆棧跟蹤工具用于生成虛擬機(jī)當(dāng)前時刻的線程快照。 概述 給系統(tǒng)定位問題的時候,知識、經(jīng)驗是關(guān)鍵基礎(chǔ),數(shù)據(jù)是依據(jù),工具是運用知識處理數(shù)據(jù)的手段。 java開發(fā)人員可以在jdk安裝的bin目錄下找到除了java,javac以外的其他命令。這些命令主要是一...
摘要:引言在包中,很好的解決了在多線程中,如何高效安全傳輸數(shù)據(jù)的問題。同時,也用于自帶線程池的緩沖隊列中,了解也有助于理解線程池的工作模型。 引言 在java.util.Concurrent包中,BlockingQueue很好的解決了在多線程中,如何高效安全傳輸數(shù)據(jù)的問題。通過這些高效并且線程安全的隊列類,為我們快速搭建高質(zhì)量的多線程程序帶來極大的便利。同時,BlockingQueue也用于...
摘要:三總結(jié)主要用于線程之間的數(shù)據(jù)交換,由于采用無鎖算法,其性能一般比單純的其它阻塞隊列要高。它的最大特點時不存儲實際元素,而是在內(nèi)部通過?;蜿犃薪Y(jié)構(gòu)保存阻塞線程。 showImg(https://segmentfault.com/img/bVbgOsh?w=900&h=900); 本文首發(fā)于一世流云專欄:https://segmentfault.com/blog... 一、Synchro...
閱讀 4400·2023-04-26 02:40
閱讀 2740·2023-04-26 02:31
閱讀 2823·2021-11-15 18:08
閱讀 639·2021-11-12 10:36
閱讀 1511·2021-09-30 09:57
閱讀 5349·2021-09-22 15:31
閱讀 2697·2019-08-30 14:17
閱讀 1349·2019-08-30 12:58