成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專(zhuān)欄INFORMATION COLUMN

分布式隊(duì)列神器 Celery

趙春朋 / 1314人閱讀

摘要:是什么是一個(gè)由編寫(xiě)的簡(jiǎn)單靈活可靠的用來(lái)處理大量信息的分布式系統(tǒng)它同時(shí)提供操作和維護(hù)分布式系統(tǒng)所需的工具。專(zhuān)注于實(shí)時(shí)任務(wù)處理,支持任務(wù)調(diào)度。說(shuō)白了,它是一個(gè)分布式隊(duì)列的管理工具,我們可以用提供的接口快速實(shí)現(xiàn)并管理一個(gè)分布式的任務(wù)隊(duì)列。

Celery 是什么?

Celery 是一個(gè)由 Python 編寫(xiě)的簡(jiǎn)單、靈活、可靠的用來(lái)處理大量信息的分布式系統(tǒng),它同時(shí)提供操作和維護(hù)分布式系統(tǒng)所需的工具。

Celery 專(zhuān)注于實(shí)時(shí)任務(wù)處理,支持任務(wù)調(diào)度。

說(shuō)白了,它是一個(gè)分布式隊(duì)列的管理工具,我們可以用 Celery 提供的接口快速實(shí)現(xiàn)并管理一個(gè)分布式的任務(wù)隊(duì)列。

1.快速入門(mén)

(本文以 Celery4.0 為基礎(chǔ)進(jìn)行書(shū)寫(xiě))

首先,我們要理解 Celery 本身不是任務(wù)隊(duì)列,它是管理分布式任務(wù)隊(duì)列的工具,或者換一種說(shuō)法,它封裝好了操作常見(jiàn)任務(wù)隊(duì)列的各種操作,我們用它可以快速進(jìn)行任務(wù)隊(duì)列的使用與管理,當(dāng)然你也可以自己看 rabbitmq 等隊(duì)列的文檔然后自己實(shí)現(xiàn)相關(guān)操作都是沒(méi)有問(wèn)題的。

Celery 是語(yǔ)言無(wú)關(guān)的,雖然它是用 Python 實(shí)現(xiàn)的,但他提供了其他常見(jiàn)語(yǔ)言的接口支持。只是如果你恰好使用 Python 進(jìn)行開(kāi)發(fā)那么使用 Celery 就自然而然了。

想讓 Celery 運(yùn)行起來(lái)我們要明白幾個(gè)概念:

1.1 Brokers

brokers 中文意思為中間人,在這里就是指任務(wù)隊(duì)列本身,Celery 扮演生產(chǎn)者和消費(fèi)者的角色,brokers 就是生產(chǎn)者和消費(fèi)者存放/拿取產(chǎn)品的地方(隊(duì)列)

常見(jiàn)的 brokers 有 rabbitmq、redis、Zookeeper 等

1.2 Result Stores / backend

顧名思義就是結(jié)果儲(chǔ)存的地方,隊(duì)列中的任務(wù)運(yùn)行完后的結(jié)果或者狀態(tài)需要被任務(wù)發(fā)送者知道,那么就需要一個(gè)地方儲(chǔ)存這些結(jié)果,就是 Result Stores 了

常見(jiàn)的 backend 有 redis、Memcached 甚至常用的數(shù)據(jù)都可以。

1.3 Workers

就是 Celery 中的工作者,類(lèi)似與生產(chǎn)/消費(fèi)模型中的消費(fèi)者,其從隊(duì)列中取出任務(wù)并執(zhí)行

1.4 Tasks

就是我們想在隊(duì)列中進(jìn)行的任務(wù)咯,一般由用戶(hù)、觸發(fā)器或其他操作將任務(wù)入隊(duì),然后交由 workers 進(jìn)行處理。

理解以上概念后我們就可以快速實(shí)現(xiàn)一個(gè)隊(duì)列的操作:

這里我們用 redis 當(dāng)做 celery 的 broker 和 backend。

(其他 brokers 與 backend 支持看這里)

安裝 Celery 和 redis 以及 python 的 redis 支持:

apt-get install redis-server
pip install redis
pip install celery

這里需要注意如果你的 celery 是 4.0 及以上版本請(qǐng)確保 python 的 redis 庫(kù)版本在 2.10.4 及以上,否則會(huì)出現(xiàn) redis 連接 timeout 的錯(cuò)誤,具體參考

