成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專欄INFORMATION COLUMN

高性能SPSC無鎖隊列設計之路

evin2016 / 3028人閱讀

摘要:當多線程修改互相獨立的變量時,如果這些變量共享同一個緩存行,就會無意中影響彼此的性能,這就是偽共享。

本文整理了Single Producer/Consumer lock free Queue step by step這篇文章里頭關于高性能的SPSC無鎖隊列使用遵循的幾個原則:

單寫原則

使用lazySet替代volatile set

使用位運算替代取模運算

避免偽共享

減少緩存一致性沖突

1.Single Writer Principle(單寫原則)

如果只有一個線程對資源進行寫操作,它實際上是比你想象的更容易,這個方案是可行的,無需CPU浪費管理資源爭奪或上下文切換。當然,如果有多個線程讀取相同的數據。CPU可以通過高速緩存一致性的子系統(tǒng)廣播只讀數據的拷貝到其他核。這雖然有成本的,但它的尺度非常好。
多個線程如果同時寫同一個資源,必有爭奪,就需要用鎖或樂觀鎖等堵塞方法,而非堵塞的單線程寫比多線程寫要快,能獲得高吞吐量和低延遲,特別是多核情況,一個線程一個CPU核,大大增加其他CPU核并行運行其他線程的概率。

Method Time (ms)
One Thread 300
One Thread with Memory Barrier 4,700
One Thread with CAS 5,700
Two Threads with CAS 18,000
One Thread with Lock 10,000
Two Threads with Lock 118,000

Disruptor分離了關注,真正實現單寫原則。(Disruptor的特點是將多線程生產者通過Ringbuffer變成單線程消費者,通過單線程消費者對共享資源進行寫操作)
目前 Node.js, Erlang, Actor 模式, SEDA 都采取了單寫解決方案,但是他們大多數使用基于隊列的下實現的,它打破多帶帶寫原則

2.使用lazySet替代volatile set

lazySet是使用Unsafe.putOrderedObject方法,會前置一個store-store barrier(在當前的硬件體系下或者是no-op或者非常輕),而不是store-load barrier。

store-load barrier較慢,總是用在volatile的寫操作上。在操作序列Store1; StoreStore;Store2中,Store1的數據會在Store2和后續(xù)寫操作之前對其它處理器可見。換句話說,就是保證了對其它數據可見的寫的順序。

如果只有一個線程寫我們就用不著store-load barrier,lazySet和volatile set在單寫原則下面是等價的。

這種性能提升是有代價的,雖然廉價,也就是寫后結果并不會被其他線程看到,甚至是自己的線程,通常是幾納秒后被其他線程看到,lazySet的寫在實踐上來延遲是納秒級,這個時間比較短,所以代價可以忍受。

類似Unsafe.putOrderedObject還有unsafe.putOrderedLong等方法,unsafe.putOrderedLong比使用 volatile long要快3倍左右。

3.使用位運算替代取模運算

比如這段

public boolean offer(final E e) {
        if (null == e) {
            throw new NullPointerException("Null is not a valid element");
        }

        final long currentTail = tail;
        final long wrapPoint = currentTail - buffer.length;
        if (head <= wrapPoint) {
            return false;
        }

        buffer[(int) (currentTail % buffer.length)] = e;
        tail = currentTail + 1;

        return true;
    }

使用位運算之后

mask = capacity - 1;
public boolean offer(final E e) {
        if (null == e) {
            throw new NullPointerException("Null is not a valid element");
        }

        final long currentTail = tail.get();
        final long wrapPoint = currentTail - buffer.length;
        if (head.get() <= wrapPoint) {
            return false;
        }

        buffer[(int) currentTail & mask] = e;
        tail.lazySet(currentTail + 1);

        return true;
    }
性能對比

x % 8 == x & (8 - 1) 但是位運算速度更快

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 20, time = 3, timeUnit = TimeUnit.SECONDS)
@Fork(1)
@State(Scope.Benchmark)
public class ModuloMaskTest {

