iT邦幫忙

2024 iThome 鐵人賽

DAY 29
0
自我挑戰組

Linux Kernel 網路巡禮系列 第 29

igb 網卡與驅動封包接收

  • 分享至 

  • xImage
  •  

昨天我們了解了 igb 網卡驅動建立的 igb_ring 隊列,那今天就要真的來解析 igb 網卡驅動是怎麼接收封包的。這邊我們跳過中斷處理的內容,直接從 igb 網卡驅動開始一直到轉交給 Kernel 的網路子系統。

封包接收

當網卡接收到封包並透過 DMA (Direct Memory Access) 傳輸到記憶體後,會發起中斷。此時,Kernel 會呼叫 igb driver 提供的 igb_clean_rx_irq 函數來處理這些封包的接收。

這裡我們將跳過中斷處理與 NAPI 的細節,未來有時間再補充介紹。

Poll 機制

static int igb_clean_rx_irq(struct igb_q_vector *q_vector, const int budget)
{
    ...
    unsigned int total_bytes = 0, total_packets = 0
    ...
    while (likely(total_packets < budget)) {
        /* 處理一個封包 DMA PAGE */
        ...

        if (/* ring 上的封包都處理完了 */)
            break;
        
        /* update budget accounting */
		total_packets++;
    }
    ...
}

如果每次接收一個封包都要發起一次中斷,在遇到大流量時就會造成大量的中斷,降低低 CPU 的處理效率。
因此這邊使用 Poll 的方式去處理封包。每次中斷發生時,igb_clean_rx_irq 函數會盡可能的把所有已經 DMA 到記憶體中的封包都處理掉,所以這邊會跑一個 while loop,每個 iteration 都會處理一個 ring buffer 上的封包,直到 ring buffer 上的封包都處理完了,或著次數超出次數上限 (budget)。

igb_clean_rx_irq 解析

接下來,我們將跳過與 igb 維護 rx_ring 相關的部分,專注於分析 igb 驅動如何處理單個封包。先給大家看完整的 igb_clean_rx_irq 函數,後面會一段一段解析。

