成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專欄INFORMATION COLUMN

J.U.C|condition分析

Sourcelink / 2584人閱讀

摘要:造成當(dāng)前線程在接到信號(hào)被中斷或到達(dá)指定最后期限之前一直處于等待狀態(tài)。該線程從等待方法返回前必須獲得與相關(guān)的鎖。如果線程已經(jīng)獲取了鎖,則將喚醒條件隊(duì)列的首節(jié)點(diǎn)。

一、寫在前面

在前幾篇我們聊了 AQS、CLH、ReentrantLock、ReentrantReadWriteLock等的原理以及其源碼解讀,具體參見專欄 《非學(xué)無以廣才》

這章我們一起聊聊顯示的Condition 對(duì)象。

二、簡(jiǎn)介

在沒有Lock之前,我們使用synchronized來控制同步,配合Object的wait()、wait(long timeout)、notify()、以及notifyAll 等方法可以實(shí)現(xiàn)等待/通知模式。

Condition接口也提供了類似于Object的監(jiān)聽器方法、與Lock接口配合可以實(shí)現(xiàn)等待/通知模式,但是兩者還是有很大區(qū)別的,下圖是兩者的對(duì)比

參考《Java并發(fā)編程藝術(shù)》

Java API 摘要

Condition 將 Object 監(jiān)視器方法(wait、notify 和 notifyAll)分解成截然不同的對(duì)象,以便通過將這些對(duì)象與任意 Lock 實(shí)現(xiàn)組合使用,為每個(gè)對(duì)象提供多個(gè)等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和語(yǔ)句的使用,Condition 替代了 Object 監(jiān)視器方法的使用。

條件(也稱為條件隊(duì)列 或條件變量)為線程提供了一個(gè)含義,以便在某個(gè)狀態(tài)條件現(xiàn)在可能為 true 的另一個(gè)線程通知它之前,一直掛起該線程(即讓其“等待”)。因?yàn)樵L問此共享狀態(tài)信息發(fā)生在不同的線程中,所以它必須受保護(hù),因此要將某種形式的鎖與該條件相關(guān)聯(lián)。等待提供一個(gè)條件的主要屬性是:以原子方式 釋放相關(guān)的鎖,并掛起當(dāng)前線程,就像 Object.wait 做的那樣。

Condition 實(shí)例實(shí)質(zhì)上被綁定到一個(gè)鎖上。要為特定 Lock 實(shí)例獲得 Condition 實(shí)例,請(qǐng)使用其 newCondition() 方法。

三、方法摘要

Condition提供了一系列的方法來對(duì)阻塞和喚醒線程:

await():造成當(dāng)前線程在接到信號(hào)或被中斷之前一直處于等待狀態(tài)。

await(long time, TimeUnit unit) :造成當(dāng)前線程在接到信號(hào)、被中斷或到達(dá)指定等待時(shí)間之前一直處于等待狀態(tài)。

awaitNanos(long nanosTimeout) :造成當(dāng)前線程在接到信號(hào)、被中斷或到達(dá)指定等待時(shí)間之前一直處于等待狀態(tài)。返回值表示剩余時(shí)間,如果在nanosTimesout之前喚醒,那么返回值 = nanosTimeout - 消耗時(shí)間,如果返回值 <= 0 ,則可以認(rèn)定它已經(jīng)超時(shí)了。

awaitUninterruptibly() :造成當(dāng)前線程在接到信號(hào)之前一直處于等待狀態(tài)?!咀⒁猓涸摲椒▽?duì)中斷不敏感】。

awaitUntil(Date deadline) :造成當(dāng)前線程在接到信號(hào)、被中斷或到達(dá)指定最后期限之前一直處于等待狀態(tài)。如果沒有到指定時(shí)間就被通知,則返回true,否則表示到了指定時(shí)間,返回返回false。

signal() :?jiǎn)拘岩粋€(gè)等待線程。該線程從等待方法返回前必須獲得與Condition相關(guān)的鎖。

