成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專欄INFORMATION COLUMN

Java 并發(fā)方案全面學(xué)習(xí)總結(jié)

mengera88 / 1723人閱讀

摘要:進程線程與協(xié)程它們都是并行機制的解決方案。選擇是任意性的,并在對實現(xiàn)做出決定時發(fā)生。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執(zhí)行異常而結(jié)束,那么線程池會補充一個新線程。此線程池支持定時以及周期性執(zhí)行任務(wù)的需求。

并發(fā)與并行的概念

并發(fā)(Concurrency): 問題域中的概念—— 程序需要被設(shè)計成能夠處理多個同時(或者幾乎同時)發(fā)生的事件

并行(Parallelism): 方法域中的概念——通過將問題中的多個部分 并行執(zhí)行,來加速解決問題。

進程、線程與協(xié)程

它們都是并行機制的解決方案。

進程: 進程是什么呢?直白地講,進程就是應(yīng)用程序的啟動實例。比如我們運行一個游戲,打開一個軟件,就是開啟了一個進程。進程擁有代碼和打開的文件資源、數(shù)據(jù)資源、獨立的內(nèi)存空間。啟動一個進程非常消耗資源,一般一臺機器最多啟動數(shù)百個進程。

線程: 線程從屬于進程,是程序的實際執(zhí)行者。一個進程至少包含一個主線程,也可以有更多的子線程。線程擁有自己的棧空間。在進程內(nèi)啟動線程也要消耗一定的資源,一般一個進程最多啟動數(shù)千個線程。操作系統(tǒng)能夠調(diào)度的最小單位就是線程了。

協(xié)程: 協(xié)程又從屬于線程,它不屬于操作系統(tǒng)管轄,完全由程序控制,一個線程內(nèi)可以啟動數(shù)萬甚至數(shù)百萬協(xié)程。但也正是因為它由程序控制,它對編寫代碼的風(fēng)格改變也最多。

Java的并行執(zhí)行實現(xiàn) JVM中的線程

主線程: 獨立生命周期的線程

守護線程: 被主線程創(chuàng)建,隨著創(chuàng)建線程結(jié)束而結(jié)束

線程狀態(tài)

要注意的是,線程不是調(diào)用start之后馬上進入運行中的狀態(tài),而是在"可運行"狀態(tài),由操作系統(tǒng)來決定調(diào)度哪個線程來運行。

Jetty中的線程

Web服務(wù)器都有自己管理的線程池, 比如輕量級的Jetty, 就有以下三種類型的線程:

Acceptor

Selector

Worker

最原始的多線程——Thread類 繼承類 vs 實現(xiàn)接口

繼承Thread類

實現(xiàn)Runnable接口

實際使用中顯然實現(xiàn)接口更好, 避免了單繼承限制。

Runnable vs Callable

Runnable:實現(xiàn)run方法,無法拋出受檢查的異常,運行時異常會中斷主線程,但主線程無法捕獲,所以子線程應(yīng)該自己處理所有異常

Callable:實現(xiàn)call方法,可以拋出受檢查的異常,可以被主線程捕獲,但主線程無法捕獲運行時異常,也不會被打斷。

需要返回值的話,就用Callable接口
一個實現(xiàn)了Callable接口的對象,需要被包裝為RunnableFuture對象, 然后才能被新線程執(zhí)行, 而RunnableFuture其實還是實現(xiàn)了Runnable接口。

Future, Runnable 和FutureTask的關(guān)系如下:

可以看出FutureTask其實是RunnableFuture接口的實現(xiàn)類,下面是使用Future的示例代碼

public class Callee implements Callable {
    AtomicInteger counter = new AtomicInteger(0);

    private Integer seq=null;

    public Callee()
    {
        super();
    }

    public  Callee(int seq)
    {
        this.seq = seq;
    }

