Wasm+containerd-shim-wasm+sandbox - part 3
sandbox
資料夾結構
sandbox
├── cli.rs (part 2)
├── error.rs (part 1)
├── instance.rs
├── instance_utils.rs
├── manager.rs (*)
├── mod.rs (part 1)
├── oci.rs
├── shim.rs
└── stdio.rs
sandbox/manager.rs
//! This experimental module implements a manager service which can be used to
//! manage multiple instances of a sandbox in-process.
//! The idea behind this module is to only need a single shim process for the entire node rather than one per pod/container.
//! 這個實驗性模組實作了一個管理服務,能夠在同一個沙盒處理程序中管理多個實體。
//! 這個模組的想法是整個節點只需要一個 shim 處理程序,而不是每個 pod/container 都需要各自的 shim 處理程序。
// 導入所需的模組和函式庫
use std::collections::HashMap;
use std::env::current_dir;
use std::path::Path;
use std::sync::{Arc, RwLock};
use std::thread;
use anyhow::Context;
use containerd_shim::error::Error as ShimError;
use containerd_shim::protos::shim::shim_ttrpc::{create_task, Task};
use containerd_shim::protos::ttrpc::{Client, Server};
use containerd_shim::protos::TaskClient;
use containerd_shim::publisher::RemotePublisher;
use containerd_shim::{self as shim, api, TtrpcContext, TtrpcResult};
use oci_spec::runtime::{self, Spec};
use shim::Flags;
use ttrpc::context;
use super::error::Error;
use super::instance::Instance;
use super::sandbox;
use crate::services::sandbox_ttrpc::{Manager, ManagerClient};
use crate::sys::networking::setup_namespaces;
/// Sandbox wraps an Instance and is used with the `Service` to manage multiple instances.
/// 定義 Sandbox 型態中,包含了一個 Instance,並與 `Service` 一起用於管理多個實體。
pub trait Sandbox: Task + Send + Sync {
type Instance: Instance;
fn new(
namespace: String,
containerd_address: String,
id: String,
engine: <Self::Instance as Instance>::Engine,
publisher: RemotePublisher,
) -> Self;
}
/// Service is a manager service which can be used to manage multiple instances of a sandbox in-process.
/// Service 是一個管理服務,用來管理同一個沙盒處理程序中的多個實體。
pub struct Service<T: Sandbox> {
sandboxes: RwLock<HashMap<String, String>>,
engine: <T::Instance as Instance>::Engine,
phantom: std::marker::PhantomData<T>,
}
/// 建構子,建立一個新的 Service 實體
impl<T: Sandbox> Service<T> {
pub fn new(engine: <T::Instance as Instance>::Engine) -> Self {
Self {
sandboxes: RwLock::new(HashMap::new()),
engine,
phantom: std::marker::PhantomData,
}
}
}
/// 預設的 Service 實作
impl<T: Sandbox> Default for Service<T>
where
<T::Instance as Instance>::Engine: Default,
{
fn default() -> Self {
Self::new(Default::default())
}
}
/// 實作 Manager trait,來提供沙盒管理功能
impl<T: Sandbox + 'static> Manager for Service<T> {
/// 建立沙盒,啟動 ttrpc 服務
fn create(
&self,
_ctx: &TtrpcContext,
req: sandbox::CreateRequest,
) -> TtrpcResult<sandbox::CreateResponse> {
let mut sandboxes = self.sandboxes.write().unwrap();
/// 檢查 id 是否已存在,若已存在則回傳錯誤
if sandboxes.contains_key(&req.id) {
return Err(Error::AlreadyExists(req.id).into());
}
/// 建立 ttrpc 服務的 socket 位置
let sock = format!("unix://{}/shim.sock", &req.working_directory);
/// 建立 ttrpc 服務的 publisher
let publisher = RemotePublisher::new(req.ttrpc_address)?;
/// 建立沙盒
let sb = T::new(
req.namespace.clone(),
req.containerd_address.clone(),
req.id.clone(),
self.engine.clone(),
publisher,
);
let task_service = create_task(Arc::new(Box::new(sb)));
/// 建立 ttrpc 服務
let mut server = Server::new().bind(&sock)?.register_service(task_service);
sandboxes.insert(req.id.clone(), sock.clone());
/// 讀取 runtime spec
let cfg = Spec::load(
Path::new(&req.working_directory)
.join("config.json")
.to_str()
.unwrap(),
)
.map_err(|err| Error::InvalidArgument(format!("could not load runtime spec: {}", err)))?;
/// 建立一個 channel,用於接收 sandbox 啟動結果
let (tx, rx) = std::sync::mpsc::channel::<Result<(), Error>>();
let id = &req.id;
/// 啟動 sandbox
let _ = thread::Builder::new()
.name(format!("{}-sandbox-create", id))
.spawn(move || {
let r = start_sandbox(cfg, &mut server);
tx.send(r).context("could not send sandbox result").unwrap();
})
.context("failed to spawn sandbox thread")
.map_err(Error::from)?;
/// 等待 sandbox 啟動結果
rx.recv()
.context("could not receive sandbox result")
.map_err(Error::from)??;
Ok(sandbox::CreateResponse {
socket_path: sock,
..Default::default()
})
}
/// 刪除沙盒
fn delete(
&self,
_ctx: &TtrpcContext,
req: sandbox::DeleteRequest,
) -> TtrpcResult<sandbox::DeleteResponse> {
let mut sandboxes = self.sandboxes.write().unwrap();
/// 檢查 id 是否存在,若不存在則回傳錯誤
if !sandboxes.contains_key(&req.id) {
return Err(Error::NotFound(req.id).into());
}
/// 從 sandboxes 中移除 id
let sock = sandboxes.remove(&req.id).unwrap();
let c = Client::connect(&sock)?;
let tc = TaskClient::new(c);
/// 關閉 ttrpc 服務
tc.shutdown(
context::Context::default(),
&api::ShutdownRequest {
id: req.id,
now: true,
..Default::default()
},
)?;
Ok(sandbox::DeleteResponse::default())
}
}
/// Note that this changes the current thread's state.
/// You probably want to run this in a new thread.
/// 特別留意,這個函式會改變當前執行緒的狀態。
/// 你或許想要在一個新的執行緒中呼叫這個函式。
fn start_sandbox(cfg: runtime::Spec, server: &mut Server) -> Result<(), Error> {
setup_namespaces(&cfg)?;
server.start_listen().context("could not start listener")?;
Ok(())
}
/// Shim implements the containerd-shim CLI for connecting to a Manager service.
/// Shim 實作了 containerd-shim CLI,用於連接 Manager 服務。
pub struct Shim {
id: String,
namespace: String,
}
/// 實作 Task trait
impl Task for Shim {}
/// 實作 shim::Shim trait
impl shim::Shim for Shim {
type T = Self;
/// 建立一個新的 Shim 實體
fn new(_runtime_id: &str, args: &Flags, _config: &mut shim::Config) -> Self {
Shim {
id: args.id.to_string(),
namespace: args.namespace.to_string(),
}
}
/// 啟動 Shim
fn start_shim(&mut self, opts: containerd_shim::StartOpts) -> shim::Result<String> {
/// 使用當前目錄作為工作目錄
let dir = current_dir().map_err(|err| ShimError::Other(err.to_string()))?;
/// 讀取 runtime spec
let spec = Spec::load(dir.join("config.json").to_str().unwrap()).map_err(|err| {
shim::Error::InvalidArgument(format!("error loading runtime spec: {}", err))
})?;
/// 讀取 annotations
let default = HashMap::new() as HashMap<String, String>;
let annotations = spec.annotations().as_ref().unwrap_or(&default);
/// 讀取 sandbox id
let sandbox = annotations
.get("io.kubernetes.cri.sandbox-id")
.unwrap_or(&opts.id)
.to_string();
let client = Client::connect("unix:///run/io.containerd.wasmwasi.v1/manager.sock")?;
let mc = ManagerClient::new(client);
/// 建立或連接 sandbox
let addr = match mc.create(
context::Context::default(),
&sandbox::CreateRequest {
id: sandbox.clone(),
working_directory: dir.as_path().to_str().unwrap().to_string(),
ttrpc_address: opts.ttrpc_address.clone(),
..Default::default()
},
) {
Ok(res) => res.socket_path,
Err(_) => {
let res = mc.connect(
context::Context::default(),
&sandbox::ConnectRequest {
id: sandbox,
ttrpc_address: opts.ttrpc_address,
..Default::default()
},
)?;
res.socket_path
}
};
shim::util::write_address(&addr)?;
Ok(addr)
}
fn wait(&mut self) {
todo!()
}
/// 你看不到那行英文註解,如果看到了一定是眼睛業障重
fn create_task_service(&self, _publisher: RemotePublisher) -> Self::T {
todo!() // but not really, haha
}
/// 刪除 Shim
fn delete_shim(&mut self) -> shim::Result<api::DeleteResponse> {
/// 使用當前目錄作為工作目錄
let dir = current_dir().map_err(|err| ShimError::Other(err.to_string()))?;
/// 讀取 runtime spec
let spec = Spec::load(dir.join("config.json").to_str().unwrap()).map_err(|err| {
shim::Error::InvalidArgument(format!("error loading runtime spec: {}", err))
})?;
/// 讀取 annotations
let default = HashMap::new() as HashMap<String, String>;
let annotations = spec.annotations().as_ref().unwrap_or(&default);
/// 讀取 sandbox id
let sandbox = annotations
.get("io.kubernetes.cri.sandbox-id")
.unwrap_or(&self.id)
.to_string();
/// 如果 sandbox id 不等於 shim id,則直接回傳
if sandbox != self.id {
return Ok(api::DeleteResponse::default());
}
/// 連接到 Manager 服務
let client = Client::connect("unix:///run/io.containerd.wasmwasi.v1/manager.sock")?;
let mc = ManagerClient::new(client);
/// 刪除 sandbox
mc.delete(
context::Context::default(),
&sandbox::DeleteRequest {
id: sandbox,
namespace: self.namespace.clone(),
..Default::default()
},
)?;
// TODO: write pid, exit code, etc to disk so we can use it here.
// TODO: 將 pid、exit code 等資訊寫入磁碟,以便在這裡使用。
Ok(api::DeleteResponse::default())
}
}