static int igb_clean_rx_irq(struct igb_q_vector *q_vector, const int budget)
{
	struct igb_adapter *adapter = q_vector->adapter;
	struct igb_ring *rx_ring = q_vector->rx.ring;
	struct sk_buff *skb = rx_ring->skb;
	unsigned int total_bytes = 0, total_packets = 0;
	u16 cleaned_count = igb_desc_unused(rx_ring);
	unsigned int xdp_xmit = 0;
	struct xdp_buff xdp;
	u32 frame_sz = 0;
	int rx_buf_pgcnt;

	/* Frame size depend on rx_ring setup when PAGE_SIZE=4K */
#if (PAGE_SIZE < 8192)
	frame_sz = igb_rx_frame_truesize(rx_ring, 0);
#endif
	xdp_init_buff(&xdp, frame_sz, &rx_ring->xdp_rxq);

	while (likely(total_packets < budget)) {
		union e1000_adv_rx_desc *rx_desc;
		struct igb_rx_buffer *rx_buffer;
		ktime_t timestamp = 0;
		int pkt_offset = 0;
		unsigned int size;
		void *pktbuf;

		/* return some buffers to hardware, one at a time is too slow */
		if (cleaned_count >= IGB_RX_BUFFER_WRITE) {
			igb_alloc_rx_buffers(rx_ring, cleaned_count);
			cleaned_count = 0;
		}

		rx_desc = IGB_RX_DESC(rx_ring, rx_ring->next_to_clean);
		size = le16_to_cpu(rx_desc->wb.upper.length);
		if (!size)
			break;

		/* This memory barrier is needed to keep us from reading
		 * any other fields out of the rx_desc until we know the
		 * descriptor has been written back
		 */
		dma_rmb();

		rx_buffer = igb_get_rx_buffer(rx_ring, size, &rx_buf_pgcnt);
		pktbuf = page_address(rx_buffer->page) + rx_buffer->page_offset;

		/* pull rx packet timestamp if available and valid */
		if (igb_test_staterr(rx_desc, E1000_RXDADV_STAT_TSIP)) {
			int ts_hdr_len;

			ts_hdr_len = igb_ptp_rx_pktstamp(rx_ring->q_vector,
							 pktbuf, &timestamp);

			pkt_offset += ts_hdr_len;
			size -= ts_hdr_len;
		}

		/* retrieve a buffer from the ring */
		if (!skb) {
			unsigned char *hard_start = pktbuf - igb_rx_offset(rx_ring);
			unsigned int offset = pkt_offset + igb_rx_offset(rx_ring);

			xdp_prepare_buff(&xdp, hard_start, offset, size, true);
			xdp_buff_clear_frags_flag(&xdp);
#if (PAGE_SIZE > 4096)
			/* At larger PAGE_SIZE, frame_sz depend on len size */
			xdp.frame_sz = igb_rx_frame_truesize(rx_ring, size);
#endif
			skb = igb_run_xdp(adapter, rx_ring, &xdp);
		}

		if (IS_ERR(skb)) {
			unsigned int xdp_res = -PTR_ERR(skb);

			if (xdp_res & (IGB_XDP_TX | IGB_XDP_REDIR)) {
				xdp_xmit |= xdp_res;
				igb_rx_buffer_flip(rx_ring, rx_buffer, size);
			} else {
				rx_buffer->pagecnt_bias++;
			}
			total_packets++;
			total_bytes += size;
		} else if (skb)
			igb_add_rx_frag(rx_ring, rx_buffer, skb, size);
		else if (ring_uses_build_skb(rx_ring))
			skb = igb_build_skb(rx_ring, rx_buffer, &xdp,
					    timestamp);
		else
			skb = igb_construct_skb(rx_ring, rx_buffer,
						&xdp, timestamp);

		/* exit if we failed to retrieve a buffer */
		if (!skb) {
			rx_ring->rx_stats.alloc_failed++;
			rx_buffer->pagecnt_bias++;
			break;
		}

		igb_put_rx_buffer(rx_ring, rx_buffer, rx_buf_pgcnt);
		cleaned_count++;

		/* fetch next buffer in frame if non-eop */
		if (igb_is_non_eop(rx_ring, rx_desc))
			continue;

		/* verify the packet layout is correct */
		if (igb_cleanup_headers(rx_ring, rx_desc, skb)) {
			skb = NULL;
			continue;
		}

		/* probably a little skewed due to removing CRC */
		total_bytes += skb->len;

		/* populate checksum, timestamp, VLAN, and protocol */
		igb_process_skb_fields(rx_ring, rx_desc, skb);

		napi_gro_receive(&q_vector->napi, skb);

		/* reset skb pointer */
		skb = NULL;

		/* update budget accounting */
		total_packets++;
	}

	/* place incomplete frames back on ring for completion */
	rx_ring->skb = skb;

	if (xdp_xmit & IGB_XDP_REDIR)
		xdp_do_flush();

	if (xdp_xmit & IGB_XDP_TX) {
		struct igb_ring *tx_ring = igb_xdp_tx_queue_mapping(adapter);

		igb_xdp_ring_update_tail(tx_ring);
	}

	u64_stats_update_begin(&rx_ring->rx_syncp);
	rx_ring->rx_stats.packets += total_packets;
	rx_ring->rx_stats.bytes += total_bytes;
	u64_stats_update_end(&rx_ring->rx_syncp);
	q_vector->rx.total_packets += total_packets;
	q_vector->rx.total_bytes += total_bytes;

	if (cleaned_count)
		igb_alloc_rx_buffers(rx_ring, cleaned_count);

	return total_packets;
}

取得 Descriptor

在接收封包的過程中,驅動首先會取得下一個待處理的 descriptor,並從中讀取封包的長度。

union e1000_adv_rx_desc *rx_desc;
struct igb_rx_buffer *rx_buffer;

rx_desc = IGB_RX_DESC(rx_ring, rx_ring->next_to_clean);
size = le16_to_cpu(rx_desc->wb.upper.length);
if (!size)
    break;