    /**
     * call接口可以拋出受檢查的異常
     * @return
     * @throws InterruptedException
     */
    @Override
    public Person call() throws InterruptedException {
        Person p = new Person("person"+ counter.incrementAndGet(), RandomUtil.random(0,150));
        System.out.println("In thread("+seq+"), create a Person: "+p.toString());
        Thread.sleep(1000);
        return  p;
    }
}
Callee callee1 = new Callee();
FutureTask ft= new FutureTask(callee1);
Thread thread = new Thread(ft);
thread.start();

try {
    thread.join();
} catch (InterruptedException e) {
    e.printStackTrace();
    return;
}

System.out.println("ft.isDone: "+ft.isDone());

Person result1;
try {
    result1 = ((Future) ft).get();
} catch (InterruptedException e) {
    e.printStackTrace();
    result1 = null;
} catch (ExecutionException e) {
    e.printStackTrace();
    result1 = null;
}
Person result = result1;
System.out.println("main thread get result: "+result.toString());
線程調(diào)度

Thread.yield() 方法:調(diào)用這個方法,會讓當(dāng)前線程退回到可運行狀態(tài),而不是阻塞狀態(tài),這樣就留給其他同級線程一些運行機會

Thread.sleep(long millis):調(diào)用這個方法,真的會讓當(dāng)前線程進入阻塞狀態(tài),直到時間結(jié)束

線程對象的join():這個方法讓當(dāng)前線程進入阻塞狀態(tài),直到要等待的線程結(jié)束。

線程對象的interrupt():不要以為它是中斷某個線程!它只是線線程發(fā)送一個中斷信號,讓線程在無限等待時(如死鎖時)能拋出異常,從而結(jié)束線程,但是如果你吃掉了這個異常,那么這個線程還是不會中斷的!

Object類中的wait():線程進入等待狀態(tài),直到其他線程調(diào)用此對象的 notify() 方法或 notifyAll() 喚醒方法。這個狀態(tài)跟加鎖有關(guān),所以是Object的方法。

Object類中的notify():喚醒在此對象監(jiān)視器上等待的單個線程。如果所有線程都在此對象上等待,則會選擇喚醒其中一個線程。選擇是任意性的,并在對實現(xiàn)做出決定時發(fā)生。線程通過調(diào)用其中一個 wait 方法,在對象的監(jiān)視器上等待。 直到當(dāng)前的線程放棄此對象上的鎖定,才能繼續(xù)執(zhí)行被喚醒的線程。被喚醒的線程將以常規(guī)方式與在該對象上主動同步的其他所有線程進行競爭;類似的方法還有一個notifyAll(),喚醒在此對象監(jiān)視器上等待的所有線程。

同步與鎖 內(nèi)存一致性錯誤

由于線程在并行時,可能會"同時"訪問一個變量, 所以共享變量的時候,會出現(xiàn)值處于一個不確定的狀況, 例如下面的代碼, c是一個實例變量, 多個線程同時訪問increment或decrement方法時,就可能出現(xiàn)一致性錯誤,最終讓c變成"奇怪"的值。

public class Counter {
    private int c = 0;

    public void increment() {
        c++;
    }

    public void decrement() {
        c--;
    }

    public int value() {
        return c;
    }
}
volatile
public class Foo {
    private int x = -1;
    private volatile boolean v = false;
    public void setX(int x) {
        this.x = x;
        v = true;
    }
    public int getX() {
        if (v == true) {
            return x;
        }
        return 0;
    }
}

volatile關(guān)鍵字實際上指定了變量不使用寄存器, 并且對變量的訪問不會亂序執(zhí)行,從而避免了并行訪問的不一致問題。但這個方案僅僅對原始類型變量本身生效,如果是++或者--這種“非原子”操作,則不能保證多線程操作的正確性了

原子類型

JDK提供了一系列對基本類型的封裝,形成原子類型(Atomic Variables),特別適合用來做計數(shù)器

