iT邦幫忙

2022 iThome 鐵人賽

DAY 18
0

本文目標:

  • 介紹 UPF 如何處理 UL/DL packets
  • 追蹤 UPF 專案的原始程式碼(本篇注重在 control plane 上)

Recap:什麼是 UPF?

UPF、AMF、SMF 是整個核心網路最為重要的 NFs,UPF 負責了所有 UE 的 Uplink & Downlink Data Packet,除了 5G 基地台傳輸效能外,UPF 的吞吐量也會影響這個 5G 網路的傳輸效率。

image

UPF 一共有 N3、N4、N6、N9 四個 Interface,它們的作用分別是:

  • N3 用於建立與 RAN 之間的聯繫,任何上下行封包都會經過 N3 (Based on GTP protocol)。
  • N4 用來負責與 SMF 溝通,任何與 PDU Session、QoS 有關的東西都由 SMF 建立 PFCP Procedure 告訴 UPF(N4 is Based on PFCP protocol)。
  • N6 用來將封包傳送給 Public Data Network(外部網路):
    • 當 UPF 收到來自 N3/N9 的 GTP 封包時,它會將封包進行解封裝與 SNAT 再轉發到外部網路。
    • 當 UPF 收到來自外部網路的 IP 封包,它需要將封包的目標 IP 轉換回正確的 UE IP。
  • N9 用於連結其他 UPF,供 ULCL 的場景使用。

進入正題

Packet processing flow

任何從 UE 送來、或是要送給 UE 的資料流都會經過下面的 Packet Processing flow:

簡單來說,當一個資料流進到 UPF 時,會先使用 PDR 檢查封包到底屬於哪一個 PDU Session,這時:

  • 如果檢測不到,UPF 會丟棄該封包
  • 如果 PDR matching 成功,這個封包會繼續被套用到 MAR(Multi-Access Rule)、FAR(Forwarding Action Rule,要轉發到哪裡?或是要先 Buffer 起來?)、QER(QoS Enforcement Rule,這個 Session 的 QoS 如何?)...

至於為什麼要有 QoS 這樣的東西出現,道理也非常簡單:就是為了流量控制,像是現在台灣最低的 4G 吃到飽方案已經下探到 200 左右,電信商不可能提供給 299、499 甚至是 999 月租費的用戶一樣的網路品質,這時候他們就可以透過 OAM 去設定像是 AMBR、MBR、GBR 等參數去限制每一個用戶能獲得什麼樣的連線品質囉。

軟體架構

架構圖取自:https://github.com/khirono/aps-upf-doc

本篇文章探討的是 UPF 的 control-plane,它負責處理來自 SMF 的控制訊號,並且轉換為 data-plane 模組看得懂的 flow rule(概念有點像是 SDN)。

透過先前的“5G NF 通用框架”章節的內容,我們可以快速的鎖定 Network Function 中不同功能分別實作在何處,GO-UPF 由 5 個子模組所組成:

1. forwarder

用於控制 gtp5g kernel module,運作原理如下:

  • 在初始化時,透過 netlink 新增一個 socket。
  • 透過建立好的 netlink client 發送 rtnetlink 訊息:
    • 建立 gtp5g device
    • 新增 gtp5g device 的路由(呼叫 RouteAdd() 函式)

初始化完成後,其他模組可以呼叫 forwarder 提供的 newSdfFilter() 新增 SDF,該函式會呼叫 newFlowDesc() 函式新增 flow description。
除此之外,forwarder 還提供了其他 api 讓相關的 rule 可以傳送至 gtp5g 執行:

  • CreatePDR()UpdatePDR()RemovePDR()
  • CreateFAR()UpdateFAR()RemoveFAR()
  • CreateQER()UpdateQER()RemoveQER()
  • ...

2. gtpv1

用於構造 gtpv1 protocol 的封包,當 UPF 收到 PFCP Modification Request 更新 Flow rules 會檢查是否有 buffered data 的狀態從 BUFF 變為 FORW(Forward),如果有,會呼叫 g.WritePacket(far, qer, pkt) 將 buffered packets 送往 gtp5g 轉發。