這裡使用 rx_ring->next_to_clean 來從 desc ring buffer 中取得下一個 descriptor,然後使用 rx_desc->wb.upper.length 來獲取網卡寫入的封包長度。由於封包長度是由網卡這種外部設備寫入的,因此會使用 le16_to_cpu 來進行大小端轉換。如果此時封包長度為 0,則表示該 descriptor 對應的DMA PAGE還沒沒有封包寫入,整個 ring 上沒有需要處理的封包了,就會直接 break 離開處理函數。

取得封包地址

由於 descriptor 保存的 DMA Page 地址已經被網卡寫入覆蓋成封包的資訊了,所以需要從igb_rx_buffer 中取得對應的資料。

void *pktbuf;
struct igb_rx_buffer *rx_buffer;

rx_buffer = igb_get_rx_buffer(rx_ring, size, &rx_buf_pgcnt);
pktbuf = page_address(rx_buffer->page) + rx_buffer->page_offset;

驅動會透過 igb_rx_buffer 取得對應的 DMA PAGE 物理地址,然後使用 page_address 函數將頁的物理地址轉換為虛擬地址,並加上 page_offset,取得網卡寫入封包的虛擬地址,最終將結果保存在 pktbuf 中,這樣驅動便可以直接存取封包內容,進行後續的處理。

PTP 時間戳處理

如果網卡支援 PTP 時間同步,則網卡寫入封包時,會在封包前寫入時間戳 (timestamp)。那麼,實際的 packet 內容的所在地址就要在往後移時間戳的長度 (ts_hdr_len)。

int pkt_offset = 0;

if (igb_test_staterr(rx_desc, E1000_RXDADV_STAT_TSIP)) {
			int ts_hdr_len;
			ts_hdr_len = igb_ptp_rx_pktstamp(rx_ring->q_vector,
							 pktbuf, &timestamp);
			pkt_offset += ts_hdr_len;
			size -= ts_hdr_len;
		}

這邊會先把這個數值會保存在 pkt_offset 中,然後實際的封包大小 (size) 就要扣掉 ts_hdr_len

XDP (eBPF)

struct xdp_buff xdp;

/* retrieve a buffer from the ring */
if (!skb) {
    unsigned char *hard_start = pktbuf - igb_rx_offset(rx_ring);
    unsigned int offset = pkt_offset + igb_rx_offset(rx_ring);

    xdp_prepare_buff(&xdp, hard_start, offset, size, true);
    xdp_buff_clear_frags_flag(&xdp);
#if (PAGE_SIZE > 4096)
    /* At larger PAGE_SIZE, frame_sz depend on len size */
    xdp.frame_sz = igb_rx_frame_truesize(rx_ring, size);
#endif
    skb = igb_run_xdp(adapter, rx_ring, &xdp);
}

這邊會先判斷 skb 是不是空,一般情況下 skb 指標會是空的,所以進入該區塊。

因為 while 迴圈的一個 iteraction 只會處取一個 Page,所以當封包內容大小超過一個 Page 的大小時就會需要跑多次回圈,讀多個 Page 來拼接。當發生這個情況或著或上一次處理中斷時,封包存在不完整的狀況,skb 會保留前一次 iteraction 申請的 sk_buff 指標,那就不會執行到該區塊。

接著要計算兩個數值 hard_startoffset

  • hard_start 指向該封包 data buffer 的起始位置,pktbuf 指向封包內容的開頭,所以這邊會用 pktbuf 扣掉 igb_rx_offset 找到最起始的位置。前面說道這個如果是 IGB_RING_FLAG_RX_BUILD_SKB_ENABLED,那驅動會在buffer開頭額外預留出 IGB_SKB_PAD,那這邊就要把扣掉。
  • offset 是從 buffer 的頭到真實封包內容開頭的距離,所以就會是IGB_SKB_PAD 加上前面的 pkt_offset。

整理起來長這樣:
https://ithelp.ithome.com.tw/upload/images/20241013/20152703V6ux627EYO.png

接著看到一連串 XDP 開頭的函數就知道,這裡就是網卡驅動接收封包時的 eBPF XDP 進入點了,所以可知 XDP 真的需要驅動去寫才能支援的,而且是在 socket buffer allocation 之前。

首先會根據剛剛算出來的位移和封包大小資料去準備 xdp 的上下文,接著呼叫 igb_run_xdp

