摘要:默認(rèn)值為,指定為時(shí)代表可以阻塞,若同時(shí)指定,在超時(shí)時(shí)返回。當(dāng)消費(fèi)者線程調(diào)用意味著有消費(fèi)者取得任務(wù)并完成任務(wù),未完成的任務(wù)數(shù)就會(huì)減少。當(dāng)未完成的任務(wù)數(shù)降到,解除阻塞。
學(xué)習(xí)契機(jī)
最近的一個(gè)項(xiàng)目中在使用grpc時(shí)遇到一個(gè)問(wèn)題,由于client端可多達(dá)200,每個(gè)端口每10s向grpc server發(fā)送一次請(qǐng)求,server端接受client的請(qǐng)求后根據(jù)request信息更新數(shù)據(jù)庫(kù),再將數(shù)據(jù)庫(kù)和配置文件的某些數(shù)據(jù)封裝后返回給client。原代碼的性能是0.26s/request,遠(yuǎn)遠(yuǎn)達(dá)不到所需性能,其中數(shù)據(jù)庫(kù)更新操作耗時(shí)達(dá)到80%,其中一個(gè)優(yōu)化點(diǎn)就是將數(shù)據(jù)庫(kù)更新操作放在獨(dú)立的線程中。
在次之前沒(méi)有使用過(guò)線程編碼,學(xué)以致用后本著加深理解的想法,將這個(gè)過(guò)程記錄下來(lái),這里先記下用于線程間通信的隊(duì)列Queue的相關(guān)知識(shí)。
Python2中隊(duì)列庫(kù)名稱(chēng)為Queue,Python3中已改名為queue,項(xiàng)目使用Python2.7.5版本,自然是使用Queue。
Queue模塊中提供了同步的、線程安全的隊(duì)列類(lèi),包括FIFO(先入先出)隊(duì)列Queue,LIFO(后入先出)隊(duì)列LifoQueue,和優(yōu)先級(jí)隊(duì)列PriorityQueue。這些隊(duì)列都實(shí)現(xiàn)了鎖原語(yǔ),可在多線程通信中直接使用。
Queue模塊定義了以下類(lèi)及異常,在隊(duì)列類(lèi)中,maxsize限制可入隊(duì)列數(shù)據(jù)的數(shù)量,值小于等于0時(shí)代表不限制:
Queue.Queue(maxsize=0) FIFO隊(duì)列
Queue.LifoQueue(maxsize=0) LIFO隊(duì)列
Queue.PriorityQueue(maxsize=0) 優(yōu)先級(jí)隊(duì)列
Queue.Empty TODO
Queue.Full
Queue(Queue、LifoQueue、PriorityQueue)對(duì)象提供以下方法:
Queue.qsize()
返回隊(duì)列大小,但是不保證qsize() > 0時(shí),get()不會(huì)阻塞;也不保證qsize() < maxsize時(shí),put()不會(huì)阻塞。
Queue.empty()
返回True時(shí),不保證put()時(shí)不會(huì)阻塞;返回False時(shí)不保證get()不會(huì)阻塞。
Queue.full()
返回True時(shí),不保證get()時(shí)不會(huì)阻塞;返回False時(shí)不保證put()不會(huì)阻塞。
Queue.put(item[, block[, timeout]])
block默認(rèn)值為False,指定為T(mén)rue時(shí)代表可以阻塞,若同時(shí)指定timeout,在超時(shí)時(shí)返回Full exception。
Queue.put_nowait(item)
等同put(item, False)
Queue.get([block[, timeout]])
Queue.get_nowait()
等同get(item, False)
Queue.task_done()
消費(fèi)者線程調(diào)用。調(diào)用get()后,可調(diào)用task_done()告訴隊(duì)列該任務(wù)已經(jīng)處理完畢。
如果當(dāng)前一個(gè)join()正在阻塞,它將在隊(duì)列中的所有任務(wù)都處理完時(shí)恢復(fù)執(zhí)行(即每一個(gè)由put()調(diào)用入隊(duì)的任務(wù)都有一個(gè)對(duì)應(yīng)的task_done()調(diào)用)。
Queue.join()
阻塞調(diào)用線程,直到隊(duì)列中的所有任務(wù)被處理掉。
只要有數(shù)據(jù)被加入隊(duì)列,未完成的任務(wù)數(shù)就會(huì)增加。當(dāng)消費(fèi)者線程調(diào)用task_done()(意味著有消費(fèi)者取得任務(wù)并完成任務(wù)),未完成的任務(wù)數(shù)就會(huì)減少。當(dāng)未完成的任務(wù)數(shù)降到0,join()解除阻塞。
UpdateThread是單一消費(fèi)者進(jìn)程,獲取FIFO隊(duì)列中的數(shù)據(jù)處理,GrpcThread是multi生產(chǎn)者線程,需要對(duì)往隊(duì)列中丟數(shù)據(jù)這個(gè)操作加鎖保證數(shù)據(jù)先后順序。
import threading import Queue import time q = Queue.Queue() q_lock = threading.Lock() class UpdateThread(threading.Thread): def __init__(self): super(self.__class__, self).__init__() self.setName(self.__class__.__name__) self._setName = self.setName @staticmethod def update_stat(): global q while not q.empty(): stat = q.get() print "Update stat (%s) in db" % stat def run(self): while True: self.update_stat() time.sleep(0.1) class GrpcThread(threading.Thread): def compose_stat(self, stat): global q q_lock.acquire() q.put("%d: %s" % (stat, self.name)) q_lock.release() return def run(self): for i in range(10): self.compose_stat(i) time.sleep(0.1) def launch_update_thread(): UpdateThread().start() if __name__ == "__main__": launch_update_thread() thread1 = GrpcThread() thread2 = GrpcThread() thread1.start() thread2.start()
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/44586.html
摘要:在生產(chǎn)者與消費(fèi)者之間的緩沖區(qū)稱(chēng)之為倉(cāng)庫(kù)。生產(chǎn)者負(fù)責(zé)往倉(cāng)庫(kù)運(yùn)輸商品,而消費(fèi)者負(fù)責(zé)從倉(cāng)庫(kù)里取出商品,這就構(gòu)成了生產(chǎn)者消費(fèi)者模式。中的多線程編程在實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式之前,我們先學(xué)習(xí)下中的多線程編程。 什么是生產(chǎn)者消費(fèi)者模式 在軟件開(kāi)發(fā)的過(guò)程中,經(jīng)常碰到這樣的場(chǎng)景:某些模塊負(fù)責(zé)生產(chǎn)數(shù)據(jù),這些數(shù)據(jù)由其他模塊來(lái)負(fù)責(zé)處理(此處的模塊可能是:函數(shù)、線程、進(jìn)程等)。產(chǎn)生數(shù)據(jù)的模塊稱(chēng)為生產(chǎn)者,而處理數(shù)據(jù)...
摘要:介紹今天花了近乎一天的時(shí)間研究關(guān)于多線程的問(wèn)題,查看了大量源碼自己也實(shí)踐了一個(gè)生產(chǎn)消費(fèi)者模型,所以把一天的收獲總結(jié)一下。提供了兩個(gè)模塊和來(lái)支持的多線程操作。使用來(lái)阻塞線程。 介紹 今天花了近乎一天的時(shí)間研究python關(guān)于多線程的問(wèn)題,查看了大量源碼 自己也實(shí)踐了一個(gè)生產(chǎn)消費(fèi)者模型,所以把一天的收獲總結(jié)一下。 由于GIL(Global Interpreter Lock)鎖的關(guān)系,純的p...
摘要:分布式進(jìn)程在和中,應(yīng)當(dāng)優(yōu)選,因?yàn)楦€(wěn)定,而且,可以分布到多臺(tái)機(jī)器上,而最多只能分布到同一臺(tái)機(jī)器的多個(gè)上。由于模塊封裝很好,不必了解網(wǎng)絡(luò)通信的細(xì)節(jié),就可以很容易地編寫(xiě)分布式多進(jìn)程程序。 分布式進(jìn)程 在Thread和Process中,應(yīng)當(dāng)優(yōu)選Process,因?yàn)镻rocess更穩(wěn)定,而且,Process可以分布到多臺(tái)機(jī)器上,而Thread最多只能分布到同一臺(tái)機(jī)器的多個(gè)CPU上。 Pytho...
摘要:批評(píng)的人通常都會(huì)說(shuō)的多線程編程太困難了,眾所周知的全局解釋器鎖,或稱(chēng)使得多個(gè)線程的代碼無(wú)法同時(shí)運(yùn)行。多線程起步首先讓我們來(lái)創(chuàng)建一個(gè)名為的模塊。多進(jìn)程可能比多線程更易使用,但需要消耗更大的內(nèi)存。 批評(píng) Python 的人通常都會(huì)說(shuō) Python 的多線程編程太困難了,眾所周知的全局解釋器鎖(Global Interpreter Lock,或稱(chēng) GIL)使得多個(gè)線程的 Python 代碼無(wú)...
閱讀 3951·2021-09-27 13:56
閱讀 950·2021-09-08 09:36
閱讀 894·2019-08-30 15:54
閱讀 679·2019-08-29 17:29
閱讀 997·2019-08-29 17:21
閱讀 1768·2019-08-29 16:59
閱讀 2850·2019-08-29 13:03
閱讀 3069·2019-08-29 12:47