iT邦幫忙

2023 iThome 鐵人賽

DAY 23
0
Software Development

前端? 後端? 摻在一起做成全端就好了系列 第 23

23 是websocket,不是socket。使用rust websocket

  • 分享至 

  • xImage
  •  

先簡單說明一下websocket是什麼,Socket就是一堆洞的意思,看下圖有Socket的圖,不過這是CPU Socket,CPU和插槽長像下面這樣,以前IntelAMD還可以換著插,現在都不行了,只能各插各的,形狀要符合才能插:

socket 4插槽 intel P133 CPU

但我們現在要講的不是這個,在軟體開發一般單講Socket是在講網路的洞,什麼是網路的洞,下面這張圖表達的很清楚,想像一條看不見的 手(X) 線(O),透過網路連接2台不同的主機,所以要有洞才能讓這條線插,就像我們每天都在插USB充電一樣,而我們的應用程式也要指到這個對應的洞,才不會傳錯資料,錯把馮京當馬涼,把要傳給女友的訊息傳給男友就糗了(咦?)。

socket示意圖

from: https://www.slideshare.net/PeterREgli/sockets-6225288

所以Socket是屬於比較偏底層的東西,而websocket(ws)呢,他是在高層裡的底層(? 阿鬼 你還是說中文吧),就像我們之前在用WebAssembly不也是在Web裡跑Assembly語言嗎(雖然現在wasm也可以用在非web的環境,而且還慢慢的紅起來),websocket只是在web環境中用起來很像socket,實作上的細節差很多,想了解的話去看邦友的文章,而想知道wasm不在web裡怎麼用的可以去看這系列介紹

在.NET裡有SignalR幫我們包好了,會依不同環境的相容性看情況使用ws,或是在不相容的瀏覽器使用其他協定去達到類似的結果。在Rust中,我們很多東西要自己處理,不過好在有豐富的社群和生態系支持,這些比較底層的東西大多有前輩做好了,我們只要學習如何使用就好(不過如果是要寫IoT要顧及頻寬及電力問題,可能需要知道更底層的東西來做優化)。

hello websocket (後端)

前言到這邊,我們看一下warp的hello_websocket,把它接進我們的router裡:

@@ Cargo.toml @@
+futures-util = { version = "0.3" }

@@ web/Cargo.toml @@
+futures-util = { workspace = true }
// web/src/routers.rs
pub fn all_routers() -> 
    // ... 略

ws_routers()
    .or(hello)
    .or(api_games)
    .recover(error::handle_rejection)
    .with(cors_config())
    .with(warp::trace::request())
}

use futures_util::stream::StreamExt;
use futures_util::FutureExt;

pub fn ws_routers() -> impl Filter<Extract=impl Reply, Error=Rejection> + Clone {
    warp::path("echo")
        .and(warp::ws())
        .map(|ws: warp::ws::Ws| {
            ws.on_upgrade(|websocket| {
                let (tx, rx) = websocket.split();
                rx.forward(tx).map(|result| {
                    if let Err(e) = result {
                        tracing::info!("websocket error: {:?}", e);
                    }
                })
            })
        })
}

這個範例只是很簡單的,從前端收到什麼訊息,就回應什麼訊息:

誰答腔我就說誰啊

欸不是,放錯圖了,跑出來的結果是下面這個:

加ws後把server跑起來的console

看不出來有沒有正確執行(?),我們用前端開 ws client試一下:

hello websocket (前端)

通常前端個人傾向從console中進行測試,因為可以快速得到結果,據以驗證自己的演算法對不對,直譯式語言的好處就不是不用編譯,打什麼出什麼。等試成功後再把code寫到code base裡,在MDN的文件有提到怎麼寫javascript的websocket:

var mySocket = new WebSocket("ws://localhost:3030/echo");

在browser裡的console裡試行ws連線

記得網頁要開localhost:3030,在console才不會被CORS擋掉。

看後端的畫面有收到echo的訊息:

前端連線ws後,後端log顯示的訊息

我們試著從前端發一下訊息

前端從console.log發送websocket訊息

這時候從前端發送資料給websocket,但沒反應,因為他是保持連線,不是request/response模式,所以需要用監聽的方式監聽後端有沒有發送訊息過來。在這裡依MDN說明去註冊一個監聽的事件,再重新傳送剛剛的訊息,就可以得到回應的結果:

mySocket.onmessage = function (e) {
    console.log(e.data);
};

加上監聽ws的callback function

不過,只有資料,這時可以把範例的 .data拿掉查看完整的訊息物件:

查看ws訊息的物件長相

可以看到data只是回應裡面的一個部分,還有其他meta資訊。

測試ws可以作用後,我們到code base裡去實現吧:

我們在 api 中加 websocket client:

// app/src/api/ws_client.ts
export const wsClient = (): WebSocket => {
  const url = `${import.meta.env.VITE_WS_BASE_URL}/echo`;
  const ws: WebSocket = new WebSocket(url);
  ws.onopen = () => {
    console.log('ws open');
  }
  ws.onclose = (): void => {
    console.log('ws close');
  }
  ws.onerror = (err: Event) => {
    console.log('ws error', err);
  }
  ws.onmessage = (msg: MessageEvent<any>) => {
    console.log('ws message', msg);
  }
  return ws;
}

其中環境變數加在 前端node.js使用的.env裡

# app/.env
VITE_WS_BASE_URL=ws://localhost:3030

使用Greet來進行概念性驗證:

<script lang="ts">
  import { wsClient } from '../api/ws_client';

  let message = '';
  let responseMessage = '';
  let ws = wsClient();
  ws.onmessage = function (event) {
    console.log('Message from server ', event.data);
    responseMessage = event.data;
  };
  async function greet() {
    ws.send(message);
  }
</script>

<div>
  <form class="row" on:submit|preventDefault={greet}>
    <input id="greet-input" placeholder="Enter message" bind:value={message} />
    <button type="submit">Greet</button>
  </form>
  <p>{responseMessage}</p>
</div>

svelte使用ws
看起來沒有問題。

ps 這裡要同時跑 web http, tauri, grpc server 3個terminal。

管理websocket連線 (後端)

剛剛的websocket只是簡單echo,除了測試服務是不是還活著,好像沒什麼用途(還有用來demo XDD)。因為websocket的連線是持續活著(直到一方關閉),我們在這邊借用tokio-stream的幫忙,想像連線一直開始其實就是一種串流,一邊丟資料另一邊接到一包資料就可以進行處理。另外也需要一個state來存放,跟先前在第14篇幫tauri加state很像:

@@ Cargo.toml @@
 [workspace.dependencies]
+tokio-stream = { version = "0.1" }

@@ web/Cargo.toml @@
 [dependencies]
+tokio-stream = { workspace = true }

@@ web/src/lib.rs @@
+pub mod web_socket;
+pub mod app_context;

我們開一個context,存放作為整個App的State狀態管理使用

// web/src/app_context.rs
use std::{sync::Arc, collections::HashMap};
use tokio::sync::{RwLock, mpsc::UnboundedSender};
use warp::ws::Message;

#[derive(Clone)]
pub struct AppContext {
    pub ws_connections: 
    Arc<
        RwLock<
            HashMap<
                usize,
                UnboundedSender<Message>
            >
        >
    >,
}

impl Default for AppContext {
    fn default() -> Self {
        Self {
            ws_connections:
            Arc::new(
                RwLock::new(
                    HashMap::new()
                )
            ),
        }
    }
}

其中HaspMap存放的就是websocket的連線,使用key/value,key為整數usize,用以辨識連線用戶端,如果有需要也可以使用其他key值,或綁定用戶資訊等。RwLock之前在第7篇有順帶提到,這裡我們只有在「用戶端建立新連線」的時候才會取號並寫入HashMap中,其他時候調用現有的websocket連線都只是對HashMap進行讀取,是故在這裡用Mutex相對效率不彰。

實作 web socket 訊息處理 (後端)

// web/src/main.rs
use web::{config, routers, app_context::AppContext};

#[tokio::main]
async fn main() {
    config::init();
    let _logger = Logger::builder().use_env().build();
    let app_context = AppContext::default();    // 加入App狀態機
    let routers = routers::all_routers(app_context.clone()); // 注入
    warp::serve(routers).run(([0, 0, 0, 0], config::http_port())).await;
}

每回合的poc都是隨意寫的code,先求有再求好,確認想法ok要開始實作時,記得先整理一下程式碼,我們直接把web_socket移出為一個獨立的mod檔案。