import java.util.concurrent.atomic.AtomicInteger;

class AtomicCounter {
    private AtomicInteger c = new AtomicInteger(0);

    public void increment() {
        c.incrementAndGet();
    }

    public void decrement() {
        c.decrementAndGet();
    }

    public int value() {
        return c.get();
    }
}

原子操作的實現(xiàn)原理,在Java8之前和之后不同

Java7

public final int getAndIncrement() {
    for (;;) {
        int current = get();
        int next = current + 1;
        if (compareAndSet(current, next))
            return current;
    }
}

Java8

public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}

至于Compare-and-Swap,以及Fetch-and-Add兩種算法,是依賴機器底層機制實現(xiàn)的。

線程安全的集合類

BlockingQueue: 定義了一個先進先出的數(shù)據(jù)結(jié)構(gòu),當(dāng)你嘗試往滿隊列中添加元素,或者從空隊列中獲取元素時,將會阻塞或者超時

ConcurrentMap: 是 java.util.Map 的子接口,定義了一些有用的原子操作。移除或者替換鍵值對的操作只有當(dāng) key 存在時才能進行,而新增操作只有當(dāng) key 不存在時。使這些操作原子化,可以避免同步。ConcurrentMap 的標(biāo)準(zhǔn)實現(xiàn)是 ConcurrentHashMap,它是 HashMap 的并發(fā)模式。

ConcurrentNavigableMap: 是 ConcurrentMap 的子接口,支持近似匹配。ConcurrentNavigableMap 的標(biāo)準(zhǔn)實現(xiàn)是 ConcurrentSkipListMap,它是 TreeMap 的并發(fā)模式。

ThreadLocal-只有本線程才能訪問的變量

ThreadLoal 變量,它的基本原理是,同一個 ThreadLocal 所包含的對象(對ThreadLocal< String >而言即為 String 類型變量),在不同的 Thread 中有不同的副本(實際是不同的實例,后文會詳細闡述)。這里有幾點需要注意

因為每個 Thread 內(nèi)有自己的實例副本,且該副本只能由當(dāng)前 Thread 使用。這是也是 ThreadLocal 命名的由來
既然每個 Thread 有自己的實例副本,且其它 Thread 不可訪問,那就不存在多線程間共享的問題。

它與普通變量的區(qū)別在于,每個使用該變量的線程都會初始化一個完全獨立的實例副本。ThreadLocal 變量通常被private static修飾。當(dāng)一個線程結(jié)束時,它所使用的所有 ThreadLocal 相對的實例副本都可被回收。

總的來說,ThreadLocal 適用于每個線程需要自己獨立的實例且該實例需要在多個方法中被使用,也即變量在線程間隔離而在方法或類間共享的場景。后文會通過實例詳細闡述該觀點。另外,該場景下,并非必須使用 ThreadLocal ,其它方式完全可以實現(xiàn)同樣的效果,只是 ThreadLocal 使得實現(xiàn)更簡潔。

synchronized關(guān)鍵字

方法加鎖:其實不是加在指定的方法上,而是在指定的對象上,只不過在方法開始前會檢查這個鎖

靜態(tài)方法鎖:加在類上,它和加在對象上的鎖互補干擾

代碼區(qū)塊鎖:其實不是加在指定的代碼塊上,而是加在指定的對象上,只不過在代碼塊開始前會檢查這個鎖。一個對象只會有一個鎖,所以代碼塊鎖和實例方法鎖是會互相影響的

需要注意的是:無論synchronized關(guān)鍵字加在方法上還是對象上,它取得的鎖都是對象,而不是把一段代碼或函數(shù)當(dāng)作鎖――而且同步方法很可能還會被其他線程的對象訪問,每個對象只有一個鎖(lock)與之相關(guān)聯(lián)

加鎖不慎可能會造成死鎖

線程池(Java 5) 用途

