成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專欄INFORMATION COLUMN

celery動(dòng)態(tài)添加任務(wù)

yanbingyun1990 / 3530人閱讀

摘要:是一個(gè)基于的分布式調(diào)度系統(tǒng),文檔在這最近有個(gè)需求想要?jiǎng)討B(tài)的添加任務(wù)而不用重啟服務(wù)找了一圈沒找到什么好辦法也有可能是文檔沒看仔細(xì),所以只能自己實(shí)現(xiàn)囉為動(dòng)態(tài)添加任務(wù),首先我想到的是傳遞一個(gè)函數(shù)進(jìn)去,讓某個(gè)特定任務(wù)去執(zhí)行這個(gè)傳遞過去的函數(shù),就像這

celery是一個(gè)基于Python的分布式調(diào)度系統(tǒng),文檔在這 ,最近有個(gè)需求,想要?jiǎng)討B(tài)的添加任務(wù)而不用重啟celery服務(wù),找了一圈沒找到什么好辦法(也有可能是文檔沒看仔細(xì)),所以只能自己實(shí)現(xiàn)囉

為celery動(dòng)態(tài)添加任務(wù),首先我想到的是傳遞一個(gè)函數(shù)進(jìn)去,讓某個(gè)特定任務(wù)去執(zhí)行這個(gè)傳遞過去的函數(shù),就像這樣

@app.task
def execute(func, *args, **kwargs):
    return func(*args, **kwargs)

很可惜,會(huì)出現(xiàn)這樣的錯(cuò)誤

kombu.exceptions.EncodeError: Object of type "function" is not JSON serializable

換一種序列化方式

@app.task(serializer="pickle")
def execute(func, *args, **kwargs):
    return func(*args, **kwargs)

結(jié)果又出現(xiàn)一大串錯(cuò)誤信息

ERROR/MainProcess] Pool callback raised exception: ContentDisallowed("Refusing to deserialize untrusted content of type pickle (application/x-python-serialize)",)
Traceback (most recent call last):
  File "/home/jl/.virtualenvs/test/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
    return obj.__dict__[self.__name__]
KeyError: "chord"

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/jl/.virtualenvs/test/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
    return obj.__dict__[self.__name__]
KeyError: "_payload"

換一種思路

func = import_string(func)

不知道這樣是否可以,結(jié)果測試: No

哎,流年不利.

最后一直測試,一直測試,終于找到了一種辦法,直接上代碼

from importlib import import_module, reload

app.conf.CELERY_IMPORTS = ["task", "task.all_task"]

def import_string(import_name):
    import_name = str(import_name).replace(":", ".")
    modules = import_name.split(".")
    mod = import_module(modules[0])
    for comp in modules[1:]:
        if not hasattr(mod, comp):
            reload(mod)
        mod = getattr(mod, comp)
    return mod

@app.task
def execute(func, *args, **kwargs):
    func = import_string(func)
    return func(*args, **kwargs)

項(xiàng)目結(jié)構(gòu)是這樣的

├── celery_app.py
├── config.py
├── task
│?? ├── all_task.py
│?? ├── __init__.py

注意: 任務(wù)必須大于等于兩層目錄

以后每次添加任務(wù)都可以先添加到all_task.py里,調(diào)用時(shí)不用再重啟celery服務(wù)

# task/all_task.py

def ee(c, d):
    return c, d, "你好"

# example
from celery_app import execute

execute.delay("task.all_task.ee", 2, 444)

ok,另外發(fā)現(xiàn)celery也支持任務(wù)定時(shí)調(diào)用,就像這樣

execute.apply_async(args=["task.all_task.aa"], eta=datetime(2017, 7, 9, 8, 12, 0))

簡單實(shí)現(xiàn)一個(gè)任務(wù)重復(fù)調(diào)用的功能

@app.task
def interval(func, seconds, args=(), task_id=None):
    next_run_time = current_time() + timedelta(seconds=seconds)
    kwargs = dict(args=(func, seconds, args), eta=next_run_time)
    if task_id is not None:
        kwargs.update(task_id=task_id)
    interval.apply_async(**kwargs)
    func = import_string(func)
    return func(*args)

大概意思就是先計(jì)算下次運(yùn)行的時(shí)間,然后把任務(wù)添加到celery隊(duì)列里,這里有個(gè)task_id有些問題,因?yàn)榧僭O(shè)添加了每隔3s執(zhí)行一個(gè)任務(wù),
它的task_id默認(rèn)會(huì)使用uuid生成,如果想要再移除這個(gè)任務(wù)就不太方便,自定task_id可能會(huì)好一些,另外也許需要判斷task_id是否存在

AsyncResult(task_id).state

ok,再獻(xiàn)上一個(gè)好用的函數(shù)

from inspect import getmembers, isfunction

def get_tasks(module="task"):
    return [{
        "name": "task:{}".format(f[1].__name__),
        "doc": f[1].__doc__,
    } for f in getmembers(import_module(module), isfunction)]

就這樣.

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://m.hztianpu.com/yun/38697.html

相關(guān)文章

  • Celery中使用Flask的上下文

    摘要:所以這就現(xiàn)實(shí)了在中使用的應(yīng)用上下文。要引入請求上下文,需要考慮這兩個(gè)問題如何在中產(chǎn)生請求上下文。中有和可以產(chǎn)生請求上下文。具體的思路還是在中重載類,通過,在的上下文環(huán)境下執(zhí)行。將他們傳入,生成偽造的請求上下文可以覆蓋大多數(shù)的使用情況。 其實(shí)我只是想把郵件發(fā)送這個(gè)動(dòng)作移到Celery中執(zhí)行。既然用到了Celery,那么每次發(fā)郵件都單獨(dú)開一個(gè)線程似乎有點(diǎn)多余,異步任務(wù)還是交給Celery吧...

    Sourcelink 評論0 收藏0
  • Flask+Celery+Redis實(shí)現(xiàn)隊(duì)列化異步任務(wù)

    摘要:使用異步框架,例如等等,裝飾異步任務(wù)。它是一個(gè)專注于實(shí)時(shí)處理的任務(wù)隊(duì)列,同時(shí)也支持任務(wù)調(diào)度。不存儲(chǔ)任務(wù)狀態(tài)。標(biāo)識(shí)要使用的默認(rèn)序列化方法的字符串。指定該任務(wù)的結(jié)果存儲(chǔ)后端用于此任務(wù)。 概述: ????????我們考慮一個(gè)場景,公司有一個(gè)需求,現(xiàn)在需要做一套web系統(tǒng),而這套系統(tǒng)某些功能需要使用...

    Ali_ 評論0 收藏0
  • Python之celery的簡介與使用

    摘要:的簡介是一個(gè)基于分布式消息傳輸?shù)漠惒饺蝿?wù)隊(duì)列,它專注于實(shí)時(shí)處理,同時(shí)也支持任務(wù)調(diào)度。目前支持等作為消息代理,但適用于生產(chǎn)環(huán)境的只有和官方推薦。任務(wù)處理完后保存狀態(tài)信息和結(jié)果,以供查詢。 celery的簡介 ??celery是一個(gè)基于分布式消息傳輸?shù)漠惒饺蝿?wù)隊(duì)列,它專注于實(shí)時(shí)處理,同時(shí)也支持任務(wù)調(diào)度。它的執(zhí)行單元為任務(wù)(task),利用多線程,如Eventlet,gevent等,它們能被...

    LeexMuller 評論0 收藏0

發(fā)表評論

0條評論

閱讀需要支付1元查看
<