這裡有一篇文章:我的寫法我的 Team 都看得懂,針對寫出清晰可讀的代碼有滿獨到的見解,必看。

// web/src/routers.rs
use crate::app_context::AppContext;
use crate::web_socket::ws_routers;
pub fn all_routers(ctx: AppContext)    // 把 state 作為參數注入
    -> impl Filter<Extract=impl Reply, Error=Rejection> + Clone {
    // ...略
    hello
        .or(ws_routers(ctx.clone()))    // 傳遞state
    // 略
}
// 移除舊的ws_routes

上面把router裡的ws_routes移除,並使用引用的方式,從另一個mod web_scoket引入,以下是mod web_socket路由,內容有點長,這裡分段講述:

// web/src/web_socket.rs
use std::sync::atomic::{AtomicUsize, Ordering};
use futures_util::{SinkExt, TryFutureExt, stream::StreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use warp::{Filter, Rejection, Reply, ws::{Message, WebSocket}};
use crate::app_context::AppContext;

static NEXT_CONNECTION_ID: AtomicUsize = AtomicUsize::new(1); // 取號用

pub fn ws_routers(context: AppContext)
    -> impl Filter<Extract=impl Reply, Error=Rejection> + Clone {
    warp::path("echo")        // web socket 路由
        .and(warp::ws())
        .and(warp::any().map(move || context.clone()))   //注入 State
        .map(|ws: warp::ws::Ws, ctx: AppContext| {      // 注入 State
            ws.on_upgrade(move |socket| ws_connected(socket, ctx))
        })                   // ↑↑ 接到連線使用我們定義的ws_connected fn處理
}

上面的map 是extrator,抽取整串pipeline裡面有的東西,可能是reqeust傳進來的,可能是我們在中間加入的(middleware),整個管線如同第13篇的鐵道開發法。

而裡面的ws.on_upgrade ,需要傳入一個Function,用來處理當有websocket連線時,而且這個Function必需回傳一個Future物件,只要我們在fn前面加一個async,rust就會把我們把它傳為Future,以下我們寫一個ws_connected,傳入連線物件及App狀態物件,而這個ws_connected存活期間是跟著每一條連線走的,有幾條連線就有幾個ws_connected,裡面有一個無窮迴圈,也因此這裡強迫我們使用Future,如此可以透過async的runtime進行資源調派,(一個連線就開一個分身處理的意思)。

websocket 的 on_upgrade函數

實作連線ws_connected內容架構如下:

// web/src/web_socket.rs
async fn ws_connected(ws: WebSocket, ctx: AppContext) {
    let conn_id = NEXT_CONNECTION_ID
        .fetch_add(1, Ordering::Relaxed);          // 取號
    tracing::info!("new websocket connection: {}", conn_id);

    let (mut ws_tx, mut ws_rx) = ws.split();       // 取得 ws 的 tx/rx
    let (tx, rx) = mpsc::unbounded_channel();      // 建立一個 mpsc 通道
    let mut rx = UnboundedReceiverStream::new(rx); // mpsc 收受端

    // 等待程式通知要發送訊息時的處理,透過mpsc接受訊息
    tokio::task::spawn(async move {                 // 建立工作(分身)監聽
        while let Some(message) = rx.next().await { // 當 mpsc 收到訊息時
            ws_tx
                .send(message)                      // 從 ws 發送該訊息
                .unwrap_or_else(|e| {
                    tracing::info!("websocket send error: {}", e);
                })
                .await;
        }
    });

    // 把連線id和 mpsc 的發送端,存到AppState裡
    // 後續我們在程式任意地方都可以呼叫這個tx
    let _ = &ctx.ws_connections.write().await.insert(conn_id, tx);

    // 處理當 websocket 接收到資料的時候
    while let Some(result) = ws_rx.next().await {
        let msg = match result {                    // 解析收到的訊息
            Ok(msg) => msg,
            Err(e) => {
                tracing::info!("websocket error(uid={}): {}", conn_id, e);
                break;
            }
        };
        // 處理訊息,本例是發送給websocket全連線端
        send_all_message(conn_id, msg, &ctx).await;
    }
    
    // 處理斷線處理
    disconnected(conn_id, &ctx).await;
}

以下是上面幾個處理訊息fn的實作:

// web/src/web_socket.rs
/// 發送訊息給特定id用戶端
async fn send_one_message(my_id: usize, msg: Message, ctx: &AppContext) {
    let msg = if let Ok(s) = msg.to_str() {
        s
    } else {
        return;
    };

    let new_msg = format!("<User#{}>: {}", my_id, msg);  // 訊息帶用戶id

    // 找出特定 id 的 tx,並傳送文字訊息
    for (&uid, tx) in ctx.ws_connections.read().await.iter()
        .filter(|(&uid, _)| uid == my_id) {             // 選出特定id
        if let Err(_disconnected) = tx
            .send(Message::text(new_msg.clone())) {}    // 發送訊息
    }
}

/// 發送訊息給所有連線用戶端
async fn send_all_message(my_id: usize, msg: Message, ctx: &AppContext) {
    let msg = if let Ok(s) = msg.to_str() {
        s
    } else {
        return;
    };

    let new_msg = format!("<User#{}>: {}", my_id, msg);  // 訊息帶用戶id

    // 遍歷所有的連線端,傳送文字訊息
    for (&uid, tx) in ctx.ws_connections.read().await.iter() {
        if let Err(_disconnected) = tx
            .send(Message::text(new_msg.clone())) {}
    }
}

/// 處理websocket連線中斷
async fn disconnected(conn_id: usize, ctx: &AppContext) {
    tracing::info!("disconnected conn_id: {}", conn_id);

    // 從AppState移除該連線tx通道
    ctx.ws_connections.write().await.remove(&conn_id);   
}

這裡舉例只是簡單用一對一,一對多的傳遞,依實際境情可以另外做一群組,然後選擇哪些用戶端是同一個群組的,就可以依不同群組發送不同的訊息。

自動派送罐頭訊息

到這裡把基礎架設好了,我們來演示一個實際的應用,一般我們常見的功能都是由客戶端發動,傳遞request給後端,再由後端回應,我們這次反過來,由server端主動派送訊息給前端,以下實作:

@@ core/src/lib.rs @@
+pub mod game_message;  

@@ web/Cargo.toml @@
 [dependencies]
+rand = { workspace = true }

在core裡寫一個罐頭訊息工廠,會隨機挑選一則預先寫好的訊息。

// core/src/game_message.rs
use rand::Rng;

pub fn message_factory() -> String {
    let messages = vec![
        "我得好好策略一下。",
        "等等,我差點就贏了!",
        "看來我需要改變策略。",
        // ...略
    ];
    let mut rng = rand::thread_rng();
    let index = rng.gen_range(0..messages.len());
    messages[index].to_string()
}

在web_socket裡,新增一支程式,會隨機等待一段時間,接著發送訊息給所有現存連線的ws用戶端。

// web/src/web_socket.rs
use tokio::{time::Duration, time};
use rand::random;

pub async fn polling_message(ctx: &AppContext) {
    let ctx = ctx.clone();
    // 因為要一直存活才能處理,故開分身:
    tokio::task::spawn(async move {
        loop {
            // 隨機等待1000豪秒 至10000豪秒
            let secs = random::<u64>() % 9_000 + 1_000; 
            time::sleep(Duration::from_millis(secs)).await;
            
            // 從訊息庫隨機取訊息並發送至用戶端
            let message = my_core::game_message::message_factory();
            send_all_message(0,Message::text(message), &ctx).await;
        }
    });
}

最後到main裡註冊我們剛剛寫的polling_message

use web::web_socket::polling_message;                    // 加這行

#[tokio::main]
async fn main() {
    config::init();
    let _logger = Logger::builder().use_env().build();
    let app_context = AppContext::default();
    polling_message(&app_context).await;                // 加這行
    let routers = routers::all_routers(app_context.clone());
    warp::serve(routers).run(([0, 0, 0, 0], config::http_port())).await;
}

最後在前端實測一下:

實測前端websocket是否有接受後端發送訊息

可以看到在前端有正確的接受後端不定期派送的訊息,到這邊為止我們完成了後端的websocket服務,並作實作一個隨機時間自動派送訊息的功能。

其他參考資料

本系列專案源始碼放置於 https://github.com/kenstt/demo-app


上一篇
22 是 await 我加了await:rust async runtime ー tokio
下一篇
24 Websocket 前端:使用 Svelte
系列文
前端? 後端? 摻在一起做成全端就好了30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言