iT邦幫忙

2022 iThome 鐵人賽

0

前言

前面我們看到了整個 Switch Thread 的運作,首先會有 Timer Interrupt 產生 trap,接著 yield() 會將 Process 的狀態設置為 RUNNABLE,接著便會執行到 sched() ,進行一些判斷之後,我們便來到了整個 Switch Thread 的關鍵部分,也就是 swtch() 的部分,在這裡我們通過將 CPU 的暫存器狀態儲存到 Process,並且將要切換到的 Process,也就是 Scheduler Thread 的暫存器狀態寫入到目前的 CPU,接著執行 scheduler(),找到處於 RUNNABLE 的 Process 並且嘗試去執行,這樣不斷的執行下去,Thread 不斷地完成切換。我們發現在 Switch Thread 的時候,我們總是會去嘗試獲得 Process 的 lock,等到 sched() 執行完成之後,我們才會去釋放掉 Process 所持有的 lock。

回顧 Switch Thread

我們整理一下先前看到整個 Switch Thread 的過程

  • 一個 Process 放棄 CPU 的資源,也就是進入到 SLEEPING 的狀態 (SLEEPING 狀態在 process, init process 中提及,表示 Process 需要立即放棄目前持有的 CPU 資源),這邊對應到的為 yield(),在 yield() 的最一開始會嘗試去獲得 Process 的 lock
  • Process 將自身的狀態由 RUNNING 設置為 RUNNABLE
  • 呼叫 sched(),接著 sched() 呼叫 swtch()
  • swtch() 將目前的 Process 切換到 Scheduler Thread (也就是要切換到的那一個 Process)
  • Scheduler Thread 也有呼叫一個 swtch(),因此這時候 Scheduler Thread 會恢復執行並且從自己呼叫的 swtch() 回傳並結束
  • Scheduler Thread 放棄自己所持有的 lock,也就是將 lock 釋放

我們在最一開始去嘗試獲取 Process 的 lock,原因為每一個 CPU (hart) 都有一個 Scheduler Thread 在遍歷 procs 這個資料結構,並且從中找到 RUNNABLE 的 Process 嘗試去獲取他的資源並且使用 swtch() 切換到該 Process,如果我們沒有在 Process 切換的最一開始獲得 lock,可能會發生該 Process 目前還是被 CPU 持有的狀態 (只是狀態被改為 RUNNABLE,實際上為 RUNNING),則其他 CPU 會發現到該 Process 處在 RUNABLE 的狀態,其他 CPU 就會馬上 swtch() 到該 Process,這時候就會發生兩個 CPU 執行同一個 Process 的狀況,xv6 會產生錯誤 (在上一章有稍微提及)。

所以,Process 在 Switch Thread 最一開始去獲得 lock,直到 swtch() 結束之後才將 lock 釋放,因為這時候已經確定 Process 沒有被 CPU 執行了,可以被其他 CPU 執行。對於 Switch Thread 這一段期間,我們會一直持有 Process 的 lock。

我們下面看到 scheduler() 的部分程式碼

void
scheduler(void)
{
  struct proc *p;
  struct cpu *c = mycpu();
  
  c->proc = 0;
  for(;;){
    // Avoid deadlock by ensuring that devices can interrupt.
    intr_on();

    for(p = proc; p < &proc[NPROC]; p++) {
      acquire(&p->lock);
        // ...
        swtch(&c->context, &p->context);
      }
      release(&p->lock);
    }
  }
}

接著我們看到 yield()

void
yield(void)
{
  struct proc *p = myproc();
  acquire(&p->lock);
  p->state = RUNNABLE;
  sched();
  release(&p->lock);
}

可以發現到,在 swtch() 期間,我們都必須持有 Process 的 lock,在 xv6 的設計中,規範了我們必須要 swtch() 期間持有 Process 的 lock,且不能夠持有其他的 lock (如周圍 I/O 設備的 lock,UART 等等),這個條件除了在 swtch() 中需要遵守以外,我們之後也會看到在 sleep() 中我們也需要遵循這一個條件,而下面我們將探討這樣的設計原因。

持有唯一 Process lock 原因

假設我們現在有一個 Process A,現在既持有了 Process A 的 lock,同時還持有周圍 I/O 設備,諸如 UART, CONSOLE 等等 lock。

假設我們現在在單核心的 CPU 機器上,而現在有另外一個 Process B,Process A 通過 swtch() 切換到 Process B,雖然 Process A 現在不在執行,從 RUNNING 切換到 RUNNABLE,但這時候 Process A 卻持有 UART 的 lock,而假設 Process B 需要存取 UART,Process B 需要 acquire() UART 的 lock,而我們知道 UART 中 lock 是使用 spinlock 進行實作,而由於現在 UART 的 lock 是由 Process A 所持有,因此 Process B 進入到 spin,直到 Process A 將 UART 的 lock 給 release(),但 Process A 需要處於 RUNNING 才能夠將 UART 的 lock 給 release(),而 Process A 要處於 RUNNING 需要通過 Process B 呼叫 swtch() 進行切換,而這種彼此等待資源的情況,便形成了 deadlock。