然后,我們需要寫(xiě)一個(gè)task:

#tasks.py
from celery import Celery

app = Celery("tasks",  backend="redis://localhost:6379/0", broker="redis://localhost:6379/0") #配置好celery的backend和broker

@app.task  #普通函數(shù)裝飾為 celery task
def add(x, y):
    return x + y

OK,到這里,broker 我們有了,backend 我們有了,task 我們也有了,現(xiàn)在就該運(yùn)行 worker 進(jìn)行工作了,在 tasks.py 所在目錄下運(yùn)行:

celery -A tasks worker --loglevel=info

意思就是運(yùn)行 tasks 這個(gè)任務(wù)集合的 worker 進(jìn)行工作(當(dāng)然此時(shí)broker中還沒(méi)有任務(wù),worker此時(shí)相當(dāng)于待命狀態(tài))

最后一步,就是觸發(fā)任務(wù)啦,最簡(jiǎn)單方式就是再寫(xiě)一個(gè)腳本然后調(diào)用那個(gè)被裝飾成 task 的函數(shù):

#trigger.py
from tasks import add
result = add.delay(4, 4) #不要直接 add(4, 4),這里需要用 celery 提供的接口 delay 進(jìn)行調(diào)用
while not result.ready():
    time.sleep(1)
print "task done: {0}".format(result.get())

運(yùn)行此腳本

delay 返回的是一個(gè) AsyncResult 對(duì)象,里面存的就是一個(gè)異步的結(jié)果,當(dāng)任務(wù)完成時(shí)result.ready() 為 true,然后用 result.get() 取結(jié)果即可。

到此,一個(gè)簡(jiǎn)單的 celery 應(yīng)用就完成啦。

2. 進(jìn)階用法

經(jīng)過(guò)快速入門(mén)的學(xué)習(xí)后,我們已經(jīng)能夠使用 Celery 管理普通任務(wù),但對(duì)于實(shí)際使用場(chǎng)景來(lái)說(shuō)這是遠(yuǎn)遠(yuǎn)不夠的,所以我們需要更深入的去了解 Celery 更多的使用方式。

首先來(lái)看之前的task:

@app.task  #普通函數(shù)裝飾為 celery task
def add(x, y):
    return x + y

這里的裝飾器app.task實(shí)際上是將一個(gè)正常的函數(shù)修飾成了一個(gè) celery task 對(duì)象,所以這里我們可以給修飾器加上參數(shù)來(lái)決定修飾后的 task 對(duì)象的一些屬性。

首先,我們可以讓被修飾的函數(shù)成為 task 對(duì)象的綁定方法,這樣就相當(dāng)于被修飾的函數(shù) add 成了 task 的實(shí)例方法,可以調(diào)用 self 獲取當(dāng)前 task 實(shí)例的很多狀態(tài)及屬性。

其次,我們也可以自己復(fù)寫(xiě) task 類(lèi)然后讓這個(gè)自定義 task 修飾函數(shù) add ,來(lái)做一些自定義操作。

2.1 根據(jù)任務(wù)狀態(tài)執(zhí)行不同操作

任務(wù)執(zhí)行后,根據(jù)任務(wù)狀態(tài)執(zhí)行不同操作需要我們復(fù)寫(xiě) task 的 on_failure、on_success 等方法:

# tasks.py
class MyTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        print "task done: {0}".format(retval)
        return super(MyTask, self).on_success(retval, task_id, args, kwargs)
    
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print "task fail, reason: {0}".format(exc)
        return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)

@app.task(base=MyTask)
def add(x, y):
    return x + y

嗯, 然后繼續(xù)運(yùn)行 worker:

celery -A tasks worker --loglevel=info

運(yùn)行腳本,得到:


再修改下tasks:

@app.task  #普通函數(shù)裝飾為 celery task
def add(x, y):
    raise KeyError
    return x + y

重新運(yùn)行 worker,再運(yùn)行 trigger.py:

可以看到,任務(wù)執(zhí)行成功或失敗后分別執(zhí)行了我們自定義的 on_failure、on_success

2.2 綁定任務(wù)為實(shí)例方法
# tasks.py
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)
@app.task(bind=True)
def add(self, x, y):
    logger.info(self.request.__dict__)
    return x + y

