成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專欄INFORMATION COLUMN

100行代碼實現(xiàn)任務(wù)隊列

xorpay / 752人閱讀

摘要:最近剛看完多線程,為了加深印象,按照分鐘實現(xiàn)延遲消息功能的思路,實現(xiàn)了一個簡易版的異步隊列。讀取任務(wù)時,計算當(dāng)前和,取出需要執(zhí)行的任務(wù),使用多線程的形式執(zhí)行。加鎖的主要作用是防止多線程同時操作文件讀寫,影響數(shù)據(jù)一致性。

最近剛看完python多線程,為了加深印象,按照1分鐘實現(xiàn)“延遲消息”功能的思路,實現(xiàn)了一個簡易版的異步隊列。

高效延時消息,包含兩個重要的數(shù)據(jù)結(jié)構(gòu):

1.環(huán)形隊列,例如可以創(chuàng)建一個包含3600個slot的環(huán)形隊列(本質(zhì)是個數(shù)組)

2.任務(wù)集合,環(huán)上每一個slot是一個Set

同時,啟動一個timer,這個timer每隔1s,在上述環(huán)形隊列中移動一格,有一個Current Index指針來標(biāo)識正在檢測的slot。

Task結(jié)構(gòu)中有兩個很重要的屬性:

(1)Cycle-Num:當(dāng)Current Index第幾圈掃描到這個Slot時,執(zhí)行任務(wù)
(2)Task-Function:需要執(zhí)行的任務(wù)指針

下邊是代碼(代碼不止100行,但是在200行內(nèi),也算100行了。)

#! -*- coding: utf-8 -*-

try:
    import cPickle as pickle
except ImportError:
    import pickle
try:
    import simplejson as json
except ImportError:
    import json

import os
import errno
import Queue
import random
import logging
from functools import wraps
from threading import Timer, RLock, Thread
from time import sleep, time
from base64 import b64encode, b64decode

# json 的數(shù)據(jù)結(jié)構(gòu)
# tasks = {
#     index: {
#         cycle_num: [(func, bargs)]
#     }
# }

logging.basicConfig(level=logging.DEBUG,
                    format="(%(asctime)-15s) %(message)s",)
tasks_file = "tasks.json"
flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY

# 為了防止任務(wù)太多需要生成過多的線程,我們使用Queue 來限制生成的線程數(shù)量
WORKER_NUMS = 2
q = Queue.Queue(WORKER_NUMS)

lock = RLock()


def check_file():
    try:
        file_handle = os.open(tasks_file, flags)
    except OSError as e:
        if e.errno == errno.EEXIST:  # Failed as the file already exists.
            pass
        else:
            raise
    else:
        with os.fdopen(file_handle, "w") as file_obj:
            file_obj.write("{}")


def set_delay_task(func_name, *args, **kwargs):
    # 使用鎖來保證每次只要一個線程寫入文件,防止數(shù)據(jù)出錯
    with lock:
        with open(tasks_file, "r+") as json_file:
            count_down = kwargs.pop("count_down", 0)
            tasks = json.load(json_file)
            # 執(zhí)行時間
            exec_time = int(time()) + count_down
            # 循環(huán)索引
            index = str(exec_time % 3600)
            # 圈數(shù)
            cycle_num = str(exec_time / 3600 + 1)
            dargs = pickle.dumps((args, kwargs))
            bargs = b64encode(dargs)
            index_data = tasks.get(index, {})
            index_data.setdefault(cycle_num, []).append((func_name, bargs))
            tasks[index] = index_data
            json_file.seek(0)
            json.dump(tasks, json_file)
            logging.debug("Received task: %s" % func_name)


def get_delay_tasks():
    with open(tasks_file, "r+") as json_file:
        tasks = json.load(json_file)
        # 執(zhí)行時間
        current_time = int(time())
        # 循環(huán)索引
        index = str(current_time % 3600)
        # 圈數(shù)
        cycle_num = str(current_time / 3600 + 1)
        current_tasks = tasks.get(index, {}).get(cycle_num, [])
    tasks = []
    for func, bargs in current_tasks:
        dargs = b64decode(bargs)
        args, kwargs = pickle.loads(dargs)
        tasks.append((func, (args, kwargs)))
    return tasks


