計算機已進入多 CPU 或多核時代,我們所使用的作業系統都是支持'多任務'的作業系統,可以同時運行多個程序,也可以將一個程序分解為若干個相對獨立的子任務,讓多個子任務併發執行,從而縮短程序的執行時間。
行程 (process) 是作業系統中執行的一個程序,作業系統以行程 (process) 為單位分配存儲空間,每個行程 (process) 都有自己的地址空間、數據棧以及其他用於跟踪進程執行的輔助數據,作業系統管理所有行程 (process) 的執行,為它們合理的分配資源。
行程 (process) 可以通過 fork 或 spawn 的方式來創建新的行程 (process) 來執行其他的任務,不過新的行程 (process) 也有自己獨立的內存空間,因此必須通過行程間通信機制(IPC,Inter-Process Communication)來實現數據共享,具體的方式包括管道、訊號、套接字、共享內存區等。
一個行程 (process) 還可以擁有多個併發的執行線索,簡單的說就是擁有多個可以獲得 CPU調度的執行單元,就是所謂的線程 (thread)。由於線程 (thread) 在同一個行程 (process) 下,它們可以共享相同的上下文,因此相對於行程 (process) 而言,線程 (thread) 間的訊息共享和通信更加容易。
用多線程 (multithreading)實現併發編程在升程序的性能,多線程 (multithreading)的程序對其他程序並不友好,因為它佔用了更多的 CPU 執行時間,導致其他程序無法獲得足夠的 CPU 執行時間。
Python 既支持多行程 (Multi-Process) 又支持多線程 (Multithreading),因此使用 Python 實現併發編程主要有3種方式:多進程 (Multi-Process)、多線程 (Multithreading)、多進程 (Multi-Process) + 多線程 (Multithreading)。
Unix 和 Linux 作業系統上提供了 fork() 系統調用來創建行程 (process),調用 fork() 函數的是父行程 (process),創建出的是子行程 (process),子行程 (process)是父行程 (process) 的一個拷貝,但是子行程 (process) 擁有自己的 PID。
fork() 函數非常特殊它會返回兩次,父行程 (process) 中可以通過 fork() 函數的返回值得到子行程 (process) 的 PID,而子行程 (process) 中的返回值永遠都是 0。
Python 的 os 模組提供了 fork() 函數。由於 Windows 系統沒有 fork() 調用,因此要實現跨平台的多行程 (Multi-Process) 編程,可以使用 multiprocessing 模組的 Process 類別來創建子行程 (process),而且該模組還提供了更高級的封裝,例如批量啟動行程的行程池 (Pool)、用於行程間通信的佇列 (Queue) 和管道 (Pipe) 等。
下面用一個下載文件的例子來說明使用多行程 (Multi-Process) 和不使用到底有什麼差別。
from random import randint
from time import time, sleep
def download_task(filename):
print('開始下載 %s...' % filename)
time_to_download = randint(5, 10)
sleep(time_to_download)
print('%s 下載完成! 耗費了 %d 秒' % (filename, time_to_download))
def main():
start = time()
download_task('Python.pdf')
download_task('Hot.avi')
end = time()
print('總共耗費了 %.2f 秒.' % (end - start))
if __name__ == '__main__':
main()
從上面的例子可以看出,程式運行只能按順序一行一行往下執行,即使執行兩個毫不相關的下載任務,也需要先等待一個文件下載完成後才能開始下一個下載任務,顯然很沒有效率。
使用多行程 (Multi-Process) 的方式將兩個下載任務放到不同的行程 (process) 中。
from multiprocessing import Process
from os import getpid
from random import randint
from time import time, sleep
def download_task(filename):
print('啟動下載行程,行程號[ %d ].' % getpid())
print('開始下載 %s...' % filename)
time_to_download = randint(5, 10)
sleep(time_to_download)
print('%s 下載完成! 耗費了 %d 秒' % (filename, time_to_download))
def main():
start = time()
# 利用 Process 類別創建了行程物件
# 透過 target 參數傳入一個函數來表示進程啟動後要執行的程式碼
# 後面的 args 代表傳遞給函數的參數
# Process 物件的 start 方法 (method) 用來啟動行程 (process)
# 而 join 方法 (method) 表示等待行程 (process) 執行結束。
p1 = Process (target = download_task, args = ('Python.pdf', ))
p1.start()
p2 = Process (target = download_task, args = ('Hot.avi', ))
p2.start()
p1.join()
p2.join()
end = time()
print('總共耗費了 %.2f 秒.' % (end - start))
if __name__ == '__main__':
main()
運行上面的程式碼可以明顯發現兩個下載任務'同時'啟動了,而且程序的執行時間將大大縮短,不再是兩個任務的時間總和。
文中還有提到行程間的通訊,但是並非講得很清楚,所以我就不多寫這一部份了。
如果有興趣的人,可以自己上網查。
目前的多線程 (Multithreading) 開發使用 threading 模組,該模組對多線程 (Multithreading) 編程提供物件導向的封裝。
利用剛才下載文件的例子用多線程 (Multithreading) 的方式實現。
from random import randint
from threading import Thread
from time import time, sleep
def download(filename):
print('開始下載 %s...' % filename)
time_to_download = randint(5, 10)
sleep(time_to_download)
print('%s 下載完成! 耗費了 %d 秒' % (filename, time_to_download))
def main():
start = time()
# 使用 threading 模組的 Thread 類別來創建線程 (thread)
t1 = Thread (target = download, args =('Python.pdf',))
t1.start()
t2 = Thread (target = download, args =('Hot.avi',))
t2.start()
t1.join()
t2.join()
end = time()
print('總共耗費了 %.3f 秒.' % (end - start))
if __name__ == '__main__':
main()
可以透過繼承 Thread 類的方式來創建自定義的線程類別,然後再創建線程物件並啟動線程 (thread)。
from random import randint
from threading import Thread
from time import time, sleep
class DownloadTask(Thread): # 繼承 Tread
def __init__(self, filename):
super().__init__()
self._filename = filename
def run(self):
print('開始下載 %s...' % self._filename)
time_to_download = randint(5, 10)
sleep(time_to_download)
print('%s 下載完成! 耗費了 %d 秒' % (self._filename, time_to_download))
def main():
start = time()
t1 = DownloadTask('Python.pdf')
t1.start()
t2 = DownloadTask('Hot.avi')
t2.start()
t1.join()
t2.join()
end = time()
print('總共耗費了 %.3f 秒.' % (end - start))
if __name__ == '__main__':
main()
因為多個線程 (thread) 可以共享行程 (process) 的內存空間,因此要實現多個線程間的通訊相對簡單,最直接的辦法就是設置一個全域變數,多個線程共享這個全域變數即可。
但當多個線程共享同一個變數(通常稱之為'資源')時,很有可能產生不可控的結果從而導致程序失效甚至崩潰。
如果一個資源被多個線程競爭使用,通常稱之為'臨界資源',對臨界資源的訪問需要加上保護,否則資源會處於'混亂'的狀態。
下面的例子展示 100 個線程向同一個銀行帳戶轉賬(轉入 1 元錢)的場景。
銀行帳戶是一個臨界資源,在沒有保護的情況下很有可能會得到錯誤的結果。
from time import sleep
from threading import Thread
class Account(object):
def __init__(self):
self._balance = 0
def deposit (self, money):
new_balance = self._balance + money # 存款後餘額
sleep(0.01) # 模擬受理存款需要 0.01 秒的時間
self._balance = new_balance # 修改帳戶餘額
@property
def balance(self):
return self._balance
class AddMoneyThread(Thread):
def __init__ (self, account, money):
super().__init__()
self._account = account
self._money = money
def run(self):
self._account.deposit(self._money)
def main():
account = Account()
threads = []
# 創建 100 個存款的線程 (thread) 向同一個帳戶中存錢
for _ in range(100):
t = AddMoneyThread(account, 1)
threads.append(t)
t.start()
# 等所有存款的線程 (thread) 都執行完畢
for t in threads:
t.join()
print('帳戶餘額為 : %d 元' % account.balance)
if __name__ == '__main__':
main()
運行上面的程式碼,100 個線程 (thread) 分別向帳戶中轉入 1 元錢,結果居然小於 100元。之所以出現這種情況是因為沒有對銀行帳戶這'臨界資源'加以保護,多個線程 (thread) 同時向帳戶中存錢時,會一起執行到 new_balance = self._balance + money
這行,多個線程 (thread) 得到的帳戶餘額都是初始狀態下的 0,所以都是 0 做了 +1 ,因此得到了錯誤的結果。
在這種情況下,可以透過'鎖'來保護'臨界資源',只有獲得'鎖'的線程 (thread) 才能訪問'臨界資源',而其他沒有得到'鎖'的線程 (thread) 只能被阻塞起來,直到獲得鎖的線程 (thread) 釋放了'鎖',其他線程才有機會獲得'鎖',進而訪問被保護的'臨界資源'。
下面的程式碼展示如何使用'鎖'來保護對銀行帳戶的操作,從而獲得正確的結果。
from time import sleep
from threading import Thread, Lock
class Account(object):
def __init__(self):
self._balance = 0
self._lock = Lock()
def deposit (self, money):
# 先獲取所才能執行後續後續的程式碼
self._lock.acquire()
try:
new_balance = self._balance + money
sleep(0.01)
self._balance = new_balance
finally:
# 在 finally 中執行釋放鎖保證正常異常鎖都能釋放
self._lock.release()
@property
def balance(self):
return self._balance
class AddMoneyThread(Thread):
def __init__ (self, account, money):
super().__init__()
self._account = account
self._money = money
def run(self):
self._account.deposit(self._money)
def main():
account = Account()
threads = []
for _ in range(100):
t = AddMoneyThread(account, 1)
threads.append(t)
t.start()
for t in threads:
t.join()
print('帳戶餘額為 : %d 元' % account.balance)
if __name__ == '__main__':
main()
因為今天時間不太夠,所以我將應用範例移到明天,再繼續加油!!!