摘要:理由是如果到了上,而這個(gè)對(duì)應(yīng)的操作遲遲不能就緒被出來。但我認(rèn)為這其實(shí)是一個(gè)超時(shí)處理問題。問題是,原生的是沒有超時(shí)支持的。如果是回調(diào)性質(zhì)的,一般的做法是正常就緒給一個(gè),超時(shí)給另外一個(gè)。只要時(shí)間合理,作者之前所說的會(huì)引發(fā)的問題并不會(huì)出現(xiàn)。
grizzly框架的作者曾經(jīng)提到NIO框架不應(yīng)該使用selection key的attach功能(鏈接)。理由是如果attach到了selection key上,而這個(gè)selection key對(duì)應(yīng)的操作遲遲不能就緒(被select出來)。那么這些selection key所attach的附件都是被強(qiáng)引用的,從而無法被gc。如果有大量這樣的selection key累積,程序就好像發(fā)生了內(nèi)存泄漏了一樣。
但我認(rèn)為這其實(shí)是一個(gè)超時(shí)處理問題??蚣軕?yīng)該支持設(shè)置超時(shí),并且可以在超時(shí)之后調(diào)用框架用戶預(yù)先設(shè)置的處理邏輯,并且釋放掉對(duì)應(yīng)的資源。問題是,原生的NIO 1是沒有超時(shí)支持的。它提供的是selector,可以注冊(cè),可以select,可以cancel。但是超時(shí)需要自己做記錄,程序自己判斷超時(shí)了,也就是select了老半天了仍然沒有就緒,那么就需要去調(diào)用cancel把selection key注銷掉。如果使用netty這樣的封裝庫(kù),它是把selector的api轉(zhuǎn)成回調(diào)的形式,同時(shí)也添加了超時(shí)的支持。NIO 2除了windows的proactor(OICP)部分之外,對(duì)于selector基本上就是一個(gè)官方版的netty,也是回調(diào)的形式,也支持了超時(shí)。
基于協(xié)程來封裝selector的話,支持超時(shí)處理自然也不在話下(代碼在此)。如果是回調(diào)性質(zhì)的api,一般的做法是正常就緒給一個(gè)callback,超時(shí)給另外一個(gè)callback??蚣芨鶕?jù)實(shí)際情況決定調(diào)用哪個(gè)callback。如果是協(xié)程的api,最自然的方式自然是拋異常了。
private SocketChannel tryAccept(ServerSocketChannel serverSocketChannel) throws IOException, Pausable { while(true) { try { return scheduler.accept(serverSocketChannel); } catch (TimeoutException e) { System.out.println("time out, try again"); continue; } } }
scheduler.accept會(huì)有兩個(gè)路徑的返回。一個(gè)路徑是正常的return一個(gè)socket channel,這表明accept阻塞等待成功,拿到了一個(gè)socket channel。另外一個(gè)返回是拋出了TimeoutException異常,這表明等待超時(shí)了。框架要做的就是要在超時(shí)的時(shí)候拋出這個(gè)異常,同時(shí)要確保相關(guān)的資源這個(gè)時(shí)候已經(jīng)釋放掉了,不會(huì)引起內(nèi)存泄漏。
首先,需要在做阻塞調(diào)用之前說明超時(shí)時(shí)間的長(zhǎng)度。
scheduler.timeout = 5000; SocketChannel socketChannel = tryAccept(serverSocketChannel);
這里設(shè)置的是5秒之后超時(shí)。根據(jù)這個(gè)超時(shí)時(shí)間可以計(jì)算一個(gè)dead line:
booking.acceptBlocked(getCurrentTimeMillis() + timeout);
然后拿一個(gè)小本子記著這個(gè)dead line:
public void acceptBlocked(long deadline) throws Pausable, TimeoutException { if (null != acceptTask) { throw new RuntimeException("multiple accept blocked on same channel"); } acceptDeadline = deadline; updateDeadline(); acceptTask = Task.getCurrentTask(); Task.pause(this); if (acceptDeadline == -1) { acceptUnblocked(); throw new TimeoutException(); } }
這個(gè)dead line會(huì)用來計(jì)算整個(gè)selector booking四個(gè)操作的earliest dead line:
public void updateDeadline() { earliestDeadline = Long.MAX_VALUE; if (readDeadline > 0 && readDeadline < earliestDeadline) { earliestDeadline = readDeadline; } if (writeDeadline > 0 && writeDeadline < earliestDeadline) { earliestDeadline = writeDeadline; } if (acceptDeadline > 0 && acceptDeadline < earliestDeadline) { earliestDeadline = acceptDeadline; } if (connectDeadline > 0 && connectDeadline < earliestDeadline) { earliestDeadline = connectDeadline; } bookings.remove(this); // when timed out, the booking might be removed already if (earliestDeadline != Long.MAX_VALUE) { // add back in case read timed out, but write is still blocking if (!bookings.offer(this)) { throw new RuntimeException("update booking failed"); } } }
也就是說每個(gè)selector booking通過這樣的設(shè)置都會(huì)有一個(gè)自己的時(shí)間戳(earliestDeadline)。用這個(gè)時(shí)間戳可以對(duì)booking進(jìn)行一個(gè)時(shí)間上的排序:
@Override public int compareTo(SelectorBooking that) { if (that.earliestDeadline > this.earliestDeadline) { return -1; } else if (that.earliestDeadline < this.earliestDeadline) { return 1; } return 0; }
因?yàn)榭梢耘判?,所以也就可以用一個(gè)PriorityQueue來維護(hù)一個(gè)鏈表以記錄哪個(gè)booking是最近會(huì)到期的booking。因?yàn)镻riorityQueue的排序是發(fā)生在插入時(shí)的,所以在這個(gè)booking的時(shí)間戳發(fā)生變更的時(shí)候,需要從鏈表中刪除然后二次插入已達(dá)到更新排序的目的。有了這個(gè)排序的鏈表之后,就可以用來做兩個(gè)事情:決定selector的select等待時(shí)間,以及哪些booking的哪些task是超時(shí)了的:
protected int doSelect() throws IOException { SelectorBooking booking = selectorBookings.peek(); if (null == booking) { return selector.select(); } else { long delta = booking.getEarliestDeadline() - getCurrentTimeMillis(); if (delta > 0) { return selector.select(delta); } else { return selector.selectNow(); } } } boolean loopOnce() { try { executeReadyTasks(); doSelect(); Iteratoriterator = selector.selectedKeys().iterator(); ioUnblocked(iterator); while (hasDeadSelectorBooking()) { SelectorBooking booking = selectorBookings.poll(); booking.cancelDeadTasks(getCurrentTimeMillis()); } return true; } catch (Exception e) { LOGGER.error("loop died", e); return false; } }
最后就是一件事情了,如果協(xié)程所阻塞的io操作確實(shí)超時(shí)了,如何在超時(shí)的調(diào)用處拋出異常,以達(dá)到走不通業(yè)務(wù)邏輯路徑的目的:
public void cancelDeadTasks(long currentTimeMillis) { // ... if (null != acceptTask && currentTimeMillis > acceptDeadline) { selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_ACCEPT); acceptDeadline = -1; updateDeadline(); acceptTask.resume(); if (-1 == acceptDeadline) { throw new RuntimeException("accept deadline unhandled"); } } // ... if (0 == selectionKey.interestOps()) { selectionKey.cancel(); } } public void acceptBlocked(long deadline) throws Pausable, TimeoutException { if (null != acceptTask) { throw new RuntimeException("multiple accept blocked on same channel"); } acceptDeadline = deadline; updateDeadline(); acceptTask = Task.getCurrentTask(); Task.pause(this); if (acceptDeadline == -1) { acceptUnblocked(); throw new TimeoutException(); } }
這里是兩方面的配合。一方面是在io循環(huán)的地方設(shè)置一個(gè)-1為標(biāo)志位。然后去喚醒協(xié)程。協(xié)程喚醒了之后立即去檢查-1這個(gè)標(biāo)志位有沒有設(shè)置,如果設(shè)置了,則認(rèn)為自己被喚醒是因?yàn)槌瑫r(shí),而不是io操作就緒了。于是TimeoutException被拋出了。特別注意這行:
if (0 == selectionKey.interestOps()) { selectionKey.cancel(); }
通過在超時(shí)之后取消了interestOps,然后在所有interestOps都沒有之后自動(dòng)cancel對(duì)應(yīng)的selection key。這個(gè)時(shí)候?qū)?yīng)的附件也會(huì)被垃圾回收給干掉了。只要time out時(shí)間合理,grizzly作者之前所說的attach會(huì)引發(fā)的問題并不會(huì)出現(xiàn)。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/64152.html
摘要:接下來,就看怎么用協(xié)程來實(shí)現(xiàn)異步了。直接拿的原始寫代碼會(huì)死人的。引入?yún)f(xié)程就是為了把上下連續(xù)的業(yè)務(wù)邏輯放在一個(gè)協(xié)程里,把與業(yè)務(wù)關(guān)系不大的的處理部分放到框架的里。第三部分是放棄掉執(zhí)行權(quán)。這樣一個(gè)只能接收打印一行的異步應(yīng)用就寫好了。 前面已經(jīng)準(zhǔn)備好了greenlet對(duì)應(yīng)的Java版本了,一個(gè)刪減后的kilim(http://segmentfault.com/blog/taowen/11900...
摘要:基本上所有的網(wǎng)絡(luò)應(yīng)用都會(huì)示范一個(gè)的寫法。除了這些操作的主體是而不是,操作的是,而不是。以為例其過程是這樣的這段代碼就是創(chuàng)建一個(gè),并注冊(cè)一個(gè),并把附著到上。關(guān)鍵之一顯然是利用了協(xié)程的和,把回調(diào)轉(zhuǎn)換成順序的邏輯執(zhí)行。 基本上所有的網(wǎng)絡(luò)應(yīng)用都會(huì)示范一個(gè)tcp的echo寫法。前面我們已經(jīng)看到了如何使用協(xié)程和異步io來做tcp服務(wù)器的第一步,accept。下面是一個(gè)完整的echo server的...
摘要:抽象類有一個(gè)方法用于使通道處于阻塞模式或非阻塞模式。注意抽象類的方法是由抽象類實(shí)現(xiàn)的,都是直接繼承了抽象類。大家有興趣可以看看的源碼,各種抽象類和抽象類上層的抽象類。 歷史回顧: Java NIO 概覽 Java NIO 之 Buffer(緩沖區(qū)) Java NIO 之 Channel(通道) 其他高贊文章: 面試中關(guān)于Redis的問題看這篇就夠了 一文輕松搞懂redis集群原理及搭建...
摘要:后改良為用線程池的方式代替新增線程,被稱為偽異步。最大的問題是阻塞,同步。每次請(qǐng)求都由程序執(zhí)行并返回,這是同步的缺陷。這些都會(huì)被注冊(cè)在多路復(fù)用器上。多路復(fù)用器提供選擇已經(jīng)就緒狀態(tài)任務(wù)的能力。并沒有采用的多路復(fù)用器,而是使用異步通道的概念。 Netty是一個(gè)提供異步事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用框架,用以快速開發(fā)高性能、高可靠的網(wǎng)絡(luò)服務(wù)器和客戶端程序。Netty簡(jiǎn)化了網(wǎng)絡(luò)程序的開發(fā),是很多框架和公司...
摘要:后改良為用線程池的方式代替新增線程,被稱為偽異步。最大的問題是阻塞,同步。每次請(qǐng)求都由程序執(zhí)行并返回,這是同步的缺陷。這些都會(huì)被注冊(cè)在多路復(fù)用器上。多路復(fù)用器提供選擇已經(jīng)就緒狀態(tài)任務(wù)的能力。并沒有采用的多路復(fù)用器,而是使用異步通道的概念。 Netty是一個(gè)提供異步事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用框架,用以快速開發(fā)高性能、高可靠的網(wǎng)絡(luò)服務(wù)器和客戶端程序。Netty簡(jiǎn)化了網(wǎng)絡(luò)程序的開發(fā),是很多框架和公司...
閱讀 1649·2021-10-18 13:35
閱讀 2425·2021-10-09 09:44
閱讀 883·2021-10-08 10:05
閱讀 2841·2021-09-26 09:47
閱讀 3678·2021-09-22 15:22
閱讀 493·2019-08-29 12:24
閱讀 2071·2019-08-29 11:06
閱讀 2911·2019-08-26 12:23