然后重新運(yùn)行:


執(zhí)行中的任務(wù)獲取到了自己執(zhí)行任務(wù)的各種信息,可以根據(jù)這些信息做很多其他操作,例如判斷鏈?zhǔn)饺蝿?wù)是否到結(jié)尾等等。

關(guān)于 celery.task.request 對(duì)象的詳細(xì)數(shù)據(jù)可以看這里

2.3 任務(wù)狀態(tài)回調(diào)

實(shí)際場(chǎng)景中得知任務(wù)狀態(tài)是很常見(jiàn)的需求,對(duì)于 Celery 其內(nèi)建任務(wù)狀態(tài)有如下幾種:

參數(shù) 說(shuō)明
PENDING 任務(wù)等待中
STARTED 任務(wù)已開(kāi)始
SUCCESS 任務(wù)執(zhí)行成功
FAILURE 任務(wù)執(zhí)行失敗
RETRY 任務(wù)將被重試
REVOKED 任務(wù)取消

當(dāng)我們有個(gè)耗時(shí)時(shí)間較長(zhǎng)的任務(wù)進(jìn)行時(shí)一般我們想得知它的實(shí)時(shí)進(jìn)度,這里就需要我們自定義一個(gè)任務(wù)狀態(tài)用來(lái)說(shuō)明進(jìn)度并手動(dòng)更新?tīng)顟B(tài),從而告訴回調(diào)當(dāng)前任務(wù)的進(jìn)度,具體實(shí)現(xiàn):

# tasks.py
from celery import Celery
import time

@app.task(bind=True)
def test_mes(self):
    for i in xrange(1, 11):
        time.sleep(0.1)
        self.update_state(state="PROGRESS", meta={"p": i*10})
    return "finish"

然后在 trigger.py 中增加:

# trigger.py
from task import add,test_mes
import sys

def pm(body):
    res = body.get("result")
    if body.get("status") == "PROGRESS":
        sys.stdout.write("
任務(wù)進(jìn)度: {0}%".format(res.get("p")))
        sys.stdout.flush()
    else:
        print "
"
        print res
r = test_mes.delay()
print r.get(on_message=pm, propagate=False)

然后運(yùn)行任務(wù):

2.4 定時(shí)/周期任務(wù)

Celery 進(jìn)行周期任務(wù)也很簡(jiǎn)單,只需要在配置中配置好周期任務(wù),然后在運(yùn)行一個(gè)周期任務(wù)觸發(fā)器( beat )即可:

新建 Celery 配置文件 celery_config.py:

# celery_config.py
from datetime import timedelta
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    "ptask": {
        "task": "tasks.period_task",
        "schedule": timedelta(seconds=5),
    },
}

CELERY_RESULT_BACKEND = "redis://localhost:6379/0"

配置中 schedule 就是間隔執(zhí)行的時(shí)間,這里可以用 datetime.timedelta 或者 crontab 甚至太陽(yáng)系經(jīng)緯度坐標(biāo)進(jìn)行間隔時(shí)間配置,具體可以參考這里

如果定時(shí)任務(wù)涉及到 datetime 需要在配置中加入時(shí)區(qū)信息,否則默認(rèn)是以 utc 為準(zhǔn)。例如中國(guó)可以加上:

CELERY_TIMEZONE = "Asia/Shanghai"

然后在 tasks.py 中增加要被周期執(zhí)行的任務(wù):

# tasks.py
app = Celery("tasks", backend="redis://localhost:6379/0", broker="redis://localhost:6379/0")
app.config_from_object("celery_config")

@app.task(bind=True)
def period_task(self):
    print "period task done: {0}".format(self.request.id)

然后重新運(yùn)行 worker,接著再運(yùn)行 beat:

celery -A task beat

可以看到周期任務(wù)運(yùn)行正?!?/p> 2.5 鏈?zhǔn)饺蝿?wù)

有些任務(wù)可能需由幾個(gè)子任務(wù)組成,此時(shí)調(diào)用各個(gè)子任務(wù)的方式就變的很重要,盡量不要以同步阻塞的方式調(diào)用子任務(wù),而是用異步回調(diào)的方式進(jìn)行鏈?zhǔn)饺蝿?wù)的調(diào)用:

錯(cuò)誤示范
@app.task
def update_page_info(url):
    page = fetch_page.delay(url).get()
    info = parse_page.delay(url, page).get()
    store_page_info.delay(url, info)