真正的多線程使用,是從線程池開始的,Callable接口,基本上也是被線程池調(diào)用的。

線程池全景圖


線程池的使用
        ExecutorService pool = Executors.newFixedThreadPool(3);

        Callable worker1 = new Callee();
        Future ft1 = pool.submit(worker1);

        Callable worker2 = new Callee();
        Future ft2 = pool.submit(worker2);

        Callable worker3 = new Callee();
        Future ft3 = pool.submit(worker3);

        System.out.println("準(zhǔn)備通知線程池shutdown...");
        pool.shutdown();
        System.out.println("已通知線程池shutdown");
        try {
            pool.awaitTermination(2L, TimeUnit.SECONDS);
            System.out.println("線程池完全結(jié)束");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
線程池要解決的問題

任務(wù)排隊:當(dāng)前能并發(fā)執(zhí)行的線程數(shù)總是有限的,但任務(wù)數(shù)可以很大

線程調(diào)度:線程的創(chuàng)建是比較消耗資源的,需要一個池來維持活躍線程

結(jié)果收集:每個任務(wù)完成以后,其結(jié)果需要統(tǒng)一采集

線程池類型

newSingleThreadExecutor:創(chuàng)建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當(dāng)于單線程串行執(zhí)行所有任務(wù)。如果這個唯一的線程因為異常結(jié)束,那么會有一個新的線程來替代它。此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行。

newFixedThreadPool:創(chuàng)建固定大小的線程池。每次提交一個任務(wù)就創(chuàng)建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執(zhí)行異常而結(jié)束,那么線程池會補充一個新線程。

newCachedThreadPool:創(chuàng)建一個可緩存的線程池。如果線程池的大小超過了處理任務(wù)所需要的線程,那么就會回收部分空閑(60秒不執(zhí)行任務(wù))的線程,當(dāng)任務(wù)數(shù)增加時,此線程池又可以智能的添加新線程來處理任務(wù)。此線程池不會對線程池大小做限制,線程池大小完全依賴于操作系統(tǒng)(或者說JVM)能夠創(chuàng)建的最大線程大小。

newScheduledThreadPool:創(chuàng)建一個大小無限的線程池。此線程池支持定時以及周期性執(zhí)行任務(wù)的需求。

newSingleThreadScheduledExecutor:創(chuàng)建一個單線程的線程池。此線程池支持定時以及周期性執(zhí)行任務(wù)的需求。

線程池狀態(tài)

線程池在構(gòu)造前(new操作)是初始狀態(tài),一旦構(gòu)造完成線程池就進入了執(zhí)行狀態(tài)RUNNING。嚴(yán)格意義上講線程池構(gòu)造完成后并沒有線程被立即啟動,只有進行“預(yù)啟動”或者接收到任務(wù)的時候才會啟動線程。這個會后面線程池的原理會詳細分析。但是線程池是出于運行狀態(tài),隨時準(zhǔn)備接受任務(wù)來執(zhí)行。

線程池運行中可以通過shutdown()和shutdownNow()來改變運行狀態(tài)。shutdown()是一個平緩的關(guān)閉過程,線程池停止接受新的任務(wù),同時等待已經(jīng)提交的任務(wù)執(zhí)行完畢,包括那些進入隊列還沒有開始的任務(wù),這時候線程池處于SHUTDOWN狀態(tài);shutdownNow()是一個立即關(guān)閉過程,線程池停止接受新的任務(wù),同時線程池取消所有執(zhí)行的任務(wù)和已經(jīng)進入隊列但是還沒有執(zhí)行的任務(wù),這時候線程池處于STOP狀態(tài)。

一旦shutdown()或者shutdownNow()執(zhí)行完畢,線程池就進入TERMINATED狀態(tài),此時線程池就結(jié)束了。

isTerminating()描述的是SHUTDOWN和STOP兩種狀態(tài)。

isShutdown()描述的是非RUNNING狀態(tài),也就是SHUTDOWN/STOP/TERMINATED三種狀態(tài)。

任務(wù)拒絕策略

Fork/Join模型(Java7) 用途

計算密集型的任務(wù),最好很少有IO等待,也沒有Sleep之類的,最好是本身就適合遞歸處理的算法

分析

在給定的線程數(shù)內(nèi),盡可能地最大化利用CPU資源,但又不會導(dǎo)致其他資源過載(比如內(nèi)存),或者大量空線程等待。

ForkJoinPool主要用來使用分治法(Divide-and-Conquer Algorithm)來解決問題。典型的應(yīng)用比如快速排序算法。

這里的要點在于,F(xiàn)orkJoinPool需要使用相對少的線程來處理大量的任務(wù)。

比如要對1000萬個數(shù)據(jù)進行排序,那么會將這個任務(wù)分割成兩個500萬的排序任務(wù)和一個針對這兩組500萬數(shù)據(jù)的合并任務(wù)。以此類推,對于500萬的數(shù)據(jù)也會做出同樣的分割處理,到最后會設(shè)置一個閾值來規(guī)定當(dāng)數(shù)據(jù)規(guī)模到多少時,停止這樣的分割處理。比如,當(dāng)元素的數(shù)量小于10時,會停止分割,轉(zhuǎn)而使用插入排序?qū)λ鼈冞M行排序。

