iT邦幫忙

第 11 屆 iThome 鐵人賽

DAY 11
0
自我挑戰組

Linux Inside - Synchronization & Interrupt系列 第 11

Example: Concurrent Linked List

  • 分享至 

  • xImage
  •  

Concurrent Linked List

昨天講了四個範例,都在介紹如何 Lock Linked List,那麼終於要介紹如何實現 Concurrent Linked List,參考以下專案,這是一個 Linked List,節點儲存的數字採取遞增的方式

因為這是參考這篇論文 A Pragmatic Implementation of Non-Blocking Linked-Lists 實作的演算法,所以要先了解他的演算法==,沒錯!!!! Concurrent Linked List 在 2001 年都算是可以發論文的東西 ......

在作 Lock-Free 的程式的時候,最直覺的想法是使用 CAS(Compare and Set),在 add 時,乍看之下是沒有問題的,但如果現在有一個 Thread 要刪除節點,另外一個 Thread 要插入節點在即將被刪除的節點之後,那麼這樣就會出現問題

要刪除節點 10,且把 20 插入在節點 10 之後,若只是使用 CAS,成功執行後 20 會插入失敗

當然如果只是單個變數的修改,使用 CAS 是不會有事的,但很遺憾這是 Linked List ...

這份論文提出的解法是使用兩個 CAS

  • 第一個 CAS 用來標記要被刪除的節點指向什麼 (logically deleted)
  • 第二個 CAS 用來刪除節點 (physically deleted)

程式碼中難以理解的部份

  1. __sync_xxxxx 這系列的涵式,是 gcc built-in function 用來作原子運算
    參考 5.44 Built-in functions for atomic memory access

    以下列出幾個文件中有寫的解釋,以及等一下程式裡面會看到的運算

    // compare and swap
    // if *ptr is old value, then write newval into *ptr
    // 回傳 *ptr 舊的數值
    type __sync_val_compare_and_swap (type *ptr, type oldval, type newval, ...)
    
    
    // fetch and operations
    type __sync_fetch_and_add (type *ptr, type value, ...)
    type __sync_fetch_and_sub (type *ptr, type value, ...)
    type __sync_fetch_and_or (type *ptr, type value, ...)
    type __sync_fetch_and_and (type *ptr, type value, ...)
    type __sync_fetch_and_xor (type *ptr, type value, ...)
    type __sync_fetch_and_nand (type *ptr, type value, ...)
    // example
    // 把記憶體的東西先抓出來,然後對記憶體操作,但是是回傳之前還未修改的數值
    { tmp = *ptr; *ptr op= value; return tmp; }
    
    
    // operations and fetch
    type __sync_add_and_fetch (type *ptr, type value, ...)
    type __sync_sub_and_fetch (type *ptr, type value, ...)
    type __sync_or_and_fetch (type *ptr, type value, ...)
    type __sync_and_and_fetch (type *ptr, type value, ...)
    type __sync_xor_and_fetch (type *ptr, type value, ...)
    type __sync_nand_and_fetch (type *ptr, type value, ...)
    // example
    // 對記憶體進行操作,然後回傳最新的數值
    { *ptr op= value; return *ptr; }
    

    不過我們在閱讀的時候不會看到這些程式碼,因為被宣告成 macro 來使用了

    
    // Compare-and-swap
    #define CAS_PTR(a, b, c) __sync_val_compare_and_swap(a, b, c)
    ...
    
    // Fetch-and-increment
    #define FAI_U8(a) __sync_fetch_and_add(a, 1)
    ...
    
    // Increment-and-fetch
    #define IAF_U8(a) __sync_add_and_fetch(a, 1)
    
    ...
    

資料結構

typedef intptr_t val_t;

typedef struct node {
    val_t data;
    struct node *next;
} node_t;

typedef struct llist {
    node_t *head;
    node_t *tail;
    uint32_t size;
} llist_t;
  1. intptr_t 適用於不知道是 32 bit 還是 64 bit 的情況的 int
  2. llist 結構非常簡單,就只有 node_t *head node_t *tail uint32_t size
  3. node 結構更簡單==,只有 int data struct node *next

程式碼閱讀

llist_t *list_new()
{
    // allocate list
    llist_t *the_list = malloc(sizeof(llist_t));

    // now need to create the sentinel node
    the_list->head = new_node(INT_MIN, NULL);
    the_list->tail = new_node(INT_MAX, NULL);
    the_list->head->next = the_list->tail;
    the_list->size = 0;
    return the_list;
}

初始化的時候,直接把 headtail 設定好了,所以初始有兩個節點,但是 size 是零

在看核心邏輯之前,讓我們先看一下五個小涵式,用來表示我們最開始所提到的,使用兩個 CAS 來避免這個問題,就是所謂的 marked ,需要作一些標記來表示這個東西有沒有被刪掉

/*
 * The five following functions handle the low-order mark bit that indicates
 * whether a node is logically deleted (1) or not (0).
 *  - is_marked_ref returns whether it is marked,
 *  - (un)set_marked changes the mark,
 *  - get_(un)marked_ref sets the mark before returning the node.
 */
static inline
int is_marked_ref(long i)
{
    return (int)(i & 0x1L);
}

static inline
long unset_mark(long i)
{
    i &= ~0x1L;
    return i;
}

static inline
long set_mark(long i)
{
    i |= 0x1L;
    return i;
}

static inline
long get_unmarked_ref(long w)
{
    return w & ~0x1L;
}

