如果覺得文章對你有所啟發,可以考慮用 🌟 支持 Gthulhu 專案,短期目標是集齊 300 個 🌟 藉此被 CNCF Landscape 採納 [ref]。
在前一篇文章中,我們探討了 Gthulhu 如何利用 plugin 擴充實作上的靈活性。在這篇文章中,我們會嘗試實作一個符合 plugin 介面的簡易排程器。
Gthulhu 的 eBPF 排程器有兩種 DSQ,分別是:
我們可以利用 SHARED DSQ 實作 FIFO 或是簡單的 weighted deadline 排程器。
type CustomScheduler interface {
// Drain the queued task from eBPF and return the number of tasks drained
DrainQueuedTask(s Sched) int
// Select a task from the queued tasks and return it
SelectQueuedTask(s Sched) *models.QueuedTask
// Select a CPU for the given queued task, After selecting the CPU, the task will be dispatched to that CPU by Scheduler
SelectCPU(s Sched, t *models.QueuedTask) (error, int32)
// Determine the time slice for the given task
DetermineTimeSlice(s Sched, t *models.QueuedTask) uint64
// Get the number of objects in the pool (waiting to be dispatched)
// GetPoolCount will be called by the scheduler to notify the number of tasks waiting to be dispatched (NotifyComplete)
GetPoolCount() uint64
}
讓我們逐一探討該如何實作這些 Hook:
// DrainQueuedTask drains tasks from the scheduler queue into the task pool
func (s *SimplePlugin) DrainQueuedTask(sched plugin.Sched) int {
count := 0
// Keep draining until the pool is full or no more tasks available
for {
var queuedTask models.QueuedTask
sched.DequeueTask(&queuedTask)
// Validate task before processing to prevent corruption
if queuedTask.Pid <= 0 {
// Skip invalid tasks
return count
}
// Create task and enqueue it
task := s.enqueueTask(&queuedTask)
s.insertTaskToPool(task)
count++
s.globalQueueCount++
}
}
將任務從 RingBuffer eBPF Map 取出,直到沒有可被排程的任務(queuedTask.Pid <= 0
)可以取得。
取出來的任務會被按順序插入至 global slice,如此一來,當 Scheduler 呼叫 SelectQueuedTask()
時就能按插入順序取得任務,也就實現了 FIFO 的效果:
// getTaskFromPool retrieves the next task from the pool
func (s *SimplePlugin) getTaskFromPool() *models.QueuedTask {
if len(s.taskPool) == 0 {
return nil
}
// Get the first task
task := &s.taskPool[0]
// Remove the first task from slice
selectedTask := task.QueuedTask
s.taskPool = s.taskPool[1:]
// Update running task vtime (for weighted vtime scheduling)
if !s.fifoMode {
// Ensure task vtime is never 0 before updating global vtime
if selectedTask.Vtime == 0 {
selectedTask.Vtime = 1
}
s.updateRunningTask(selectedTask)
}
return selectedTask
}
再來就是選擇 CPU 的部分,我希望將任務都放入 SHARED DSQ 之中,讓空閑的 CPU 能夠從 SHARED DSQ 取得任務,所以 CPU selection 一率會回應 ANY CPU(1<<20
):
// SelectCPU selects a CPU for the given task
func (s *SimplePlugin) SelectCPU(sched plugin.Sched, task *models.QueuedTask) (error, int32) {
return nil, 1 << 20
}
再來看為任務分配 time slice 的部分,simple scheduler 一率會回應預設的 time slice:
// DetermineTimeSlice determines the time slice for the given task
func (s *SimplePlugin) DetermineTimeSlice(sched plugin.Sched, task *models.QueuedTask) uint64 {
// Always return default slice
return s.sliceDefault
}
透過 plugin 的機制,我們只要使用約 200 行的程式碼即可實作一個排程器,並且不需要撰寫任何一行 eBPF code。如果大家有興趣也歡迎一同參與貢獻!