iT邦幫忙

2023 iThome 鐵人賽

DAY 12
0
Cloud Native

關於 WebAssembly 也能變成 Container 的這檔事系列 第 12

Wasm+containerd-shim-wasm+sandbox - part 3

  • 分享至 

  • xImage
  •  

Wasm+containerd-shim-wasm+sandbox - part 3

sandbox 資料夾結構

sandbox
├── cli.rs (part 2)
├── error.rs (part 1)
├── instance.rs
├── instance_utils.rs
├── manager.rs (*)
├── mod.rs (part 1)
├── oci.rs
├── shim.rs
└── stdio.rs

sandbox/manager.rs

//! This experimental module implements a manager service which can be used to
//! manage multiple instances of a sandbox in-process.
//! The idea behind this module is to only need a single shim process for the entire node rather than one per pod/container.
//! 這個實驗性模組實作了一個管理服務,能夠在同一個沙盒處理程序中管理多個實體。
//! 這個模組的想法是整個節點只需要一個 shim 處理程序,而不是每個 pod/container 都需要各自的 shim 處理程序。

// 導入所需的模組和函式庫
use std::collections::HashMap;
use std::env::current_dir;
use std::path::Path;
use std::sync::{Arc, RwLock};
use std::thread;

use anyhow::Context;
use containerd_shim::error::Error as ShimError;
use containerd_shim::protos::shim::shim_ttrpc::{create_task, Task};
use containerd_shim::protos::ttrpc::{Client, Server};
use containerd_shim::protos::TaskClient;
use containerd_shim::publisher::RemotePublisher;
use containerd_shim::{self as shim, api, TtrpcContext, TtrpcResult};
use oci_spec::runtime::{self, Spec};
use shim::Flags;
use ttrpc::context;

use super::error::Error;
use super::instance::Instance;
use super::sandbox;
use crate::services::sandbox_ttrpc::{Manager, ManagerClient};
use crate::sys::networking::setup_namespaces;

/// Sandbox wraps an Instance and is used with the `Service` to manage multiple instances.
/// 定義 Sandbox 型態中,包含了一個 Instance,並與 `Service` 一起用於管理多個實體。
pub trait Sandbox: Task + Send + Sync {
    type Instance: Instance;

    fn new(
        namespace: String,
        containerd_address: String,
        id: String,
        engine: <Self::Instance as Instance>::Engine,
        publisher: RemotePublisher,
    ) -> Self;
}

/// Service is a manager service which can be used to manage multiple instances of a sandbox in-process.
/// Service 是一個管理服務,用來管理同一個沙盒處理程序中的多個實體。
pub struct Service<T: Sandbox> {
    sandboxes: RwLock<HashMap<String, String>>,
    engine: <T::Instance as Instance>::Engine,
    phantom: std::marker::PhantomData<T>,
}

/// 建構子,建立一個新的 Service 實體
impl<T: Sandbox> Service<T> {
    pub fn new(engine: <T::Instance as Instance>::Engine) -> Self {
        Self {
            sandboxes: RwLock::new(HashMap::new()),
            engine,
            phantom: std::marker::PhantomData,
        }
    }
}

/// 預設的 Service 實作
impl<T: Sandbox> Default for Service<T>
where
    <T::Instance as Instance>::Engine: Default,
{
    fn default() -> Self {
        Self::new(Default::default())
    }
}

/// 實作 Manager trait,來提供沙盒管理功能
impl<T: Sandbox + 'static> Manager for Service<T> {
    /// 建立沙盒,啟動 ttrpc 服務
    fn create(
        &self,
        _ctx: &TtrpcContext,
        req: sandbox::CreateRequest,
    ) -> TtrpcResult<sandbox::CreateResponse> {
        let mut sandboxes = self.sandboxes.write().unwrap();

        /// 檢查 id 是否已存在,若已存在則回傳錯誤
        if sandboxes.contains_key(&req.id) {
            return Err(Error::AlreadyExists(req.id).into());
        }

        /// 建立 ttrpc 服務的 socket 位置
        let sock = format!("unix://{}/shim.sock", &req.working_directory);

        /// 建立 ttrpc 服務的 publisher
        let publisher = RemotePublisher::new(req.ttrpc_address)?;

        /// 建立沙盒
        let sb = T::new(
            req.namespace.clone(),
            req.containerd_address.clone(),
            req.id.clone(),
            self.engine.clone(),
            publisher,
        );
        let task_service = create_task(Arc::new(Box::new(sb)));
        /// 建立 ttrpc 服務
        let mut server = Server::new().bind(&sock)?.register_service(task_service);

        sandboxes.insert(req.id.clone(), sock.clone());

        /// 讀取 runtime spec
        let cfg = Spec::load(
            Path::new(&req.working_directory)
                .join("config.json")
                .to_str()
                .unwrap(),
        )
        .map_err(|err| Error::InvalidArgument(format!("could not load runtime spec: {}", err)))?;

        /// 建立一個 channel,用於接收 sandbox 啟動結果
        let (tx, rx) = std::sync::mpsc::channel::<Result<(), Error>>();

        let id = &req.id;

        /// 啟動 sandbox
        let _ = thread::Builder::new()
            .name(format!("{}-sandbox-create", id))
            .spawn(move || {
                let r = start_sandbox(cfg, &mut server);
                tx.send(r).context("could not send sandbox result").unwrap();
            })
            .context("failed to spawn sandbox thread")
            .map_err(Error::from)?;

        /// 等待 sandbox 啟動結果
        rx.recv()
            .context("could not receive sandbox result")
            .map_err(Error::from)??;
        Ok(sandbox::CreateResponse {
            socket_path: sock,
            ..Default::default()
        })
    }

    /// 刪除沙盒
    fn delete(
        &self,
        _ctx: &TtrpcContext,
        req: sandbox::DeleteRequest,
    ) -> TtrpcResult<sandbox::DeleteResponse> {
        let mut sandboxes = self.sandboxes.write().unwrap();
        /// 檢查 id 是否存在,若不存在則回傳錯誤
        if !sandboxes.contains_key(&req.id) {
            return Err(Error::NotFound(req.id).into());
        }
        /// 從 sandboxes 中移除 id
        let sock = sandboxes.remove(&req.id).unwrap();
        let c = Client::connect(&sock)?;
        let tc = TaskClient::new(c);

        /// 關閉 ttrpc 服務
        tc.shutdown(
            context::Context::default(),
            &api::ShutdownRequest {
                id: req.id,
                now: true,
                ..Default::default()
            },
        )?;
        Ok(sandbox::DeleteResponse::default())
    }
}