static inline
long get_marked_ref(long w)
{
    return w | 0x1L;
}

這邊用了一個我覺得很神奇的技巧,也沒有需要再宣告另外的資料結構,直接使用記憶體位置來當作參數==,因為記憶體位置不可能不是 2 的幾次方的倍數吧 ...

除了 static inline int is_marked_ref(long i) 是回傳整數之外,其他都是對記憶體位置進行操作,然後回傳,使用的手法不外乎以下兩種位元運算

  1. i &= ~0x1L 把最後一個 bit 的 1 拿掉
  2. i |= 0x1L 把最後一個 bit 設成 1

最後一個 bit 被設成 1 表示被標記要刪除

但上面我有一個地方看不懂,就是不知道為什麼有些地方直接在回傳的時候進行計算,有些是先計算完再回傳!!!???程式碼中是沒有用到 set_mark 以及 unset_mark 但就是覺得問號

  • list_add
int list_add(llist_t *the_list, val_t val)
{
    node_t *right, *left;
    right = left = NULL;
    node_t *new_elem = new_node(val, NULL);
    while (1) {
        right = list_search(the_list, val, &left);
        if (right != the_list->tail && right->data == val) {
            return 0;
        }
        new_elem->next = right;
        if (CAS_PTR(&(left->next), right, new_elem) == right) {
            FAI_U32(&(the_list->size));
            return 1;
        }
    }
}
  • list_remove
int list_remove(llist_t *the_list, val_t val)
{
    node_t *right, *left, *right_succ;
    right = left = right_succ = NULL;
    while (1) {
        right = list_search(the_list, val, &left);
        // check if we found our node
        if (right == the_list->tail || right->data != val) {
            return 0;
        }
        right_succ = right->next;
        if (!is_marked_ref(right_succ)) {
            // 如果 right->next 還不是 logically deleted,就把他標記成 logically deleted
            // 這就是第一層 CAS ,標記物件
            
            if (CAS_PTR(&(right->next), right_succ,
                        get_marked_ref(right_succ)) == right_succ) {
                FAD_U32(&(the_list->size));
                return 1;
            }
        }
    }
    // we just logically delete it, someone else will invoke search and delete
    // it
}

上面提到了,要透過呼叫 list_search 來完成刪除的動作,所以刪除究竟在哪裡呢?

  • list_search
node_t *list_search(llist_t *set, val_t val, node_t **left_node)
{
    node_t *left_node_next, *right_node;
    left_node_next = right_node = NULL;
    while (1) {
        node_t *t = set->head;
        node_t *t_next = set->head->next;
        // 移動到正確的 right 
        // 如果 t_next 被標記刪除,進去迴圈
        // 如果現在位置的數值還不夠大,進去迴圈
        while (is_marked_ref(t_next) || (t->data < val)) {
            if (!is_marked_ref(t_next)) {         // t_next 沒被標記刪除
                (*left_node) = t;                         
                left_node_next = t_next;                  
            }
            t = get_unmarked_ref(t_next);                 
            if (t == set->tail)                           
                break;
            t_next = t->next;
        }
        right_node = t;

        if (left_node_next == right_node) {       // 如果沒進來,代表 left ~ right 之間有節點被刪除
            if (!is_marked_ref(right_node->next)) // right_node->next 沒被標記刪除
                return right_node;
        } else { // left ~ right 之間有節點被刪除的情況
            if (CAS_PTR(&((*left_node)->next), left_node_next, right_node) ==
                left_node_next) {
                // 一個一個把節點刪除,在這一輪迴圈沒把東西刪完,就等下一輪
                
                if (!is_marked_ref(right_node->next)) // right_node->next 沒被標記刪除
                    return right_node;
            }
        }
    }
}

第二個 CAS 是在 search 的時候,順便刪除的,透過判斷 left ~ right 之間是否存在被刪除的節點來處理

然後,有一個例外狀況,如果 right 的下一個也是要被刪除的節點,那就不能直接回傳,不然 right 可能會不知道指向哪裡

直接看程式碼有點難以理解,假設幾個場景來思考

  • 插入一筆資料,目前沒有任何資料

    1. search 的時候,會取得兩個位置,分別是 left_node = head, right_node = tail, left_node_next = tail,然後就直接回傳了,因為沒有節點被刪除
    2. 使用 CAS 操作
  • 插入一筆資料,left right 之間存在一個被標記刪除的節點

    1. search 中,下一輪才能新增成功
  • 插入一筆資料,left right 之間存在 N 個被標記刪除的節點

    1. searchN 輪後,可以新增成功
  • 插入一筆資料,right 之後存在一個被標記刪除的節點

    1. 只能在迴圈中一直等待,直到其他 Thread 操作資料,把他刪除,或者是新的資料變成新的 right

Others

以上就是 Concurrent Linked List 的一個實作參考,原本應該要直接介紹 Concurrent Thread Pool,但我發現他的專案其實有點複雜,我以為 Concurrent Linked List 會比較簡單好懂==,結果這是另一篇論文 ......,但時間快到了

Concurrent Thread Pool 那一篇的 Concurrency 方法是使用 Linux Kernel 中 kfifo 所使用的設計,但明天是否要介紹這個,我再考慮一下,因為 kfifo 也是有好幾個涵式要看 ......


上一篇
[LAB 背景知識] Blocking Linked List
下一篇
kfifo 部份原始碼閱讀 & Concurrent Thread Pool
系列文
Linux Inside - Synchronization & Interrupt18
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言