@app.task
def fetch_page(url):
    return myhttplib.get(url)

@app.task
def parse_page(url, page):
    return myparser.parse_document(page)

@app.task
def store_page_info(url, info):
    return PageInfo.objects.create(url, info)
正確示范1
def update_page_info(url):
    # fetch_page -> parse_page -> store_page
    chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
    chain()

@app.task()
def fetch_page(url):
    return myhttplib.get(url)

@app.task()
def parse_page(page):
    return myparser.parse_document(page)

@app.task(ignore_result=True)
def store_page_info(info, url):
    PageInfo.objects.create(url=url, info=info)
正確示范2
fetch_page.apply_async((url), link=[parse_page.s(), store_page_info.s(url)])

鏈?zhǔn)饺蝿?wù)中前一個(gè)任務(wù)的返回值默認(rèn)是下一個(gè)任務(wù)的輸入值之一 ( 不想讓返回值做默認(rèn)參數(shù)可以用 si() 或者 s(immutable=True) 的方式調(diào)用 )。

這里的 s() 是方法 celery.signature() 的快捷調(diào)用方式,signature 具體作用就是生成一個(gè)包含調(diào)用任務(wù)及其調(diào)用參數(shù)與其他信息的對(duì)象,個(gè)人感覺(jué)有點(diǎn)類(lèi)似偏函數(shù)的概念:先不執(zhí)行任務(wù),而是把任務(wù)與任務(wù)參數(shù)存起來(lái)以供其他地方調(diào)用。

2.6 調(diào)用任務(wù)