signalAll() :?jiǎn)拘阉械却€程。能夠從等待方法返回的線程必須獲得與Condition相關(guān)的鎖。

Condition是一種廣義上的條件隊(duì)列。他為線程提供了一種更為靈活的等待/通知模式,線程在調(diào)用await方法后執(zhí)行掛起操作,直到線程等待的某個(gè)條件為真時(shí)才會(huì)被喚醒。Condition必須要配合鎖一起使用,因?yàn)閷?duì)共享狀態(tài)變量的訪問發(fā)生在多線程環(huán)境下。一個(gè)Condition的實(shí)例必須與一個(gè)Lock綁定,因此Condition一般都是作為L(zhǎng)ock的內(nèi)部實(shí)現(xiàn)。

四、具體實(shí)現(xiàn)

獲取一個(gè)Condition必須要通過Lock的newCondition()方法。該方法定義在接口Lock下,返回的結(jié)果是綁定到此 Lock 實(shí)例的新 Condition 實(shí)例。

Condition為一個(gè)接口,其下僅有一個(gè)實(shí)現(xiàn)類ConditionObject,由于Condition的操作需要獲取相關(guān)的鎖,而AQS則是同步鎖的實(shí)現(xiàn)基礎(chǔ),所以ConditionObject則定義為AQS的內(nèi)部類。

public class ConditionObject implements Condition, java.io.Serializable {
    // 省略方法
}

等待隊(duì)列

每個(gè)Condition對(duì)象都包含著一個(gè)FIFO隊(duì)列,該隊(duì)列是Condition對(duì)象通知/等待功能的關(guān)鍵。

在隊(duì)列中每一個(gè)節(jié)點(diǎn)都包含著一個(gè)線程引用,該線程就是在該Condition對(duì)象上等待的線程。

我們看Condition的定義就明白了:?

 public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        
        /** First node of condition queue. */
        private transient Node firstWaiter;
        
        /** Last node of condition queue. */
        private transient Node lastWaiter;

        /**
         * Creates a new {@code ConditionObject} instance.
         */
        public ConditionObject() { }

        // Internal methods
        // 省略方法
}

從上面代碼可以看出Condition擁有首節(jié)點(diǎn)(firstWaiter),尾節(jié)點(diǎn)(lastWaiter)。

當(dāng)前線程調(diào)用await()方法,將會(huì)以當(dāng)前線程構(gòu)造成一個(gè)節(jié)點(diǎn)(Node),并將節(jié)點(diǎn)加入到該隊(duì)列的尾部。

圖來源《Java 并發(fā)編程藝術(shù)》

Node里面包含了當(dāng)前線程的引用。Node定義與AQS的CLH同步隊(duì)列的節(jié)點(diǎn)使用的都是同一個(gè)類(AbstractQueuedSynchronized.Node靜態(tài)內(nèi)部類)。

Condition的隊(duì)列結(jié)構(gòu)比CLH同步隊(duì)列的結(jié)構(gòu)簡(jiǎn)單些,新增過程較為簡(jiǎn)單只需要將原尾節(jié)點(diǎn)的nextWaiter指向新增節(jié)點(diǎn),然后更新lastWaiter即可。

等待?

調(diào)用Condition的await()方法會(huì)使當(dāng)前線程進(jìn)入等待狀態(tài),同時(shí)會(huì)加入到Condition等待隊(duì)列并釋放鎖。

當(dāng)從await()方法返回時(shí),當(dāng)前線程一定是獲取了Condition相的鎖。