那么到最后,所有的任務(wù)加起來會有大概2000000+個。問題的關(guān)鍵在于,對于一個任務(wù)而言,只有當(dāng)它所有的子任務(wù)完成之后,它才能夠被執(zhí)行。

所以當(dāng)使用ThreadPoolExecutor時,使用分治法會存在問題,因為ThreadPoolExecutor中的線程無法像任務(wù)隊列中再添加一個任務(wù)并且在等待該任務(wù)完成之后再繼續(xù)執(zhí)行。而使用ForkJoinPool時,就能夠讓其中的線程創(chuàng)建新的任務(wù),并掛起當(dāng)前的任務(wù),此時線程就能夠從隊列中選擇子任務(wù)執(zhí)行。

以上程序的關(guān)鍵是fork()和join()方法。在ForkJoinPool使用的線程中,會使用一個內(nèi)部隊列來對需要執(zhí)行的任務(wù)以及子任務(wù)進行操作來保證它們的執(zhí)行順序。

那么使用ThreadPoolExecutor或者ForkJoinPool,會有什么性能的差異呢?

首先,使用ForkJoinPool能夠使用數(shù)量有限的線程來完成非常多的具有父子關(guān)系的任務(wù),比如使用4個線程來完成超過200萬個任務(wù)。但是,使用ThreadPoolExecutor時,是不可能完成的,因為ThreadPoolExecutor中的Thread無法選擇優(yōu)先執(zhí)行子任務(wù),需要完成200萬個具有父子關(guān)系的任務(wù)時,也需要200萬個線程,顯然這是不可行的。

ps:ForkJoinPool在執(zhí)行過程中,會創(chuàng)建大量的子任務(wù),導(dǎo)致GC進行垃圾回收,這些是需要注意的。

原理與使用

ForkJoinPool首先是ExecutorService的實現(xiàn)類,因此是特殊的線程池。

創(chuàng)建了ForkJoinPool實例之后,就可以調(diào)用ForkJoinPool的submit(ForkJoinTask task) 或invoke(ForkJoinTask task)方法來執(zhí)行指定任務(wù)了。

其中ForkJoinTask代表一個可以并行、合并的任務(wù)。ForkJoinTask是一個抽象類,它還有兩個抽象子類:RecusiveAction和RecusiveTask。其中RecusiveTask代表有返回值的任務(wù),而RecusiveAction代表沒有返回值的任務(wù)。