3. logger

var (
	log      *logrus.Logger
	MainLog  *logrus.Entry
	CfgLog   *logrus.Entry
	PfcpLog  *logrus.Entry
	BuffLog  *logrus.Entry
	FwderLog *logrus.Entry
)

// ...


func init() {
	log = logrus.New()
	log.SetReportCaller(false)

	log.Formatter = &formatter.Formatter{
		TimestampFormat: time.RFC3339,
		TrimMessages:    true,
		NoFieldsSpace:   true,
		HideKeys:        true,
		FieldsOrder: []string{
			"component",
			"category",
			FieldListenAddr,
			FieldRemoteNodeID,
			FieldSessionID,
			FieldTransction,
		},
	}

	MainLog = log.WithFields(logrus.Fields{"component": "UPF", FieldCategory: "Main"})
	CfgLog = log.WithFields(logrus.Fields{"component": "UPF", FieldCategory: "Cfg"})
	PfcpLog = log.WithFields(logrus.Fields{"component": "UPF", FieldCategory: "Pfcp"})
	BuffLog = log.WithFields(logrus.Fields{"component": "UPF", FieldCategory: "Buff"})
	FwderLog = log.WithFields(logrus.Fields{"component": "UPF"})
}

為了方便除錯與紀錄系統日誌,UPF 就跟其他的 Network Function 專案一樣,會為 log 進行分類,像是:MainLog(用於 UPF main loop)PfcpLog(用於 pfcp server)FwderLog(用於 gtp5g,也可以說是 UPF user plane 的部分)...等等。

4. pfcp

pfcp 模組用於處理來自 SMF 的 PFCP 訊息:

func (s *PfcpServer) reqDispacher(msg message.Message, addr net.Addr) error {
	switch req := msg.(type) {
	case *message.HeartbeatRequest:
		s.handleHeartbeatRequest(req, addr)
	case *message.AssociationSetupRequest:
		s.handleAssociationSetupRequest(req, addr)
	case *message.AssociationUpdateRequest:
		s.handleAssociationUpdateRequest(req, addr)
	case *message.AssociationReleaseRequest:
		s.handleAssociationReleaseRequest(req, addr)
	case *message.SessionEstablishmentRequest:
		s.handleSessionEstablishmentRequest(req, addr)
	case *message.SessionModificationRequest:
		s.handleSessionModificationRequest(req, addr)
	case *message.SessionDeletionRequest:
		s.handleSessionDeletionRequest(req, addr)
	default:
		return errors.Errorf("pfcp reqDispacher unknown msg type: %d", msg.MessageType())
	}
	return nil
}
  • HeartBeat 是 SMF 可以用來檢查 UPF 是否保持運作狀態的方式。
  • 根據 spec 定義,UPF 和 SMF 都只會受理有向自己建立 association 的 node(UPF -> SMF 或是 SMF -> UPF)。
  • 當 SMF 收到 PDU Session 相關請求需要為 Session 建立通道時,會向 UPF 發送 PFCP Session Establishment。
    • 相對的,如果要變更或刪除 Flow rule,SMF 會向 UPF 發送 Session Modification。

以 PFCP Session Establishment 為例,UPF 會進行以下處理:

  • rnode, ok := s.rnodes[rnodeid]:檢查發送 PFCP 訊息的 SMF 是否為已經建立 association 的 PFCP node?
  • sess := rnode.NewSess(fseid.SEID):建立一個 session。
  • 建立 PDR、FAR、URR、QER、BAR 等 rules:
for _, i := range req.CreateFAR {
		err = sess.CreateFAR(i)
		if err != nil {
			sess.log.Errorf("Est CreateFAR error: %+v", err)
		}
	}

以 FAR 為例,在呼叫 sess.CreateFAR(i) 時會有兩個行為:

  1. 告訴 forwarder 要建立 FAR
  2. 將 FAR ID 存入 Session 紀錄 FAR 的 MAP

