摘要:在本文中,我們將介紹如何更改此行為并以適當(dāng)?shù)姆绞教幚矶鄠€(gè)訂閱者。第一個(gè)訂閱者將獲得此示例中發(fā)出的所有元素,而第二個(gè)訂閱者將只接收一些元素。我們可以取消訂閱所有真正的訂閱者,但人工訂閱者仍將處理事件。
多個(gè)訂閱者的默認(rèn)行為并不總是可取的。在本文中,我們將介紹如何更改此行為并以適當(dāng)?shù)姆绞教幚矶鄠€(gè)訂閱者。
但首先,讓我們來(lái)看看多個(gè)訂閱者的默認(rèn)行為。
默認(rèn)行為
假設(shè)我們有以下Observable:
private static Observable getObservable() { ????return Observable.create(subscriber -> { ????????subscriber.onNext(gettingValue(1)); ????????subscriber.onNext(gettingValue(2)); ? ????????subscriber.add(Subscriptions.create(() -> { ????????????LOGGER.info("Clear resources"); ????????})); ????}); }
訂閱者訂閱后會(huì)立即發(fā)出兩個(gè)元素。
在我們的示例中,我們有兩個(gè)訂閱者:
LOGGER.info("Subscribing"); ? Subscription s1 = obs.subscribe(i -> LOGGER.info("subscriber#1 is printing " + i)); Subscription s2 = obs.subscribe(i -> LOGGER.info("subscriber#2 is printing " + i)); ? s1.unsubscribe(); s2.unsubscribe();
想象一下,獲取每個(gè)元素是一項(xiàng)代價(jià)高昂的操作 - 例如,它可能包括密集計(jì)算或打開(kāi)URL連接。
為了簡(jiǎn)單起見(jiàn),我們只返回一個(gè)數(shù)字:
private static Integer gettingValue(int i) { ????LOGGER.info("Getting " + i); ????return i; }
這是輸出:
Subscribing Getting 1 subscriber#1 is printing 1 Getting 2 subscriber#1 is printing 2 Getting 1 subscriber#2 is printing 1 Getting 2 subscriber#2 is printing 2 Clear resources Clear resources
我們可以看到,在默認(rèn)情況下,獲取每個(gè)元素和清除資源都要執(zhí)行兩次-對(duì)于每個(gè)訂閱服務(wù)器一次。這不是我們想要的。ConnectableObservable類有助于解決這個(gè)問(wèn)題。
ConnectableObservableConnectableObservable類允許與多個(gè)訂閱者共享訂閱,而不允許多次執(zhí)行底層操作。
但首先,讓我們創(chuàng)建一個(gè)ConnectableObservable。
publish()
publish()方法是從Observable創(chuàng)建一個(gè)ConnectableObservable:
ConnectableObservable obs = Observable.create(subscriber -> { ????subscriber.onNext(gettingValue(1)); ????subscriber.onNext(gettingValue(2)); ????subscriber.add(Subscriptions.create(() -> { ????????LOGGER.info("Clear resources"); ????})); }).publish();
但就目前而言,它什么都不做。它的工作原理是connect()方法。
connect()
在調(diào)用ConnectableObservable的connect()方法之前,即使有一些訂閱者,也不會(huì)觸發(fā)Observable的onSubcribe()回調(diào)。
讓我們來(lái)證明一下:
LOGGER.info("Subscribing"); obs.subscribe(i -> LOGGER.info("subscriber #1 is printing " + i)); obs.subscribe(i -> LOGGER.info("subscriber #2 is printing " + i)); Thread.sleep(1000); LOGGER.info("Connecting"); Subscription s = obs.connect(); s.unsubscribe();
我們訂閱,然后等待一秒鐘再連接輸出是:
Subscribing Connecting Getting 1 subscriber #1 is printing 1 subscriber #2 is printing 1 Getting 2 subscriber #1 is printing 2 subscriber #2 is printing 2 Clear resources
我們可以看到:
獲取元素只出現(xiàn)一次我們想要的
清算資源也只出現(xiàn)一次
訂閱后獲取元素開(kāi)始一秒鐘
訂閱不再觸發(fā)元素的發(fā)射。只有connect()才能這樣做
這種延遲可能是有益的 - 有時(shí)我們需要為所有訂閱者提供相同的元素序列,即使其中一個(gè)訂閱者比另一個(gè)訂閱者更早。
可觀察的一致視圖 - 在subscribe()之后的connect()
這個(gè)用例無(wú)法在我們之前的Observable上進(jìn)行演示,因?yàn)樗\(yùn)行很冷,而且兩個(gè)訂閱者都可以獲得整個(gè)元素序列。
相反,想象一下,元素發(fā)射不依賴于訂閱的時(shí)刻,例如,鼠標(biāo)點(diǎn)擊發(fā)出的事件?,F(xiàn)在還想象第二個(gè)訂閱者在第一個(gè)訂閱者之后訂閱第二個(gè)訂閱者。
第一個(gè)訂閱者將獲得此示例中發(fā)出的所有元素,而第二個(gè)訂閱者將只接收一些元素。
另一方面,在正確的位置使用connect()方法可以為兩個(gè)訂閱者提供Observable序列上的相同視圖。
讓我們創(chuàng)建一個(gè)Observable。它將在JFrame上點(diǎn)擊鼠標(biāo)時(shí)發(fā)出元素。
每個(gè)元素都是點(diǎn)擊的x坐標(biāo):
private static Observable getObservable() { ????return Observable.create(subscriber -> { ????????frame.addMouseListener(new MouseAdapter() { ????????????@Override ????????????public void mouseClicked(MouseEvent e) { ????????????????subscriber.onNext(e.getX()); ????????????} ????????}); ????????subscriber.add(Subscriptions.create(() { ????????????LOGGER.info("Clear resources"); ????????????for (MouseListener listener : frame.getListeners(MouseListener.class)) { ????????????????frame.removeMouseListener(listener); ????????????} ????????})); ????}); }
現(xiàn)在,如果我們以第二個(gè)間隔一個(gè)接一個(gè)地訂閱兩個(gè)訂閱者,運(yùn)行程序并開(kāi)始單擊,我們將看到第一個(gè)訂閱者將獲得更多元素:
public static void defaultBehaviour() throws InterruptedException { ????Observable obs = getObservable(); ? ????LOGGER.info("subscribing #1"); ????Subscription subscription1 = obs.subscribe((i) -> ????????LOGGER.info("subscriber#1 is printing x-coordinate " + i)); ????Thread.sleep(1000); ????LOGGER.info("subscribing #2"); ????Subscription subscription2 = obs.subscribe((i) -> ????????LOGGER.info("subscriber#2 is printing x-coordinate " + i)); ????Thread.sleep(1000); ????LOGGER.info("unsubscribe#1"); ????subscription1.unsubscribe(); ????Thread.sleep(1000); ????LOGGER.info("unsubscribe#2"); ????subscription2.unsubscribe(); }
subscribing #1 subscriber#1 is printing x-coordinate 280 subscriber#1 is printing x-coordinate 242 subscribing #2 subscriber#1 is printing x-coordinate 343 subscriber#2 is printing x-coordinate 343 unsubscribe#1 clearing resources unsubscribe#2 clearing resources
connect() After subscribe()
為了使兩個(gè)訂閱者獲得相同的序列,我們將Observable轉(zhuǎn)換為ConnectableObservable并在訂閱者之后調(diào)用connect():
public static void subscribeBeforeConnect() throws InterruptedException { ? ????ConnectableObservable obs = getObservable().publish(); ? ????LOGGER.info("subscribing #1"); ????Subscription subscription1 = obs.subscribe( ??????i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i)); ????Thread.sleep(1000); ????LOGGER.info("subscribing #2"); ????Subscription subscription2 = obs.subscribe( ??????i ->? LOGGER.info("subscriber#2 is printing x-coordinate " + i)); ????Thread.sleep(1000); ????LOGGER.info("connecting:"); ????Subscription s = obs.connect(); ????Thread.sleep(1000); ????LOGGER.info("unsubscribe connected"); ????s.unsubscribe(); }
現(xiàn)在他們將得到相同的序列:
subscribing #1 subscribing #2 connecting: subscriber#1 is printing x-coordinate 317 subscriber#2 is printing x-coordinate 317 subscriber#1 is printing x-coordinate 364 subscriber#2 is printing x-coordinate 364 unsubscribe connected clearing resources
所以重點(diǎn)是等待所有用戶準(zhǔn)備就緒然后調(diào)用connect()。
在Spring應(yīng)用程序中,我們可以在應(yīng)用程序啟動(dòng)期間訂閱所有組件,例如在onApplicationEvent()中調(diào)用connect()。
讓我們回到我們的例子;注意,connect()方法之前的所有單擊操作都失敗了。如果我們不想遺漏元素,但相反,我們可以在代碼中更早地放置connect(),并強(qiáng)制可觀察到的元素在沒(méi)有任何訂閱服務(wù)器的情況下生成事件。
在沒(méi)有任何訂閱者的情況下強(qiáng)制訂閱 - connect()在subscribe()之前
為了證明這一點(diǎn),讓我們更正我們的例子:
public static void connectBeforeSubscribe() throws InterruptedException { ????ConnectableObservable obs = getObservable() ??????.doOnNext(x -> LOGGER.info("saving " + x)).publish(); ????LOGGER.info("connecting:"); ????Subscription s = obs.connect(); ????Thread.sleep(1000); ????LOGGER.info("subscribing #1"); ????obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i)); ????Thread.sleep(1000); ????LOGGER.info("subscribing #2"); ????obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i)); ????Thread.sleep(1000); ????s.unsubscribe(); }
步驟相對(duì)簡(jiǎn)單:
首先,我們連接
然后我們等待一秒鐘并訂閱第一個(gè)訂閱者
最后,我們等待另一秒鐘并訂閱第二個(gè)訂閱者
請(qǐng)注意,我們添加了doOnNext()運(yùn)算符。這里我們可以在數(shù)據(jù)庫(kù)中存儲(chǔ)元素,例如在我們的代碼中,我們只打印“save...”。
如果我們啟動(dòng)代碼并開(kāi)始點(diǎn)擊,我們將看到在connect()調(diào)用之后立即發(fā)出和處理元素:
connecting: saving 306 saving 248 subscribing #1 saving 377 subscriber#1 is printing x-coordinate 377 saving 295 subscriber#1 is printing x-coordinate 295 saving 206 subscriber#1 is printing x-coordinate 206 subscribing #2 saving 347 subscriber#1 is printing x-coordinate 347 subscriber#2 is printing x-coordinate 347 clearing resources
如果沒(méi)有訂閱者,則仍會(huì)處理這些元素。
因此,不管是否有人訂閱,connect()方法都會(huì)開(kāi)始發(fā)出和處理元素,就好像有一個(gè)使用了元素的空操作的人工訂閱器一樣。
如果有一些真正的訂閱者訂閱,這個(gè)人工中介只向他們傳播元素。
若要取消訂閱,我們會(huì)執(zhí)行以下步驟:
s.unsubscribe();
然后:
Subscription s = obs.connect();
autoConnect()
此方法意味著在訂閱之前或之后不會(huì)調(diào)用connect(),而是在第一個(gè)訂閱者訂閱時(shí)自動(dòng)調(diào)用。
使用此方法,我們不能自己調(diào)用connect(),因?yàn)榉祷氐膶?duì)象是通常的Observable,它沒(méi)有此方法但使用底層的ConnectableObservable:
public static void autoConnectAndSubscribe() throws InterruptedException { ????Observable obs = getObservable() ????.doOnNext(x -> LOGGER.info("saving " + x)).publish().autoConnect(); ? ????LOGGER.info("autoconnect()"); ????Thread.sleep(1000); ????LOGGER.info("subscribing #1"); ????Subscription s1 = obs.subscribe((i) -> ????????LOGGER.info("subscriber#1 is printing x-coordinate " + i)); ????Thread.sleep(1000); ????LOGGER.info("subscribing #2"); ????Subscription s2 = obs.subscribe((i) -> ????????LOGGER.info("subscriber#2 is printing x-coordinate " + i)); ? ????Thread.sleep(1000); ????LOGGER.info("unsubscribe 1"); ????s1.unsubscribe(); ????Thread.sleep(1000); ????LOGGER.info("unsubscribe 2"); ????s2.unsubscribe(); }
請(qǐng)注意,我們也不能取消訂閱人工訂閱者。我們可以取消訂閱所有真正的訂閱者,但人工訂閱者仍將處理事件。
為了理解這一點(diǎn),讓我們看一下最后一個(gè)訂閱者取消訂閱后最后發(fā)生的事情:
subscribing #1 saving 296 subscriber#1 is printing x-coordinate 296 saving 329 subscriber#1 is printing x-coordinate 329 subscribing #2 saving 226 subscriber#1 is printing x-coordinate 226 subscriber#2 is printing x-coordinate 226 unsubscribe 1 saving 268 subscriber#2 is printing x-coordinate 268 saving 234 subscriber#2 is printing x-coordinate 234 unsubscribe 2 saving 278 saving 268
正如我們所看到的,在第二次取消訂閱后,不會(huì)出現(xiàn)清除資源的情況,并繼續(xù)使用doOnNext()保存元素。這意味著人工訂閱服務(wù)器不會(huì)取消訂閱,而是繼續(xù)使用元素。
refCount()
refCount()類似于autoConnect(),因?yàn)橹灰谝粋€(gè)訂閱者訂閱,連接也會(huì)自動(dòng)發(fā)生。
與autoconnect()不同,當(dāng)最后一個(gè)訂閱者取消訂閱時(shí),也會(huì)自動(dòng)斷開(kāi)連接:
public static void refCountAndSubscribe() throws InterruptedException { ????Observable obs = getObservable() ??????.doOnNext(x -> LOGGER.info("saving " + x)).publish().refCount(); ? ????LOGGER.info("refcount()"); ????Thread.sleep(1000); ????LOGGER.info("subscribing #1"); ????Subscription subscription1 = obs.subscribe( ??????i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i)); ????Thread.sleep(1000); ????LOGGER.info("subscribing #2"); ????Subscription subscription2 = obs.subscribe( ??????i -> LOGGER.info("subscriber#2 is printing x-coordinate " + i)); ? ????Thread.sleep(1000); ????LOGGER.info("unsubscribe#1"); ????subscription1.unsubscribe(); ????Thread.sleep(1000); ????LOGGER.info("unsubscribe#2"); ????subscription2.unsubscribe(); }
refcount() subscribing #1 saving 265 subscriber#1 is printing x-coordinate 265 saving 338 subscriber#1 is printing x-coordinate 338 subscribing #2 saving 203 subscriber#1 is printing x-coordinate 203 subscriber#2 is printing x-coordinate 203 unsubscribe#1 saving 294 subscriber#2 is printing x-coordinate 294 unsubscribe#2 clearing resources
結(jié)論
ConnectableObservable類可以輕松地處理多個(gè)訂閱者。
它的方法看起來(lái)很相似,但由于實(shí)現(xiàn)上的細(xì)微差別(甚至方法的順序也很重要),用戶的行為發(fā)生了很大的變化。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/72981.html
摘要:作用默認(rèn)的,直接在當(dāng)前線程運(yùn)行總是開(kāi)啟一個(gè)新線程用于密集型任務(wù),如異步阻塞操作,這個(gè)調(diào)度器的線程池會(huì)根據(jù)需要增長(zhǎng)對(duì)于普通的計(jì)算任務(wù),請(qǐng)使用默認(rèn)是一個(gè),很像一個(gè)有線程緩存的新線程調(diào)度器計(jì)算所使用的。這個(gè)使用的固定的線程池,大小為核數(shù)。 轉(zhuǎn)載請(qǐng)注明出處:https://zhuanlan.zhihu.com/p/20687307 RxJava系列1(簡(jiǎn)介) RxJava系列2(基本概念及使...
摘要:回顧在上一節(jié)的使用一基本用法中,介紹了的基本用法。它同樣只有一個(gè)方法,這個(gè)方法也無(wú)返回值,但有一個(gè)參數(shù)與同理,由于和也是單參數(shù)無(wú)返回值的,因此可以將和打包起來(lái)傳入以實(shí)現(xiàn)不完整定義的回調(diào)的使用定義三個(gè)對(duì)象,分別打包。 回顧 在上一節(jié)Android RxJava的使用(一)基本用法中,介紹了RxJava的基本用法。下面來(lái)回顧下實(shí)現(xiàn)一次RxJava的基本使用。例:分別打印Hello、 Wor...
閱讀 2000·2021-11-24 09:39
閱讀 2802·2021-10-14 09:43
閱讀 3413·2021-10-08 10:10
閱讀 2423·2021-09-22 15:54
閱讀 2414·2019-08-29 17:20
閱讀 1632·2019-08-28 18:14
閱讀 2434·2019-08-26 13:28
閱讀 1184·2019-08-26 12:16