個人認(rèn)為ForkJoinPool設(shè)計不太好的地方在于,F(xiàn)orkJoinTask不是個接口,而是抽象類,實際使用時基本上不是繼承RecursiveAction就是繼承RecursiveTask,對業(yè)務(wù)類有限制。

示例

典型的一個例子,就是一串?dāng)?shù)組求和

public interface Calculator {
    long sumUp(long[] numbers);
}
public class ForkJoinCalculator implements Calculator {
    private ForkJoinPool pool;

    private static class SumTask extends RecursiveTask {
        private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }

        @Override
        protected Long compute() {
            // 當(dāng)需要計算的數(shù)字小于6時,直接計算結(jié)果
            if (to - from < 6) {
                long total = 0;
                for (int i = from; i <= to; i++) {
                    total += numbers[i];
                }
                return total;
            // 否則,把任務(wù)一分為二,遞歸計算
            } else {
                int middle = (from + to) / 2;
                SumTask taskLeft = new SumTask(numbers, from, middle);
                SumTask taskRight = new SumTask(numbers, middle+1, to);
                taskLeft.fork();
                taskRight.fork();
                return taskLeft.join() + taskRight.join();
            }
        }
    }

    public ForkJoinCalculator() {
        // 也可以使用公用的 ForkJoinPool:
        // pool = ForkJoinPool.commonPool()
        pool = new ForkJoinPool();
    }

    @Override
    public long sumUp(long[] numbers) {
        return pool.invoke(new SumTask(numbers, 0, numbers.length-1));
    }
}

這個例子展示了當(dāng)數(shù)組被拆分得足夠?。?6)之后,就不需要并行處理了,而更大的數(shù)組就拆為兩半,分別處理。

Stream(Java 8) 概念

別搞混了,跟IO的Stream完全不是一回事,可以把它看做是集合處理的聲明式語法,類似數(shù)據(jù)庫操作語言SQL。當(dāng)然也有跟IO類似的地方,就是Stream只能消費一次,不能重復(fù)使用。

看個例子:

int sum = widgets.stream()
.filter(w -> w.getColor() == RED)
 .mapToInt(w -> w.getWeight())
 .sum();

流提供了一個能力,任何一個流,只要獲取一次并行流,后面的操作就都可以并行了。
例如:

Stream stream = Stream.of("a", "b", "c","d","e","f","g");
String str = stream.parallel().reduce((a, b) -> a + "," + b).get();
System.out.println(str);
流操作

生成流

Collection.stream()

Collection.parallelStream()

Arrays.stream(T array) or Stream.of()

java.io.BufferedReader.lines()

java.util.stream.IntStream.range()

java.nio.file.Files.walk()

java.util.Spliterator

Random.ints()

BitSet.stream()

Pattern.splitAsStream(java.lang.CharSequence)

JarFile.stream()

示例

// 1. Individual values
Stream stream = Stream.of("a", "b", "c");
// 2. Arrays
String [] strArray = new String[] {"a", "b", "c"};
stream = Stream.of(strArray);
stream = Arrays.stream(strArray);
// 3. Collections
List list = Arrays.asList(strArray);
stream = list.stream();

需要注意的是,對于基本數(shù)值型,目前有三種對應(yīng)的包裝類型 Stream:

IntStream、LongStream、DoubleStream。當(dāng)然我們也可以用 Stream、Stream >、Stream,但是 boxing 和 unboxing 會很耗時,所以特別為這三種基本數(shù)值型提供了對應(yīng)的 Stream。

Intermediate

一個流可以后面跟隨零個或多個 intermediate 操作。其目的主要是打開流,做出某種程度的數(shù)據(jù)映射/過濾,然后返回一個新的流,交給下一個操作使用。這類操作都是惰性化的(lazy),就是說,僅僅調(diào)用到這類方法,并沒有真正開始流的遍歷。

已知的Intermediate操作包括:map (mapToInt, flatMap 等)、 filter、 distinct、 sorted、 peek、 limit、 skip、 parallel、 sequential、 unordered。