static struct sk_buff *igb_run_xdp(struct igb_adapter *adapter,
				   struct igb_ring *rx_ring,
				   struct xdp_buff *xdp)
{
	int err, result = IGB_XDP_PASS;
	struct bpf_prog *xdp_prog;
	u32 act;

	xdp_prog = READ_ONCE(rx_ring->xdp_prog); /* 讀取 XDP 程式 */

	if (!xdp_prog)
		goto xdp_out;

	prefetchw(xdp->data_hard_start); /* xdp_frame write */

	act = bpf_prog_run_xdp(xdp_prog, xdp); /* eBPF 進入點 */
	switch (act) {
	case XDP_PASS:
		break;
	case XDP_TX:
		result = igb_xdp_xmit_back(adapter, xdp);
		if (result == IGB_XDP_CONSUMED)
			goto out_failure;
		break;
	case XDP_REDIRECT:
		err = xdp_do_redirect(adapter->netdev, xdp, xdp_prog);
		if (err)
			goto out_failure;
		result = IGB_XDP_REDIR;
		break;
	default:
		bpf_warn_invalid_xdp_action(adapter->netdev, xdp_prog, act);
		fallthrough;
	case XDP_ABORTED:
out_failure:
		trace_xdp_exception(rx_ring->netdev, xdp_prog, act);
		fallthrough;
	case XDP_DROP:
		result = IGB_XDP_CONSUMED;
		break;
	}
xdp_out:
	return ERR_PTR(-result);
}

可以看到 igb 驅動會讀取綁定在 rx_ring 上的 xdp program,然後呼叫 bpf_prog_run_xdp 執行 XDP 程式。然後根據 XDP 結果去處理,如果是 XDP_PASS 就繼續執行一般封包的接收流程。

sk_buff 的建立 (Socket Allocation)

#define IGB_XDP_PASS		0

skb = igb_run_xdp(adapter, rx_ring, &xdp);

// igb_clean_rx_irq
if (IS_ERR(skb)) {
    unsigned int xdp_res = -PTR_ERR(skb);

    if (xdp_res & (IGB_XDP_TX | IGB_XDP_REDIR)) {
        xdp_xmit |= xdp_res;
        igb_rx_buffer_flip(rx_ring, rx_buffer, size);
    } else {
        rx_buffer->pagecnt_bias++;
    }
    total_packets++;
    total_bytes += size;
} else if (skb)
    igb_add_rx_frag(rx_ring, rx_buffer, skb, size);



else if (ring_uses_build_skb(rx_ring))
    skb = igb_build_skb(rx_ring, rx_buffer, &xdp,
                timestamp);
else
    skb = igb_construct_skb(rx_ring, rx_buffer,
                &xdp, timestamp);

回到 igb_clean_rx_irq ,接著是對 XDP 的執行結果做一些處理,如果沒有錯誤然後是 XDP_PASS 的話,skb 會是 0,所以跳到最後一個if判斷。

這邊要做的事情是執行 socket allocation,建立 sk_buff 結構實體。前面介紹到的,如果啟用IGB_RING_FLAG_RX_BUILD_SKB_ENABLED,那就會執行 igb_build_skb,直接使用前面的 DMA Page 當作 packet data buffer,不然就會呼叫到 igb_construct_skb,同時建立 sk_buff 和 packet data buffer。兩個函數都會建立 sk_buff 結構實體,然後配置指標指向 packet data buffer,這邊就不跟進去看了。

sk_buff 初始化

/* exit if we failed to retrieve a buffer */
if (!skb) {
    rx_ring->rx_stats.alloc_failed++;
    rx_buffer->pagecnt_bias++;
    break;
}

/* 更新 rx_ring 跳過不說明 */
igb_put_rx_buffer(rx_ring, rx_buffer, rx_buf_pgcnt);
cleaned_count++;

/* fetch next buffer in frame if non-eop */
if (igb_is_non_eop(rx_ring, rx_desc))
    continue;

/* verify the packet layout is correct */
if (igb_cleanup_headers(rx_ring, rx_desc, skb)) {
    skb = NULL;
    continue;
}