以下為一個 session 的資料結構:

type Sess struct {
	rnode    *RemoteNode
	LocalID  uint64
	RemoteID uint64
	PDRIDs   map[uint16]struct{}
	FARIDs   map[uint32]struct{}
	QERIDs   map[uint32]struct{}
	URRIDs   map[uint32]struct{}
	BARIDs   map[uint8]struct{}
	q        map[uint16]chan []byte // key: PDR_ID
	qlen     int
	log      *logrus.Entry
}

每一個 Session 會紀錄:

  • 對應的 SMF node 的資訊
  • Local 以及 Remote 的 session id
  • Rule map
  • buffered packets queue,用 PDR ID 當成鍵值
    • ServeReport() 收到來自 gtp5g 的 Downlink data report,並且有 buffered packets 需要處理,該函式會找出 report 對應到的 session 並且呼叫 sess.Push(dldr.PDRID, rp.BufPkt) 將 buffered packets 儲存到 queue。

除了 local/remote node、session mapping 的設計,UPF 還有一項設計值得我們進行分析:PFCP Transaction 設計。
在介紹之前,先跟大家說明一下為什麼 PFCP server 需要 Transaction:PFCP 是基於 udp 進行傳輸的網路協定,也就是說,它並不像 TCP 或是 SCTP 這類的網路協定一樣保證接收方一定會收到訊息。如果 SMF 與 UPF 之間在溝通時有訊息流失,會產生兩邊 context 不同步的問題。因此,實作 Transaction 的機制來抵抗訊息流失造成的非預期行為是必要的(不管是 UPF 還是 SMF 都有實作,只是實作方式不盡相同)。

type PfcpServer struct {
	cfg          *factory.Config
	listen       string
	nodeID       string
	rcvCh        chan ReceivePacket
	srCh         chan report.SessReport
	trToCh       chan TransactionTimeout
	conn         *net.UDPConn
	recoveryTime time.Time
	driver       forwarder.Driver
	lnode        LocalNode
	rnodes       map[string]*RemoteNode
	txTrans      map[string]*TxTransaction // key: RemoteAddr-Sequence
	rxTrans      map[string]*RxTransaction // key: RemoteAddr-Sequence
	txSeq        uint32
	log          *logrus.Entry
}

上方程式碼展示了 UPF 專案中的 PfcpServer 資料結構,讓我來稍微說明一下其中幾個結構成員的用途:

1. rcvCh:PfcpServer 在運作時會有兩個 goroutine,一個是 pfcpServer 本身,另一個是 pfcp message receiver,當 receiver 收到來自 conn(SMF)發來的 PFCP 訊息,它會將訊息包裝成一個 event 並且放到 PfcpServer 的 rcvCh 當中:

type ReceivePacket struct {
	RemoteAddr net.Addr
	Buf        []byte
}

2. srCh:用於發送 session report 給 SMF 的 channel,由於 free5GC 的 UPF 只會主動的傳送一個 PFCP message(session report),所以你也可以把它當成 tx channel,當 forwarder goroutine 從 unix socket 收到來自 gtp5g(UPF user plane)的資料,它會將這些資訊包裝成 SessReport 結構的資料放入 srCh

3. trToCh:不管是收到訊息或是傳送訊息,PFCP server 都需要紀錄 transaction,以發送訊息為例,如果 UPF 發送了訊息給 SMF 但遲遲沒有收到回應,那麼 transaction 應該適時的重新傳送該訊息。以目前 UPF 的設計來說,UPF 會重新傳送 3 次 PFCP 訊息(這是在沒有收到 response 的情況下),當重傳已經到達三次,就會觸發 timeout event 經由 trToCh 通知 pfcp server。

4. recoveryTime:recoveryTime 紀錄了 PFCP server 的啟動時間,它會在某些訊息中作為 IE 包在 PFCP 訊息中傳給 SMF,目的是讓 SMF 對比每次收到訊息中的 recoveryTime 是否有變化(如果有變化,代表 UPF 可能出現了一些問題重啟了,這時候 SMF 就必須把所有 Session 的資訊傳遞給 UPF 進行同步)。