Terminal

一個流只能有一個 terminal操作,當(dāng)這個操作執(zhí)行后,流就被使用“光”了,無法再被操作。所以這必定是流的最后一個操作。Terminal 操作的執(zhí)行,才會真正開始流的遍歷,并且會生成一個結(jié)果,或者一個 side effect。

已知的Terminal操作包括:forEach、 forEachOrdered、 toArray、 reduce、 collect、 min、 max、 count、 anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 iterator

reduce解析: reduce本質(zhì)上是個聚合方法,它的作用是用流里面的元素生成一個結(jié)果,所以用來做累加,字符串拼接之類的都非常合適。它有三個參數(shù)

初始值:最終結(jié)果的初始化值,可以是一個空的對象

聚合函數(shù):一個二元函數(shù)(有兩個參數(shù)),第一個參數(shù)是上一次聚合的結(jié)果,第二個參數(shù)是某個元素

多個部分結(jié)果的合并函數(shù):如果流并發(fā)了,那么聚合操作會分為多段進行,這里顯示了多段之間如何配合

collect: collect比reduce更強大:reduce最終只能得到一個跟流里數(shù)據(jù)類型相同的值, 但collect的結(jié)果可以是任何對象。簡單的collect也有三個參數(shù):

最終要返回的數(shù)據(jù)容器

把元素并入返回值的方法

多個部分結(jié)果的合并

兩個collect示例

//和reduce相同的合并字符操作
String concat = stringStream.collect(StringBuilder::new, StringBuilder::append,StringBuilder::append).toString();
//等價于上面,這樣看起來應(yīng)該更加清晰
String concat = stringStream.collect(() -> new StringBuilder(),(l, x) -> l.append(x), (r1, r2) -> r1.append(r2)).toString();
//把stream轉(zhuǎn)成map
Stream stream = Stream.of(1, 2, 3, 4).filter(p -> p > 2);

List result = stream.collect(() -> new ArrayList<>(), (list, item) -> list.add(item), (one, two) -> one.addAll(two));
/* 或者使用方法引用 */
result = stream.collect(ArrayList::new, List::add, List::addAll);
協(xié)程

協(xié)程,英文Coroutines,也叫纖程(Fiber)是一種比線程更加輕量級的存在。正如一個進程可以擁有多個線程一樣,一個線程也可以擁有多個協(xié)程。

協(xié)程實際上是在語言底層(或者框架)對需要等待的程序進行調(diào)度,從而充分利用CPU的方法, 其實這完全可以通過回調(diào)來實現(xiàn), 但是深層回調(diào)的代碼太{{BANNED}}了,所以發(fā)明了協(xié)程的寫法。理論上多個協(xié)程不會真的"同時"執(zhí)行,也就不會引起共享變量操作的不確定性,不需要加鎖(待確認(rèn))。

pythone協(xié)程示例

Pythone, Golang和C#都內(nèi)置了協(xié)程的語法,但Java沒有,只能通過框架實現(xiàn),常見的框架包括:Quasar,kilim和ea-async。

Java ea-async 協(xié)程示例

import static com.ea.async.Async.await;
import static java.util.concurrent.CompletableFuture.completedFuture;

public class Store
{
    //購物操作, 傳一個商品id和一個價格
    public CompletableFuture buyItem(String itemTypeId, int cost)
    {
        //銀行扣款(長時間操作)
        if(!await(bank.decrement(cost))) {
            return completedFuture(false);
        }
        try {
            //商品出庫(長時間操作)
            await(inventory.giveItem(itemTypeId));
            return completedFuture(true);
        } catch (Exception ex) {
            await(bank.refund(cost));
            throw new AppException(ex);
        }
    }
}
參考資料

《七周七并發(fā)模型》電子書

深入淺出Java Concurrency——線程池

Java多線程學(xué)習(xí)(吐血超詳細總結(jié))