而為了避免發生以上 deadlock 的情況,在 xv6 的設計中要求無論是在 swtch() 或是 sleep(),需要唯一持有 Process 自身的 lock (不能夠持有其他的 lock)。而持有唯一 Process 的 lock 我們可以在 sched() 中的檢查機制中看見。

void
sched(void)
{
  int intena;
  struct proc *p = myproc();

  if(!holding(&p->lock))
    panic("sched p->lock");
  if(mycpu()->noff != 1)
    panic("sched locks");
  if(p->state == RUNNING)
    panic("sched running");
  if(intr_get())
    panic("sched interruptible");

  intena = mycpu()->intena;
  swtch(&p->context, &mycpu()->context);
  mycpu()->intena = intena;
}

在上面的 deadlock 情境中我們有一個有趣的發現,Process B 因為 spinlock 機制的緣故,會陷入到無限迴圈中,導致無法切換到 Process A,而前面我們看到,當一個 Process 長時間持有資源,也就是前面提及的,這是一個 compute bound thread,我們可以通過 Timer Interrupt 強制讓 Thread 讓出 CPU 的資源,而在這邊,我們是否能夠使用 Timer Interrupt 進行處理?

我們可以嘗試回顧 lock 的機制,在 lock 中有兩個主要部分,分別為 acquire()release(),我們先看到 acquire() 的部分。

