昨天講了四個範例,都在介紹如何 Lock Linked List,那麼終於要介紹如何實現 Concurrent Linked List,參考以下專案,這是一個 Linked List,節點儲存的數字採取遞增的方式
因為這是參考這篇論文 A Pragmatic Implementation of Non-Blocking Linked-Lists 實作的演算法,所以要先了解他的演算法==,沒錯!!!! Concurrent Linked List 在 2001 年都算是可以發論文的東西 ......
在作 Lock-Free 的程式的時候,最直覺的想法是使用 CAS(Compare and Set),在 add 時,乍看之下是沒有問題的,但如果現在有一個 Thread 要刪除節點,另外一個 Thread 要插入節點在即將被刪除的節點之後,那麼這樣就會出現問題
要刪除節點 10,且把 20 插入在節點 10 之後,若只是使用 CAS,成功執行後 20 會插入失敗
當然如果只是單個變數的修改,使用 CAS 是不會有事的,但很遺憾這是 Linked List ...
這份論文提出的解法是使用兩個 CAS
__sync_xxxxx
這系列的涵式,是 gcc built-in function 用來作原子運算
參考 5.44 Built-in functions for atomic memory access
以下列出幾個文件中有寫的解釋,以及等一下程式裡面會看到的運算
// compare and swap
// if *ptr is old value, then write newval into *ptr
// 回傳 *ptr 舊的數值
type __sync_val_compare_and_swap (type *ptr, type oldval, type newval, ...)
// fetch and operations
type __sync_fetch_and_add (type *ptr, type value, ...)
type __sync_fetch_and_sub (type *ptr, type value, ...)
type __sync_fetch_and_or (type *ptr, type value, ...)
type __sync_fetch_and_and (type *ptr, type value, ...)
type __sync_fetch_and_xor (type *ptr, type value, ...)
type __sync_fetch_and_nand (type *ptr, type value, ...)
// example
// 把記憶體的東西先抓出來,然後對記憶體操作,但是是回傳之前還未修改的數值
{ tmp = *ptr; *ptr op= value; return tmp; }
// operations and fetch
type __sync_add_and_fetch (type *ptr, type value, ...)
type __sync_sub_and_fetch (type *ptr, type value, ...)
type __sync_or_and_fetch (type *ptr, type value, ...)
type __sync_and_and_fetch (type *ptr, type value, ...)
type __sync_xor_and_fetch (type *ptr, type value, ...)
type __sync_nand_and_fetch (type *ptr, type value, ...)
// example
// 對記憶體進行操作,然後回傳最新的數值
{ *ptr op= value; return *ptr; }
不過我們在閱讀的時候不會看到這些程式碼,因為被宣告成 macro 來使用了
// Compare-and-swap
#define CAS_PTR(a, b, c) __sync_val_compare_and_swap(a, b, c)
...
// Fetch-and-increment
#define FAI_U8(a) __sync_fetch_and_add(a, 1)
...
// Increment-and-fetch
#define IAF_U8(a) __sync_add_and_fetch(a, 1)
...
typedef intptr_t val_t;
typedef struct node {
val_t data;
struct node *next;
} node_t;
typedef struct llist {
node_t *head;
node_t *tail;
uint32_t size;
} llist_t;
intptr_t
適用於不知道是 32 bit 還是 64 bit 的情況的 int
llist
結構非常簡單,就只有 node_t *head
node_t *tail
uint32_t size
node
結構更簡單==,只有 int data
struct node *next
llist_t *list_new()
{
// allocate list
llist_t *the_list = malloc(sizeof(llist_t));
// now need to create the sentinel node
the_list->head = new_node(INT_MIN, NULL);
the_list->tail = new_node(INT_MAX, NULL);
the_list->head->next = the_list->tail;
the_list->size = 0;
return the_list;
}
初始化的時候,直接把 head
跟 tail
設定好了,所以初始有兩個節點,但是 size
是零
在看核心邏輯之前,讓我們先看一下五個小涵式,用來表示我們最開始所提到的,使用兩個 CAS 來避免這個問題,就是所謂的 marked
,需要作一些標記來表示這個東西有沒有被刪掉
/*
* The five following functions handle the low-order mark bit that indicates
* whether a node is logically deleted (1) or not (0).
* - is_marked_ref returns whether it is marked,
* - (un)set_marked changes the mark,
* - get_(un)marked_ref sets the mark before returning the node.
*/
static inline
int is_marked_ref(long i)
{
return (int)(i & 0x1L);
}
static inline
long unset_mark(long i)
{
i &= ~0x1L;
return i;
}
static inline
long set_mark(long i)
{
i |= 0x1L;
return i;
}
static inline
long get_unmarked_ref(long w)
{
return w & ~0x1L;
}
static inline
long get_marked_ref(long w)
{
return w | 0x1L;
}
這邊用了一個我覺得很神奇的技巧,也沒有需要再宣告另外的資料結構,直接使用記憶體位置來當作參數==,因為記憶體位置不可能不是 2 的幾次方的倍數吧 ...
除了 static inline int is_marked_ref(long i)
是回傳整數之外,其他都是對記憶體位置進行操作,然後回傳,使用的手法不外乎以下兩種位元運算
i &= ~0x1L
把最後一個 bit 的 1 拿掉i |= 0x1L
把最後一個 bit 設成 1最後一個 bit 被設成 1 表示被標記要刪除
但上面我有一個地方看不懂,就是不知道為什麼有些地方直接在回傳的時候進行計算,有些是先計算完再回傳!!!???程式碼中是沒有用到 set_mark
以及 unset_mark
但就是覺得問號
list_add
int list_add(llist_t *the_list, val_t val)
{
node_t *right, *left;
right = left = NULL;
node_t *new_elem = new_node(val, NULL);
while (1) {
right = list_search(the_list, val, &left);
if (right != the_list->tail && right->data == val) {
return 0;
}
new_elem->next = right;
if (CAS_PTR(&(left->next), right, new_elem) == right) {
FAI_U32(&(the_list->size));
return 1;
}
}
}
list_remove
int list_remove(llist_t *the_list, val_t val)
{
node_t *right, *left, *right_succ;
right = left = right_succ = NULL;
while (1) {
right = list_search(the_list, val, &left);
// check if we found our node
if (right == the_list->tail || right->data != val) {
return 0;
}
right_succ = right->next;
if (!is_marked_ref(right_succ)) {
// 如果 right->next 還不是 logically deleted,就把他標記成 logically deleted
// 這就是第一層 CAS ,標記物件
if (CAS_PTR(&(right->next), right_succ,
get_marked_ref(right_succ)) == right_succ) {
FAD_U32(&(the_list->size));
return 1;
}
}
}
// we just logically delete it, someone else will invoke search and delete
// it
}
上面提到了,要透過呼叫 list_search
來完成刪除的動作,所以刪除究竟在哪裡呢?
list_search
node_t *list_search(llist_t *set, val_t val, node_t **left_node)
{
node_t *left_node_next, *right_node;
left_node_next = right_node = NULL;
while (1) {
node_t *t = set->head;
node_t *t_next = set->head->next;
// 移動到正確的 right
// 如果 t_next 被標記刪除,進去迴圈
// 如果現在位置的數值還不夠大,進去迴圈
while (is_marked_ref(t_next) || (t->data < val)) {
if (!is_marked_ref(t_next)) { // t_next 沒被標記刪除
(*left_node) = t;
left_node_next = t_next;
}
t = get_unmarked_ref(t_next);
if (t == set->tail)
break;
t_next = t->next;
}
right_node = t;
if (left_node_next == right_node) { // 如果沒進來,代表 left ~ right 之間有節點被刪除
if (!is_marked_ref(right_node->next)) // right_node->next 沒被標記刪除
return right_node;
} else { // left ~ right 之間有節點被刪除的情況
if (CAS_PTR(&((*left_node)->next), left_node_next, right_node) ==
left_node_next) {
// 一個一個把節點刪除,在這一輪迴圈沒把東西刪完,就等下一輪
if (!is_marked_ref(right_node->next)) // right_node->next 沒被標記刪除
return right_node;
}
}
}
}
第二個 CAS 是在 search 的時候,順便刪除的,透過判斷 left
~ right
之間是否存在被刪除的節點來處理
然後,有一個例外狀況,如果 right
的下一個也是要被刪除的節點,那就不能直接回傳,不然 right 可能會不知道指向哪裡
直接看程式碼有點難以理解,假設幾個場景來思考
插入一筆資料,目前沒有任何資料
search
的時候,會取得兩個位置,分別是 left_node = head, right_node = tail, left_node_next = tail
,然後就直接回傳了,因為沒有節點被刪除插入一筆資料,left
right
之間存在一個被標記刪除的節點
search
中,下一輪才能新增成功插入一筆資料,left
right
之間存在 N
個被標記刪除的節點
search
中 N
輪後,可以新增成功插入一筆資料,right
之後存在一個被標記刪除的節點
right
以上就是 Concurrent Linked List 的一個實作參考,原本應該要直接介紹 Concurrent Thread Pool,但我發現他的專案其實有點複雜,我以為 Concurrent Linked List 會比較簡單好懂==,結果這是另一篇論文 ......,但時間快到了
Concurrent Thread Pool 那一篇的 Concurrency 方法是使用 Linux Kernel 中 kfifo 所使用的設計,但明天是否要介紹這個,我再考慮一下,因為 kfifo 也是有好幾個涵式要看 ......