摘要:進(jìn)程線程切換都需要使用一定的時間。子進(jìn)程在中,如果要運(yùn)行系統(tǒng)命令,會使用來運(yùn)行,官方建議使用方法來運(yùn)行系統(tǒng)命令,更高級的用法是直接使用其接口。
多線程 簡單示例
對于CPU計算密集型的任務(wù),python的多線程跟單線程沒什么區(qū)別,甚至有可能會更慢,但是對于IO密集型的任務(wù),比如http請求這類任務(wù),python的多線程還是有用處。在日常的使用中,經(jīng)常會結(jié)合多線程和隊列一起使用,比如,以爬取simpledestops 網(wǎng)站壁紙為例:
import os from datetime import datetime from queue import Queue from threading import Thread import requests requests.packages.urllib3.disable_warnings() from bs4 import BeautifulSoup import re if not os.path.exists("img"): os.mkdir("img") # 聲明一個隊列 Q = Queue() def producer(pages): for page in range(1,pages+1): # 提取每一頁的圖片 url 加入隊列 print("[-] 收集第 {} 頁".format(str(page))) url = "http://simpledesktops.com/browse/"+str(page)+"/" r = requests.get(url,verify=False) html = r.text soup = BeautifulSoup(html,"html.parser") try: imgs = soup.find_all("img") for img in imgs: img_url = img["src"] Q.put(img_url) except: pass def worker(i): # 取出隊列的值,按順序取,下載圖片 while not Q.empty(): img_url = Q.get() text = re.search("(http://static.simpledesktops.com/uploads/desktops/d+/d+/d+/(.*?png)).*?png",img_url) new_img_url = text.group(1) r = requests.get(new_img_url,verify=False) path = "img/"+text.group(2) print("[-] 線程 {} 開始下載 {} 開始時間:{}".format(i,text.group(2),datetime.now())) with open(path,"wb") as f: f.write(r.content) Q.all_tasks_done if __name__ =="__main__": # 一定要將數(shù)據(jù)加入隊列,否則是啟動不了的,因?yàn)殛犃袨榭? producer(50) # 線程的聲明 ts = [Thread(target=worker,args=(i,)) for i in range(50)] for t in ts: t.start() for t in ts: t.join()
我們使用start啟動多線程,使用 join 防止主線程退出的時候結(jié)束所有的線程,使用隊列有序的且并發(fā)的下載壁紙。 仔細(xì)觀察就會發(fā)現(xiàn)代碼其實(shí)有跡可循,更改其中的爬取內(nèi)容的部分代碼后,我們就可以應(yīng)用于爬取別的網(wǎng)站。
ThreadLocal按照道理來說,多線程中,每個線程的處理邏輯應(yīng)該是相同的,但是其處理的數(shù)據(jù),卻不一定是相同的,如果數(shù)據(jù)是全局的,那么我們就需要加鎖,防止數(shù)據(jù)混亂,這樣一來就會麻煩很多,所以線程處理的數(shù)據(jù)最好是局部的、其他線程不能干擾的。
代碼示例:
# coding: utf-8 import threading,time import requests requests.packages.urllib3.disable_warnings() from datetime import datetime local_variable = threading.local() # 邏輯處理函數(shù) def worker(): print("每個線程啟動的時間: ",datetime.now()) time.sleep(10) url = local_variable.url r = requests.get(url,verify=False) print(r.url,datetime.strftime(datetime.now(),"%H:%M:%S"),threading.current_thread().name) # 線程處理函數(shù) def process_thread(url): local_variable.url = url worker() if __name__ == "__main__": ts = [threading.Thread(target=process_thread,args=(url,))for url in ["https://www.baidu.com","https://www.google.com","https://www.bing.com"]] for t in ts: t.start() for t in ts: t.join()
輸出:
線程Thread-1 啟動的時間:2019-01-09 11:25:18.339631 線程Thread-2 啟動的時間:2019-01-09 11:25:18.340646 線程Thread-3 啟動的時間:2019-01-09 11:25:18.342635 https://www.baidu.com/ 11:25:28 Thread-1 https://cn.bing.com/ 11:25:29 Thread-3 https://www.google.com/ 11:25:29 Thread-2多進(jìn)程 進(jìn)程池
python中使用 multiprocessing 來創(chuàng)建多進(jìn)程,如果要創(chuàng)建多個子進(jìn)程,則需要使用 進(jìn)程池 Pool 來創(chuàng)建,一個簡單的例子:
from multiprocessing import Pool import os from datetime import datetime """ @param {type} int @return: None """ def print_num(i): print("進(jìn)程{} 打印 {}".format(os.getpid(),i)) if __name__ == "__main__": p = Pool(4) for i in range(100): p.apply_async(print_num,args=(i,)) # 關(guān)閉進(jìn)程池,不再加入進(jìn)程 p.close() # 防止主進(jìn)程結(jié)束,子進(jìn)程無法繼續(xù)運(yùn)行 p.join()
輸出:
進(jìn)程2624 打印 0 進(jìn)程2625 打印 1 進(jìn)程2626 打印 3 進(jìn)程2627 打印 2 進(jìn)程2624 打印 4 進(jìn)程2625 打印 5 進(jìn)程2626 打印 6 進(jìn)程2627 打印 7 進(jìn)程2624 打印 8 ...
進(jìn)程可以實(shí)現(xiàn)并行運(yùn)行代碼,但是一旦進(jìn)程太多,CPU運(yùn)行不過來也是需要進(jìn)行等待,用了多進(jìn)程以后,就可以不使用隊列了,也可以實(shí)現(xiàn)多線程的效果
除此之外,還可以多進(jìn)程和多線程結(jié)合起來使用,一個簡單的例子
from multiprocessing import Pool import threading import os,time import queue from datetime import datetime def producer(i): Q = queue.Queue() start = 25*(i-1) end = 100 * int(i / 4) for x in range(start,end): Q.put(x) return Q def process_thread(Q,j): while not Q.empty(): item = Q.get() print("進(jìn)程{}: 線程{} 正在消耗:{} 時間:{}".format(os.getpid(),j,item,datetime.now())) Q.all_tasks_done def tasks(i): Q = producer(i) ts = [threading.Thread(target=process_thread,args=(Q,j)) for j in range(10)] for t in ts: t.start() for t in ts: t.join() if __name__ == "__main__": start = datetime.now() p = Pool(4) for i in range(1,5): print(i) p.apply_async(tasks,args=(i,)) p.close() p.join() end = datetime.now() waste = end-start print("一共花費(fèi)了: {}".format(waste))
先將要處理的數(shù)據(jù),填進(jìn)隊列,然后創(chuàng)建4個進(jìn)程,10個線程運(yùn)行。 其輸出為:
""" (venv) C:projectlibraries-python>python bulit-in-libraries hreadingmultithreading.py 進(jìn)程17020: 線程0 正在消耗:1 時間:2019-01-09 12:50:48.701523 進(jìn)程17020: 線程1 正在消耗:2 時間:2019-01-09 12:50:48.703521 進(jìn)程17020: 線程3 正在消耗:4 時間:2019-01-09 12:50:48.704365 進(jìn)程17020: 線程2 正在消耗:3 時間:2019-01-09 12:50:48.704365 進(jìn)程2804: 線程0 正在消耗:5 時間:2019-01-09 12:50:48.706349 進(jìn)程2804: 線程1 正在消耗:6 時間:2019-01-09 12:50:48.707352 進(jìn)程2804: 線程4 正在消耗:9 時間:2019-01-09 12:50:48.708355 進(jìn)程2804: 線程3 正在消耗:8 時間:2019-01-09 12:50:48.708355 進(jìn)程2804: 線程2 正在消耗:7 時間:2019-01-09 12:50:48.708355 進(jìn)程16060: 線程0 正在消耗:10 時間:2019-01-09 12:50:48.728409 進(jìn)程16060: 線程1 正在消耗:11 時間:2019-01-09 12:50:48.730413 進(jìn)程16060: 線程4 正在消耗:14 時間:2019-01-09 12:50:48.732418 進(jìn)程16060: 線程3 正在消耗:13 時間:2019-01-09 12:50:48.732418 進(jìn)程16060: 線程2 正在消耗:12 時間:2019-01-09 12:50:48.732418 進(jìn)程7588: 線程3 正在消耗:18 時間:2019-01-09 12:50:48.761808 進(jìn)程7588: 線程4 正在消耗:19 時間:2019-01-09 12:50:48.761808 進(jìn)程7588: 線程0 正在消耗:15 時間:2019-01-09 12:50:48.761808 進(jìn)程7588: 線程1 正在消耗:16 時間:2019-01-09 12:50:48.761808 進(jìn)程7588: 線程2 正在消耗:17 時間:2019-01-09 12:50:48.761808
后來實(shí)驗(yàn)了打印出10萬個數(shù),4個進(jìn)程,每個進(jìn)程400個線程,花費(fèi)了39秒。而400個線程,只花費(fèi)了17秒。所以有時候,也并不是多就是好。進(jìn)程線程切換都需要使用一定的時間。
子進(jìn)程在python中,如果要運(yùn)行系統(tǒng)命令,會使用 subprocess 來運(yùn)行,官方建議使用run 方法來運(yùn)行系統(tǒng)命令,更高級的用法是直接使用其 Popen 接口。
其函數(shù)格式為:
subprocess.run(args,?*,?stdin=None,?input=None,?stdout=None,?stderr=None,?capture_output=False,?shell=False,?cwd=None,?timeout=None,?check=False,?encoding=None,?errors=None,?text=None,?env=None,?universal_newlines=None)
可以看幾個簡單的例子:
直接使用import subprocess subprocess.run(["ls","-al"])
在python3.7 之前,默認(rèn)系統(tǒng)命令執(zhí)行的結(jié)果(輸出/錯誤)不存在stdout/stderr 里面,需要設(shè)置 capture_output=True,而在python3.6 版本,如果你需要使用執(zhí)行的結(jié)果,你就需要設(shè)置 stdout. 如下所示
# python 3.6 >>> a = subprocess.run(["ls","-al"],stdout=subprocess.PIPE) >>> a.stdout # python3.7 >>> a = subprocess.run(["ls","-al"],capture_output=True) >>> a.stdout
所以可以看出python3.7 又做了一層封裝,為了讓大家使用更上一層的接口。可以看一下幾個參數(shù)的含義為:
args 列表,為shell命令 shell boolean值, 設(shè)置后,args可以直接接受shell命令 capture_output = True , 設(shè)置后,stdout/stderr會存儲值 check=True, 設(shè)置后,如果程序異常退出,會跑出一個CalledProcessError異常 cwd 是工作目錄,可以為str,或者path-like 類高級使用
Popen的構(gòu)造函數(shù):
class?subprocess.Popen(args,?bufsize=-1,?executable=None,?stdin=None,?stdout=None,?stderr=None,?preexec_fn=None,?close_fds=True,?shell=False,?cwd=None,?env=None,?universal_newlines=False,?startupinfo=None,?creationflags=0,?restore_signals=True,?start_new_session=False,?pass_fds=(),?*,?encoding=None,?errors=None)
一個簡單的例子
p = subprocess.Popen(["ls","-al"],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
其次,通過Popen.communicate() ,子進(jìn)程可以在啟動了以后,還可以進(jìn)行參數(shù)的輸入
import subprocess print("$ nslookup") p = subprocess.Popen(["nslookup"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, err = p.communicate(b"set q=mx python.org exit ") print(output.decode("utf-8")) print("Exit code:", p.returncode) 其輸出: $ nslookup Server: 192.168.19.4 Address: 192.168.19.4#53 Non-authoritative answer: python.org mail exchanger = 50 mail.python.org. Authoritative answers can be found from: mail.python.org internet address = 82.94.164.166 mail.python.org has AAAA address 2001:888:2000:d::a6 Exit code: 0分布式多進(jìn)程
python的分布式接口簡單,使用起來也十分簡單,可以參考廖雪峰的教程,需要的時候,修改代碼,即可完成屬于自己的分布式程序
這里貼出代碼:
# master import random,time,queue from multiprocessing.managers import BaseManager task_queue = queue.Queue() result_queue = queue.Queue() class QueueManager(BaseManager): pass QueueManager.register("get_task_queue",callable=lambda:task_queue) QueueManager.register("get_result_queue",callable=lambda:result_queue) manager = QueueManager(address=("",5000),authkey=b"abc") manager.start() tasks = manager.get_task_queue() results = manager.get_result_queue() for i in range(10): n = random.randint(0,10000) print("put task {}".format(n)) tasks.put(n) print("try get results...") for i in range(10): r = results.get(timeout=100) print("result:{}".format(r)) manager.shutdown() print("master exit") # worker import time,sys,queue from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass QueueManager.register("get_task_queue") QueueManager.register("get_result_queue") # master的主機(jī)地址 server_addr = "127.0.0.1" print("connect to server...") m = QueueManager(address=(server_addr,5000),authkey=b"abc") m.connect() tasks = m.get_task_queue() results = m.get_result_queue() for i in range(10): try: n = tasks.get(timeout=1) print("run task %d * %d..." % (n, n)) r = "{} * {} = {}".format(n,n,n*n) time.sleep(1) results.put(r) except Queue.Empty: print("task queue is empty.") print("worker exit.")參考
https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001431929340191970154d52b9d484b88a7b343708fcc60000
https://docs.python.org/3.6/library/subprocess.html
https://docs.python.org/3.7/library/subprocess.html
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.hztianpu.com/yun/42965.html
摘要:之前中提過,并發(fā)的時候,可能造成死循環(huán),那么在多線程中可以用來避免這一情況。默認(rèn),當(dāng)容量大于時,開始擴(kuò)容并發(fā)數(shù),默認(rèn),直接影響和的值,以及的初始化數(shù)量。初始化的數(shù)量,為最接近且大于的辦等于的次方的值,比如,數(shù)量為,,數(shù)量為。 之前HashMap中提過,并發(fā)的時候,可能造成死循環(huán),那么在多線程中可以用ConcurrentHashMap來避免這一情況。 Segment Concurrent...
摘要:如果兩個線程存取相同的對象,并且每一個線程都調(diào)用一個修改該對象狀態(tài)的方法,根據(jù)線程訪問數(shù)據(jù)的順序,可能會出現(xiàn)錯誤的數(shù)據(jù)結(jié)果,這種現(xiàn)象成為條件競爭。而問題往往就是有多個線程同時在執(zhí)行步驟。內(nèi)部鎖有如下的特點(diǎn)不能中斷正在試圖獲得鎖的線程。 【條件競爭 在多線程的開發(fā)中,兩個及其以上的線程需要共享統(tǒng)一數(shù)據(jù)的存取。如果兩個線程存取相同的對象,并且每一個線程都調(diào)用一個修改該對象狀態(tài)的方法,根據(jù)線...
摘要:無狀態(tài)的是線程安全的,當(dāng)無狀態(tài)變?yōu)橛袪顟B(tài)時就是不安全的破壞了線程的安全性,非原子性操作競態(tài)條件在并發(fā)編程中,由于不恰當(dāng)?shù)膱?zhí)行時序而出現(xiàn)的不正確結(jié)果是一種非常重要的情況,被稱之為競態(tài)條件。重入意味著獲取鎖的操作的粒度是線程,而不是調(diào)用。 這本書的內(nèi)容是什么? 本書提供了各種實(shí)用的設(shè)計規(guī)則,用于幫助開發(fā)人員創(chuàng)建安全的和高性能的并發(fā)類。 什么類是線程安全的? 當(dāng)多個線程訪問某...
摘要:大家好,我是冰河有句話叫做投資啥都不如投資自己的回報率高。馬上就十一國慶假期了,給小伙伴們分享下,從小白程序員到大廠高級技術(shù)專家我看過哪些技術(shù)類書籍。 大家好,我是...
摘要:所以這情況下,當(dāng)線程操作變量的時候,變量并不對線程可見。總結(jié),緩存引發(fā)的可見性問題,切換線程帶來的原子性問題,編譯帶來的有序性問題深刻理解這些前因后果,可以診斷大部分并發(fā)的問題 背景介紹 如何解決并發(fā)問題,首先要理解并發(fā)問題的實(shí)際源頭怎么發(fā)生的。 現(xiàn)代計算機(jī)的不同硬件的運(yùn)行速度是差異很大的,這個大家應(yīng)該都是知道的。 計算機(jī)數(shù)據(jù)傳輸運(yùn)行速度上的快慢比較: CPU > 緩存 > I/O ...
摘要:我們以請求網(wǎng)絡(luò)服務(wù)為例,來實(shí)際測試一下加入多線程之后的效果。所以,執(zhí)行密集型操作時,多線程是有用的,對于密集型操作,則每次只能使用一個線程。說到這里,對于密集型,可以使用多線程或者多進(jìn)程來提高效率。 為了提高系統(tǒng)密集型運(yùn)算的效率,我們常常會使用到多個進(jìn)程或者是多個線程,python中的Threading包實(shí)現(xiàn)了線程,multiprocessing 包則實(shí)現(xiàn)了多進(jìn)程。而在3.2版本的py...
閱讀 3700·2023-04-25 16:35
閱讀 808·2021-10-11 11:09
閱讀 6415·2021-09-22 15:11
閱讀 3423·2019-08-30 14:03
閱讀 2661·2019-08-29 16:54
閱讀 3413·2019-08-29 16:34
閱讀 3135·2019-08-29 12:18
閱讀 2232·2019-08-28 18:31