    private static final int LENGTH = 16;
    int[] ints = new int[LENGTH];
    int mask = LENGTH - 1;
    int someIndex = 5;

    @Benchmark
    public int moduloLengthNoMask() {
        return someIndex % ints.length;
    }

    @Benchmark
    public int moduloLengthMask() {
        return someIndex & (ints.length - 1);
    }

    @Benchmark
    public int moduloConstantLengthNoMask() {
        return someIndex % LENGTH;
    }

    @Benchmark
    public int moduloMask() {
        return someIndex & mask;
    }

    @Benchmark
    public int consume() {
        return someIndex;
    }
    @Benchmark
    public void noop() {
    }

    public static void main(String[] args) throws RunnerException {
        Options opt = new OptionsBuilder()
                .include(".*" +ModuloMaskTest.class.getSimpleName()+ ".*")
                .forks(1)
                .build();
        new Runner(opt).run();
    }
}

結果如下:

# Run complete. Total time: 00:07:34

Benchmark                                  Mode  Cnt  Score   Error  Units
ModuloMaskTest.consume                     avgt   20  3.099 ± 0.152  ns/op
ModuloMaskTest.moduloConstantLengthNoMask  avgt   20  3.430 ± 0.509  ns/op
ModuloMaskTest.moduloLengthMask            avgt   20  3.505 ± 0.058  ns/op
ModuloMaskTest.moduloLengthNoMask          avgt   20  6.490 ± 0.143  ns/op
ModuloMaskTest.moduloMask                  avgt   20  3.304 ± 0.159  ns/op
ModuloMaskTest.noop                        avgt   20  0.404 ± 0.010  ns/op

可以發(fā)現%操作性能最差要6.x納秒,&操作基本在3ns左右

4.避免偽共享 L1 L2 L3 cache

當 CPU 執(zhí)行運算的時候,它先去 L1 查找所需的數據,再去 L2,然后是L3,最后如果這些緩存中都沒有,所需的數據就要去主內存拿。走得越遠,運算耗費的時間就越長。所以如果你在做一些很頻繁的事,你要確保數據在 L1 緩存中。

從CPU到 大約需要的CPU周期 大約需要的時間
主存 約60-80ns
QPI 總線傳輸(between sockets, not drawn) 約20ns
L3 cache 約40-45 cycles 約15ns
L2 cache 約10 cycles 約3ns
L1 cache 約3-4 cycles 約1ns
寄存器 1 cycle

可見CPU讀取主存中的數據會比從L1中讀取慢了近2個數量級。

定義

Cache是由很多個cache line組成的。每個cache line通常是64字節(jié),并且它有效地引用主內存中的一塊兒地址。一個Java的long類型變量是8字節(jié),因此在一個緩存行中可以存8個long類型的變量。
CPU每次從主存中拉取數據時,會把相鄰的數據也存入同一個cache line。
在訪問一個long數組的時候,如果數組中的一個值被加載到緩存中,它會自動加載另外7個。因此你能非常快的遍歷這個數組。事實上,你可以非??焖俚谋闅v在連續(xù)內存塊中分配的任意數據結構。這種無法充分使用緩存行特性的現象,稱為偽共享。

當多線程修改互相獨立的變量時,如果這些變量共享同一個緩存行,就會無意中影響彼此的性能,這就是偽共享。緩存行上的寫競爭是運行在SMP系統(tǒng)中并行線程實現可伸縮性最重要的限制因素。有人將偽共享描述成無聲的性能殺手。

圖1說明了偽共享的問題。在核心1上運行的線程想更新變量X,同時核心2上的線程想要更新變量Y。不幸的是,這兩個變量在同一個緩存行中。每個線程都要去競爭緩存行的所有權來更新變量。如果核心1獲得了所有權,緩存子系統(tǒng)將會使核心2中對應的緩存行失效。當核心2獲得了所有權然后執(zhí)行更新操作,核心1就要使自己對應的緩存行失效。這會來來回回的經過L3緩存,大大影響了性能。如果互相競爭的核心位于不同的插槽,就要額外橫跨插槽連接,問題可能更加嚴重。

解決

