摘要:生產(chǎn)者消費者問題是一個典型的多進程同步問題。生產(chǎn)者線程開始產(chǎn)生新的元素并將它們存儲在緩沖區(qū)。否則,生產(chǎn)者線程將會在緩沖區(qū)創(chuàng)建一個新元素然后通知消費者。我們建立一個線程池,它將收到兩個任務,生產(chǎn)者和消費者的任務。
原文鏈接:https://dzone.com/articles/th...
作者:Ioan Tinca
譯者:liumapp
想要了解更多關于Java生產(chǎn)者消費者問題的演變嗎?那就看看這篇文章吧,我們分別用舊方法和新方法來處理這個問題。
生產(chǎn)者消費者問題是一個典型的多進程同步問題。
對于大多數(shù)人來說,這個問題可能是我們在學校,執(zhí)行第一次并行算法所遇到的第一個同步問題。
雖然它很簡單,但一直是并行計算中的最大挑戰(zhàn) - 多個進程共享一個資源。
問題陳述生產(chǎn)者和消費者兩個程序,共享一個大小有限的公共緩沖區(qū)。
假設一個生產(chǎn)者"生產(chǎn)"一份數(shù)據(jù)并將其存儲在緩沖區(qū)中,而一個消費者"消費"這份數(shù)據(jù),并將這份數(shù)據(jù)從緩沖區(qū)中刪除。
再假設現(xiàn)在這兩個程序在并發(fā)地運行,我們需要確保當緩沖區(qū)的數(shù)據(jù)已滿時,生產(chǎn)者不會放置新數(shù)據(jù)進來,也要確保當緩沖區(qū)的數(shù)據(jù)為空時,消費者不會試圖刪除數(shù)據(jù)緩沖區(qū)的數(shù)據(jù)。
解決方案為了解決上述的并發(fā)問題,生產(chǎn)者和消費者將不得不相互通信。
如果緩沖區(qū)已滿,生產(chǎn)者將處于睡眠狀態(tài),直到有通知信息喚醒。
在消費者將一些數(shù)據(jù)從緩沖區(qū)刪除后,消費者將通知生產(chǎn)者,隨后生產(chǎn)者將重新開始填充數(shù)據(jù)到緩沖區(qū)中。
如果緩沖區(qū)內(nèi)容為空的化,那么情況是一樣的,只不過,消費者會先等待生產(chǎn)者的通知。
但如果這種溝通做得不恰當,在進程彼此等待的位置可能導致程序死鎖。
經(jīng)典的方法首先來看一個典型的Java方案來解決這個問題。
package ProducerConsumer; import java.util.LinkedList; import java.util.Queue; public class ClassicProducerConsumerExample { public static void main(String[] args) throws InterruptedException { Buffer buffer = new Buffer(2); Thread producerThread = new Thread(new Runnable() { @Override public void run() { try { buffer.produce(); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread consumerThread = new Thread(new Runnable() { @Override public void run() { try { buffer.consume(); } catch (InterruptedException e) { e.printStackTrace(); } } }); producerThread.start(); consumerThread.start(); producerThread.join(); consumerThread.join(); } static class Buffer { private Queue list; private int size; public Buffer(int size) { this.list = new LinkedList<>(); this.size = size; } public void produce() throws InterruptedException { int value = 0; while (true) { synchronized (this) { while (list.size() >= size) { // wait for the consumer wait(); } list.add(value); System.out.println("Produced " + value); value++; // notify the consumer notify(); Thread.sleep(1000); } } } public void consume() throws InterruptedException { while (true) { synchronized (this) { while (list.size() == 0) { // wait for the producer wait(); } int value = list.poll(); System.out.println("Consume " + value); // notify the producer notify(); Thread.sleep(1000); } } } }}
這里我們有生產(chǎn)者和消費者兩個線程,它們共享一個公共緩沖區(qū)。生產(chǎn)者線程開始產(chǎn)生新的元素并將它們存儲在緩沖區(qū)。如果緩沖區(qū)已滿,那么生產(chǎn)者線程進入睡眠狀態(tài),直到有通知喚醒。否則,生產(chǎn)者線程將會在緩沖區(qū)創(chuàng)建一個新元素然后通知消費者。就像我之前說的,這個過程也適用于消費者。如果緩沖區(qū)為空,那么消費者將等待生產(chǎn)者的通知。否則,消費者將從緩沖區(qū)刪除一個元素并通知生產(chǎn)者。
正如你所看到的,在之前的例子中,生產(chǎn)者和消費者的工作都是管理緩沖區(qū)的對象。這些線程僅僅調用了buffer.produce()和buffer.consume()兩個方法就搞定了一切。
對于緩沖區(qū)是否應該負責創(chuàng)建或者刪除元素,一直都是一個有爭議的話題,但在我看來,緩沖區(qū)不應該做這種事情。當然,這取決于你想要達到的目的,但在這種情況下,緩沖區(qū)應該只是負責以線程安全的形式存儲合并元素,而不是生產(chǎn)新的元素。
所以,讓我們把生產(chǎn)和消費的邏輯從緩沖對象中進行解耦。
package ProducerConsumer; import java.util.LinkedList; import java.util.Queue; public class ProducerConsumerExample2 { public static void main(String[] args) throws InterruptedException { Buffer buffer = new Buffer(2); Thread producerThread = new Thread(() -> { try { int value = 0; while (true) { buffer.add(value); System.out.println("Produced " + value); value ++; Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); Thread consumerThread = new Thread(() -> { try { while (true) { int value = buffer.poll(); System.out.println("Consume " + value); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); producerThread.start(); consumerThread.start(); producerThread.join(); consumerThread.join(); } static class Buffer { private Queue list; private int size; public Buffer(int size) { this.list = new LinkedList<>(); this.size = size; } public void add(int value) throws InterruptedException { synchronized (this) { while (list.size() >= size) { wait(); } list.add(value); notify(); } } public int poll() throws InterruptedException { synchronized (this) { while (list.size() == 0) { wait(); } int value = list.poll(); notify(); return value; } } }}
這樣好多了,至少現(xiàn)在緩沖區(qū)僅僅負責以線程安全的形式來存儲和刪除元素。
隊列阻塞(BlockingQueue)不過,我們還可以進一步改善。
在前面的例子中,我們已經(jīng)創(chuàng)建了一個緩沖區(qū),每當存儲一個元素之前,緩沖區(qū)將等待是否有可用的一個槽以防止沒有足夠的存儲空間,并且,在合并之前,緩沖區(qū)也會等待一個新的元素出現(xiàn),以確保存儲和刪除的操作是線程安全的。
但是,Java本身的庫已經(jīng)整合了這些操作。它被稱之為BlockingQueue,在這里可以查看它的詳細文檔。
BlockingQueue是一個以線程安全的形式存入和取出實例的隊列。而這就是我們所需要的。
所以,如果我們在示例中使用BlockingQueue,我們就不需要再去實現(xiàn)等待和通知的機制。
接下來,我們來看看具體的代碼。
package ProducerConsumer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; public class ProducerConsumerWithBlockingQueue { public static void main(String[] args) throws InterruptedException { BlockingQueue blockingQueue = new LinkedBlockingDeque<>(2); Thread producerThread = new Thread(() -> { try { int value = 0; while (true) { blockingQueue.put(value); System.out.println("Produced " + value); value++; Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); Thread consumerThread = new Thread(() -> { try { while (true) { int value = blockingQueue.take(); System.out.println("Consume " + value); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }); producerThread.start(); consumerThread.start(); producerThread.join(); consumerThread.join(); }}
雖然runnables看起來跟之前一樣,他們按照之前的方式生產(chǎn)和消費元素。
唯一的區(qū)別在于,這里我們使用blockingQueue代替緩沖區(qū)對象。
關于Blocking Queue的更多細節(jié)這兒有很多種類型的BlockingQueue:
無界隊列
有界隊列
一個無界隊列幾乎可以無限地增加元素,任何添加操作將不會被阻止。
你可以以這種方式去創(chuàng)建一個無界隊列:
BlockingQueue blockingQueue = new LinkedBlockingDeque<>();
在這種情況下,由于添加操作不會被阻塞,生產(chǎn)者添加新元素時可以不用等待。每次當生產(chǎn)者想要添加一個新元素時,會有一個隊列先存儲它。但是,這里面也存在一個異常需要捕獲。如果消費者刪除元素的速度比生產(chǎn)者添加新的元素要慢,那么內(nèi)存將被填滿,我們將可能得到一個OutOfMemory異常。
與之相反的則是有界隊列,存在一個固定大小。你可以這樣去創(chuàng)建它:
BlockingQueue blockingQueue = new LinkedBlockingDeque<>(10);
兩者最主要的區(qū)別在于,使用有界隊列的情況下,如果隊列內(nèi)存已滿,而生產(chǎn)者仍然試圖往里面塞元素,那么隊列將會被阻塞(具體阻塞方式取決于添加元素的方法)直到有足夠的空間騰出來。
往blocking queue里面添加元素一共有以下四種方式:
add() - 如果插入成功返回true,否則拋出IllegalStateException
put() - 往隊列中插入元素,并在有必要的情況下等待一個可用的槽(slot)
offer() - 如果插入元素成功返回true,否則返回false
offer(E e, long timeout, TimeUnit unit) – 在隊列沒有滿的情況下,或者為了一個可用的slot而等待指定的時間后,往隊列中插入一個元素。
所以,如果你使用put()方法插入元素,而隊列內(nèi)存已滿的情況下,我們的生產(chǎn)者就必須等待,直到有可用的slot出現(xiàn)。
以上就是我們上一個案例的全部,這跟ProducerConsumerExample2的工作原理是一樣的。
使用線程池還有什么地方我們可以優(yōu)化的?那首先來分析一下我們干了什么,我們實例化了兩個線程,一個被叫做生產(chǎn)者,專門往隊列里面塞元素,另一個被叫做消費者,負責從隊列里面刪元素。
然而,好的軟件技術表明,手動地去創(chuàng)建和銷毀線程是不好的做法。首先創(chuàng)建線程是一項昂貴的任務,每創(chuàng)建一個線程,意味著要經(jīng)歷一遍下面的步驟:
首先要分配內(nèi)存給一個線程堆棧
操作系統(tǒng)要創(chuàng)建一個原生線程對應于Java的線程
跟這個線程相關的描述符被添加到JVM內(nèi)部的數(shù)據(jù)結構中
首先別誤會我,我們的案例中用了幾個線程是沒有問題的,而那也是并發(fā)工作的方式之一。這里的問題是,我們是手動地去創(chuàng)建線程,這可以說是一次糟糕的實踐。如果我們手動地創(chuàng)建線程,除了創(chuàng)建過程中的消耗外,還有另一個問題,就是我們無法控制同時有多少個線程在運行。舉個例子,如果同時有一百萬次請求線上服務,那么每一次請求都會相應的創(chuàng)建一個線程,那么同時會有一百萬個線程在后臺運行,這將會導致thread starvation)
所以,我們需要一種全局管理線程的方式,這就用到了線程池。
線程池將基于我們選擇的策略來處理線程的生命周期。它擁有有限數(shù)量的空閑線程,并在需要解決任務時啟用它們。通過這種方式,我們不需要為每一個新的請求創(chuàng)建一個新線程,因此,我們可以避免出現(xiàn)線程饑餓的問題。
Java線程池的實現(xiàn)包括:
一個任務隊列
一個工作線程的集合
一個線程工廠
管理線程池狀態(tài)的元數(shù)據(jù)
為了同時運行一些任務,你必須把他們先放到任務隊列里。然后,當一個線程可用的時候,它將接收一個任務并運行它。可用的線程越多,并行執(zhí)行的任務就越多。
除了管理線程生命周期,使用線程池還有另一個好處,當你計劃如何分割任務,以便同時執(zhí)行時,你能想到更多種方式。并行性的單位不再是線程了,而是任務。你設計一些任務來并發(fā)執(zhí)行,而不是讓一些線程通過共享公共的內(nèi)存塊來并發(fā)運行。按照功能需求來思考的方式可以幫助我們避免一些常見的多線程問題,如死鎖或數(shù)據(jù)競爭等。沒有什么可以阻止我們再次深入這些問題,但是,由于使用了功能范式,我們沒辦法命令式地同步并行計算(鎖)。這比直接使用線程和共享內(nèi)存所能碰到的幾率要少的多。在我們的例子中,共享一個阻塞隊列不是想要的情況,但我就是想強調這個優(yōu)勢。
在這里和這里你可以找到更多有關線程池的內(nèi)容。
說了那么多,接下來我們看看在案例中如何使用線程池。
package ProducerConsumer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; public class ProducerConsumerExecutorService { public static void main(String[] args) { BlockingQueue blockingQueue = new LinkedBlockingDeque<>(2); ExecutorService executor = Executors.newFixedThreadPool(2); Runnable producerTask = () -> { try { int value = 0; while (true) { blockingQueue.put(value); System.out.println("Produced " + value); value++; Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }; Runnable consumerTask = () -> { try { while (true) { int value = blockingQueue.take(); System.out.println("Consume " + value); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }; executor.execute(producerTask); executor.execute(consumerTask); executor.shutdown(); }}
這里的區(qū)別在于,我們不在手動創(chuàng)建或運行消費者和生產(chǎn)者線程。我們建立一個線程池,它將收到兩個任務,生產(chǎn)者和消費者的任務。生產(chǎn)者和消費者的任務,實際上跟之前例子里面使用的runnable是相同的?,F(xiàn)在,執(zhí)行程序(線程池實現(xiàn))將接收任務,并安排它的工作線程去執(zhí)行他們。
在我們簡單的案例下,一切都跟之前一樣運行。就像之前的例子,我們?nèi)匀挥袃蓚€線程,他們?nèi)匀灰酝瑯拥姆绞缴a(chǎn)和消費元素。雖然我們并沒有讓性能得到提升,但是代碼看起來干凈多了。我們不再手動創(chuàng)建線程,而只是具體說明我們想要什么:我們想要并發(fā)執(zhí)行某些任務。
所以,當你使用一個線程池時。你不需要考慮線程是并發(fā)執(zhí)行的單位,相反的,你把一些任務看作并發(fā)執(zhí)行的就好。以上就是你需要知道的,剩下的由執(zhí)行程序去處理。執(zhí)行程序會收到一些任務,然后,它會分配工作線程去處理它們。
總結首先,我們看到了一個"傳統(tǒng)"的消費者-生產(chǎn)者問題的解決方案。我們盡量避免了重復造沒有必要的車輪,恰恰相反,我們重用了已經(jīng)測試過的解決方案,因此,我們不是寫一個通知等待系統(tǒng),而是嘗試使用Java已經(jīng)提供的blocking queue,因為Java為我們提供了一個非常有效的線程池來管理線程生命周期,讓我們可以擺脫手動創(chuàng)建線程。通過這些改進,消費者-生產(chǎn)者問題的解決方案看起來更可靠和更好理解。
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://m.hztianpu.com/yun/75813.html
摘要:傳的最后一次參數(shù)是一個回調函數(shù),當命令成功或失敗之后會立即被調用。回調函數(shù)中,我們明確地處理連接錯誤的情況,設置狀態(tài)為,并再次調用重連。如果沒有發(fā)生錯誤,調用回調函數(shù)結束當前工作項目。嘗試連接的時候,使用增加每次重連的時間間隔。 Node.js 中的隊列 本文轉載自:眾成翻譯譯者:文藺鏈接:http://www.zcfy.cc/article/662原文:http://blog.yld...
摘要:發(fā)布訂閱模式在之前的文章里,創(chuàng)建了。我們稱之為發(fā)布訂閱模式。其實我們是用到了默認的,用空字符串來標識??兆址砹藳]有名字的被路由到了由指定名字的。和這種關系的建立我們稱之為從現(xiàn)在開始這個就會將推向我們的隊列了。 發(fā)布訂閱模式 在之前的文章里,創(chuàng)建了work queue。work queue中,每一個task都會派發(fā)給一個worker。在本章中,我們會完成完全不一樣的事情 - 我們會...
摘要:允許接收和轉發(fā)消息。一個等待接收消息的程序是一個消費者。發(fā)送者會先連接到發(fā)送一條消息,然后退出。注意這里的是要和之前的名稱一致。翻譯日期另因為想入門第一次想著翻譯,第一次然后希望多多提出不足。 gitBook https://joursion.gitbooks.io/... Title: RabbitMQ tutorials ---- Hello World (Javascript) ...
摘要:生產(chǎn)者只能把消息發(fā)到交換器。是否要追加到一個特殊的隊列是否要追加到許多的隊列或者丟掉這條消息這些規(guī)則被定義為交換類型。有一點很關鍵,向不存在的交換器發(fā)布消息是被禁止的。如果仍然沒有隊列綁定交換器,消息會丟失。 發(fā)布與訂閱 (Publish/Subscribe) 在之前的章節(jié)中,我們創(chuàng)建了工作隊列,之前的工作隊列的假設是每個任務只被分發(fā)到一個worker。在這一節(jié)中,我們會做一些完全不一...
閱讀 1235·2021-11-23 10:04
閱讀 2477·2021-11-22 15:29
閱讀 3144·2021-11-19 09:40
閱讀 792·2021-09-22 15:26
閱讀 2193·2019-08-29 16:27
閱讀 2564·2019-08-29 16:10
閱讀 1979·2019-08-29 15:43
閱讀 3386·2019-08-29 12:43