iT邦幫忙

第 11 屆 iThome 鐵人賽

DAY 10
0
自我挑戰組

Linux Inside - Synchronization & Interrupt系列 第 10

[LAB 背景知識] Blocking Linked List

  • 分享至 

  • xImage
  •  

讓我們從 Linked List 的資料結構下手,了解如何在 Multi-Thread 中維護一個 Linked List

經典範例

參考 The Art of Multiprocessor Programming, Revised Reprint (English) 1st 版本 第九章的範例,以 C 語言進行改寫,但請注意程式碼是不可執行,或者說含有錯誤,比如說:我並沒有檢查指標是否為 NULL,只是示意邏輯長什麼樣子,所以也不探討要用 mutex、semaphore、spinlock 會有啥區別

PS:我覺得 C 語言的程式碼,我現在這種寫法的可讀性很低== Sorry

從資料結構的定義以及一個 add function,就能了解這個要怎麼去作 Lock,以下為幾種可能使用的方法

  • 一般的 linked list
typedef struct __NODE {
    int val;
    struct __NODE *next;
} NODE;
  • Coarse-Grained Synchronization
typedef struct __NODE {
    int val;
    struct __NODE *next;
} NODE;

typedef struct {
    NODE *head;
    pthread_mutex_t mutex;
} LIST;

每次進行操作,都把整個 LIST 鎖起來,以下範例是假設 Linked List 中的數字由小到大排序

void add(NODE *head, int newVal) {
    NODE *curr, *pred;
    pthread_mutex_lock(mutex);// 在最外面鎖起來
    
    pred = head;
    curr = pred->next;
    while(curr->val < newVal) {
        pred = curr;
        curr = curr->next;
    }
    NODE *new = malloc(sizeof(NODE));
    new->val = newVal;
    new->next = curr;
    pred->next = new;

    pthread_mutex_unlock(mutex);
}
  • Fine-Grained Synchronization
typedef struct __NODE {
    int val;
    struct __NODE *next;
    pthread_mutex_t mutex;
} NODE;

比起 Coarse-Grained 演算,Fine-Grained 明顯粒度變細,只需要在修改的節點上鎖就好,以下的程式碼就是同時有兩個 Lock ,這樣一直鎖過去

void add(NODE *head, int newVal) {
    NODE *pred;
    pred = head;
    pthread_mutex_lock(head->mutex);// 鎖起來
    
    Node *curr = pred->next;
    pthread_mutex_lock(curr->mutex);// 連續鎖了兩個
    while(curr->val < newVal) {
        pthread_mutex_unlock(pred->mutex);// 馬上解放上一層
        pred = curr;
        curr = curr->next;
        pthread_mutex_lock(curr->mutex);// 鎖住下一層
    }
    NODE *new = malloc(sizeof(NODE));
    new->val = newVal;
    new->next = curr;
    pred->next = new;
    
    pthread_mutex_unlock(pred->mutex);
}
  • Optimistic Synchronization

雖然 Fine-Grained 對效能來說有所改善,但他實在有太多的 Lock 了,可能會產生很多問題,比如:有個 Thread 正在修改第二個 NODE,同時有另外一個 Thread 想要作搜尋,但因為第二個 NODE 已經被鎖住了,所以就無法繼續進行

所以就有了 Optimisic Synchronization 這樣方式產生,不需要每次都 Lock,因為你並沒有要更改他,等到要更改的時候,再一口氣 Lock 就好

// 確認是否還連在一起
int validate(NODE *head, NODE *pred, NODE *curr) {
    NODE node = head;
    while(node->val <= pred->val) {
        if(node == pred)
            return pred->next == curr;
        node = node->next;
    }
    
    return 0;
}

void add(NODE *head, int newVal) {
    while(true) {
        NODE *pred = head;
        NODE *curr = pred->next;
        
        while(curr->val <= newVal) {
            pred = curr; curr = curr->next;
        }
        // 先執行 while 搜尋後,才開始
        pthread_mutex_lock(pred->mutex);
        pthread_mutex_lock(curr->mutex);

        if(validate(head, pred, curr)) {
            NODE *new = malloc(sizeof(NODE));
            new->val = newVal;
            new->next = curr;
            pred->next = new;
        }

        pthread_mutex_lock(pred->mutex);
        pthread_mutex_lock(curr->mutex);
    }
}
  • Lazy Synchronization