對于偽共享,一般的解決方案是,增大數組元素的間隔使得由不同線程存取的元素位于不同的緩存行上,以空間換時間。在jdk1.8中,有專門的注解@Contended來避免偽共享,更優(yōu)雅地解決問題。

@Contended
public class VolatileLong {
    public volatile long value = 0L;
}

public class FalseSharingJdk8 implements Runnable {
    public static int NUM_THREADS = 4; // change
    public final static long ITERATIONS = 500L * 1000L * 1000L;
    private final int arrayIndex;
    private static VolatileLong[] longs;

    public FalseSharingJdk8(final int arrayIndex) {
        this.arrayIndex = arrayIndex;
    }

    /**
     * -XX:-RestrictContended
     * –XX:+PrintFieldLayout  --- 只是在調試版jdk有效
     * @param args
     * @throws Exception
     */
    public static void main(final String[] args) throws Exception {
        Thread.sleep(10000);
        System.out.println("starting....");
        if (args.length == 1) {
            NUM_THREADS = Integer.parseInt(args[0]);
        }

        longs = new VolatileLong[NUM_THREADS];
        for (int i = 0; i < longs.length; i++) {
            longs[i] = new VolatileLong();
        }
        final long start = System.nanoTime();
        runTest();
        System.out.println("duration = " + (System.nanoTime() - start));
    }

    private static void runTest() throws InterruptedException {
        Thread[] threads = new Thread[NUM_THREADS];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(new FalseSharingJdk8(i));
        }
        for (Thread t : threads) {
            t.start();
        }
        for (Thread t : threads) {
            t.join();
        }
    }

    public void run() {
        long i = ITERATIONS + 1;
        while (0 != --i) {
            longs[arrayIndex].value = i;
        }
    }
}

沒有使用注解的話,需要自己去填充

public final static class ValuePadding {
        protected long p1, p2, p3, p4, p5, p6, p7;
        protected volatile long value = 0L;
        protected long p9, p10, p11, p12, p13, p14;
        protected long p15;
    }
5.減少緩存一致性沖突

只要系統(tǒng)只有一個CPU核在工作,一切都沒問題。如果有多個核,每個核又都有自己的緩存,那么我們就遇到問題了:如果某個CPU緩存段中對應的內存內容被另外一個CPU偷偷改了,會發(fā)生什么?
緩存一致性協(xié)議就是為了解決這個問題而設計的,使多組緩存的內容保持一致,即使用多組緩存,但使它們的行為看起來就像只有一組緩存那樣。

    private final AtomicLong tail = new AtomicLong(0);
    private final AtomicLong head = new AtomicLong(0);

    public static class PaddedLong {
        public long value = 0, p1, p2, p3, p4, p5, p6;
    }

    private final PaddedLong tailCache = new PaddedLong();
    private final PaddedLong headCache = new PaddedLong();

    public boolean offer(final E e) {
        if (null == e) {
            throw new NullPointerException("Null is not a valid element");
        }

        final long currentTail = tail.get();
        final long wrapPoint = currentTail - capacity;
        if (headCache.value <= wrapPoint) {
            headCache.value = head.get();
            if (headCache.value <= wrapPoint) {
                return false;
            }
        }

        buffer[(int) currentTail & mask] = e;
        tail.lazySet(currentTail + 1);

        return true;
    }

    public E poll() {
        final long currentHead = head.get();
        if (currentHead >= tailCache.value) {
            tailCache.value = tail.get();
            if (currentHead >= tailCache.value) {
                return null;
            }
        }

        final int index = (int) currentHead & mask;
        final E e = buffer[index];
        buffer[index] = null;
        head.lazySet(currentHead + 1);

        return e;
    }

對比沒有cache的版本

private final AtomicLong tail = new AtomicLong(0);
private final AtomicLong head = new AtomicLong(0);