public final void await() throws InterruptedException {
            // 當(dāng)前線程中斷、直接異常
            if (Thread.interrupted())
                throw new InterruptedException();
                
            加入等待 隊(duì)列
            Node node = addConditionWaiter();
            
            //釋放鎖
            int savedState = fullyRelease(node);
            
            int interruptMode = 0;
            
            //檢測(cè)當(dāng)前節(jié)點(diǎn)是否在同步隊(duì)列上、如果不在則說明該節(jié)點(diǎn)沒有資格競(jìng)爭(zhēng)鎖,繼續(xù)等待。
            while (!isOnSyncQueue(node)) {
            
                // 掛起線程
                LockSupport.park(this);
                
                // 線程釋是否被中斷,中斷直接退出
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            
            // 獲取同步狀態(tài)
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            
            // 清理?xiàng)l件隊(duì)列中,不實(shí)在等待狀態(tài)的節(jié)點(diǎn)
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

此段代碼的邏輯是:
首先將當(dāng)前線程新建一個(gè)節(jié)點(diǎn)同時(shí)加入到等待隊(duì)列中,然后釋放當(dāng)前線程持有的同步狀態(tài)。

然后則是不斷檢測(cè)該節(jié)點(diǎn)代表的線程是否出現(xiàn)在CLH同步隊(duì)列中(收到signal信號(hào)之后就會(huì)在AQS隊(duì)列中檢測(cè)到),如果不存在則一直掛起,否則參與競(jìng)爭(zhēng)同步狀態(tài)。

加入條件隊(duì)列(addConditionWaiter())源碼如下

private Node addConditionWaiter() {
            
            //隊(duì)列的尾節(jié)點(diǎn)
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            
            // 如果該節(jié)點(diǎn)的狀態(tài)的不是CONDITION,則說明該節(jié)點(diǎn)不在等待隊(duì)列上,需要 清除
            if (t != null && t.waitStatus != Node.CONDITION) {
                
                // 清除等待隊(duì)列中狀態(tài)部位 CONDITION 的節(jié)點(diǎn)
                unlinkCancelledWaiters();
                
                //清除后從新獲取尾節(jié)點(diǎn)
                t = lastWaiter;
            }
            
            // 將當(dāng)前線程構(gòu)造成等待節(jié)點(diǎn)
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            
            // 將node 節(jié)點(diǎn)添加到等待隊(duì)列的尾部
            
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

該方法主要是將當(dāng)前線程加入到Condition條件隊(duì)列中。當(dāng)然在加入到尾節(jié)點(diǎn)之前會(huì)清除所有狀態(tài)不為Condition的節(jié)點(diǎn)。

fullyRelease(Node node),負(fù)責(zé)釋放該線程持有的鎖

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
        
            // 獲取節(jié)點(diǎn)持有鎖的數(shù)量
            int savedState = getState();
            
            // 釋放鎖也就是釋放共享狀態(tài)
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

isOnSyncQueue(Node node):如果一個(gè)節(jié)點(diǎn)剛開始在條件隊(duì)列上,現(xiàn)在在同步隊(duì)列上獲取鎖則返回true。

final boolean isOnSyncQueue(Node node) {
        
        // 狀態(tài)為CONDITION 、前驅(qū)節(jié)點(diǎn)為空,返回false
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
            
         // 如果后繼節(jié)點(diǎn)不為空,則說明節(jié)點(diǎn)肯定在同步隊(duì)列中
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }

unlinkCancelledWaiters():負(fù)責(zé)將條件隊(duì)列中狀態(tài)不為Condition的節(jié)點(diǎn)刪除。

private void unlinkCancelledWaiters() {
            
            // 首節(jié)點(diǎn)
            Node t = firstWaiter;
            
            Node trail = null;
            // 從頭開始清除狀態(tài)不為CONDITION的節(jié)點(diǎn)
            while (t != null) {
                Node next = t.nextWaiter;
                
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

通知

調(diào)用Condition的signal()方法,將會(huì)喚醒在等待隊(duì)列中等待最長(zhǎng)時(shí)間的節(jié)點(diǎn)(條件隊(duì)列里的首節(jié)點(diǎn)),在喚醒節(jié)點(diǎn)前,會(huì)將節(jié)點(diǎn)移到CLH同步隊(duì)列中。

public final void signal() {

            // 如果同步是以獨(dú)占方式進(jìn)行的,則返回 true;其他情況則返回 false  
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            
            // 喚醒首節(jié)點(diǎn)
            Node first = firstWaiter;
            
            if (first != null)
                doSignal(first);
        }
        

該方法首先會(huì)判斷當(dāng)前線程是否已經(jīng)獲得了鎖,這是前置條件。然后喚醒條件隊(duì)列中的頭節(jié)點(diǎn)。

doSignal(Node first):?jiǎn)拘杨^節(jié)點(diǎn)

private void doSignal(Node first) {
            do {
                // 修改頭節(jié)點(diǎn)、方便移除
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
                // 將該節(jié)點(diǎn)移到同步隊(duì)列
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
        

doSignal(Node first)主要是做兩件事:

修改頭節(jié)點(diǎn);

調(diào)用transferForSignal(Node first) 方法將節(jié)點(diǎn)移動(dòng)到CLH同步隊(duì)列中。

transferForSignal(Node first)源碼如下

final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
         // 將該節(jié)點(diǎn)的狀態(tài)改為初始狀態(tài)0
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
         
         // 將該節(jié)點(diǎn)添加到同步隊(duì)列的尾部、返回的是舊的尾部節(jié)點(diǎn),也就是 node.prev節(jié)點(diǎn)
        Node p = enq(node);
        
        //如果結(jié)點(diǎn)p的狀態(tài)為cancel 或者修改waitStatus失敗,則直接喚醒
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
小結(jié)

整個(gè)通知的流程如下:

判斷當(dāng)前線程是否已經(jīng)獲取了鎖,如果沒有獲取則直接拋出異常,因?yàn)楂@取鎖為通知的前置條件。

如果線程已經(jīng)獲取了鎖,則將喚醒條件隊(duì)列的首節(jié)點(diǎn)。

喚醒首節(jié)點(diǎn)是先將條件隊(duì)列中的頭節(jié)點(diǎn)移出,然后調(diào)用AQS的enq(Node node)方法將其安全地移到CLH同步隊(duì)列中 。

最后判斷如果該節(jié)點(diǎn)的同步狀態(tài)是否為Cancel,或者修改狀態(tài)為Signal失敗時(shí),則直接調(diào)用LockSupport喚醒該節(jié)點(diǎn)的線程。

五、總結(jié)

一個(gè)線程獲取鎖后,通過調(diào)用Condition的await()方法,會(huì)將當(dāng)前線程先加入到條件隊(duì)列中,然后釋放鎖,最后通過isOnSyncQueue(Node node)方法不斷自檢看節(jié)點(diǎn)是否已經(jīng)在CLH同步隊(duì)列了,如果是則嘗試獲取鎖,否則一直掛起。

當(dāng)線程調(diào)用signal()方法后,程序首先檢查當(dāng)前線程是否獲取了鎖,然后通過doSignal(Node first)方法喚醒CLH同步隊(duì)列的首節(jié)點(diǎn)。被喚醒的線程,將從await()方法中的while循環(huán)中退出來,然后調(diào)用acquireQueued()方法競(jìng)爭(zhēng)同步狀態(tài)。

注:本章參考《Java 并發(fā)編程藝術(shù)》、《Java 并發(fā)編程實(shí)戰(zhàn)》

本人技術(shù)有限,如果錯(cuò)誤,歡迎拍磚

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/74424.html

相關(guān)文章

  • J.U.C|同步隊(duì)列(CLH)

    摘要:二什么是同步隊(duì)列同步隊(duì)列一個(gè)雙向隊(duì)列,隊(duì)列中每個(gè)節(jié)點(diǎn)等待前驅(qū)節(jié)點(diǎn)釋放共享狀態(tài)鎖被喚醒就可以了。三入列操作如上圖了解了同步隊(duì)列的結(jié)構(gòu),我們?cè)诜治銎淙肓胁僮髟诤?jiǎn)單不過。 一、寫在前面 在上篇我們聊到AQS的原理,具體參見《J.U.C|AQS原理》。 這篇我們來給大家聊聊AQS中核心同步隊(duì)列(CLH)。 二、什么是同步隊(duì)列(CLH) 同步隊(duì)列 一個(gè)FIFO雙向隊(duì)列,隊(duì)列中每個(gè)節(jié)點(diǎn)等待前驅(qū)...

    Nosee 評(píng)論0 收藏0
  • J.U.C|可重入鎖ReentrantLock

    摘要:二什么是重入鎖可重入鎖,顧名思義,支持重新進(jìn)入的鎖,其表示該鎖能支持一個(gè)線程對(duì)資源的重復(fù)加鎖。將由最近成功獲得鎖,并且還沒有釋放該鎖的線程所擁有??梢允褂煤头椒▉頇z查此情況是否發(fā)生。 一、寫在前面 前幾篇我們具體的聊了AQS原理以及底層源碼的實(shí)現(xiàn),具體參見 《J.U.C|一文搞懂AQS》《J.U.C|同步隊(duì)列(CLH)》《J.U.C|AQS獨(dú)占式源碼分析》《J.U.C|AQS共享式源...

    wangdai 評(píng)論0 收藏0
  • J.U.C|AQS獨(dú)占式源碼分析

    摘要:本章我們主要聊獨(dú)占式即同一時(shí)刻只能有一個(gè)線程獲取同步狀態(tài),其它獲取同步狀態(tài)失敗的線程則會(huì)加入到同步隊(duì)列中進(jìn)行等待。到這獨(dú)占式獲取同步和釋放同步狀態(tài)的源碼已經(jīng)分析完了。 一、寫在前面 上篇文章通過ReentrantLock 的加鎖和釋放鎖過程給大家聊了聊AQS架構(gòu)以及實(shí)現(xiàn)原理,具體參見《J.U.C|AQS的原理》。 理解了原理,我們?cè)趤砜纯丛賮硪徊揭徊降牧牧钠湓创a是如何實(shí)現(xiàn)的。 本章給...

    why_rookie 評(píng)論0 收藏0
  • J.U.C|AQS共享式源碼分析

    摘要:主要講解方法共享式獲取同步狀態(tài),返回值表示獲取成功,反之則失敗。源碼分析同步器的和方法請(qǐng)求共享鎖的入口當(dāng)并且時(shí)才去才獲取資源獲取鎖以共享不可中斷模式獲取鎖將當(dāng)前線程一共享方式構(gòu)建成節(jié)點(diǎn)并將其加入到同步隊(duì)列的尾部。 一、寫在前面 上篇給大家聊了獨(dú)占式的源碼,具體參見《J.U.C|AQS獨(dú)占式源碼分析》 這一章我們繼續(xù)在AQS的源碼世界中遨游,解讀共享式同步狀態(tài)的獲取和釋放。 二、什么是...

    ghnor 評(píng)論0 收藏0
  • J.U.C|讀-寫鎖ReentrantReadWriteLock

    摘要:所以就有了讀寫鎖。只要沒有,讀取鎖可以由多個(gè)線程同時(shí)保持。其讀寫鎖為兩個(gè)內(nèi)部類都實(shí)現(xiàn)了接口。讀寫鎖同樣依賴自定義同步器來實(shí)現(xiàn)同步狀態(tài)的,而讀寫狀態(tài)就是其自定義同步器的狀態(tài)。判斷申請(qǐng)寫鎖數(shù)量是否超標(biāo)超標(biāo)則直接異常,反之則設(shè)置共享狀態(tài)。 一、寫在前面 在上篇我們聊到了可重入鎖(排它鎖)ReentrantLcok ,具體參見《J.U.C|可重入鎖ReentrantLock》 Reentra...

    Tonny 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

閱讀需要支付1元查看
<