如果一個應用程式想要使用多個 CPU 的核心 (thread),並且使用了 System call,將會觸發 trap,而多個核心使用 System call,並行的存取一些位於 kernel space 中的資料結構,而當並行的存取一個資料結構時,例如一個核心在讀取某個資料結構,而另外一個核心在寫入某一個資料結構,為了確保資料的一致性,我們需要鎖 (lock) 來進行確保。lock 的作用是確保在並行情況下的資料正確性。
我們使用並行的方式,讓多個核心同時執行一個 process 是為了提高效能,但假設我們使用了 System call 或是共享了一些資料,我們為了確保資料的正確性,我們會需要使用 lock 來確保資料的正確性,沒有使用鎖可能會產生出 race condition 的問題,但是使用 lock 卻會降低效率,而這就和我們使用並行處理的初衷產生出了矛盾。
基於效率,我們需要並行處理,但是基於正確性,我們需要 lock。
race condition 表示當一個系統或是某個 process 的輸出不按照某個事件或是出現的順序決定,例如如果有兩個 process 嘗試對同一塊記憶體區塊進行修改,在沒有適當的並行控制,如 lock 控制的條件下,最終的結果就會變成看哪一個 process 先執行,從而導致可能每一次的結果都不盡相同。我們試著在 xv6 中創造出 race condition,查看 race condition 可能發生得情況。
我們看到下方的 kfree()
,參數為 pa,表示物理記憶體。kfree()
的作用為釋放掉 pa 指向的記憶體分頁。
void
kfree(void *pa)
{
struct run *r;
if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
panic("kfree");
// Fill with junk to catch dangling refs.
memset(pa, 1, PGSIZE);
r = (struct run*)pa;
acquire(&kmem.lock);
r->next = kmem.freelist;
kmem.freelist = r;
release(&kmem.lock);
}
我們在kfree()
中看到了兩個涉及 lock 的操作,一個為acquire()
,另外一個則為release()
。
在kfree()
中可以看到一個名為 freelist 的資料結構,所有可用的記憶體分頁會儲存在 freelist 上,當 kalloc()
需要分配一個記憶體分頁時,可以從 freelist 上獲取可用的記憶體分頁。
可以看到對 freelist 的一些操作有受到 lock 的控制,使用acquire()
後才對 freelist 以及 kmem 進行一些操作,之後release()
釋放了 lock,保證了操作的原子性,而為了破壞原子性,以及產生出 race conditon,我們將acquire()
和release()
註解掉,觀察發生 race condition 可能會發生的輸出。
我們可以使用 usertests 這個指令來進行測試,位於/user/usertests
可以發現出現了一些問題,可以猜測 race condition 在 kfree 的情況下可能會發生以下:
如果兩個核心同時呼叫kfree()
對 freelist 進行操作,可能已經釋放的記憶體分頁被再度釋放,而其他的函式可能也會產生出類似的情況。
以圖表式如下,freelist 位於記憶體中,而有兩個 CPU 核心共享這一塊記憶體,假設 freelist 上面目前可以使用的記憶體分頁共有兩個,而這兩個核心同時呼叫了 kfree。
freelist 如圖所示,為單向鏈結串列,kfree()
會將傳入的pa
加入到 freelist 上,pa
作為 freelist 新的 head 節點,而原本的 freelist 指向到新 head 節點的下一個節點,完成將pa
加入 freelist 的操作。pa
指向的記憶體分頁被釋放,並加入到 freelist 上。
假設兩個核心同時執行kfree()
,如果 hart 0 執行r->next = kmem.freelist;
也就是將釋放的記憶體分頁的下一個節點指向到目前的 freelist,如果再 hart 0 執行kmem.freelist = r;
之前,hart 1 執行了r->next = kmem.freelist;
,那麼 hart 0 和 hart 1 便指向到同一個 freelist 上,如下圖所示
接下來,hart 0 或是 hart 1 或執行 kmem.freelist = r;
,如果 hart 0 先執行了kmem.freelist = r;
,那麼現在 freelist 為以下
r (hart 0) -> free list
接著 hart 1 執行 kmem.freelist = r;
,便會產生出以下狀況
r (hart 1) -> free list
這時候我們便丟掉了 hart 0 所對應到的 page,hart 0 傳入 pa 對應到的記憶體分頁最終沒有加入到 freelist 中。而目前這是兩個 hart 的情況,如果在 xv6 支援的最大 hart 數,也就是 8 個 hart,可以想像到我們可能會遺失更多的記憶體分頁。這就是 race condition 所造成的問題。
而在這個情況下,程式會依據哪一個 process 或是 thread 先開始執行而決定其產生的結果,這會導致程式的執行結果是不確定的 (indeterminate)。
而在上面這個問題中,我們也可以發現到一件事情
r->next = kmem.freelist;
kmem.freelist = r;
上面這個操作不具有原子性,也就是某一個操作只做到了一半便結束,而不是一次完成所有操作或是乾脆不完成任何操作,因此如果我們能夠確保一個操作的原子性,我們就能夠避免掉上面發生的 race condition 的問題。
而要解決這樣的問題,我們可以使用 lock。
下面我們將引入 Critical Section 的概念,這邊使用到共享變數的概念,而這個變數可以實現互斥控制的效果。
do{
entry section
critical section
exit section
remainder section
}while(true);
任何一個時間點,只有一個 Process 能夠對 cirtical section 進行讀寫,因為受到 lock 保護
以下為 xv6 中的 lock
// Mutual exclusion lock.
struct spinlock {
uint locked; // Is the lock held?
// For debugging:
char *name; // Name of lock.
struct cpu *cpu; // The cpu holding the lock.
};
void acquire(struct spinlock *lk);
void release(struct spinlock *lk);
在 spinlock 結構中,有三個成員
而 spinlock 有兩個 API 可以使用
acquire()
: 參數為指向 lock 的指標,acquire()
確保在任何時間內,只有一個 process 可以持有 lock。release()
: 參數為指向 lock 的指標,釋放掉目前 process 所持有的 lock。在acquire()
和release()
之間的程式碼片段,稱為 critical section,這一段程式碼為存取共享資源的程式碼。在這裡的操作都具有原子性,要就是全部完成執行,要就是完全不執行。lock 的機制可以讓程式碼序列化執行。如果有兩個 hart 同時要執行 critical section 中的程式碼,只有一個 hart 會成功執行,另外一個 hart 會在先前 hart 退出 critical section 後再進入執行。
而對於多個 hart 的 System call,經過 lock 的序列化變成 System call A 獲得了某個 lock,之後將其釋放,接著下一個 System call 獲得 lock,完成後釋放。如果程式碼操作需要有一定的原子性操作,則可以使用 lock。
整理而言,lock 總共有三個作用
kfree()
的例子中可以看到因為 race condition 導致我們遺失了某個被釋放的記憶體分頁,但是在加上 lock 後,我們便不會發生遺失記憶體分頁的情況發生。acquire()
會先暫時性的破壞掉共享資料的不變性,而當變更完成時,便會使用release()
將 lock 釋放,恢復共享資料的不變性。最簡單的 Dead lock,為以下情況
acquire(lk);
acquire(lk);
release(lk);
release(lk);
首先 acquire()
獲得了一個 lock,接著進入到 critical section,然後在 critiacal section 中再度 acquire()
嘗試獲得 lock,但是這裡的 acquire()
需要等待第一個 acquire()
release 後才能夠 acquire 另一個 lock,但是第二個 acquire()
如果不繼續執行的話,又無法走到第一個 acquire()
的 release,這樣便形成了 Dead lock。
而這個 Dead lock 的成因為一個 process 多次的 acquire 同一個 lock 所發生的,在 xv6 中,如果偵測到一個 process acquire 同一個 lock,便會產生 panic。因此 xv6 實際上避免了上面的 dead lock 情形。
前面說到,每一次一個 lock 只能夠被一個 process 所持有,不能夠一個 lock 有多個擁有者,而在 xv6 中,我們可以通過 spinlock 來了解 lock 的相關實作
struct spinlock {
uint locked; // Is the lock held?
// For debugging:
char *name; // Name of lock.
struct cpu *cpu; // The cpu holding the lock.
};
void acquire(struct spinlock *lk);
void release(struct spinlock *lk);
void
acquire(struct spinlock *lk)
{
push_off(); // disable interrupts to avoid deadlock.
if(holding(lk))
panic("acquire");
// On RISC-V, sync_lock_test_and_set turns into an atomic swap:
// a5 = 1
// s1 = &lk->locked
// amoswap.w.aq a5, a5, (s1)
while(__sync_lock_test_and_set(&lk->locked, 1) != 0)
;
// Tell the C compiler and the processor to not move loads or stores
// past this point, to ensure that the critical section's memory
// references happen strictly after the lock is acquired.
// On RISC-V, this emits a fence instruction.
__sync_synchronize();
// Record info about lock acquisition for holding() and debugging.
lk->cpu = mycpu();
}
我們可以看到有一個 while 無限迴圈,他會不斷的判斷是否已經完成了上鎖,如果尚未上鎖,也就是 locked != 1
,則會將 locked
設置為 1,也就是通過了 acquire()
獲得了 lock。
如果傳入的 lock 物件成員 locked 已經為 1,則會不斷的在 acqruire()
中 while 迴圈進行無限迴圈,也就是不斷的旋轉 (spin),直到 lock 的持有者通過 release()
將 lock 釋放,使得 lock 物件成員 locked 為 0。
而這裡我們可以看到在 while 迴圈的條件中,我們使用到了 __sync_lock_test_and_set
這個特殊的指令,我們想下,如果我們是使用正常直觀的想法去實作 acquire,可能會得到以下
do
{
if(lock->locked == 0)
lock->locked = 1;
}
while(lock->locked != 1);
這裡我們可以想像可能會有 race condition 的問題,如果同時有兩個 process 或是 hart 同時執行了這一段程式碼,同時讀取到 lock 物件的 locked 成員數值為 0,則這兩個 process 或是 hart 就會同時將 lock 的 locked 成員數值設置為 1,而這樣就違反了 lock 的特性,也就是 lock 只能夠被一個 process 或是 hart 所持有。
但是其實在這裡並不會發生上面所說的問題,因為使用了 C 標準函式庫中 __sync_lock_test_and_set
,拆解如下
a5 = 1
s1 = &lk->locked
amoswap.w.aq a5, a5, (s1)
__sync_lock_test_and_set(&lk->locked, 1)
作用為將1複製到 lk->locked
所在的記憶體地址,接著回傳再覆蓋 lk->locked
之前的數值。我們在 while 中不斷驗證回傳的值是否為 0,如果為 0,表示先前 lock 沒有被任何 process 或是 hart 所持有。
我們可以看到 amoswap.w.aq a5, a5, (s1)
,這是由 RISC-V 提供的具有原子性的交換函式,這個函式保證了交換的操作無法被中斷,必須所有操作全部完成或是不完成。
而我們可以將 amoswap.w.aq a5, a5, (s1)
返組譯後,查看其組合語言
while(__sync_lock_test_and_set(&lk->locked, 1) != 0)
80006184: 87ba mv a5,a4
80006186: 0cf4a7af amoswap.w.aq a5,a5,(s1)
8000618a: 2781 sext.w a5,a5
8000618c: ffe5 bnez a5,80006184 <acquire+0x22>
__sync_lock_test_and_set
將其設置為 1,接著回傳 0,迴圈結束__sync_lock_test_and_set
將其設置為 1,接著回傳 1,也就是數值沒有任何變化,迴圈繼續。void
release(struct spinlock *lk)
{
if(!holding(lk))
panic("release");
lk->cpu = 0;
// Tell the C compiler and the CPU to not move loads or stores
// past this point, to ensure that all the stores in the critical
// section are visible to other CPUs before the lock is released,
// and that loads in the critical section occur strictly before
// the lock is released.
// On RISC-V, this emits a fence instruction.
__sync_synchronize();
// Release the lock, equivalent to lk->locked = 0.
// This code doesn't use a C assignment, since the C standard
// implies that an assignment might be implemented with
// multiple store instructions.
// On RISC-V, sync_lock_release turns into an atomic swap:
// s1 = &lk->locked
// amoswap.w zero, zero, (s1)
__sync_lock_release(&lk->locked);
pop_off();
}
可以看到在release()
中同樣的使用了原子操作,__sync_lock_release(&lk->locked)
,這裡之所以沒有把0作為參數傳入,是因為 RISC-V 中有暫存器的值永遠為 0,可以直接供我們使用。
而這裡可以發現,我們就是將 0 寫入而已,為什麼不使用 store 之類的 CPU 指令去完成操作? 原因為 store 這個操作可能不是原子的,不保證是原子的,有可能會涉及到一些 CPU cache 的操作,因此我們使用確定是原子的操作來完成。
SiFive FU540-C000 Manual v1p0
xv6-riscv
Operating System Concepts, 9/e
RISC-V xv6 Book
Linux 核心設計: 多核處理器和 spinlock