摘要:源碼之分析的協(xié)程原理分析版本為支持異步,實現(xiàn)了一個協(xié)程庫。提供了回調函數(shù)注冊當異步事件完成后,調用注冊的回調中間結果保存結束結果返回等功能注冊回調函數(shù),當被解決時,改回調函數(shù)被調用。相當于喚醒已經(jīng)處于狀態(tài)的父協(xié)程,通過回調函數(shù),再執(zhí)行。
tornado 源碼之 coroutine 分析
tornado 的協(xié)程原理分析
版本:4.3.0
為支持異步,tornado 實現(xiàn)了一個協(xié)程庫。
tornado 實現(xiàn)的協(xié)程框架有下面幾個特點:
支持 python 2.7,沒有使用 yield from
特性,純粹使用 yield 實現(xiàn)
使用拋出異常的方式從協(xié)程返回值
采用 Future 類代理協(xié)程(保存協(xié)程的執(zhí)行結果,當攜程執(zhí)行結束時,調用注冊的回調函數(shù))
使用 IOLoop 事件循環(huán),當事件發(fā)生時在循環(huán)中調用注冊的回調,驅動協(xié)程向前執(zhí)行
由此可見,這是 python 協(xié)程的一個經(jīng)典的實現(xiàn)。
本文將實現(xiàn)一個類似 tornado 實現(xiàn)的基礎協(xié)程框架,并闡述相應的原理。
外部庫使用 time 來實現(xiàn)定時器回調的時間計算。
bisect 的 insort 方法維護一個時間有限的定時器隊列。
functools 的 partial 方法綁定函數(shù)部分參數(shù)。
使用 backports_abc 導入 Generator 來判斷函數(shù)是否是生成器。
import time import bisect import functools from backports_abc import Generator as GeneratorTypeFuture
是一個穿梭于協(xié)程和調度器之間的信使。
提供了回調函數(shù)注冊(當異步事件完成后,調用注冊的回調)、中間結果保存、結束結果返回等功能
add_done_callback 注冊回調函數(shù),當 Future 被解決時,改回調函數(shù)被調用。
set_result 設置最終的狀態(tài),并且調用已注冊的回調函數(shù)
協(xié)程中的每一個 yield 對應一個協(xié)程,相應的對應一個 Future 對象,譬如:
@coroutine def routine_main(): yield routine_simple() yield sleep(1)
這里的 routine_simple() 和 sleep(1) 分別對應一個協(xié)程,同時有一個 Future 對應。
class Future(object): def __init__(self): self._done = False self._callbacks = [] self._result = None def _set_done(self): self._done = True for cb in self._callbacks: cb(self) self._callbacks = None def done(self): return self._done def add_done_callback(self, fn): if self._done: fn(self) else: self._callbacks.append(fn) def set_result(self, result): self._result = result self._set_done() def result(self): return self._resultIOLoop
這里的 IOLoop 去掉了 tornado 源代碼中 IO 相關部分,只保留了基本需要的功能,如果命名為 CoroutineLoop 更貼切。
這里的 IOLoop 提供基本的回調功能。它是一個線程循環(huán),在循環(huán)中完成兩件事:
檢測有沒有注冊的回調并執(zhí)行
檢測有沒有到期的定時器回調并執(zhí)行
程序中注冊的回調事件,最終都會在此處執(zhí)行。
可以認為,協(xié)程程序本身、協(xié)程的驅動程序 都會在此處執(zhí)行。
協(xié)程本身使用 wrapper 包裝,并最后注冊到 IOLoop 的事件回調,所以它的從預激到結束的代碼全部在 IOLoop 回調中執(zhí)行。
而協(xié)程預激后,會把 Runner.run() 函數(shù)注冊到 IOLoop 的事件回調,以驅動協(xié)程向前運行。
理解這一點對于理解協(xié)程的運行原理至關重要。
這就是單線程異步的基本原理。因為都在一個線程循環(huán)中執(zhí)行,我們可以不用處理多線程需要面對的各種繁瑣的事情。
IOLoop.start事件循環(huán),回調事件和定時器事件在循環(huán)中調用。
IOLoop.run_sync執(zhí)行一個協(xié)程。
將 run 注冊進全局回調,在 run 中調用 func()啟動協(xié)程。
注冊協(xié)程結束回調 stop, 退出 run_sync 的 start 循環(huán),事件循環(huán)隨之結束。
class IOLoop(object):, def __init__(self): self._callbacks = [] self._timers = [] self._running = False @classmethod def instance(cls): if not hasattr(cls, "_instance"): cls._instance = cls() return cls._instance def add_future(self, future, callback): future.add_done_callback( lambda future: self.add_callback(functools.partial(callback, future))) def add_timeout(self, when, callback): bisect.insort(self._timers, (when, callback)) def call_later(self, delay, callback): return self.add_timeout(time.time() + delay, callback) def add_callback(self, call_back): self._callbacks.append(call_back) def start(self): self._running = True while self._running: # 回調任務 callbacks = self._callbacks self._callbacks = [] for call_back in callbacks: call_back() # 定時器任務 while self._timers and self._timers[0][0] < time.time(): task = self._timers[0][1] del self._timers[0] task() def stop(self): self._running = False def run_sync(self, func): future_cell = [None] def run(): try: future_cell[0] = func() except Exception: pass self.add_future(future_cell[0], lambda future: self.stop()) self.add_callback(run) self.start() return future_cell[0].result()coroutine
協(xié)程裝飾器。
協(xié)程由 coroutine 裝飾,分為兩類:
含 yield 的生成器函數(shù)
無 yield 語句的普通函數(shù)
裝飾協(xié)程,并通過注冊回調驅動協(xié)程運行。
程序中通過 yield coroutine_func() 方式調用協(xié)程。
此時,wrapper 函數(shù)被調用:
獲取協(xié)程生成器
如果是生成器,則
調用 next() 預激協(xié)程
實例化 Runner(),驅動協(xié)程
如果是普通函數(shù),則
調用 set_result() 結束協(xié)程
協(xié)程返回 Future 對象,供外層的協(xié)程處理。外部通過操作該 Future 控制協(xié)程的運行。
每個 yield 對應一個協(xié)程,每個協(xié)程擁有一個 Future 對象。
外部協(xié)程獲取到內部協(xié)程的 Future 對象,如果內部協(xié)程尚未結束,將 Runner.run() 方法注冊到 內部協(xié)程的 Future 的結束回調。
這樣,在內部協(xié)程結束時,會調用注冊的 run() 方法,從而驅動外部協(xié)程向前執(zhí)行。
各個協(xié)程通過 Future 形成一個鏈式回調關系。
Runner 類在下面多帶帶小節(jié)描述。
def coroutine(func): return _make_coroutine_wrapper(func) # 每個協(xié)程都有一個 future, 代表當前協(xié)程的運行狀態(tài) def _make_coroutine_wrapper(func): @functools.wraps(func) def wrapper(*args, **kwargs): future = Future() try: result = func(*args, **kwargs) except (Return, StopIteration) as e: result = _value_from_stopiteration(e) except Exception: return future else: if isinstance(result, GeneratorType): try: yielded = next(result) except (StopIteration, Return) as e: future.set_result(_value_from_stopiteration(e)) except Exception: pass else: Runner(result, future, yielded) try: return future finally: future = None future.set_result(result) return future return wrapper協(xié)程返回值
因為沒有使用 yield from,協(xié)程無法直接返回值,所以使用拋出異常的方式返回。
python 2 無法在生成器中使用 return 語句。但是生成器中拋出的異??梢栽谕獠?send() 語句中捕獲。
所以,使用拋出異常的方式,將返回值存儲在異常的 value 屬性中,拋出。外部使用諸如:
try: yielded = gen.send(value) except Return as e:
這樣的方式獲取協(xié)程的返回值。
class Return(Exception): def __init__(self, value=None): super(Return, self).__init__() self.value = value self.args = (value,)Runner
Runner 是協(xié)程的驅動器類。
self.result_future 保存當前協(xié)程的狀態(tài)。
self.future 保存 yield 子協(xié)程傳遞回來的協(xié)程狀態(tài)。
從子協(xié)程的 future 獲取協(xié)程運行結果 send 給當前協(xié)程,以驅動協(xié)程向前執(zhí)行。
注意,會判斷子協(xié)程返回的 future
如果 future 已經(jīng) set_result,代表子協(xié)程運行結束,回到 while Ture 循環(huán),繼續(xù)往下執(zhí)行下一個 send;
如果 future 未 set_result,代表子協(xié)程運行未結束,將 self.run 注冊到子協(xié)程結束的回調,這樣,子協(xié)程結束時會調用 self.run,重新驅動協(xié)程執(zhí)行。
如果本協(xié)程 send() 執(zhí)行過程中,捕獲到 StopIteration 或者 Return 異常,說明本協(xié)程執(zhí)行結束,設置 result_future 的協(xié)程返回值,此時,注冊的回調函數(shù)被執(zhí)行。這里的回調函數(shù)為本協(xié)程的父協(xié)程所注冊的 run()。
相當于喚醒已經(jīng)處于 yiled 狀態(tài)的父協(xié)程,通過 IOLoop 回調 run 函數(shù),再執(zhí)行 send()。
class Runner(object): def __init__(self, gen, result_future, first_yielded): self.gen = gen self.result_future = result_future self.io_loop = IOLoop.instance() self.running = False self.future = None if self.handle_yield(first_yielded): self.run() def run(self): try: self.running = True while True: try: # 每一個 yield 處看做一個協(xié)程,對應一個 Future # 將該協(xié)程的結果 send 出去 # 這樣外層形如 ret = yiled coroutine_func() 能夠獲取到協(xié)程的返回數(shù)據(jù) value = self.future.result() yielded = self.gen.send(value) except (StopIteration, Return) as e: # 協(xié)程執(zhí)行完成,不再注冊回調 self.result_future.set_result(_value_from_stopiteration(e)) self.result_future = None return except Exception: return # 協(xié)程未執(zhí)行結束,繼續(xù)使用 self.run() 進行驅動 if not self.handle_yield(yielded): return finally: self.running = False def handle_yield(self, yielded): self.future = yielded if not self.future.done(): # 給 future 增加執(zhí)行結束回調函數(shù),這樣,外部使用 future.set_result 時會調用該回調 # 而該回調是把 self.run() 注冊到 IOLoop 的事件循環(huán) # 所以,future.set_result 會把 self.run() 注冊到 IOLoop 的事件循環(huán),從而在下一個事件循環(huán)中調用 self.io_loop.add_future( self.future, lambda f: self.run()) return False return Truesleep
sleep 是一個延時協(xié)程,充分展示了協(xié)程的標準實現(xiàn)。
創(chuàng)建一個 Future,并返回給外部協(xié)程;
外部協(xié)程發(fā)現(xiàn)是一個未完的狀態(tài),將 run()注冊到 Future 的完成回調,同時外部協(xié)程被掛起;
在設置的延時后,IOLoop 會回調 set_result 結束協(xié)程;
IOLoop 調用 run() 函數(shù);
IOLoop 調用 send(),喚醒掛起的外部協(xié)程。
流程如下圖:
def sleep(duration): f = Future() IOLoop.instance().call_later(duration, lambda: f.set_result(None)) return f運行
@coroutine def routine_ur(url, wait): yield sleep(wait) print("routine_ur {} took {}s to get!".format(url, wait)) @coroutine def routine_url_with_return(url, wait): yield sleep(wait) print("routine_url_with_return {} took {}s to get!".format(url, wait)) raise Return((url, wait)) # 非生成器協(xié)程,不會為之生成多帶帶的 Runner() # coroutine 運行結束后,直接返回一個已經(jīng)執(zhí)行結束的 future @coroutine def routine_simple(): print("it is simple routine") @coroutine def routine_simple_return(): print("it is simple routine with return") raise Return("value from routine_simple_return") @coroutine def routine_main(): yield routine_simple() yield routine_ur("url0", 1) ret = yield routine_simple_return() print(ret) ret = yield routine_url_with_return("url1", 1) print(ret) ret = yield routine_url_with_return("url2", 2) print(ret) if __name__ == "__main__": IOLoop.instance().run_sync(routine_main)
運行輸出為:
it is simple routine routine_ur url0 took 1s to get! it is simple routine with return value from routine_simple_return routine_url_with_return url1 took 1s to get! ("url1", 1) routine_url_with_return url2 took 2s to get! ("url2", 2)
可以觀察到協(xié)程 sleep 已經(jīng)生效。
源碼simple_coroutine.py
copyrightauthor:bigfish
copyright: 許可協(xié)議 知識共享署名-非商業(yè)性使用 4.0 國際許可協(xié)議
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://m.hztianpu.com/yun/45036.html
摘要:清楚了以上流程,我們直接來看函數(shù)主要用作初始化應用監(jiān)聽端口以及啟動。其中就是保存聊天室所有聊天消息的結構。關于的解讀我會放到閱讀源碼時講。然后把消息加到緩存里,如果緩存大于限制則取最新的條消息。 tornado 源碼自帶了豐富的 demo ,這篇文章主要分析 demo 中的聊天室應用: chatdemo 首先看 chatdemo 的目錄結構: ├── chatdemo.py ├── ...
摘要:序言最近閑暇無事閱讀了一下的源碼對整體的結構有了初步認識與大家分享不知道為什么右邊的目錄一直出不來非常不舒服不如移步到吧是的核心模塊也是個調度模塊各種異步事件都是由他調度的所以必須弄清他的執(zhí)行邏輯源碼分析而的核心部分則是這個循環(huán)內部的邏輯貼 序言 最近閑暇無事,閱讀了一下tornado的源碼,對整體的結構有了初步認識,與大家分享 不知道為什么右邊的目錄一直出不來,非常不舒服. 不如移...
摘要:前言本文將嘗試詳細的帶大家一步步走完一個異步操作從而了解是如何實現(xiàn)異步的其實本文是對上一篇文的實踐和復習主旨在于關注異步的實現(xiàn)所以會忽略掉代碼中的一些異常處理文字較多湊合下吧接下來只會貼出部分源碼幫助理解希望有耐心的同學打開源碼一起跟蹤一遍 前言 本文將嘗試詳細的帶大家一步步走完一個異步操作,從而了解tornado是如何實現(xiàn)異步io的. 其實本文是對[上一篇文][1]的實踐和復習 主...
摘要:前言俗話說光說不練假把式上一篇文里都只是光看著別人的源碼說貌似有點紙上談兵的意思所以這次寫一個簡單的自己定義協(xié)議的既可以熟悉和的用法又可以在去除了復雜的協(xié)議后了解的工作原理代碼不多加上空行和也就行不到在上的源碼點這里目標定義一個簡單的協(xié)議達 前言 俗話說光說不練假把式,上一篇文里都只是光看著別人的源碼說,貌似有點紙上談兵的意思. 所以這次寫一個簡單的,自己定義協(xié)議的server. 既...
這篇文章摘自我的博客, 歡迎大家沒事去逛逛~ 背景 這幾個月我開發(fā)了公司里的一個restful webservice,起初技術選型的時候是采用了flask框架。雖然flask是一個同步的框架,但是可以配合gevent或者其它方式運行在異步的容器中(測試鏈接),效果看上去也還可以,因此就采用了這種方式。 后面閱讀了tornado的源碼,也去了解了各種協(xié)程框架以及運行的原理。總感覺flask的這種同步...
閱讀 5134·2023-04-25 18:47
閱讀 2747·2021-11-19 11:33
閱讀 3497·2021-11-11 16:54
閱讀 3157·2021-10-26 09:50
閱讀 2629·2021-10-14 09:43
閱讀 740·2021-09-03 10:47
閱讀 739·2019-08-30 15:54
閱讀 1568·2019-08-30 15:44