public boolean offer(final E e) {
        if (null == e) {
            throw new NullPointerException("Null is not a valid element");
        }

        final long currentTail = tail.get();
        final long wrapPoint = currentTail - buffer.length;
        if (head.get() <= wrapPoint) {
            return false;
        }

        buffer[(int) currentTail & mask] = e;
        tail.lazySet(currentTail + 1);

        return true;
    }

    public E poll() {
        final long currentHead = head.get();
        if (currentHead >= tail.get()) {
            return null;
        }

        final int index = (int) currentHead & mask;
        final E e = buffer[index];
        buffer[index] = null;
        head.lazySet(currentHead + 1);

        return e;
    }

對比數據

0 - ops/sec=56,689,539 - OneToOneConcurrentArrayQueue2 result=777
1 - ops/sec=33,578,974 - OneToOneConcurrentArrayQueue2 result=777
2 - ops/sec=54,105,692 - OneToOneConcurrentArrayQueue2 result=777
3 - ops/sec=84,290,815 - OneToOneConcurrentArrayQueue2 result=777
4 - ops/sec=79,851,727 - OneToOneConcurrentArrayQueue2 result=777
-----
0 - ops/sec=110,506,679 - OneToOneConcurrentArrayQueue3 result=777
1 - ops/sec=117,252,276 - OneToOneConcurrentArrayQueue3 result=777
2 - ops/sec=115,639,936 - OneToOneConcurrentArrayQueue3 result=777
3 - ops/sec=116,555,884 - OneToOneConcurrentArrayQueue3 result=777
4 - ops/sec=115,712,336 - OneToOneConcurrentArrayQueue3 result=777

整體上有一定的提升。

doc

Single Producer/Consumer lock free Queue step by step

Single Writer Principle

Atomic*.lazySet is a performance win for single writers

多帶帶寫原則Single Writer Principle

Java性能優(yōu)化要點之五: 隊列與lazySet

學習一下Disruptor

The Mythical Modulo Mask

偽共享和緩存行填充,從Java 6, Java 7 到Java 8

Java8的偽共享和緩存行填充--@Contended注釋

Diving Deeper into Cache Coherency

緩存一致性(Cache Coherency)入門

偽共享(False Sharing)

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.hztianpu.com/yun/70363.html

相關文章

  • JCTools簡介

    摘要:序是一款對并發(fā)數據結構進行增強的并發(fā)工具,主要提供了以及的增強數據結構。只有幾個方法對比主要是操作之后沒有立即返回是對的增強,對多的支持以及高并發(fā)更新提供更好的性能。是對的簡單包裝以支持的接口。 序 JCTools是一款對jdk并發(fā)數據結構進行增強的并發(fā)工具,主要提供了map以及queue的增強數據結構。原來netty還是自己寫的MpscLinkedQueueNode,后來新版本就換成...

    YJNldm 評論0 收藏0
  • 十.Go并發(fā)編程--channel使用

    摘要:比如主協(xié)程啟動個子協(xié)程,主協(xié)程等待所有子協(xié)程退出后再繼續(xù)后續(xù)流程,這種場景下也可輕易實現。這個例子中,父協(xié)程僅僅是等待子協(xié)程結束,其實父協(xié)程也可以向管道中寫入數據通知子協(xié)程結束,這時子協(xié)程需要定期地探測管道中是否有消息出現。一.設計原理Go 語言中最常見的、也是經常被人提及的設計模式就是:不要通過共享內存來通信,我們應該使用通信來共享內存通過共享內存來通信是直接讀取內存的數據,而通過通信來共...

    supernavy 評論0 收藏0
  • 【推薦】最新200篇:技術文章整理

    摘要:作為面試官,我是如何甄別應聘者的包裝程度語言和等其他語言的對比分析和主從復制的原理詳解和持久化的原理是什么面試中經常被問到的持久化與恢復實現故障恢復自動化詳解哨兵技術查漏補缺最易錯過的技術要點大掃盲意外宕機不難解決,但你真的懂數據恢復嗎每秒 作為面試官,我是如何甄別應聘者的包裝程度Go語言和Java、python等其他語言的對比分析 Redis和MySQL Redis:主從復制的原理詳...

    BicycleWarrior 評論0 收藏0

發(fā)表評論

0條評論

evin2016

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<