def get_method_by_name(method_name):
    possibles = globals().copy()
    possibles.update(locals())
    method = possibles.get(method_name)
    return method


def create_task(task_class, func, task_name=None, **kwargs):

    def execute(self):
        args, kwargs = self.data or ((), {})
        return func(*args, **kwargs)

    attrs = {
        "execute": execute,
        "func_name": func.__name__,
        "__module__": func.__module__,
        "__doc__": func.__doc__
    }
    attrs.update(kwargs)

    klass = type(
        task_name or func.__name__,
        (task_class,),
        attrs
    )

    return klass


class Hu(object):

    def __init__(self, func_name=None):
        self.func_name = func_name
        check_file()

    def task(self):
        def deco(func):
            self.func_name = func.__name__
            klass = create_task(Hu, func, self.func_name)
            func.delay = klass(func_name=klass.func_name).delay
            @wraps(func)
            def wrapper(*args, **kwargs):
                return func(*args, **kwargs)
            return wrapper
        return deco

    def delay(self, *args, **kwargs):
        _args = [self.func_name]
        _args.extend(args)
        Timer(0, set_delay_task, _args, kwargs).start()
        return True


def boss():
    while True:
        current_tasks = get_delay_tasks()
        for func, params in current_tasks:
            # Task accepted: auth.tasks.send_msg
            logging.debug("Task accepted: %s" % func)
            q.put((func, params))
        sleep(1)


def worker():
    while True:
        func, params = q.get()
        print "get task: %s
" % func
        method = get_method_by_name(func)
        args, kwargs = params
        # Task auth.tasks.send_msgsucceeded in
        start_time = time()
        method(*args, **kwargs)
        end_time = time()
        logging.debug("Task %s succeeded in %s" % (str(func), end_time - start_time))
        q.task_done()


def main():
    check_file()
    print("starting at:", time())
    for target in (boss, worker):
        t = Thread(target=target)
        t.start()
    print("all DONE at:", time())

hu = Hu()

# 使用方式如下:

@hu.task()
def test(num):
    sleep(2)
    print "test: %s" % num


if __name__ == "__main__":
    for i in range(10):
        test.delay(i, count_down=random.randint(1, 10))
    main()

# output

(2017-03-21 15:59:20,394) Received task: test
(2017-03-21 15:59:20,396) Received task: test
(2017-03-21 15:59:20,397) Received task: test
(2017-03-21 15:59:20,398) Received task: test
(2017-03-21 15:59:20,400) Received task: test
(2017-03-21 15:59:20,401) Received task: test
(2017-03-21 15:59:20,403) Received task: test
(2017-03-21 15:59:20,404) Received task: test
(2017-03-21 15:59:20,406) Received task: test
(2017-03-21 15:59:20,408) Received task: test
get task: test

(2017-03-21 15:59:21,395) Task accepted: test
(2017-03-21 15:59:22,397) Task accepted: test
(2017-03-21 15:59:22,397) Task accepted: test
(2017-03-21 15:59:22,397) Task accepted: test
test: 2
get task: test

(2017-03-21 15:59:23,399) Task test succeeded in 2.0037419796
(2017-03-21 15:59:24,404) Task accepted: test
test: 1
get task: test

按照1分鐘實現(xiàn)“延遲消息”功能的思路。隊列的數(shù)據(jù)結(jié)構(gòu)為

{
    index: {
        cycle_num: [(func, bargs)]
    }
}

index的值為 1-3600。每小時一個循環(huán)。
cycle_num 則是 由 (時間戳 / 3600 + 1) 計算得到的值,是圈數(shù)。

每當(dāng)有任務(wù)加入,我們計算出index和cycle_num 將參數(shù)和方法名寫入json文件。
讀取任務(wù)時,計算當(dāng)前 index和cycle_num, 取出需要執(zhí)行的任務(wù),使用多線程的形式執(zhí)行。