在 Optimistic Synchronization 中,其實會浪費很多時間在查找比較,可以用空間換取時間,也大幅減少 Lock ,比如說只是想要查找 Linked List 中是否有存在這個節點,也不需要特別 Lock ,可以透過 marked 參數來來作判斷

typedef struct __NODE {
    int val;
    int marked;
    struct __NODE *next;
    pthread_mutex_t mutex;
} NODE;
int validate(NODE *pred, NODE *curr) {
    return !pred->marked && !curr->marked && pred->next==curr;
}

void add(NODE *head, int newVal) {
    while(true) {
        NODE *pred = head;
        NODE *curr = pred->next;
        
        while(curr->val <= newVal) {
            pred = curr; curr = curr->next;
        }
        // 先執行 while 搜尋後,才開始
        pthread_mutex_lock(pred->mutex);
        pthread_mutex_lock(curr->mutex);

        if(validate(pred, curr)) {
            NODE *new = malloc(sizeof(NODE));
            new->marked = 0;
            new->val = newVal;
            new->next = curr;
            pred->next = new;
        }

        pthread_mutex_lock(pred->mutex);
        pthread_mutex_lock(curr->mutex);
    }
}

Thread Pool

專案參考 sysprog21/mergesort-concurrent

看看一般實做上會如何管理 Thread,通常會實作一個 Thread Pool 來管理,整體的架構如下圖所示,有一個 Thread Pool 物件在管理每個 Worker Thread,他們之間透過 Queue 來作溝通

typedef struct _task {
    void (*func)(void *);   /**< Pointer to task function */
    void *arg;              /**< Pointer to the arguments passed to _func_ */
    struct _task *next;     /**< Pointer to the next task */
} task_t;

typedef struct {
    task_t *head;           /**< Pointer to the fist _task\_t_ in queue */
    task_t *tail;           /**< Pointer to the last _task\_t_ in queue */
    pthread_mutex_t mutex;  /**< The mutex lock of this queue */
    pthread_cond_t cond;    /**< The conitional variable of this queue */
    uint32_t size;          /**< The size of the queue */
    uint32_t num_of_consumed;   /**< The number of consumed tasks */
} tqueue_t;

typedef struct {
    pthread_t *threads; /**< Pointer to the array of threads */
    uint32_t count;     /**< Number of working threads */
    tqueue_t *queue;    /**< Pointer to the task queue */
} tpool_t;

定義了三種資料結構

  • task_t
    function pointer 用來指示要呼叫什麼 function
    void* 用來存取任意型態的參數
    next 指向下一個 task 形成一個 Queue

  • tqueue_t
    與一般的 Queue 相比,多了 mutex lock 以及 conditional variable

    conditional variable 比較特別一點,是用來通知其他 thread 要幹麻之類的,但這個專案目前還沒有使用到

  • tpool_t
    threads 記錄全部的 Worker
    queue 為任務列表

這邊使用到的 count ,都是使用 uint32_t 進行宣告,是為了要使用 <stdint.h> 裡透過 typedef 重新定義的 int,因為不同的機器架構中,WORD 的長度不太一樣,使用透過 typedef 的 int,程式碼會比較容易維護

在 Queue 方面,只有進行修改時要 Lock,所以不在此多作琢磨,比較值得一提的點在於「如何把 Queue 中的任務分派出去,又是如何收回」,基本上都是透過 Queue 來作溝通,如果有收回結果的需求,就需要在另外宣告一個 Queue 用作 Output 使用,至於要如何對應到哪一個任務完成了,這要在資料結構中新增個 Label 或是 TaskType 之類的,來加以辨識

以目前這個專案來說,因為是要實作 mergesort,他們是直接去對原始的資料進行操作,就沒有另外宣告

Others

明天就來看以上那份專案,是該如何實做成 Concurrent 的形式,如果我改不出來的話,題目就改成單純介紹 Concurrent Linked List 如何實作@@


上一篇
Lock Performance & Priority Inversion
下一篇
Example: Concurrent Linked List
系列文
Linux Inside - Synchronization & Interrupt18
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言