本文目標:
UPF、AMF、SMF 是整個核心網路最為重要的 NFs,UPF 負責了所有 UE 的 Uplink & Downlink Data Packet,除了 5G 基地台傳輸效能外,UPF 的吞吐量也會影響這個 5G 網路的傳輸效率。
UPF 一共有 N3、N4、N6、N9 四個 Interface,它們的作用分別是:
任何從 UE 送來、或是要送給 UE 的資料流都會經過下面的 Packet Processing flow:
簡單來說,當一個資料流進到 UPF 時,會先使用 PDR 檢查封包到底屬於哪一個 PDU Session,這時:
至於為什麼要有 QoS 這樣的東西出現,道理也非常簡單:就是為了流量控制,像是現在台灣最低的 4G 吃到飽方案已經下探到 200 左右,電信商不可能提供給 299、499 甚至是 999 月租費的用戶一樣的網路品質,這時候他們就可以透過 OAM 去設定像是 AMBR、MBR、GBR 等參數去限制每一個用戶能獲得什麼樣的連線品質囉。
架構圖取自:https://github.com/khirono/aps-upf-doc
本篇文章探討的是 UPF 的 control-plane,它負責處理來自 SMF 的控制訊號,並且轉換為 data-plane 模組看得懂的 flow rule(概念有點像是 SDN)。
透過先前的“5G NF 通用框架”章節的內容,我們可以快速的鎖定 Network Function 中不同功能分別實作在何處,GO-UPF 由 5 個子模組所組成:
用於控制 gtp5g kernel module,運作原理如下:
RouteAdd()
函式)初始化完成後,其他模組可以呼叫 forwarder 提供的 newSdfFilter()
新增 SDF,該函式會呼叫 newFlowDesc()
函式新增 flow description。
除此之外,forwarder 還提供了其他 api 讓相關的 rule 可以傳送至 gtp5g 執行:
CreatePDR()
、UpdatePDR()
、RemovePDR()
CreateFAR()
、UpdateFAR()
、RemoveFAR()
CreateQER()
、UpdateQER()
、RemoveQER()
用於構造 gtpv1 protocol 的封包,當 UPF 收到 PFCP Modification Request 更新 Flow rules 會檢查是否有 buffered data 的狀態從 BUFF 變為 FORW(Forward),如果有,會呼叫 g.WritePacket(far, qer, pkt)
將 buffered packets 送往 gtp5g 轉發。
var (
log *logrus.Logger
MainLog *logrus.Entry
CfgLog *logrus.Entry
PfcpLog *logrus.Entry
BuffLog *logrus.Entry
FwderLog *logrus.Entry
)
// ...
func init() {
log = logrus.New()
log.SetReportCaller(false)
log.Formatter = &formatter.Formatter{
TimestampFormat: time.RFC3339,
TrimMessages: true,
NoFieldsSpace: true,
HideKeys: true,
FieldsOrder: []string{
"component",
"category",
FieldListenAddr,
FieldRemoteNodeID,
FieldSessionID,
FieldTransction,
},
}
MainLog = log.WithFields(logrus.Fields{"component": "UPF", FieldCategory: "Main"})
CfgLog = log.WithFields(logrus.Fields{"component": "UPF", FieldCategory: "Cfg"})
PfcpLog = log.WithFields(logrus.Fields{"component": "UPF", FieldCategory: "Pfcp"})
BuffLog = log.WithFields(logrus.Fields{"component": "UPF", FieldCategory: "Buff"})
FwderLog = log.WithFields(logrus.Fields{"component": "UPF"})
}
為了方便除錯與紀錄系統日誌,UPF 就跟其他的 Network Function 專案一樣,會為 log 進行分類,像是:MainLog(用於 UPF main loop)
、PfcpLog(用於 pfcp server)
、FwderLog(用於 gtp5g,也可以說是 UPF user plane 的部分)
...等等。
pfcp 模組用於處理來自 SMF 的 PFCP 訊息:
func (s *PfcpServer) reqDispacher(msg message.Message, addr net.Addr) error {
switch req := msg.(type) {
case *message.HeartbeatRequest:
s.handleHeartbeatRequest(req, addr)
case *message.AssociationSetupRequest:
s.handleAssociationSetupRequest(req, addr)
case *message.AssociationUpdateRequest:
s.handleAssociationUpdateRequest(req, addr)
case *message.AssociationReleaseRequest:
s.handleAssociationReleaseRequest(req, addr)
case *message.SessionEstablishmentRequest:
s.handleSessionEstablishmentRequest(req, addr)
case *message.SessionModificationRequest:
s.handleSessionModificationRequest(req, addr)
case *message.SessionDeletionRequest:
s.handleSessionDeletionRequest(req, addr)
default:
return errors.Errorf("pfcp reqDispacher unknown msg type: %d", msg.MessageType())
}
return nil
}
以 PFCP Session Establishment 為例,UPF 會進行以下處理:
rnode, ok := s.rnodes[rnodeid]
:檢查發送 PFCP 訊息的 SMF 是否為已經建立 association 的 PFCP node?sess := rnode.NewSess(fseid.SEID)
:建立一個 session。for _, i := range req.CreateFAR {
err = sess.CreateFAR(i)
if err != nil {
sess.log.Errorf("Est CreateFAR error: %+v", err)
}
}
以 FAR 為例,在呼叫 sess.CreateFAR(i)
時會有兩個行為:
以下為一個 session 的資料結構:
type Sess struct {
rnode *RemoteNode
LocalID uint64
RemoteID uint64
PDRIDs map[uint16]struct{}
FARIDs map[uint32]struct{}
QERIDs map[uint32]struct{}
URRIDs map[uint32]struct{}
BARIDs map[uint8]struct{}
q map[uint16]chan []byte // key: PDR_ID
qlen int
log *logrus.Entry
}
每一個 Session 會紀錄:
ServeReport()
收到來自 gtp5g 的 Downlink data report,並且有 buffered packets 需要處理,該函式會找出 report 對應到的 session 並且呼叫 sess.Push(dldr.PDRID, rp.BufPkt)
將 buffered packets 儲存到 queue。除了 local/remote node、session mapping 的設計,UPF 還有一項設計值得我們進行分析:PFCP Transaction 設計。
在介紹之前,先跟大家說明一下為什麼 PFCP server 需要 Transaction:PFCP 是基於 udp 進行傳輸的網路協定,也就是說,它並不像 TCP 或是 SCTP 這類的網路協定一樣保證接收方一定會收到訊息。如果 SMF 與 UPF 之間在溝通時有訊息流失,會產生兩邊 context 不同步的問題。因此,實作 Transaction 的機制來抵抗訊息流失造成的非預期行為是必要的(不管是 UPF 還是 SMF 都有實作,只是實作方式不盡相同)。
type PfcpServer struct {
cfg *factory.Config
listen string
nodeID string
rcvCh chan ReceivePacket
srCh chan report.SessReport
trToCh chan TransactionTimeout
conn *net.UDPConn
recoveryTime time.Time
driver forwarder.Driver
lnode LocalNode
rnodes map[string]*RemoteNode
txTrans map[string]*TxTransaction // key: RemoteAddr-Sequence
rxTrans map[string]*RxTransaction // key: RemoteAddr-Sequence
txSeq uint32
log *logrus.Entry
}
上方程式碼展示了 UPF 專案中的 PfcpServer
資料結構,讓我來稍微說明一下其中幾個結構成員的用途:
1. rcvCh:PfcpServer 在運作時會有兩個 goroutine,一個是 pfcpServer 本身,另一個是 pfcp message receiver,當 receiver 收到來自 conn
(SMF)發來的 PFCP 訊息,它會將訊息包裝成一個 event 並且放到 PfcpServer 的 rcvCh 當中:
type ReceivePacket struct {
RemoteAddr net.Addr
Buf []byte
}
2. srCh:用於發送 session report 給 SMF 的 channel,由於 free5GC 的 UPF 只會主動的傳送一個 PFCP message(session report),所以你也可以把它當成 tx channel,當 forwarder goroutine 從 unix socket 收到來自 gtp5g(UPF user plane)的資料,它會將這些資訊包裝成 SessReport
結構的資料放入 srCh
。
3. trToCh:不管是收到訊息或是傳送訊息,PFCP server 都需要紀錄 transaction,以發送訊息為例,如果 UPF 發送了訊息給 SMF 但遲遲沒有收到回應,那麼 transaction 應該適時的重新傳送該訊息。以目前 UPF 的設計來說,UPF 會重新傳送 3 次 PFCP 訊息(這是在沒有收到 response 的情況下),當重傳已經到達三次,就會觸發 timeout event 經由 trToCh
通知 pfcp server。
4. recoveryTime:recoveryTime 紀錄了 PFCP server 的啟動時間,它會在某些訊息中作為 IE 包在 PFCP 訊息中傳給 SMF,目的是讓 SMF 對比每次收到訊息中的 recoveryTime 是否有變化(如果有變化,代表 UPF 可能出現了一些問題重啟了,這時候 SMF 就必須把所有 Session 的資訊傳遞給 UPF 進行同步)。
5. txTrans 與 rxTrans:用於記錄 transaction 的 map,TxTransaction 與 RxTransaction 資料結構則是用於描述 transaction 的資料。
type TxTransaction struct {
server *PfcpServer
raddr net.Addr // SMF 的位址
seq uint32 // pfcp message 的 sequence number
id string
retransTimeout time.Duration // timeout 的時間
maxRetrans uint8 // 重新傳送次數上限
req message.Message // 訊息的種類
msgBuf []byte // 封包本身
timer *time.Timer // 每個 transaction 都有一個獨立的 timer 進行計時
retransCount uint8 // 重新傳送次數的計數器
log *logrus.Entry
}
type RxTransaction struct {
server *PfcpServer
raddr net.Addr
seq uint32
id string
timeout time.Duration
msgBuf []byte
timer *time.Timer
log *logrus.Entry
}
6. txSeq:用於紀錄 pfcp message 中 sequence number 的計數器,每傳送一則 pfcp 訊息都會加一。
講解完 PfcpServer
資料結構後,我們回到前面提到的 goroutine(server 與 receiver):
func (s *PfcpServer) receiver(wg *sync.WaitGroup) {
defer func() {
if p := recover(); p != nil {
// Print stack for panic to log. Fatalf() will let program exit.
s.log.Fatalf("panic: %v\n%s", p, string(debug.Stack()))
}
s.log.Infoln("pfcp reciver stopped")
wg.Done()
}()
buf := make([]byte, MAX_PFCP_MSG_LEN)
for {
s.log.Tracef("receiver starts to read...")
n, addr, err := s.conn.ReadFrom(buf)
if err != nil {
s.log.Errorf("%+v", err)
s.rcvCh <- ReceivePacket{}
break
}
s.log.Tracef("receiver reads message(len=%d)", n)
msgBuf := make([]byte, n)
copy(msgBuf, buf)
s.rcvCh <- ReceivePacket{
RemoteAddr: addr,
Buf: msgBuf,
}
}
}
Receiver 的工作很簡單,它就是不斷的接收訊息然後使用 rcvCh 通知 server goroutine。
for {
select {
case sr := <-s.srCh:
s.log.Tracef("receive SessReport from srCh")
s.ServeReport(&sr)
case rcvPkt := <-s.rcvCh:
s.log.Tracef("receive buf(len=%d) from rcvCh", len(rcvPkt.Buf))
if len(rcvPkt.Buf) == 0 {
// receiver closed
return
}
msg, err := message.Parse(rcvPkt.Buf)
if err != nil {
s.log.Errorln(err)
s.log.Tracef("ignored undecodable message:\n%+v", hex.Dump(rcvPkt.Buf))
continue
}
trID := fmt.Sprintf("%s-%d", rcvPkt.RemoteAddr, msg.Sequence())
if isRequest(msg) {
s.log.Tracef("receive req pkt from %s", trID)
rx, ok := s.rxTrans[trID]
if !ok {
rx = NewRxTransaction(s, rcvPkt.RemoteAddr, msg.Sequence())
s.rxTrans[trID] = rx
}
needDispatch, err1 := rx.recv(msg, ok)
if err1 != nil {
s.log.Warnf("rcvCh: %v", err1)
continue
} else if !needDispatch {
s.log.Debugf("rcvCh: rxtr[%s] req no need to dispatch", trID)
continue
}
err = s.reqDispacher(msg, rcvPkt.RemoteAddr)
if err != nil {
s.log.Errorln(err)
s.log.Tracef("ignored undecodable message:\n%+v", hex.Dump(rcvPkt.Buf))
}
} else if isResponse(msg) {
s.log.Tracef("receive rsp pkt from %s", trID)
tx, ok := s.txTrans[trID]
if !ok {
s.log.Debugf("rcvCh: No txtr[%s] found for rsp", trID)
continue
}
req := tx.recv(msg)
err = s.rspDispacher(msg, rcvPkt.RemoteAddr, req)
if err != nil {
s.log.Errorln(err)
s.log.Tracef("ignored undecodable message:\n%+v", hex.Dump(rcvPkt.Buf))
}
}
case trTo := <-s.trToCh:
s.log.Tracef("receive tr timeout (%v) from trToCh", trTo)
if trTo.TrType == TX {
tx, ok := s.txTrans[trTo.TrID]
if !ok {
s.log.Warnf("trToCh: txtr[%s] not found", trTo.TrID)
continue
}
tx.handleTimeout()
} else { // RX
rx, ok := s.rxTrans[trTo.TrID]
if !ok {
s.log.Warnf("trToCh: rxtr[%s] not found", trTo.TrID)
continue
}
rx.handleTimeout()
}
}
}
上面是 server goroutine 的主要邏輯,其實它就是一個無窮迴圈,在迴圈中使用 select 來處理多個 channel 的訊息(一次處理一個),這樣的設計屬於 The Producer Consumer Pattern,由於只有 server routine 本身會存取 PfcpServer 結構中的資料,所以每一個操作都不需要使用 lock 保護(不會有 race condition 產生),是一個十分經典的 concurrent design pattern。
report 模組定義了 report 相關的 handler interface:
type Handler interface {
NotifySessReport(SessReport)
PopBufPkt(uint64, uint16) ([]byte, bool)
}
以及 report 相關的資料結構:
const (
DLDR = iota + 1
USAR
ERIR
UPIR
TMIR
SESR
UISR
)
type Report interface {
Type() int
}
type DLDReport struct {
PDRID uint16
}
func (r DLDReport) Type() int {
return DLDR
}
type USAReport struct {
URRID uint32
URSEQN uint32
USARTrigger UsageReportTrigger
}
type UsageReportTrigger struct {
PERIO uint8
VOLTH uint8
TIMTH uint8
QUHTI uint8
START uint8
STOPT uint8
DROTH uint8
IMMER uint8
VOLQU uint8
TIMQU uint8
LIUSA uint8
TERMR uint8
MONIT uint8
ENVCL uint8
MACAR uint8
EVETH uint8
EVEQU uint8
TEBUR uint8
IPMJL uint8
QUVTI uint8
EMRRE uint8
}
func (r USAReport) Type() int {
return USAR
}
const (
DROP = 1 << 0
FORW = 1 << 1
BUFF = 1 << 2
NOCP = 1 << 3
)
type SessReport struct {
SEID uint64
Report Report
Action uint16
BufPkt []byte
}
type BufInfo struct {
SEID uint64
PDRID uint16
}
這部分的實作仍不完善,更多資訊可以參考這個尚未被合併的 pull request。
在了解 TS 29.244 中描述的 UP Function 行為後,我們可以藉由閱讀程式碼將這些抽象的概念具現化,是一個很棒的學習方式。
實際上,UPF 是一個非常大的專案,本篇文章僅介紹到 controll plane(這裡的 SMF 以及 UPF 比較像是 SDN controller),實際在轉發封包、執行 QoS 的是未來幾天會介紹的 gtp5g 專案負責的。
為了能夠追蹤 gtp5g 的原始程式碼,在未來幾篇文章中,筆者將會把重點放在 Linux network internals,讓大家了解 Linux 如何處理網路封包、如何撰寫 kernel space program 後再把重點放回 UPF 身上。