昨天聊到了 TP 告知 main Thread 任務完成的方法。今天說說 TP 本身在運行甚麼。
原始碼來自以下網址 :
static void init_threads(void) {
unsigned int i;
const char* val;
uv_sem_t sem;
nthreads = ARRAY_SIZE(default_threads);
val = getenv("UV_THREADPOOL_SIZE");
if (val != NULL)
nthreads = atoi(val);
if (nthreads == 0)
nthreads = 1;
if (nthreads > MAX_THREADPOOL_SIZE)
nthreads = MAX_THREADPOOL_SIZE;
threads = default_threads;
if (nthreads > ARRAY_SIZE(default_threads)) {
threads = uv__malloc(nthreads * sizeof(threads[0]));
if (threads == NULL) {
nthreads = ARRAY_SIZE(default_threads);
threads = default_threads;
}
}
if (uv_cond_init(&cond))
abort();
if (uv_mutex_init(&mutex))
abort();
QUEUE_INIT(&wq);
QUEUE_INIT(&slow_io_pending_wq);
QUEUE_INIT(&run_slow_work_message);
if (uv_sem_init(&sem, 0))
abort();
for (i = 0; i < nthreads; i++)
if (uv_thread_create(threads + i, worker, &sem))
abort();
for (i = 0; i < nthreads; i++)
uv_sem_wait(&sem);
uv_sem_destroy(&sem);
}
不用細讀, 原則上就是 TP 多大就創建多少 thread , 並且 thread 運行 worker 方法
查看 worker ( 這段不特別討論, 其牽涉到不同 case IO 處理的機制。)
static void worker(void* arg) {
struct uv__work* w;
QUEUE* q;
int is_slow_work;
uv_sem_post((uv_sem_t*) arg);
arg = NULL;
// 任務隊列 1 次只能有一個 thread 訪問
uv_mutex_lock(&mutex);
for (;;) {
/* `mutex` should always be locked at this point. */
/* Keep waiting while either no work is present or only slow I/O
and we're at the threshold for that. */
while (QUEUE_EMPTY(&wq) ||
(QUEUE_HEAD(&wq) == &run_slow_work_message &&
QUEUE_NEXT(&run_slow_work_message) == &wq &&
slow_io_work_running >= slow_work_thread_threshold())) {
idle_threads += 1;
uv_cond_wait(&cond, &mutex);
idle_threads -= 1;
}
q = QUEUE_HEAD(&wq);
if (q == &exit_message) {
uv_cond_signal(&cond);
uv_mutex_unlock(&mutex);
break;
}
QUEUE_REMOVE(q);
QUEUE_INIT(q); /* Signal uv_cancel() that the work req is executing. */
is_slow_work = 0;
if (q == &run_slow_work_message) {
/* If we're at the slow I/O threshold, re-schedule until after all
other work in the queue is done. */
if (slow_io_work_running >= slow_work_thread_threshold()) {
QUEUE_INSERT_TAIL(&wq, q);
continue;
}
/* If we encountered a request to run slow I/O work but there is none
to run, that means it's cancelled => Start over. */
if (QUEUE_EMPTY(&slow_io_pending_wq))
continue;
is_slow_work = 1;
slow_io_work_running++;
q = QUEUE_HEAD(&slow_io_pending_wq);
QUEUE_REMOVE(q);
QUEUE_INIT(q);
/* If there is more slow I/O work, schedule it to be run as well. */
if (!QUEUE_EMPTY(&slow_io_pending_wq)) {
QUEUE_INSERT_TAIL(&wq, &run_slow_work_message);
if (idle_threads > 0)
uv_cond_signal(&cond);
}
}
uv_mutex_unlock(&mutex);
w = QUEUE_DATA(q, struct uv__work, wq);
w->work(w);
uv_mutex_lock(&w->loop->wq_mutex);
w->work = NULL; /* Signal uv_cancel() that the work req is done
executing. */
QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
uv_async_send(&w->loop->wq_async);
uv_mutex_unlock(&w->loop->wq_mutex);
/* Lock `mutex` since that is expected at the start of the next
* iteration. */
uv_mutex_lock(&mutex);
if (is_slow_work) {
/* `slow_io_work_running` is protected by `mutex`. */
slow_io_work_running--;
}
}
}
其任務就是遍歷 TP 中的 task , 一個一個地完成, 當全部都完成再讓 thread 進入休眠。
此外, 該資料結構也有方法給 TP 加入任務。
Node 的 TP 會維護多個 queue , 用來存放不同 case 的任務 , 並且提供方法使 C++ 層可以把非同步 IO 任務加入這些 queue 中, 此外 TP 本身由多個 thread 組成, 會不斷地從這些 queue 中取出任務, 並且執行該任務。當任務完成, 用 uv_async_send 設定 async_sent 為 1 , 表示任務完成, 並且加入到 wq 中。所以此處 wq 是一個完成 IO 的任務的放置區。
回到源頭, 查看在 pending 階段中被調用的 async_cb 是什麼 ?
明天見 !