5. txTrans 與 rxTrans:用於記錄 transaction 的 map,TxTransaction 與 RxTransaction 資料結構則是用於描述 transaction 的資料。

type TxTransaction struct {
	server         *PfcpServer
	raddr          net.Addr // SMF 的位址
	seq            uint32 // pfcp message 的 sequence number
	id             string
	retransTimeout time.Duration // timeout 的時間
	maxRetrans     uint8 // 重新傳送次數上限
	req            message.Message // 訊息的種類
	msgBuf         []byte // 封包本身
	timer          *time.Timer // 每個 transaction 都有一個獨立的 timer 進行計時
	retransCount   uint8 // 重新傳送次數的計數器
	log            *logrus.Entry
}

type RxTransaction struct {
	server  *PfcpServer
	raddr   net.Addr
	seq     uint32
	id      string
	timeout time.Duration
	msgBuf  []byte
	timer   *time.Timer
	log     *logrus.Entry
}

6. txSeq:用於紀錄 pfcp message 中 sequence number 的計數器,每傳送一則 pfcp 訊息都會加一。

講解完 PfcpServer 資料結構後,我們回到前面提到的 goroutine(server 與 receiver):

func (s *PfcpServer) receiver(wg *sync.WaitGroup) {
	defer func() {
		if p := recover(); p != nil {
			// Print stack for panic to log. Fatalf() will let program exit.
			s.log.Fatalf("panic: %v\n%s", p, string(debug.Stack()))
		}

		s.log.Infoln("pfcp reciver stopped")
		wg.Done()
	}()

	buf := make([]byte, MAX_PFCP_MSG_LEN)
	for {
		s.log.Tracef("receiver starts to read...")
		n, addr, err := s.conn.ReadFrom(buf)
		if err != nil {
			s.log.Errorf("%+v", err)
			s.rcvCh <- ReceivePacket{}
			break
		}

		s.log.Tracef("receiver reads message(len=%d)", n)
		msgBuf := make([]byte, n)
		copy(msgBuf, buf)
		s.rcvCh <- ReceivePacket{
			RemoteAddr: addr,
			Buf:        msgBuf,
		}
	}
}

Receiver 的工作很簡單,它就是不斷的接收訊息然後使用 rcvCh 通知 server goroutine。

for {
		select {
		case sr := <-s.srCh:
			s.log.Tracef("receive SessReport from srCh")
			s.ServeReport(&sr)
		case rcvPkt := <-s.rcvCh:
			s.log.Tracef("receive buf(len=%d) from rcvCh", len(rcvPkt.Buf))
			if len(rcvPkt.Buf) == 0 {
				// receiver closed
				return
			}
			msg, err := message.Parse(rcvPkt.Buf)
			if err != nil {
				s.log.Errorln(err)
				s.log.Tracef("ignored undecodable message:\n%+v", hex.Dump(rcvPkt.Buf))
				continue
			}

			trID := fmt.Sprintf("%s-%d", rcvPkt.RemoteAddr, msg.Sequence())
			if isRequest(msg) {
				s.log.Tracef("receive req pkt from %s", trID)
				rx, ok := s.rxTrans[trID]
				if !ok {
					rx = NewRxTransaction(s, rcvPkt.RemoteAddr, msg.Sequence())
					s.rxTrans[trID] = rx
				}
				needDispatch, err1 := rx.recv(msg, ok)
				if err1 != nil {
					s.log.Warnf("rcvCh: %v", err1)
					continue
				} else if !needDispatch {
					s.log.Debugf("rcvCh: rxtr[%s] req no need to dispatch", trID)
					continue
				}
				err = s.reqDispacher(msg, rcvPkt.RemoteAddr)
				if err != nil {
					s.log.Errorln(err)
					s.log.Tracef("ignored undecodable message:\n%+v", hex.Dump(rcvPkt.Buf))
				}
			} else if isResponse(msg) {
				s.log.Tracef("receive rsp pkt from %s", trID)
				tx, ok := s.txTrans[trID]
				if !ok {
					s.log.Debugf("rcvCh: No txtr[%s] found for rsp", trID)
					continue
				}
				req := tx.recv(msg)
				err = s.rspDispacher(msg, rcvPkt.RemoteAddr, req)
				if err != nil {
					s.log.Errorln(err)
					s.log.Tracef("ignored undecodable message:\n%+v", hex.Dump(rcvPkt.Buf))
				}
			}
		case trTo := <-s.trToCh:
			s.log.Tracef("receive tr timeout (%v) from trToCh", trTo)
			if trTo.TrType == TX {
				tx, ok := s.txTrans[trTo.TrID]
				if !ok {
					s.log.Warnf("trToCh: txtr[%s] not found", trTo.TrID)
					continue
				}
				tx.handleTimeout()
			} else { // RX
				rx, ok := s.rxTrans[trTo.TrID]
				if !ok {
					s.log.Warnf("trToCh: rxtr[%s] not found", trTo.TrID)
					continue
				}
				rx.handleTimeout()
			}
		}
    }