/* probably a little skewed due to removing CRC */
total_bytes += skb->len;

/* populate checksum, timestamp, VLAN, and protocol */
/* 提取封包內容,寫入到 sk_buff */
igb_process_skb_fields(rx_ring, rx_desc, skb);

接著會有一系列的處理來維護 rx_ring 結構還有計算 sk_buff 裡面的欄位數值,igb_is_non_eop 會判斷封包是否結尾,非結尾表示一個封包太大,超過一個 page 可以保存的內容需要讀取下一個 page,所以會 continue 再走一遍流程,這時候 skb 指標已經指向建立好的 sk_buff 實例。

Generic Receive Offload

napi_gro_receive(&adapter->napi, skb);

因為沒有介紹中斷機制,所以沒有特別介紹 NAPI (New API),並默認採用 NAPI,有興趣可以的讀者可以在查查。

接著就會呼叫 kernel API 將處理好的 sk_buff 轉交給 kernel 還有協定層來處理拉!

這邊會呼叫 napi_gro_receive 先做 Generic Receive Offload (GRO) 處理。GRO 簡單來說就是把同一個TCP/UDP flow 的多個封包 (sk_buff) 合併成一個 sk_buff 再交由上層的協定層處理,減少協定層處理的封包數量。所以呼叫napi_gro_receive 後封包會被快取在GRO子系統中,然後要等 GRO 的快取存超過一定數量的封包才會 flush 轉交給網路子系統。

雖然沒有想要介紹 GRO,但是找 GRO 怎麼送到網路子系統的時候花了不少時間,所以這邊補充說明流程。

// net/core/gro.c
static gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
{
	gro_result_t ret;

	skb_mark_napi_id(skb, napi);
	trace_napi_gro_receive_entry(skb);

	skb_gro_reset_offset(skb, 0);

	ret = napi_skb_finish(napi, skb, dev_gro_receive(napi, skb));
	trace_napi_gro_receive_exit(ret);

	return ret;
}

static gro_result_t napi_frags_finish(struct napi_struct *napi,
				      struct sk_buff *skb,
				      gro_result_t ret)
{
	switch (ret) {
	case GRO_NORMAL:
	case GRO_HELD:
		__skb_push(skb, ETH_HLEN);
		skb->protocol = eth_type_trans(skb, skb->dev);
		if (ret == GRO_NORMAL)
			gro_normal_one(napi, skb, 1);
		break;

	case GRO_MERGED_FREE:
		if (NAPI_GRO_CB(skb)->free == NAPI_GRO_FREE_STOLEN_HEAD)
			napi_skb_free_stolen_head(skb);
		else
			napi_reuse_skb(napi, skb);
		break;

	case GRO_MERGED:
	case GRO_CONSUMED:
		break;
	}

	return ret;
}

napi_gro_receive 會呼叫 dev_gro_receive 處理,然後根據結果判斷需要合併還是直接送到協定層,但都會是呼叫 gro_normal_one 來把放到一個 GRO 內部的快取列表。

// include/net/gro.h
static inline void gro_normal_one(struct napi_struct *napi, struct sk_buff *skb, int segs)
{
	list_add_tail(&skb->list, &napi->rx_list);
	napi->rx_count += segs;
	if (napi->rx_count >= READ_ONCE(gro_normal_batch))
		gro_normal_list(napi);
}

static inline void gro_normal_list(struct napi_struct *napi)
{
	if (!napi->rx_count)
		return;
	netif_receive_skb_list_internal(&napi->rx_list);
	INIT_LIST_HEAD(&napi->rx_list);
	napi->rx_count = 0;
}

接著會檢查快取的數量是不是超過 gro_normal_batch,如果數量到了就會呼叫 gro_normal_list,藉由 netif_receive_skb_list_internal 將封包送到網路子系統。gro_normal_batch/proc/sys/net/core/gro_normal_batch 控制,在我的電腦上是 8,

到這邊封包就成功被送到網路子系統了,明天我們再接著來看網路子系統的部分。


上一篇
Intel igb 網卡驅動與接收隊列
下一篇
網路子系統的封包接收處理與系列總結兼感言
系列文
Linux Kernel 網路巡禮30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言