void
acquire(struct spinlock *lk)
{
  push_off(); // disable interrupts to avoid deadlock.
  if(holding(lk))
    panic("acquire");
...

可以看到我們在 acquire() 最一開始會呼叫 push_off() 禁用中斷,因此我們無法使用 Timer Interrupt 進行處理。也就是在獲得 lock 的過程中,我們不能夠使用 Timer Interrupt。

這邊我們可以做一個小節,在 xv6 中要求 swtch() 或是 sleep(),需要遵守以下兩點

  • 必須持有 Process 的 lock
  • 不能夠持有除了 Process 的 lock 以外的 lock

sleep 與 wakeup 概念

下面我們將通過 sleep()wakeup() 引入 coordination 的概念,當我們在做一些需要使用到 Thread 的程式碼,我們可能會希望 Thread 之間能夠進行互動或是交換資料,諸如一個 Thread 在進行寫入,另外一個 Thread 在進行讀取 (如 socket 等等)。

  • 如果有一個 socket,一端進行寫入,一端進行讀取,在讀取那一端,需要等待一個 socket 的非空事件發生
  • 在前面 fork() 中我們可以看到,親代 Process 通過 wait() 去等待子代 Process 的退出

上面這兩個例子我們可以看到 Thread 在等待某一些事件的發生,而這一些事件可能是 I/O 事件的發生 (如 socket),或是另外一個 Thread 的某一事件發生 (如 wait())。而具體來說我們要等待並獲得事件,最簡單的方法就是使用一個迴圈,等待直到資料傳入,以下以 socket 聊天室中 server 的片段程式碼舉例

while ((readSize = read(csock, (Package *)&pkg, sizeof(Package))) != 0)
{
    // do something
}

上面這個迴圈會一直等待,直到其他 Thread 向 csock 寫入資料,被 server 給讀取,至於上面 server 的等待時間,取決於 client 傳送資料的頻率,有可能每 0.1 秒傳送資料,也有可能 10 分鐘才傳送資料。如果是 10 分鐘後才收到資料,我們並不希望一直在這個迴圈中等待浪費 CPU 資源,我們希望可以通過類似前面所提及的 swtch() 的機制強迫 CPU 讓出資源給其他 Thread 所使用,而所謂的 coordination 的概念,就是通過某一些機制,讓出 CPU 的資源,而等到某一個事件發生時,再去獲得 CPU 的資源並且恢復執行,在 xv6 中,使用了 sleep()wakeup() 來實現 coordination。以下我們將通過 UART 來了解 sleep()wakeup()

前面我們知道我們通過 console 來和 UART 進行互動,由 consolewrite() 會呼叫 uartputc(),以下為 uartputc() 的程式碼片段

void
uartputc(int c)
{
  acquire(&uart_tx_lock);

  if(panicked){
    for(;;)
      ;
  }
  while(uart_tx_w == uart_tx_r + UART_TX_BUF_SIZE){
    // buffer is full.
    // wait for uartstart() to open up space in the buffer.
    sleep(&uart_tx_r, &uart_tx_lock);
  }
  uart_tx_buf[uart_tx_w % UART_TX_BUF_SIZE] = c;
  uart_tx_w += 1;
  uartstart();
  release(&uart_tx_lock);
}

可以看到如果 panicked 不為 0,則我們會陷入到無限迴圈當中,這個想法就如同 spinlock 中如果無法成功獲取 lock 則我們會進入到無限迴圈一直到我們成功獲取 lock 是相同的概念。

這邊我們希望避免掉上面無限迴圈造成 CPU 資源浪費的部分,因此我們將以上程式碼進行以下改寫

void uartwrite(char buf[], int n)
{
    acquire(&uart_tx_lock)

    int i = 0;
    while(i < n)
    {
        while(tx_done == 0)
        {
            sleep(&tx_chan, &uart_tx_lock);
        }
        WriteReg(THR, buf[i]);
        i++;
        tx_done = 0;
    }
    release(&art_tx_lock);
}

概念上為我們將 buf 的內容寫入到 THR 中,THR 為 transmit holding register (for output bytes),在前面的 uartstart() 我們也有看到類似的操作,回憶下面在 UART 中看到的圖

每一次寫入一個字元,我們會將 tx_done 設置為 0,表示這時候 UART 正在忙碌中,而在忙碌中時,我們會讓 UART 進入到 sleep()

同時我們可以看到,UART 一次只能接收一個字元,而這也正是串列通訊中一個一個字元傳送的特徵。而當我們向 UART 寫入字元之後,我們就必須等待 UART 硬體處理完成,而當 UART 硬體處理完成時,接下來會發生中斷 (UART 產生的中斷)

void
uartintr(void)
{
  // read and process incoming characters.
  while(1){
    int c = uartgetc();
    if(c == -1)
      break;
    consoleintr(c);
  }

  // send buffered characters.
  acquire(&uart_tx_lock);
  uartstart();
  release(&uart_tx_lock);
}

上方為原始 UART 的中斷處理部分,而在我們修改後的 uartwrite() 中我們通過 tx_done 來判斷 UART 是否完成了字元的處理,而在中斷中,我們也需要在 uartintr() (在 tarp.c 中的 devintr() 進入到 uartintr()) 中對 tx_done 進行處理,除此之外,由於我們在 uartwrite() 中呼叫了 sleep(),接下來我們也需要加入一些諸如 wakeup()。因此我們對 uartintr() 進行以下修改

void uartintr(void)
{
    acquire(&uart_tx_lock);
    if(ReadReg(LSR) && LSR_TX_IDEL)
    {
        tx_done = 1;
        wakeup(&tx_chan);
    }
    release(&uart_tx_lock);
    while(1)
...
}

我們會通過檢查 LSR,可以通過檢查 LSR 來判斷我們是否已經完成傳輸,如果我們已經完成傳輸,則我們需要將先前的 sleep() 通過 wakeup() 進行恢復,所以這邊整個流程如下

  • uartwrite() 接收一個字元,並且寫入到 uart
  • 寫入到 uart 時,將 tx_done 設為 0
  • 到了下一次迭代,我們會進入到 sleep(),並等待某一個事件的發生 (如中斷)
  • 當 uart 完成字元的處理後,中斷發生,經由 devintr() 進入 uartintr(),會將 tx_done 設置為 1,表示完成處理,接著通過 wakeup() 恢復狀態
  • 當呼叫了 wakeup(),這時候我們已經成功通過 swtch() 讓出 CPU 資源,接著 UART 完成該字元的處理,而這時候我們在 uartwrite() 就可以讀取下一個字元了

這裡 wakeup() 的作用就像是喚醒正在等待某事件發生的 Thread,而這個 Thread 由 sleep() 進入到睡眠,意義是在等待 I/O,這邊的 I/O 就是 uart。可以看到無論是 wakeup() 或是 sleep() 都會接收一個參數,tx_cahn,意義為 sleep channel,通過 sleep channel wakeup() 能夠得知是要喚醒哪一個 Thread。

在這邊我們簡單的看到了 sleep()wakeup(),在之後我們將談及其中詳細的細節 (像是為什麼 sleep() 還需要傳入 lock),目前我們比較了 spinlock 和 sleep(),主要的差別為 spinlock 會進入到 spin 導致 CPU 資源的浪費,而 sleep() 是通過 swtch() 等機制讓出 CPU 資源,我們將在之後細談 sleep() 的細節。

reference

xv6-riscv
Operating System Concepts, 9/e
RISC-V xv6 Book
How is sleep implemented at the OS level?


上一篇
Day-29 階段性小節,未完待續
系列文
與作業系統的第一類接觸 : 探索 xv631
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言