上面是 server goroutine 的主要邏輯,其實它就是一個無窮迴圈,在迴圈中使用 select 來處理多個 channel 的訊息(一次處理一個),這樣的設計屬於 The Producer Consumer Pattern,由於只有 server routine 本身會存取 PfcpServer 結構中的資料,所以每一個操作都不需要使用 lock 保護(不會有 race condition 產生),是一個十分經典的 concurrent design pattern。

5. report

report 模組定義了 report 相關的 handler interface:

type Handler interface {
	NotifySessReport(SessReport)
	PopBufPkt(uint64, uint16) ([]byte, bool)
}

以及 report 相關的資料結構:

const (
	DLDR = iota + 1
	USAR
	ERIR
	UPIR
	TMIR
	SESR
	UISR
)

type Report interface {
	Type() int
}

type DLDReport struct {
	PDRID uint16
}

func (r DLDReport) Type() int {
	return DLDR
}

type USAReport struct {
	URRID       uint32
	URSEQN      uint32
	USARTrigger UsageReportTrigger
}

type UsageReportTrigger struct {
	PERIO uint8
	VOLTH uint8
	TIMTH uint8
	QUHTI uint8
	START uint8
	STOPT uint8
	DROTH uint8
	IMMER uint8
	VOLQU uint8
	TIMQU uint8
	LIUSA uint8
	TERMR uint8
	MONIT uint8
	ENVCL uint8
	MACAR uint8
	EVETH uint8
	EVEQU uint8
	TEBUR uint8
	IPMJL uint8
	QUVTI uint8
	EMRRE uint8
}

func (r USAReport) Type() int {
	return USAR
}

const (
	DROP = 1 << 0
	FORW = 1 << 1
	BUFF = 1 << 2
	NOCP = 1 << 3
)

type SessReport struct {
	SEID   uint64
	Report Report
	Action uint16
	BufPkt []byte
}

type BufInfo struct {
	SEID  uint64
	PDRID uint16
}

這部分的實作仍不完善,更多資訊可以參考這個尚未被合併的 pull request

總結

在了解 TS 29.244 中描述的 UP Function 行為後,我們可以藉由閱讀程式碼將這些抽象的概念具現化,是一個很棒的學習方式。
實際上,UPF 是一個非常大的專案,本篇文章僅介紹到 controll plane(這裡的 SMF 以及 UPF 比較像是 SDN controller),實際在轉發封包、執行 QoS 的是未來幾天會介紹的 gtp5g 專案負責的。
為了能夠追蹤 gtp5g 的原始程式碼,在未來幾篇文章中,筆者將會把重點放在 Linux network internals,讓大家了解 Linux 如何處理網路封包、如何撰寫 kernel space program 後再把重點放回 UPF 身上。

References


上一篇
Network Function 通用軟體架構
下一篇
Linux 網路系統概觀
系列文
5G 核心網路與雲原生開發之亂彈阿翔36
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言