Jetty基礎(chǔ)之線程模型

Jetty-server高性能,多線程特性的源碼分析

Java 編程要點之并發(fā)(Concurrency)詳解

Java Concurrency in Depth (Part 1)

Java進階(七)正確理解Thread Local的原理與適用場景

Java 并發(fā)編程筆記:如何使用 ForkJoinPool 以及原理

ForkJoinPool簡介

多線程 ForkJoinPool

Java 8 中的 Streams API 詳解

Java中的協(xié)程實現(xiàn)

漫畫:什么是協(xié)程

學(xué)習(xí)源碼

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://m.hztianpu.com/yun/72823.html

相關(guān)文章

  • Java學(xué)習(xí)必備書籍推薦終極版!

    摘要:實戰(zhàn)高并發(fā)程序設(shè)計推薦豆瓣評分書的質(zhì)量沒的說,推薦大家好好看一下。推薦,豆瓣評分,人評價本書介紹了在編程中條極具實用價值的經(jīng)驗規(guī)則,這些經(jīng)驗規(guī)則涵蓋了大多數(shù)開發(fā)人員每天所面臨的問題的解決方案。 很早就想把JavaGuide的書單更新一下了,昨晚加今天早上花了幾個時間對之前的書單進行了分類和補充完善。雖是終極版,但一定還有很多不錯的 Java 書籍我沒有添加進去,會繼續(xù)完善下去。希望這篇...

    Steve_Wang_ 評論0 收藏0
  • 出場率比較高的一道多線程安全面試題

    摘要:程序正常運行,輸出了預(yù)期容量的大小這是正常運行結(jié)果,未發(fā)生多線程安全問題,但這是不確定性的,不是每次都會達到正常預(yù)期的。另外,像等都有類似多線程安全問題,在多線程并發(fā)環(huán)境下避免使用這種集合。 這個問題是 Java 程序員面試經(jīng)常會遇到的吧。 工作一兩年的應(yīng)該都知道 ArrayList 是線程不安全的,要使用線程安全的就使用 Vector,這也是各種 Java 面試寶典里面所提及的,可能...

    xiyang 評論0 收藏0
  • 【備戰(zhàn)春招/秋招系列】Java程序員必備書單

    摘要:相關(guān)推薦,豆瓣評分,人評價本書介紹了在編程中條極具實用價值的經(jīng)驗規(guī)則,這些經(jīng)驗規(guī)則涵蓋了大多數(shù)開發(fā)人員每天所面臨的問題的解決方案。實戰(zhàn)高并發(fā)程序設(shè)計推薦豆瓣評分,書的質(zhì)量沒的說,推薦大家好好看一下。 該文已加入開源文檔:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識)。地址:https://github.com/Snailclimb... 【強烈推薦!非廣告!】...

    saucxs 評論0 收藏0
  • Android工程師轉(zhuǎn)型Java后端開發(fā)之路,自己選的路,跪著也要走下去!

    本文是公眾號讀者jianfeng投稿的面試經(jīng)驗恭喜該同學(xué)成功轉(zhuǎn)型目錄:毅然轉(zhuǎn)型,沒頭蒼蠅制定目標(biāo),系統(tǒng)學(xué)習(xí)面試經(jīng)歷毅然轉(zhuǎn)崗,沒頭蒼蠅首先,介紹一下我的背景。本人坐標(biāo)廣州,2016年畢業(yè)于一個普通二本大學(xué),曾經(jīng)在某機構(gòu)培訓(xùn)過Android。2018年初的時候已經(jīng)在兩家小公司工作干了兩年的android開發(fā),然后會一些Tomcat、Servlet之類的技術(shù),當(dāng)時的年薪大概也就15萬這樣子。由于個人發(fā)展...

    番茄西紅柿 評論0 收藏0

發(fā)表評論

0條評論

閱讀需要支付1元查看
<