摘要:分段策略嘗試自旋此,然后調(diào)用次,如果經(jīng)過這兩百次的操作還未獲取到任務(wù),就會(huì)嘗試階段性掛起自身線程。
零 前期準(zhǔn)備 0 FBI WARNING
文章異常啰嗦且繞彎。
1 版本Disruptor 版本 : Disruptor 3.4.2
IDE : idea 2018.3
JDK 版本 : OpenJDK 11.0.1
2 Disruptor 簡(jiǎn)介高性能線程間消息隊(duì)列框架 Disruptor,是金融與游戲領(lǐng)域的常用開發(fā)組件之一,也是 java 日志框架和流處理框架底層的常用依賴。
3 DemoDisruptor 的 github 主頁有非常詳細(xì)的 quick start demo,本文依照此 demo 做追蹤的模板(做了很小的改動(dòng))。
另外,對(duì)于官方提供的 jdk8 lambda 簡(jiǎn)化版 demo 暫不做討論。
import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.util.DaemonThreadFactory; import java.nio.ByteBuffer; public class LongEventMain { //main 方法,啟動(dòng)入口 public static void main(String[] args) throws Exception { //在該框架中,所有的 task 的包裝類被稱為 Event,EventFactory 則是 Event 的生產(chǎn)者 LongEventFactory factory = new LongEventFactory(); //RingBuffer 的大小,數(shù)字為字節(jié)數(shù) //RingBuffer 是框架啟動(dòng)器內(nèi)部的緩存區(qū),用來存儲(chǔ) event 內(nèi)的 task 數(shù)據(jù) int bufferSize = 1024; //創(chuàng)建一個(gè) Disruptor 啟動(dòng)器,其中 DaemonThreadFactory 是一個(gè)線程工廠的實(shí)現(xiàn)類 Disruptor一 DaemonThreadFactorydisruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE); //該框架本質(zhì)上是 生產(chǎn)-消費(fèi) 設(shè)計(jì)模式的應(yīng)用。所有的消費(fèi)者被冠名為 handler //handleEventsWith(...) 方法會(huì)在啟動(dòng)器中注冊(cè) handler //此處的參數(shù)是不定數(shù)量的,可以有多個(gè)消費(fèi)者,每個(gè)消費(fèi)者都可以獲取 Event disruptor.handleEventsWith(new LongEventHandler("handler1"),new LongEventHandler("handler2")); //啟動(dòng)器開始執(zhí)行,并獲取其內(nèi)部的緩存區(qū) RingBuffer ringBuffer = disruptor.start(); //創(chuàng)建一個(gè)生產(chǎn)者,負(fù)責(zé)往緩存區(qū)內(nèi)寫入數(shù)據(jù) LongEventProducer producer = new LongEventProducer(ringBuffer); //官方 demo 中使用了 ByteBuffer 來方便操作,其實(shí)非必須 ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { //將變量 l 作為一個(gè) long 類型的數(shù)存入 ByteBuffer 中 bb.putLong(0, l); //將 ByteBuffer 傳入生產(chǎn)者的相關(guān)方法中,該方法會(huì)負(fù)責(zé)將 ByteBuffer 中的數(shù)據(jù)寫入 RingBuffer producer.onData(bb); //線程休眠 Thread.sleep(1000); } } } //Event 類,本質(zhì)上是數(shù)據(jù)的封裝,是生產(chǎn)者和消費(fèi)者之間進(jìn)行數(shù)據(jù)傳遞的介質(zhì) class LongEvent { private long value; public void set(long value) { this.value = value; } public long get() { return value; } } //Event 的生產(chǎn)工廠類,必須實(shí)現(xiàn) Disruptor 自帶的 EventFactory 接口 class LongEventFactory implements EventFactory { @Override public LongEvent newInstance() { return new LongEvent(); } } //消費(fèi)者,必須實(shí)現(xiàn) Disruptor 自帶的 EventHandler 接口 class LongEventHandler implements EventHandler { private String handlerName; public LongEventHandler(String handlerName){ this.handlerName = handlerName; } //此方法為最終的消費(fèi) Event 的方法 @Override public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println("Event " + handlerName + " : " + event.get()); } } //生產(chǎn)者,主要負(fù)責(zé)往 RingBuffer 中寫入數(shù)據(jù) //生產(chǎn)者類在框架中并非必須,但是一般情況下都會(huì)做一定程度的封裝 class LongEventProducer { private final RingBuffer ringBuffer; //生產(chǎn)者的構(gòu)造器負(fù)責(zé)獲取并存儲(chǔ)啟動(dòng)器中的 RingBuffer public LongEventProducer(RingBuffer ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer bb) { //sequence 是 RingBuffer 中的一個(gè)數(shù)據(jù)塊,類似于一個(gè)數(shù)據(jù)地址 long sequence = ringBuffer.next(); try { //用數(shù)據(jù)地址去獲取到一個(gè) Event 事件類實(shí)例 LongEvent event = ringBuffer.get(sequence); //在實(shí)例中存入 ByteBuffer 中的數(shù)據(jù) event.set(bb.getLong(0)); } finally { //發(fā)布該數(shù)據(jù)塊,此時(shí)消費(fèi)者們都可以看到該數(shù)據(jù)塊了,可以進(jìn)行消費(fèi) ringBuffer.publish(sequence); } } }
在開始正式追蹤代碼之前有必要先來理解 DaemonThreadFactory。這是 Disruptor 自身攜帶的線程工廠類:
public enum DaemonThreadFactory implements ThreadFactory{ //線程工廠使用枚舉實(shí)現(xiàn)單例模式 INSTANCE; @Override public Thread newThread(final Runnable r){ Thread t = new Thread(r); //此處創(chuàng)建的線程是守護(hù)線程 t.setDaemon(true); return t; } }二 Disruptor
本 part 主要追蹤 demo 中 Disruptor 相關(guān)的代碼:
Disruptor1 disruptor 的創(chuàng)建disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE); disruptor.handleEventsWith(new LongEventHandler("handler1"),new LongEventHandler("handler2")); RingBuffer ringBuffer = disruptor.start();
來看下方代碼:
Disruptordisruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
追蹤 Disruptor 的構(gòu)造器:
//step 1 //Disruptor.class public Disruptor(final EventFactoryeventFactory, final int ringBufferSize, final ThreadFactory threadFactory){ //RingBuffer.createMultiProducer(...) 方法會(huì)返回一個(gè) RingBuffer //BasicExecutor 是線程和線程工廠的封裝類 this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory)); } //step 2 //Disruptor.class private Disruptor(final RingBuffer ringBuffer, final Executor executor){ //存入 RingBuffer 和 Executor this.ringBuffer = ringBuffer; this.executor = executor; }
但是實(shí)際上 Disruptor 提供了很多的構(gòu)造器,其中還有一個(gè)較高配置權(quán)限的:
//Disruptor.class public Disruptor(final EventFactoryeventFactory,final int ringBufferSize, final ThreadFactory threadFactory,final ProducerType producerType, final WaitStrategy waitStrategy){ //解釋傳入的參數(shù): //eventFactory 是 Event 類的創(chuàng)建工廠 //ringBufferSize 是 RingBuffer 的字節(jié)數(shù)大小 //threadFactory 是線程工廠 //ProducerType 是生產(chǎn)者的類型,分為單生產(chǎn)者類型(single)和多生產(chǎn)者類型(multi),默認(rèn)為 multi //waitStrategy 是框架中的一個(gè)接口,表示等待策略,默認(rèn)為 BlockingWaitStrategy(阻塞等待),WaitStrategy 的可講內(nèi)容較多,在后頭開一個(gè)多帶帶 part this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),new BasicExecutor(threadFactory)); }
先來看 ProducerType:
public enum ProducerType{ SINGLE, MULTI }
僅僅只是個(gè)標(biāo)記,不多贅述。
1.1 BasicExecutorBasicExecutor 是 Executor 的實(shí)現(xiàn)類,其內(nèi)部維護(hù)著一個(gè)線程工廠和一個(gè)線程隊(duì)列,核心方法為 execute(...):
//BasicExecutor.class public void execute(Runnable command){ //使用線程工廠創(chuàng)建一個(gè)線程,此處的 factory 即為 DaemonThreadFactory final Thread thread = factory.newThread(command); //有效性驗(yàn)證 if (null == thread){ throw new RuntimeException("Failed to create thread to run: " + command); } //開啟線程 thread.start(); //threads 是一個(gè) ConcurrentLinkedQueue1.2 RingBuffer 的創(chuàng)建類型的變量,用來存儲(chǔ)線程 threads.add(thread); }
再來追蹤一下 RingBuffer 的創(chuàng)建:
//RingBuffer.class public staticRingBuffer create(ProducerType producerType,EventFactory factory, int bufferSize,WaitStrategy waitStrategy){ //此處根據(jù) ProducerType 進(jìn)行分發(fā)操作 switch (producerType){ case SINGLE: //創(chuàng)建單消費(fèi)者的 producer return createSingleProducer(factory, bufferSize, waitStrategy); case MULTI: //創(chuàng)建多消費(fèi)者的 producer return createMultiProducer(factory, bufferSize, waitStrategy); default: //拋出錯(cuò)誤 throw new IllegalStateException(producerType.toString()); } }
本質(zhì)上這兩種模式的 RingBuffer 的創(chuàng)建差距并不大,此處追蹤 createMultiProducer(...) 方法:
//step 1 //RingBuffer.class public static2 消費(fèi)者的注冊(cè)RingBuffer createMultiProducer(EventFactory factory,int bufferSize,WaitStrategy waitStrategy){ //MultiProducerSequencer 是 RingBuffer 中用來在生產(chǎn)者和消費(fèi)者之間傳遞數(shù)據(jù)的組件 //sequencer 是 RingBuffer 中的核心組件,是區(qū)別 SINGLE 和 MULTI 的關(guān)鍵,后文會(huì)繼續(xù)理解 MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy); //自身構(gòu)造器 return new RingBuffer (factory, sequencer); } //step 2 //RingBuffer.class RingBuffer(EventFactory eventFactory,Sequencer sequencer){ //調(diào)用父類 RingBufferFields 的構(gòu)造方法 super(eventFactory, sequencer); } //step 3 //RingBufferFields.class RingBufferFields(EventFactory eventFactory,Sequencer sequencer){ //此處為 MultiProducerSequencer this.sequencer = sequencer; //獲取使用者自定義的 bufferSize 并記錄下來 this.bufferSize = sequencer.getBufferSize(); //bufferSize 的有效性驗(yàn)證 if (bufferSize < 1){ throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1){ throw new IllegalArgumentException("bufferSize must be a power of 2"); } //根據(jù) bufferSize 確定序列號(hào)最大值,因?yàn)閺?0 開始所以要減一 this.indexMask = bufferSize - 1; //entries 是一個(gè) Object 數(shù)組,用于存放 Event //BUFFER_PAD 是對(duì)整個(gè)緩沖區(qū)的填充 this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; //fill(...) 方法會(huì)重新設(shè)置 entries fill(eventFactory); } //step 4 //RingBuffer.class private void fill(EventFactory eventFactory){ for (int i = 0; i < bufferSize; i++){ //遍歷數(shù)組進(jìn)行 Event 的填充 entries[BUFFER_PAD + i] = eventFactory.newInstance(); } }
來看下方代碼:
disruptor.handleEventsWith(new LongEventHandler("handler1"),new LongEventHandler("handler2"));
追蹤 handleEventsWith(...) 方法:
//step 1 //Disruptor.class public final EventHandlerGroup2.1 newBarrierhandleEventsWith(final EventHandler super T>... handlers){ //Sequence 可以看做是 long 型的封裝類 //此處的第一個(gè)參數(shù)是前置關(guān)卡,在處理 handler 之前會(huì)進(jìn)行處理的事件 //handlers 即為消費(fèi)者 return createEventProcessors(new Sequence[0], handlers); } //step 2 //Disruptor.class EventHandlerGroup createEventProcessors(final Sequence[] barrierSequences,final EventHandler super T>[] eventHandlers){ //Disruptor 中有一個(gè) AtomicBoolean 類型的變量 started //checkNotStarted() 會(huì)檢查該變量的值是否為 true,如果是的話就證明已經(jīng)啟動(dòng)了,則拋出異常 checkNotStarted(); //processorSequences 是每個(gè)消費(fèi)者對(duì)應(yīng)的執(zhí)行器的序列號(hào) final Sequence[] processorSequences = new Sequence[eventHandlers.length]; //此處返回的 barrier 可以看做是上文 MultiProducerSequencer 的封裝增強(qiáng) final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0,eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++){ final EventHandler super T> eventHandler = eventHandlers[i]; //batchEventProcessor 是存儲(chǔ)了消費(fèi)者和生產(chǎn)者的執(zhí)行器,實(shí)現(xiàn)了 Runnable 接口,內(nèi)部會(huì)不斷循環(huán)去接收并處理事件 final BatchEventProcessor batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler); //exceptionHandler 是用于處理錯(cuò)誤的消費(fèi)者組件 if (exceptionHandler != null){ batchEventProcessor.setExceptionHandler(exceptionHandler); } //consumerRepository 可以看做是消費(fèi)者的集合封裝 //consumerRepository 會(huì)將傳入的三個(gè)參數(shù)包裝成 EventProcessorInfo 并儲(chǔ)存在集合和 map 里 consumerRepository.add(batchEventProcessor, eventHandler, barrier); //記錄下消費(fèi)者對(duì)應(yīng)的執(zhí)行器的序列號(hào) processorSequences[i] = batchEventProcessor.getSequence(); } //處理一些前置事件,在本例中沒有前置事件存在 updateGatingSequencesForNextInChain(barrierSequences, processorSequences); return new EventHandlerGroup<>(this, consumerRepository, processorSequences); }
來追蹤一下 ringBuffer.newBarrier(...) 方法:
//step 1 //RingBuffer.class public SequenceBarrier newBarrier(Sequence... sequencesToTrack){ //在本例中,此處的 sequencesToTrack 是 Sequence[0] //此處的 sequencer 即為 MultiProducerSequencer return sequencer.newBarrier(sequencesToTrack); } //step 2 //AbstractSequencer.class public SequenceBarrier newBarrier(Sequence... sequencesToTrack){ //此方法被定義在 MultiProducerSequencer 的父類 AbstractSequencer 中 //cursor 是在 AbstractSequencer 中實(shí)例化的一個(gè) Sequence 類型對(duì)象,是 MultiProducerSequencer 的序列號(hào) return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack); } //step 3 //ProcessingSequenceBarrier.class ProcessingSequenceBarrier(final Sequencer sequencer,final WaitStrategy waitStrategy, final Sequence cursorSequence,final Sequence[] dependentSequences){ //即為 Disruptor 啟動(dòng)器中的 MultiProducerSequencer this.sequencer = sequencer; //即為 Disruptor 啟動(dòng)器中的阻塞策略 this.waitStrategy = waitStrategy; //上述方法的 cursor this.cursorSequence = cursorSequence; if (0 == dependentSequences.length){ //此處的 dependentSequences 是長(zhǎng)度是 0,所以此處 dependentSequence = cursorSequence; }else{ dependentSequence = new FixedSequenceGroup(dependentSequences); } }
需要注意的是,此處的 sequencer 已經(jīng)被抽象成了 SingleProducerSequencer 和 MultiProducerSequencer 的共同實(shí)現(xiàn)接口 Sequencer。
所以對(duì)于 SingleProducerSequencer 來說,這個(gè)流程也是沒有區(qū)別的。
2.2 updateGatingSequencesForNextInChain回到上述代碼:
//此處的 barrierSequences 是一個(gè) Sequence[0] 數(shù)組,processorSequences 是所有消費(fèi)者的序列號(hào)集合 updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
追蹤該方法的實(shí)現(xiàn):
//step 1 //Disruptor.class private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences){ //processorSequences.length 大于 0 意味著消費(fèi)者數(shù)量大于 0 if (processorSequences.length > 0){ ringBuffer.addGatingSequences(processorSequences); //barrierSequences 是前置事件的集合 //由于此處的 barrierSequences 是長(zhǎng)度為 0 的 Sequence 數(shù)組,即沒有前置事件,所以此處不會(huì)進(jìn)入循環(huán),忽略 for (final Sequence barrierSequence : barrierSequences){ ringBuffer.removeGatingSequence(barrierSequence); } //unMarkEventProcessorsAsEndOfChain(...) 方法也是處理 barrierSequences 的,忽略 consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); } }addGatingSequences
追蹤 ringBuffer.addGatingSequences(...) 方法:
//step 1 //RingBuffer.class public void addGatingSequences(Sequence... gatingSequences){ //sequencer 為 MultiProducerSequencer sequencer.addGatingSequences(gatingSequences); } //step 2 //AbstractSequencer.class public final void addGatingSequences(Sequence... gatingSequences){ //此處的 SEQUENCE_UPDATER 是一個(gè) AtomicReferenceFieldUpdaterImpl 類型的變量,用于 CAS 操作 gatingSequences SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences); } //step 3 //SequenceGroups.class static3 Disruptor 的啟動(dòng)void addSequences(final T holder,final AtomicReferenceFieldUpdater updater, final Cursored cursor,final Sequence... sequencesToAdd){ long cursorSequence; Sequence[] updatedSequences; Sequence[] currentSequences; do{ //此處的 holder 即為 MultiProducerSequencer,此處獲取其內(nèi)部的 gatingSequences 變量 currentSequences = updater.get(holder); //此處為 copyOf(...) 方法為 java.util.Arrays.copyOf(...) 方法,用于將 currentSequences 復(fù)制一份 updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length); //此處的 cursor 即為 MultiProducerSequencer,getCursor() 方法會(huì)獲取其的序列號(hào) cursorSequence = cursor.getCursor(); int index = currentSequences.length; //此處的 sequencesToAdd 是之前消費(fèi)者的序列號(hào)集合,更新 sequencesToAdd 中的每個(gè)序列號(hào)封裝 //將 MultiProducerSequencer 的序列號(hào)注冊(cè)進(jìn)去,并填充到新集合的后面一半中 for (Sequence sequence : sequencesToAdd){ sequence.set(cursorSequence); updatedSequences[index++] = sequence; } }while (!updater.compareAndSet(holder, currentSequences, updatedSequences)); //此處的 while 會(huì)死循環(huán) CAS 操作直到更新成功 //在此獲取 MultiProducerSequencer 的序列號(hào),更新到 sequencesToAdd 的每個(gè)序列號(hào)封裝類中 cursorSequence = cursor.getCursor(); for (Sequence sequence : sequencesToAdd){ sequence.set(cursorSequence); } }
來看下方代碼:
disruptor.start();
追蹤 start(...) 方法:
//Disruptor.class public RingBufferstart(){ //確認(rèn)該 Disruptor 沒有啟動(dòng) checkOnlyStartedOnce(); //此處的 consumerInfo 是 EventProcessorInfo 類型的變量 for (final ConsumerInfo consumerInfo : consumerRepository){ consumerInfo.start(executor); } return ringBuffer; }
先來看 checkOnlyStartedOnce() 方法:
//Disruptor.class private void checkOnlyStartedOnce(){ //如果在調(diào)用該 CAS 方法之前已經(jīng)為 true 了,會(huì)拋出錯(cuò)誤 //其實(shí)就是確保在調(diào)用該方法之前還處于未開啟的狀態(tài) if (!started.compareAndSet(false, true)){ throw new IllegalStateException("Disruptor.start() must only be called once."); } }
再來追蹤 EventProcessorInfo 的 start(...) 方法:
//EventProcessorInfo.class public void start(final Executor executor){ //此處的 executor 即為 BasicExecutor executor.execute(eventprocessor); }
所以本質(zhì)上 Disruptor 的啟動(dòng)就是開啟 BasicExecutor,借此啟動(dòng)線程。
3.1 BatchEventProcessor上述代碼中啟動(dòng)線程的時(shí)候會(huì)傳入 eventprocessor 對(duì)象作為 task 去啟動(dòng)消費(fèi)者。eventprocessor 對(duì)象本質(zhì)上是上文中提到過的 BatchEventProcessor。
BatchEventProcessor 能夠被傳入 execute(...) 方法,證明其實(shí)現(xiàn)了 Runnable 接口:
//step 1 //BatchEventProcessor.class @Override public void run(){ //running 是一個(gè)定義在 BatchEventProcessor 中的 AtomicInteger 類型的變量 //CAS 操作,先判斷 running 的值是否等于 IDLE,如果是的話就修改成 RUNNING,并返回 true //IDLE = 1,RUNNING = 2,皆為 int 類型的常量 if (running.compareAndSet(IDLE, RUNNING)){ //此處修改 sequenceBarrier 中 alert 變量的狀態(tài)值,清除掉中斷狀態(tài) sequenceBarrier.clearAlert(); //如果傳入的消費(fèi)者實(shí)現(xiàn)了 LifecycleAware 接口,就會(huì)在 notifyStart() 方法中去執(zhí)行相關(guān)方法 //LifecycleAware 中定義了 onStart() 和 onShutdown() 方法,會(huì)分別在消費(fèi)者真正執(zhí)行之前和關(guān)閉之前執(zhí)行一次 //執(zhí)行 LifecycleAware 的 onStart() 方法 notifyStart(); try{ //如果 running 是 RUNNING 狀態(tài),就會(huì)進(jìn)入死循環(huán) if (running.get() == RUNNING){ //核心方法 processEvents(); } }finally{ //執(zhí)行 LifecycleAware 的 onShutdown() 方法 notifyShutdown(); //切換 running 的狀態(tài)值 running.set(IDLE); } }else{ if (running.get() == RUNNING){ throw new IllegalStateException("Thread is already running"); }else{ earlyExit(); } } } //step 2 //BatchEventProcessor.class private void processEvents(){ T event = null; //此處的 sequence 記錄著當(dāng)前消費(fèi)者已經(jīng)處理過的事件的編號(hào),初始化的時(shí)候?yàn)?-1,所以 nextSequence 初始為 0,每次加一 //nextSequence 是當(dāng)前消費(fèi)者下一項(xiàng)準(zhǔn)備處理的事件的編號(hào) long nextSequence = sequence.get() + 1L; //死循環(huán) while (true){ try{ //當(dāng)沒有事件發(fā)生的時(shí)候,消費(fèi)者所在的線程會(huì)在此等待,具體的實(shí)現(xiàn)依照使用者設(shè)置的等待策略的不同而不同 //本例中使用的是 BlockingWaitStrategy,所以會(huì)在此阻塞直到出現(xiàn)了事件 //返回的 availableSequence 是最新的事件的編號(hào),在任務(wù)量較小的情況下和 nextSequence 數(shù)值相同,在任務(wù)量較大的情況下小于 nextSequence //等待策略留在后頭展開 final long availableSequence = sequenceBarrier.waitFor(nextSequence); if (batchStartAware != null){ batchStartAware.onBatchStart(availableSequence - nextSequence + 1); } //nextSequence 大于 availableSequence 的情況理論上不會(huì)出現(xiàn) while (nextSequence <= availableSequence){ //dataProvider 就是之前初始化的 RingBuffer,RingBuffer 在此處會(huì)去獲取當(dāng)前編號(hào)的 Event event = dataProvider.get(nextSequence); //onEvent(...) 是 EventHandler 接口定義的方法,是消費(fèi)者消費(fèi) Event 的最重要方法,方法體由使用者進(jìn)行定義 eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); //編號(hào)自增 nextSequence++; } //在消費(fèi)完當(dāng)前的所有事件之后,記錄下事件編號(hào) sequence.set(availableSequence); }catch (final TimeoutException e){ //如果消費(fèi)者實(shí)現(xiàn)了 TimeoutHandler 接口,就可以在這里處理超時(shí)問題 notifyTimeout(sequence.get()); }catch (final AlertException ex){ //running 的狀態(tài)值非 RUNNING,就會(huì)退出死循環(huán) if (running.get() != RUNNING){ break; } }catch (final Throwable ex){ //如果當(dāng)前的消費(fèi)者實(shí)現(xiàn)了 ExceptionHandler 接口的話,可以在此處進(jìn)行錯(cuò)誤處理 exceptionHandler.handleEventException(ex, nextSequence, event); sequence.set(nextSequence); nextSequence++; } } }3.2 WaitStrategy
回到上述代碼的以下這句:
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
追蹤一下 waitFor(...) 方法:
//ProcessingSequenceBarrier.class public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException{ //如果變量 alert 為 true 的話會(huì)拋出錯(cuò)誤 checkAlert(); //調(diào)用等待策略的相關(guān)方法 //返回最新的事件的編號(hào) long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); //如果當(dāng)前可用的最新事件編號(hào)小于傳入的 sequence,就直接返回可用編號(hào)即可 if (availableSequence < sequence){ return availableSequence; } //getHighestPublishedSequence(...) 方法會(huì)判斷最大的可用的事件編號(hào) return sequencer.getHighestPublishedSequence(sequence, availableSequence); }
等待策略的所有實(shí)現(xiàn)類都實(shí)現(xiàn)了 WaitStrategy 接口:
public interface WaitStrategy{ //休眠方法 long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException, TimeoutException; //喚醒方法 void signalAllWhenBlocking(); }
Disruptor 自帶的策略中,常用的有以下幾種:
阻塞策略 BlockingWaitStrategy:默認(rèn)策略,沒有獲取到任務(wù)的情況下線程會(huì)進(jìn)入等待狀態(tài)。cpu 消耗少,但是延遲高。 阻塞限時(shí)策略 TimeoutBlockingWaitStrategy:相對(duì)于BlockingWaitStrategy來說,設(shè)置了等待時(shí)間,超過后拋異常。 自旋策略 BusySpinWaitStrategy:線程一直自旋等待。cpu 占用高,延遲低. Yield 策略 YieldingWaitStrategy:嘗試自旋 100 次,然后調(diào)用 Thread.yield() 讓出 cpu。cpu 占用高,延遲低。 分段策略 SleepingWaitStrategy:嘗試自旋 100 此,然后調(diào)用 Thread.yield() 100 次,如果經(jīng)過這兩百次的操作還未獲取到任務(wù),就會(huì)嘗試階段性掛起自身線程。此種方式是對(duì) cpu 占用和延遲的一種平衡,性能不太穩(wěn)定。
還有幾種譬如 PhasedBackoffWaitStrategy 和 LiteBlockingWaitStrategy 等,不多介紹。
詳細(xì)看一下 BlockingWaitStrategy 的實(shí)現(xiàn):
public final class BlockingWaitStrategy implements WaitStrategy{ //重入鎖 private final Lock lock = new ReentrantLock(); //Condition 用來控制線程的休眠和喚醒 private final Condition processorNotifyCondition = lock.newCondition(); @Override public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException{ long availableSequence; if (cursorSequence.get() < sequence){ //上鎖 lock.lock(); try{ while (cursorSequence.get() < sequence){ //檢查線程是否中斷了,如果已經(jīng)中斷了就會(huì)拋出異常 barrier.checkAlert(); //休眠線程 processorNotifyCondition.await(); } }finally{ //解鎖 lock.unlock(); } } //生產(chǎn)者進(jìn)度小于消費(fèi)者的消費(fèi)進(jìn)度,此循環(huán)進(jìn)行等待 //正常情況下都會(huì)在上方阻塞,不會(huì)進(jìn)入該循環(huán) while ((availableSequence = dependentSequence.get()) < sequence){ barrier.checkAlert(); ThreadHints.onSpinWait(); } return availableSequence; } @Override public void signalAllWhenBlocking(){ lock.lock(); try{ //用 Condition 喚醒全部的線程 processorNotifyCondition.signalAll(); }finally{ lock.unlock(); } } //toString() 方法,忽略 @Override public String toString(){ return "BlockingWaitStrategy{" + "processorNotifyCondition=" + processorNotifyCondition + "}"; } }3.3 DataProvider
回到上述代碼的以下這句:
event = dataProvider.get(nextSequence);
dataProvider 是一個(gè) DataProvider 類型的變量。DataProvider 本質(zhì)上是一個(gè) Disruptor 內(nèi)的接口:
public interface DataProvider{ T get(long sequence); }
其存在唯一實(shí)現(xiàn)類 RingBuffer。所以 get(...) 方法也在 RingBuffer 中:
//step 1 //RingBuffer.class @Override public E get(long sequence){ //elementAt(...) 方法定義在 RingBuffer 的抽象父類 RingBufferFields 中 return elementAt(sequence); } //step 2 //RingBufferFields.class protected final E elementAt(long sequence){ //調(diào)用 UNSAFE 的相關(guān)方法,通過地址去直接獲取 //entries 在上文代碼中申請(qǐng)了一系列地址連續(xù)的內(nèi)存 //REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT) 是一個(gè)很巧妙的算法,結(jié)果永遠(yuǎn)只會(huì)在申請(qǐng)下來的內(nèi)存中循環(huán) return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); }
由此可知,Disruptor 中的所有的事件都非存儲(chǔ)在虛擬機(jī)中,而是儲(chǔ)存在虛擬機(jī)外,由 Unsafe 類直接調(diào)用。
Unsafe 具有 "調(diào)用內(nèi)存對(duì)象很快,但是申請(qǐng)內(nèi)存塊很慢" 的特性,所以也就可以解釋為什么在初始化的時(shí)候要一次性將儲(chǔ)存 Event 的數(shù)組進(jìn)行逐個(gè)初始化了(代碼在上述 1.2 小節(jié)的 step 4 中)。
有一個(gè)注意點(diǎn),entries 上的元素實(shí)際上是在 jvm 管轄范圍內(nèi)的,并不一定需要使用 Unsafe 去調(diào)用,這里只是為了更高的性能。
三 Event 的產(chǎn)生在開頭的 demo 中,可以看到 LongEventProducer 中有一個(gè)核心方法:
//LongEventProducer.class public void onData(ByteBuffer bb) { //sequence 是 RingBuffer 中的一個(gè)數(shù)據(jù)塊,類似于一個(gè)數(shù)據(jù)地址 long sequence = ringBuffer.next(); try { //用數(shù)據(jù)地址去獲取到一個(gè) Event 事件類實(shí)例 LongEvent event = ringBuffer.get(sequence); //在實(shí)例中存入 ByteBuffer 中的數(shù)據(jù) event.set(bb.getLong(0)); } finally { //發(fā)布該數(shù)據(jù)塊,此時(shí)消費(fèi)者們都可以看到該數(shù)據(jù)塊了,可以進(jìn)行消費(fèi) ringBuffer.publish(sequence); } }
這個(gè)方法內(nèi)通過調(diào)用 ringBuffer.next() 方法獲取數(shù)組內(nèi)對(duì)象的地址,然后通過 ringBuffer.get(...) 方法獲取對(duì)象。
在 finally 代碼塊中調(diào)用 ringBuffer.publish(...) 方法去發(fā)布該信息。
1 next回到上述代碼的以下這句:
long sequence = ringBuffer.next();
追蹤 next() 方法:
//step 1 //RingBuffer.class @Override public long next(){ //調(diào)用 RingBuffer 內(nèi)的 MultiProducerSequencer 的相關(guān)方法 return sequencer.next(); } //step 2 //MultiProducerSequencer.class @Override public long next(){ //調(diào)用自身的相關(guān)方法 return next(1); } //step 3 //MultiProducerSequencer.class @Override public long next(int n){ //參數(shù)有效性驗(yàn)證,此處 n = 1 if (n < 1){ throw new IllegalArgumentException("n must be > 0"); } long current; long next; //死循環(huán) do{ //current 是當(dāng)前最新的事件編號(hào) current = cursor.get(); //此處為 current + 1,用作下一個(gè)事件的編號(hào) next = current + n; //wrapPoint 是事件編號(hào)和數(shù)組大小的差 long wrapPoint = next - bufferSize; //gatingSequenceCache 的設(shè)計(jì)很巧妙,它是一個(gè) Sequence 類型的變量,可以看做是一個(gè) long 整數(shù) //gatingSequenceCache 的存在意義是每隔一段時(shí)間去檢查一次消費(fèi)者的處理進(jìn)度 //gatingSequenceCache 在每次檢查進(jìn)度的時(shí)候都會(huì)更新成 "當(dāng)前處理最慢的消費(fèi)者已經(jīng)處理完成的事件編號(hào)" //處理邏輯在下方 if 判斷中 long cachedGatingSequence = gatingSequenceCache.get(); //cachedGatingSequence > current 的情況就不會(huì)發(fā)生,因?yàn)?cachedGatingSequence 是消費(fèi)者處理進(jìn)度,current 是目前的事件總編號(hào),所以最多相等 //在消費(fèi)者算力充足的情況下,cachedGatingSequence 會(huì)和 current 相等 //wrapPoint > cachedGatingSequence 的情況,在極端情況下可能是因?yàn)樯a(chǎn)者的速度太快了,已經(jīng)遠(yuǎn)超過最慢的那個(gè)消費(fèi)者,超過了 "一圈"(即 bufferSize 的大小) //此處可以這么理解,由于 RingBuffer 內(nèi)數(shù)組的大小是有限的,如果事件生產(chǎn)的多了,就會(huì)覆蓋掉最開始的幾個(gè)事件 //但是如果消費(fèi)者的進(jìn)度沒有跟上,來不及消費(fèi)就被覆蓋了,就造成了 bug,此處即為抑制策略 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current){ //getMinimumSequence(...) 方法會(huì)獲取當(dāng)前處理事件最慢的那個(gè)消費(fèi)者的處理位置 long gatingSequence = Util.getMinimumSequence(gatingSequences, current); //wrapPoint - gatingSequence = next - bufferSize - gatingSequence >0 //即 next > bufferSize + gatingSequence,落后了 "一圈" if (wrapPoint > gatingSequence){ //線程掛起 1 納秒,然后跳過本次循環(huán)進(jìn)行下一次循環(huán) //此處會(huì)陷入死循環(huán),阻塞掉生產(chǎn)者,去等待消費(fèi)者的進(jìn)度 LockSupport.parkNanos(1); continue; } //跳出上述循環(huán)之后在這里更新 gatingSequenceCache 的值 gatingSequenceCache.set(gatingSequence); }else if (cursor.compareAndSet(current, next)){ //如果消費(fèi)者的進(jìn)度正常,那么會(huì)在此用 CAS 操作更新 cursor 的值,并且跳出 while 循環(huán) break; } }while (true); //返回 return next; }
在線程池(比如筆者比較了解的 ThreadPoolExecutor)的實(shí)現(xiàn)中,對(duì)于 task 過多,溢出等待隊(duì)列的情況,一般會(huì)有一種策略去應(yīng)對(duì)。在 ThreadPoolExecutor 中,默認(rèn)的策略為拋出錯(cuò)誤,直接終止程序。
在 Disruptor 中,其實(shí) RingBuffer 就類似一個(gè)等待隊(duì)列,溢出策略則是暫停 task 的產(chǎn)生,等待線程池去執(zhí)行。
【此處僅為類比,不能簡(jiǎn)單的把 Disruptor 想成是一個(gè)線程池】
2 publishringBuffer.publish(...) 是事件發(fā)布的核心方法:
//step 1 //RingBuffer.class @Override public void publish(long sequence){ sequencer.publish(sequence); } //step 2 //MultiProducerSequencer.class @Override public void publish(final long sequence){ //此處更新數(shù)據(jù) setAvailable(sequence); //此處調(diào)用等待策略的 signalAllWhenBlocking() 方法喚醒所有等待的線程 //具體實(shí)現(xiàn)依照 waitStrategy 的不同而不同 waitStrategy.signalAllWhenBlocking(); } //step 3 //MultiProducerSequencer.class private void setAvailable(final long sequence){ //calculateAvailabilityFlag(sequence) 可以簡(jiǎn)單理解為是計(jì)算出的圈數(shù),即 (sequence / bufferSize) //calculateIndex(sequence) 會(huì)計(jì)算出新的 sequence 對(duì)應(yīng)組中的哪一個(gè)位置,即 (sequence % bufferSize) setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence)); } //step 4 //MultiProducerSequencer.class private void setAvailableBufferValue(int index, int flag){ //SCALE 是本機(jī) Object[] 引用的大小,一般為 4 long bufferAddress = (index * SCALE) + BASE; //使用 Unsafe 更新元素 //availableBuffer 是一個(gè) int 數(shù)組,大小為 bufferSize,即和 entries 相同 //Unsafe.putOrderedInt(...) 會(huì)將 availableBuffer 的指定位置(bufferAddress)的元素修改成 flag UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag); }四 一點(diǎn)嘮叨
· Disruptor 的封裝很薄(比起 Netty、Spring 之類的重量級(jí)框架),調(diào)用鏈路都相對(duì)較短
· Disruptor 的環(huán)裝緩存區(qū)(RingBuffer)的很多概念還有待理解
· 對(duì)于筆者這樣的數(shù)學(xué)苦手來說看底層算法代碼略頭疼
· 僅為個(gè)人的學(xué)習(xí)筆記,可能存在錯(cuò)誤或者表述不清的地方,有緣補(bǔ)充
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/73243.html
摘要:純分享直接上干貨操作系統(tǒng)并發(fā)支持進(jìn)程管理內(nèi)存管理文件系統(tǒng)系統(tǒng)進(jìn)程間通信網(wǎng)絡(luò)通信阻塞隊(duì)列數(shù)組有界隊(duì)列鏈表無界隊(duì)列優(yōu)先級(jí)有限無界隊(duì)列延時(shí)無界隊(duì)列同步隊(duì)列隊(duì)列內(nèi)存模型線程通信機(jī)制內(nèi)存共享消息傳遞內(nèi)存模型順序一致性指令重排序原則內(nèi)存語義線程 純分享 , 直接上干貨! 操作系統(tǒng)并發(fā)支持 進(jìn)程管理內(nèi)存管...
摘要:結(jié)合之前的線程快照,我發(fā)現(xiàn)這個(gè)消費(fèi)線程也是處于狀態(tài),和后面的業(yè)務(wù)線程池一模一樣。本地模擬本地也是創(chuàng)建了一個(gè)單線程的線程池,分別執(zhí)行了兩個(gè)任務(wù)。發(fā)現(xiàn)當(dāng)任務(wù)中拋出一個(gè)沒有捕獲的異常時(shí),線程池中的線程就會(huì)處于狀態(tài),同時(shí)所有的堆棧都和生產(chǎn)相符。 showImg(https://segmentfault.com/img/remote/1460000018482477); 背景 事情(事故)是這樣...
摘要:我們知道是一個(gè)隊(duì)列,生產(chǎn)者往隊(duì)列里發(fā)布一項(xiàng)事件或稱之為消息也可以時(shí),消費(fèi)者能獲得通知如果沒有事件時(shí),消費(fèi)者被堵塞,直到生產(chǎn)者發(fā)布了新的事件。實(shí)戰(zhàn)本文先不具體去闡述的工作具體原理,只是簡(jiǎn)單地將與其整合。 什么是Disruptor 從功能上來看,Disruptor 是實(shí)現(xiàn)了隊(duì)列的功能,而且是一個(gè)有界隊(duì)列。那么它的應(yīng)用場(chǎng)景自然就是生產(chǎn)者-消費(fèi)者模型的應(yīng)用場(chǎng)合了。可以拿 JDK 的 Block...
摘要:發(fā)現(xiàn)這是的一個(gè)堆棧,前段時(shí)間正好解決過一個(gè)由于隊(duì)列引起的一次強(qiáng)如也發(fā)生內(nèi)存溢出沒想到又來一出。因此初步判斷為大量線程執(zhí)行函數(shù)之后互相競(jìng)爭(zhēng)導(dǎo)致使用率增高,而通過對(duì)堆棧發(fā)現(xiàn)是和使用有關(guān)。 showImg(https://segmentfault.com/img/remote/1460000017395756?w=1816&h=1080); 前言 到了年底果然都不太平,最近又收到了運(yùn)維報(bào)警:...
摘要:結(jié)合的日志發(fā)現(xiàn)就算是發(fā)生了老年代也已經(jīng)回收不了,內(nèi)存已經(jīng)到頂。定位由于生產(chǎn)上的內(nèi)存文件非常大,達(dá)到了幾十。也是由于我們的內(nèi)存設(shè)置太大有關(guān)。同時(shí)后臺(tái)也開始打印內(nèi)存溢出了,這樣便復(fù)現(xiàn)出問題。結(jié)果發(fā)現(xiàn)類型的對(duì)象占用了將近的內(nèi)存。 showImg(https://segmentfault.com/img/remote/1460000016186784?w=2048&h=1365); 前言 Ou...
閱讀 4397·2023-04-26 02:40
閱讀 2740·2023-04-26 02:31
閱讀 2822·2021-11-15 18:08
閱讀 639·2021-11-12 10:36
閱讀 1511·2021-09-30 09:57
閱讀 5349·2021-09-22 15:31
閱讀 2696·2019-08-30 14:17
閱讀 1349·2019-08-30 12:58