/// Note that this changes the current thread's state.
/// You probably want to run this in a new thread.
/// 特別留意,這個函式會改變當前執行緒的狀態。
/// 你或許想要在一個新的執行緒中呼叫這個函式。
fn start_sandbox(cfg: runtime::Spec, server: &mut Server) -> Result<(), Error> {
    setup_namespaces(&cfg)?;

    server.start_listen().context("could not start listener")?;
    Ok(())
}

/// Shim implements the containerd-shim CLI for connecting to a Manager service.
/// Shim 實作了 containerd-shim CLI,用於連接 Manager 服務。
pub struct Shim {
    id: String,
    namespace: String,
}
/// 實作 Task trait
impl Task for Shim {}

/// 實作 shim::Shim trait
impl shim::Shim for Shim {
    type T = Self;

    /// 建立一個新的 Shim 實體
    fn new(_runtime_id: &str, args: &Flags, _config: &mut shim::Config) -> Self {
        Shim {
            id: args.id.to_string(),
            namespace: args.namespace.to_string(),
        }
    }

    /// 啟動 Shim
    fn start_shim(&mut self, opts: containerd_shim::StartOpts) -> shim::Result<String> {
        /// 使用當前目錄作為工作目錄
        let dir = current_dir().map_err(|err| ShimError::Other(err.to_string()))?;
        /// 讀取 runtime spec
        let spec = Spec::load(dir.join("config.json").to_str().unwrap()).map_err(|err| {
            shim::Error::InvalidArgument(format!("error loading runtime spec: {}", err))
        })?;

        /// 讀取 annotations
        let default = HashMap::new() as HashMap<String, String>;
        let annotations = spec.annotations().as_ref().unwrap_or(&default);

        /// 讀取 sandbox id
        let sandbox = annotations
            .get("io.kubernetes.cri.sandbox-id")
            .unwrap_or(&opts.id)
            .to_string();

        let client = Client::connect("unix:///run/io.containerd.wasmwasi.v1/manager.sock")?;
        let mc = ManagerClient::new(client);

        /// 建立或連接 sandbox
        let addr = match mc.create(
            context::Context::default(),
            &sandbox::CreateRequest {
                id: sandbox.clone(),
                working_directory: dir.as_path().to_str().unwrap().to_string(),
                ttrpc_address: opts.ttrpc_address.clone(),
                ..Default::default()
            },
        ) {
            Ok(res) => res.socket_path,
            Err(_) => {
                let res = mc.connect(
                    context::Context::default(),
                    &sandbox::ConnectRequest {
                        id: sandbox,
                        ttrpc_address: opts.ttrpc_address,
                        ..Default::default()
                    },
                )?;
                res.socket_path
            }
        };

        shim::util::write_address(&addr)?;

        Ok(addr)
    }

    fn wait(&mut self) {
        todo!()
    }

    /// 你看不到那行英文註解,如果看到了一定是眼睛業障重
    fn create_task_service(&self, _publisher: RemotePublisher) -> Self::T {
        todo!() // but not really, haha
    }

    /// 刪除 Shim
    fn delete_shim(&mut self) -> shim::Result<api::DeleteResponse> {
        /// 使用當前目錄作為工作目錄
        let dir = current_dir().map_err(|err| ShimError::Other(err.to_string()))?;
        /// 讀取 runtime spec
        let spec = Spec::load(dir.join("config.json").to_str().unwrap()).map_err(|err| {
            shim::Error::InvalidArgument(format!("error loading runtime spec: {}", err))
        })?;

        /// 讀取 annotations
        let default = HashMap::new() as HashMap<String, String>;
        let annotations = spec.annotations().as_ref().unwrap_or(&default);

        /// 讀取 sandbox id
        let sandbox = annotations
            .get("io.kubernetes.cri.sandbox-id")
            .unwrap_or(&self.id)
            .to_string();
        /// 如果 sandbox id 不等於 shim id,則直接回傳
        if sandbox != self.id {
            return Ok(api::DeleteResponse::default());
        }
        /// 連接到 Manager 服務
        let client = Client::connect("unix:///run/io.containerd.wasmwasi.v1/manager.sock")?;
        let mc = ManagerClient::new(client);
        /// 刪除 sandbox
        mc.delete(
            context::Context::default(),
            &sandbox::DeleteRequest {
                id: sandbox,
                namespace: self.namespace.clone(),
                ..Default::default()
            },
        )?;

        // TODO: write pid, exit code, etc to disk so we can use it here.
        // TODO: 將 pid、exit code 等資訊寫入磁碟,以便在這裡使用。
        Ok(api::DeleteResponse::default())
    }
}


上一篇
Wasm+containerd-shim-wasm+sandbox - part 2
下一篇
Wasm+containerd-shim-wasm+sandbox - part 4
系列文
關於 WebAssembly 也能變成 Container 的這檔事15
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言