前面講了調(diào)用任務(wù)不能直接使用普通的調(diào)用方式,而是要用類(lèi)似 add.delay(2, 2) 的方式調(diào)用,而鏈?zhǔn)饺蝿?wù)中又用到了 apply_async 方法進(jìn)行調(diào)用,實(shí)際上 delay 只是 apply_async 的快捷方式,二者作用相同,只是 apply_async 可以進(jìn)行更多的任務(wù)屬性設(shè)置,比如 callbacks/errbacks 正?;卣{(diào)與錯(cuò)誤回調(diào)、執(zhí)行超時(shí)、重試、重試時(shí)間等等,具體參數(shù)可以參考這里

2.7 關(guān)于 AsyncResult

AsyncResult 主要用來(lái)儲(chǔ)存任務(wù)執(zhí)行信息與執(zhí)行結(jié)果,有點(diǎn)類(lèi)似 tornado 中的 Future 對(duì)象,都有儲(chǔ)存異步結(jié)果與任務(wù)執(zhí)行狀態(tài)的功能,對(duì)于寫(xiě) js 的朋友,它有點(diǎn)類(lèi)似 Promise 對(duì)象,當(dāng)然在 Celery 4.0 中已經(jīng)支持了 promise 協(xié)議,只需要配合 gevent 一起使用就可以像寫(xiě) js promise 一樣寫(xiě)回調(diào):

import gevent.monkey
monkey.patch_all()

import time
from celery import Celery

app = Celery(broker="amqp://", backend="rpc")

@app.task
def add(x, y):
    return x + y

def on_result_ready(result):
    print("Received result for id %r: %r" % (result.id, result.result,))

add.delay(2, 2).then(on_result_ready)

要注意的是這種 promise 寫(xiě)法現(xiàn)在只能用在 backend 是 RPC (amqp) 或 Redis 時(shí)。 并且獨(dú)立使用時(shí)需要引入 gevent 的猴子補(bǔ)丁,可能會(huì)影響其他代碼。 官方文檔給的建議是這個(gè)特性結(jié)合異步框架使用更合適,例如 tornado、 twisted 等。

delayapply_async 生成的都是 AsyncResult 對(duì)象,此外我們還可以根據(jù) task id 直接獲取相關(guān) task 的 AsyncResult: AsyncResult(task_id=xxx)

關(guān)于 AsyncResult 更詳細(xì)的內(nèi)容,可以參考這里

利用 Celery 進(jìn)行分布式隊(duì)列管理、開(kāi)發(fā)將會(huì)大幅提升開(kāi)發(fā)效率,關(guān)于 Celery 更詳細(xì)的使用大家可以去參考詳細(xì)的官方文檔

作者:rapospectre

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/38333.html

相關(guān)文章

  • 異步任務(wù)神器 Celery 簡(jiǎn)明筆記

    摘要:我們將窗口切換到的啟動(dòng)窗口,會(huì)看到多了兩條日志這說(shuō)明任務(wù)已經(jīng)被調(diào)度并執(zhí)行成功。本文標(biāo)題為異步任務(wù)神器簡(jiǎn)明筆記本文鏈接為參考資料使用之美分布式任務(wù)隊(duì)列的介紹思誠(chéng)之道異步任務(wù)神器簡(jiǎn)明筆記 Celery 在程序的運(yùn)行過(guò)程中,我們經(jīng)常會(huì)碰到一些耗時(shí)耗資源的操作,為了避免它們阻塞主程序的運(yùn)行,我們經(jīng)常會(huì)采用多線程或異步任務(wù)。比如,在 Web 開(kāi)發(fā)中,對(duì)新用戶(hù)的注冊(cè),我們通常會(huì)給他發(fā)一封激活郵件,...

    Ryan_Li 評(píng)論0 收藏0
  • Python之celery的簡(jiǎn)介與使用

    摘要:的簡(jiǎn)介是一個(gè)基于分布式消息傳輸?shù)漠惒饺蝿?wù)隊(duì)列,它專(zhuān)注于實(shí)時(shí)處理,同時(shí)也支持任務(wù)調(diào)度。目前支持等作為消息代理,但適用于生產(chǎn)環(huán)境的只有和官方推薦。任務(wù)處理完后保存狀態(tài)信息和結(jié)果,以供查詢(xún)。 celery的簡(jiǎn)介 ??celery是一個(gè)基于分布式消息傳輸?shù)漠惒饺蝿?wù)隊(duì)列,它專(zhuān)注于實(shí)時(shí)處理,同時(shí)也支持任務(wù)調(diào)度。它的執(zhí)行單元為任務(wù)(task),利用多線程,如Eventlet,gevent等,它們能被...

    LeexMuller 評(píng)論0 收藏0
  • django開(kāi)發(fā)-使用celery搭建布式(多節(jié)點(diǎn))任務(wù)隊(duì)列

    摘要:今天介紹一下如何在項(xiàng)目中使用搭建一個(gè)有兩個(gè)節(jié)點(diǎn)的任務(wù)隊(duì)列一個(gè)主節(jié)點(diǎn)一個(gè)子節(jié)點(diǎn)主節(jié)點(diǎn)發(fā)布任務(wù),子節(jié)點(diǎn)收到任務(wù)并執(zhí)行。 今天介紹一下如何在django項(xiàng)目中使用celery搭建一個(gè)有兩個(gè)節(jié)點(diǎn)的任務(wù)隊(duì)列(一個(gè)主節(jié)點(diǎn)一個(gè)子節(jié)點(diǎn);主節(jié)點(diǎn)發(fā)布任務(wù),子節(jié)點(diǎn)收到任務(wù)并執(zhí)行。搭建3個(gè)或者以上的節(jié)點(diǎn)就類(lèi)似了),使用到了celery,rabbitmq。這里不會(huì)單獨(dú)介紹celery和rabbitmq中的知識(shí)了...

    ConardLi 評(píng)論0 收藏0
  • tornado配合celery及rabbitmq實(shí)現(xiàn)web request異步非阻塞

    摘要:主要是為了實(shí)現(xiàn)系統(tǒng)之間的雙向解耦而實(shí)現(xiàn)的。問(wèn)題及優(yōu)化隊(duì)列過(guò)長(zhǎng)問(wèn)題使用上述方案的異步非阻塞可能會(huì)依賴(lài)于的任務(wù)隊(duì)列長(zhǎng)度,若隊(duì)列中的任務(wù)過(guò)多,則可能導(dǎo)致長(zhǎng)時(shí)間等待,降低效率。 Tornado和Celery介紹 1.Tornado Tornado是一個(gè)用python編寫(xiě)的一個(gè)強(qiáng)大的、可擴(kuò)展的異步HTTP服務(wù)器,同時(shí)也是一個(gè)web開(kāi)發(fā)框架。tornado是一個(gè)非阻塞式web服務(wù)器,其速度相當(dāng)快。...

    番茄西紅柿 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

閱讀需要支付1元查看
<