摘要:之后,會重復(fù)上一步,新喚醒的線程可能取代成為新的線程。這其實(shí)是一種名為的多線程設(shè)計(jì)模式。我們之前說了,線程作用之一就是用來喚醒其它無限等待的線程,所以必須要有這個(gè)判斷。線程池框架中的就是一種延時(shí)阻塞隊(duì)列。
本文首發(fā)于一世流云專欄:https://segmentfault.com/blog...一、DelayQueue簡介
DelayQueue是JDK1.5時(shí),隨著J.U.C包一起引入的一種阻塞隊(duì)列,它實(shí)現(xiàn)了BlockingQueue接口,底層基于已有的PriorityBlockingQueue實(shí)現(xiàn):
DelayQueue也是一種比較特殊的阻塞隊(duì)列,從類聲明也可以看出,DelayQueue中的所有元素必須實(shí)現(xiàn)Delayed接口:
/** * 一種混合風(fēng)格的接口,用來標(biāo)記那些應(yīng)該在給定延遲時(shí)間之后執(zhí)行的對象。 ** 此接口的實(shí)現(xiàn)必須定義一個(gè) compareTo 方法,該方法提供與此接口的 getDelay 方法一致的排序。 */ public interface Delayed extends Comparable
{ /** * 返回與此對象相關(guān)的剩余有效時(shí)間,以給定的時(shí)間單位表示. */ long getDelay(TimeUnit unit); }
可以看到,Delayed接口除了自身的getDelay方法外,還實(shí)現(xiàn)了Comparable接口。getDelay方法用于返回對象的剩余有效時(shí)間,實(shí)現(xiàn)Comparable接口則是為了能夠比較兩個(gè)對象,以便排序。
也就是說,如果一個(gè)類實(shí)現(xiàn)了Delayed接口,當(dāng)創(chuàng)建該類的對象并添加到DelayQueue中后,只有當(dāng)該對象的getDalay方法返回的剩余時(shí)間≤0時(shí)才會出隊(duì)。
另外,由于DelayQueue內(nèi)部委托了PriorityBlockingQueue對象來實(shí)現(xiàn)所有方法,所以能以堆的結(jié)構(gòu)維護(hù)元素順序,這樣剩余時(shí)間最小的元素就在堆頂,每次出隊(duì)其實(shí)就是刪除剩余時(shí)間≤0的最小元素。
DelayQueue的特點(diǎn)簡要概括如下:
DelayQueue是無界阻塞隊(duì)列;
隊(duì)列中的元素必須實(shí)現(xiàn)Delayed接口,元素過期后才會從隊(duì)列中取走;
二、DelayQueue示例為了便于理解DelayQueue的功能,我們先來看一個(gè)使用DelayQueue的示例。
隊(duì)列元素第一節(jié)說了,隊(duì)列元素必須實(shí)現(xiàn)Delayed接口,我們先來定義一個(gè)Data類,作為隊(duì)列元素:
public class Data implements Delayed { private static final AtomicLong atomic = new AtomicLong(0); private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss-n"); // 數(shù)據(jù)的失效時(shí)間點(diǎn) private final long time; // 序號 private final long seqno; /** * @param deadline 數(shù)據(jù)失效時(shí)間點(diǎn) */ public Data(long deadline) { this.time = deadline; this.seqno = atomic.getAndIncrement(); } /** * 返回剩余有效時(shí)間 * * @param unit 時(shí)間單位 */ @Override public long getDelay(TimeUnit unit) { return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS); } /** * 比較兩個(gè)Delayed對象的大小, 比較順序如下: * 1. 如果是對象本身, 返回0; * 2. 比較失效時(shí)間點(diǎn), 先失效的返回-1,后失效的返回1; * 3. 比較元素序號, 序號小的返回-1, 否則返回1. * 4. 非Data類型元素, 比較剩余有效時(shí)間, 剩余有效時(shí)間小的返回-1,大的返回1,相同返回0 */ @Override public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof Data) { Data x = (Data) other; // 優(yōu)先比較失效時(shí)間 long diff = this.time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (this.seqno < x.seqno) // 剩余時(shí)間相同則比較序號 return -1; else return 1; } // 一般不會執(zhí)行到此處,除非元素不是Data類型 long diff = this.getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; } @Override public String toString() { return "Data{" + "time=" + time + ", seqno=" + seqno + "}, isValid=" + isValid(); } private boolean isValid() { return this.getDelay(TimeUnit.NANOSECONDS) > 0; } }
關(guān)于隊(duì)列元素Data類,需要注意以下幾點(diǎn):
每個(gè)元素的time字段保存失效時(shí)間點(diǎn))的納秒形式(構(gòu)造時(shí)指定,比如當(dāng)前時(shí)間+60s);
seqno字段表示元素序號,每個(gè)元素唯一,僅用于失效時(shí)間點(diǎn)一致的元素之間的比較。
getDelay方法返回元素的剩余有效時(shí)間,可以根據(jù)入?yún)⒌?strong>TimeUnit選擇時(shí)間的表示形式(秒、微妙、納秒等),一般選擇納秒以提高精度;
compareTo方法用于比較兩個(gè)元素的大小,以便在隊(duì)列中排序。由于DelayQueue基于優(yōu)先級隊(duì)列實(shí)現(xiàn),所以內(nèi)部是“堆”的形式,我們定義的規(guī)則是先失效的元素將先出隊(duì),所以先失效元素應(yīng)該在堆頂,即compareTo方法返回結(jié)果<0的元素優(yōu)先出隊(duì);
生產(chǎn)者-消費(fèi)者還是以“生產(chǎn)者-消費(fèi)者”模式來作為DelayQueued的示例:
生產(chǎn)者
public class Producer implements Runnable { private final DelayQueue queue; public Producer(DelayQueue queue) { this.queue = queue; } @Override public void run() { while (true) { long currentTime = System.nanoTime(); long validTime = ThreadLocalRandom.current().nextLong(1000000000L, 7000000000L); Data data = new Data(currentTime + validTime); queue.put(data); System.out.println(Thread.currentThread().getName() + ": put " + data); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
消費(fèi)者
public class Consumer implements Runnable { private final DelayQueue queue; public Consumer(DelayQueue queue) { this.queue = queue; } @Override public void run() { while (true) { try { Data data = queue.take(); System.out.println(Thread.currentThread().getName() + ": take " + data); Thread.yield(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
調(diào)用
public class Main { public static void main(String[] args) { DelayQueue queue = new DelayQueue<>(); Thread c1 = new Thread(new Consumer(queue), "consumer-1"); Thread p1 = new Thread(new Producer(queue), "producer-1"); c1.start(); p1.start(); } }
執(zhí)行結(jié)果:
producer-1: put Data{time=73262562161592, seqno=0}, isValid=true
producer-1: put Data{time=73262787192726, seqno=1}, isValid=true
producer-1: put Data{time=73265591291171, seqno=2}, isValid=true
producer-1: put Data{time=73266850330909, seqno=3}, isValid=true
consumer-1: take Data{time=73262562161592, seqno=0}, isValid=false
consumer-1: take Data{time=73262787192726, seqno=1}, isValid=false
producer-1: put Data{time=73267928737184, seqno=4}, isValid=true
producer-1: put Data{time=73265083111776, seqno=5}, isValid=true
producer-1: put Data{time=73268729942809, seqno=6}, isValid=true
consumer-1: take Data{time=73265083111776, seqno=5}, isValid=false
上面示例中,我們創(chuàng)建了一個(gè)生產(chǎn)者,一個(gè)消費(fèi)者,生產(chǎn)者不斷得入隊(duì)元素,每個(gè)元素都會有個(gè)截止有效期;消費(fèi)者不斷得從隊(duì)列者獲取元素。從輸出可以看出,消費(fèi)者每次獲取到的元素都是有效期最小的,且都是已經(jīng)失效了的。(因?yàn)镈elayQueue每次出隊(duì)只會刪除有效期最小且已經(jīng)過期的元素)
三、DelayQueue原理介紹完了DelayQueued的基本使用,讀者應(yīng)該對該阻塞隊(duì)列的功能有了基本了解,接下來我們看下Doug Lea是如何實(shí)現(xiàn)DelayQueued的。
構(gòu)造DelayQueued提供了兩種構(gòu)造器,都非常簡單:
/** * 默認(rèn)構(gòu)造器. */ public DelayQueue() { }
/** * 從已有集合構(gòu)造隊(duì)列. */ public DelayQueue(Collection extends E> c) { this.addAll(c); }
可以看到,內(nèi)部的PriorityQueue并非在構(gòu)造時(shí)創(chuàng)建,而是對象創(chuàng)建時(shí)生成:
public class DelayQueueextends AbstractQueue implements BlockingQueue { private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue q = new PriorityQueue (); /** * leader線程是首個(gè)嘗試出隊(duì)元素(隊(duì)列不為空)但被阻塞的線程. * 該線程會限時(shí)等待(隊(duì)首元素的剩余有效時(shí)間),用于喚醒其它等待線程 */ private Thread leader = null; /** * 出隊(duì)線程條件隊(duì)列, 當(dāng)有多個(gè)線程, 會在此條件隊(duì)列上等待. */ private final Condition available = lock.newCondition(); //... }
上述比較特殊的是leader字段,我們之前已經(jīng)說過,DelayQueue每次只會出隊(duì)一個(gè)過期的元素,如果隊(duì)首元素沒有過期,就會阻塞出隊(duì)線程,讓線程在available這個(gè)條件隊(duì)列上無限等待。
為了提升性能,DelayQueue并不會讓所有出隊(duì)線程都無限等待,而是用leader保存了第一個(gè)嘗試出隊(duì)的線程,該線程的等待時(shí)間是隊(duì)首元素的剩余有效期。這樣,一旦leader線程被喚醒(此時(shí)隊(duì)首元素也失效了),就可以出隊(duì)成功,然后喚醒一個(gè)其它在available條件隊(duì)列上等待的線程。之后,會重復(fù)上一步,新喚醒的線程可能取代成為新的leader線程。這樣,就避免了無效的等待,提升了性能。這其實(shí)是一種名為“Leader-Follower pattern”的多線程設(shè)計(jì)模式。
入隊(duì)——putput方法沒有什么特別,由于是無界隊(duì)列,所以也不會阻塞線程。
/** * 入隊(duì)一個(gè)指定元素e. * 由于是無界隊(duì)列, 所以該方法并不會阻塞線程. */ public void put(E e) { offer(e); } public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); // 調(diào)用PriorityQueue的offer方法 if (q.peek() == e) { // 如果入隊(duì)元素在隊(duì)首, 則喚醒一個(gè)出隊(duì)線程 leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
需要注意的是當(dāng)首次入隊(duì)元素時(shí),需要喚醒一個(gè)出隊(duì)線程,因?yàn)榇藭r(shí)可能已有出隊(duì)線程在空隊(duì)列上等待了,如果不喚醒,會導(dǎo)致出隊(duì)線程永遠(yuǎn)無法執(zhí)行。
if (q.peek() == e) { // 如果入隊(duì)元素在隊(duì)首, 則喚醒一個(gè)出隊(duì)線程 leader = null; available.signal(); }出隊(duì)——take
整個(gè)take方法在一個(gè)自旋中完成,其實(shí)就分為兩種情況:
1.隊(duì)列為空
這種情況直接阻塞出隊(duì)線程。(在available條件隊(duì)列等待)
2.隊(duì)列非空
隊(duì)列非空時(shí),還要看隊(duì)首元素的狀態(tài)(有效期),如果隊(duì)首元素過期了,那直接出隊(duì)就行了;如果隊(duì)首元素未過期,就要看當(dāng)前線程是否是第一個(gè)到達(dá)的出隊(duì)線程(即判斷leader是否為空),如果不是,就無限等待,如果是,則限時(shí)等待。
/** * 隊(duì)首出隊(duì)元素. * 如果隊(duì)首元素(堆頂)未到期或隊(duì)列為空, 則阻塞線程. */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (; ; ) { E first = q.peek(); // 讀取隊(duì)首元素 if (first == null) // CASE1: 隊(duì)列為空, 直接阻塞 available.await(); else { // CASE2: 隊(duì)列非空 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) // CASE2.0: 隊(duì)首元素已過期 return q.poll(); // 執(zhí)行到此處說明隊(duì)列非空, 且隊(duì)首元素未過期 first = null; if (leader != null) // CASE2.1: 已存在leader線程 available.await(); // 無限期阻塞當(dāng)前線程 else { // CASE2.2: 不存在leader線程 Thread thisThread = Thread.currentThread(); leader = thisThread; // 將當(dāng)前線程置為leader線程 try { available.awaitNanos(delay); // 阻塞當(dāng)前線程(限時(shí)等待剩余有效時(shí)間) } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) // 不存在leader線程, 則喚醒一個(gè)其它出隊(duì)線程 available.signal(); lock.unlock(); } }
需要注意,自旋結(jié)束后如果leader == null && q.peek() != null,需要喚醒一個(gè)等待中的出隊(duì)線程。四、總結(jié)
leader == null && q.peek() != null的含義就是——沒有leader線程但隊(duì)列中存在元素。我們之前說了,leader線程作用之一就是用來喚醒其它無限等待的線程,所以必須要有這個(gè)判斷。
DelayQueue是阻塞隊(duì)列中非常有用的一種隊(duì)列,經(jīng)常被用于緩存或定時(shí)任務(wù)等的設(shè)計(jì)。
考慮一種使用場景:
異步通知的重試,在很多系統(tǒng)中,當(dāng)用戶完成服務(wù)調(diào)用后,系統(tǒng)有時(shí)需要將結(jié)果異步通知到用戶的某個(gè)URI。由于網(wǎng)絡(luò)等原因,很多時(shí)候會通知失敗,這個(gè)時(shí)候就需要一種重試機(jī)制。
這時(shí)可以用DelayQueue保存通知失敗的請求,失效時(shí)間可以根據(jù)已通知的次數(shù)來設(shè)定(比如:2s、5s、10s、20s),這樣每次從隊(duì)列中take獲取的就是剩余時(shí)間最短的請求,如果已重復(fù)通知次數(shù)超過一定閾值,則可以把消息拋棄。
后面,我們在講J.U.C之executors框架的時(shí)候,還會再次看到DelayQueue的身影。JUC線程池框架中的ScheduledThreadPoolExecutor.DelayedWorkQueue就是一種延時(shí)阻塞隊(duì)列。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.hztianpu.com/yun/77112.html
摘要:整個(gè)包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設(shè)計(jì)模式,設(shè)計(jì)了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對等進(jìn)行補(bǔ)充增強(qiáng)。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:我們來看下的類繼承圖可以看到,實(shí)現(xiàn)了接口,在多線程進(jìn)階二五之框架中,我們提到過實(shí)現(xiàn)了接口,以提供和排序相關(guān)的功能,維持元素的有序性,所以就是一種為并發(fā)環(huán)境設(shè)計(jì)的有序工具類。唯一的區(qū)別是針對的僅僅是鍵值,針對鍵值對進(jìn)行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首發(fā)于一世流云專欄:https://seg...
摘要:僅僅當(dāng)有多個(gè)線程同時(shí)進(jìn)行寫操作時(shí),才會進(jìn)行同步??梢钥吹?,上述方法返回一個(gè)迭代器對象,的迭代是在舊數(shù)組上進(jìn)行的,當(dāng)創(chuàng)建迭代器的那一刻就確定了,所以迭代過程中不會拋出并發(fā)修改異常。另外,迭代器對象也不支持修改方法,全部會拋出異常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首發(fā)于一世流云專欄:https://...
摘要:我們之前已經(jīng)介紹過了,底層基于跳表實(shí)現(xiàn),其操作平均時(shí)間復(fù)雜度均為。事實(shí)上,內(nèi)部引用了一個(gè)對象,以組合方式,委托對象實(shí)現(xiàn)了所有功能。線程安全內(nèi)存的使用較多迭代是對快照進(jìn)行的,不會拋出,且迭代過程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首發(fā)于一世流云專欄:https://segmentfa...
摘要:接口截止目前為止,我們介紹的阻塞隊(duì)列都是實(shí)現(xiàn)了接口。該類在構(gòu)造時(shí)一般需要指定容量,如果不指定,則最大容量為。另外,由于內(nèi)部通過來保證線程安全,所以的整體實(shí)現(xiàn)時(shí)比較簡單的。另外,雙端隊(duì)列相比普通隊(duì)列,主要是多了隊(duì)尾出隊(duì)元素隊(duì)首入隊(duì)元素的功能。 showImg(https://segmentfault.com/img/bVbgZ7j?w=770&h=514); 本文首發(fā)于一世流云專欄:ht...
閱讀 1950·2023-04-26 02:32
閱讀 636·2021-11-18 13:12
閱讀 2520·2021-10-20 13:48
閱讀 2613·2021-10-14 09:43
閱讀 3917·2021-10-11 10:58
閱讀 3728·2021-09-30 10:00
閱讀 2999·2019-08-30 15:53
閱讀 3551·2019-08-30 15:53