為了防止任務(wù)太多需要生成過多的線程,我們使用Queue 來限制生成的線程數(shù)量。

加鎖的主要作用是防止多線程同時操作文件讀寫,影響數(shù)據(jù)一致性。

當(dāng)然,也可以使用redis 存儲隊列,因為 redis 是單線程操作,可以防止多線程操作影響數(shù)據(jù)一致性的問題。
這一部分有需要的可以自己實現(xiàn)。

參考:

python線程筆記

1分鐘實現(xiàn)“延遲消息”功能

>歡迎關(guān)注 >請我喝芬達(dá)

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://m.hztianpu.com/yun/38539.html

相關(guān)文章

  • Laravel Telescope:優(yōu)雅的應(yīng)用調(diào)試工具

    摘要:文章轉(zhuǎn)自視頻教程優(yōu)雅的應(yīng)用調(diào)試工具新擴(kuò)展是由和開源的應(yīng)用的調(diào)試工具。計劃任務(wù)列出已運(yùn)行的計劃任務(wù)。該封閉函數(shù)會被序列化為一個長字符串,加上他的哈希與簽名如出一轍該功能將記錄所有異常,并可查看具體異常情況。事件顯示所有事件的列表。 文章轉(zhuǎn)自:https://laravel-china.org/topics/19013視頻教程:047. 優(yōu)雅的應(yīng)用調(diào)試工具--laravel/telesco...

    MasonEast 評論0 收藏0
  • 時間切片(Time Slicing)

    摘要:所以回來后就想著補(bǔ)一篇文章針對時間切片展開詳細(xì)的討論。所以時間切片的目的是不阻塞主線程,而實現(xiàn)目的的技術(shù)手段是將一個長任務(wù)拆分成很多個不超過的小任務(wù)分散在宏任務(wù)隊列中執(zhí)行。上周我在FDConf的分享《讓你的網(wǎng)頁更絲滑》中提到了時間切片,由于時間關(guān)系當(dāng)時并沒有對時間切片展開更細(xì)致的討論。所以回來后就想著補(bǔ)一篇文章針對時間切片展開詳細(xì)的討論。 從用戶的輸入,再到顯示器在視覺上給用戶的輸出,這一過...

    Freeman 評論0 收藏0
  • 300ABAP代碼實現(xiàn)一個最簡單的區(qū)塊鏈原型

    摘要:我的這篇文章沒有任何高大上的術(shù)語,就是行代碼,實現(xiàn)一個最簡單的區(qū)塊鏈原型。檢查該區(qū)塊鏈?zhǔn)欠裼行?。而通過在循環(huán)里不斷嘗試最終得到一個合法的哈希值的這一過程,就是區(qū)塊鏈圈內(nèi)俗稱的挖礦。 不知從什么時候起,區(qū)塊鏈在網(wǎng)上一下子就火了。 showImg(https://segmentfault.com/img/remote/1460000014744826); 這里Jerry就不班門弄斧了,網(wǎng)上...

    cikenerd 評論0 收藏0
  • 300ABAP代碼實現(xiàn)一個最簡單的區(qū)塊鏈原型

    摘要:我的這篇文章沒有任何高大上的術(shù)語,就是行代碼,實現(xiàn)一個最簡單的區(qū)塊鏈原型。檢查該區(qū)塊鏈?zhǔn)欠裼行?。而通過在循環(huán)里不斷嘗試最終得到一個合法的哈希值的這一過程,就是區(qū)塊鏈圈內(nèi)俗稱的挖礦。 不知從什么時候起,區(qū)塊鏈在網(wǎng)上一下子就火了。 showImg(https://segmentfault.com/img/remote/1460000014744826); 這里Jerry就不班門弄斧了,網(wǎng)上...

    DangoSky 評論0 收藏0

發(fā)表評論

0條評論

閱讀需要支付1元查看
<