Web 服務、資料庫、快取伺服器等。手動啟動、停止和監控這些程序既繁瑣又容易出錯。
今天我們將使用 Rust 做一個一個程序管理器,類似於 PM2 或 Supervisor
cargo new process_manager
cd process_manager
cargo.toml
[dependencies]
tokio = { version = "1.35", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
clap = { version = "4.4", features = ["derive"] }
chrono = "0.4"
anyhow = "1.0"
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::process::Child;
use tokio::sync::RwLock;
use std::sync::Arc;
use chrono::{DateTime, Utc};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ProcessStatus {
Running,
Stopped,
Crashed,
Restarting,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProcessConfig {
pub name: String,
pub command: String,
pub args: Vec<String>,
pub cwd: Option<String>,
pub env: HashMap<String, String>,
pub auto_restart: bool,
pub max_restarts: u32,
pub restart_delay_ms: u64,
}
#[derive(Debug)]
pub struct ProcessInfo {
pub config: ProcessConfig,
pub status: ProcessStatus,
pub pid: Option<u32>,
pub restart_count: u32,
pub started_at: Option<DateTime<Utc>>,
pub stopped_at: Option<DateTime<Utc>>,
#[allow(dead_code)]
child: Option<Child>,
}
use std::process::Stdio;
use tokio::process::Command;
use anyhow::{Result, Context};
pub struct ProcessManager {
processes: Arc<RwLock<HashMap<String, ProcessInfo>>>,
}
impl ProcessManager {
pub fn new() -> Self {
Self {
processes: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn add_process(&self, config: ProcessConfig) -> Result<()> {
let mut processes = self.processes.write().await;
if processes.contains_key(&config.name) {
anyhow::bail!("Process '{}' already exists", config.name);
}
let info = ProcessInfo {
config,
status: ProcessStatus::Stopped,
pid: None,
restart_count: 0,
started_at: None,
stopped_at: None,
child: None,
};
processes.insert(info.config.name.clone(), info);
Ok(())
}
pub async fn start_process(&self, name: &str) -> Result<()> {
let mut processes = self.processes.write().await;
let info = processes.get_mut(name)
.context(format!("Process '{}' not found", name))?;
if matches!(info.status, ProcessStatus::Running) {
anyhow::bail!("Process '{}' is already running", name);
}
self.spawn_process(info).await?;
println!("✓ Started process '{}'", name);
Ok(())
}
async fn spawn_process(&self, info: &mut ProcessInfo) -> Result<()> {
let mut cmd = Command::new(&info.config.command);
cmd.args(&info.config.args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.stdin(Stdio::null());
if let Some(cwd) = &info.config.cwd {
cmd.current_dir(cwd);
}
for (key, value) in &info.config.env {
cmd.env(key, value);
}
let mut child = cmd.spawn()
.context("Failed to spawn process")?;
info.pid = child.id();
info.status = ProcessStatus::Running;
info.started_at = Some(Utc::now());
info.child = Some(child.into());
Ok(())
}
pub async fn stop_process(&self, name: &str) -> Result<()> {
let mut processes = self.processes.write().await;
let info = processes.get_mut(name)
.context(format!("Process '{}' not found", name))?;
if !matches!(info.status, ProcessStatus::Running) {
anyhow::bail!("Process '{}' is not running", name);
}
if let Some(mut child) = info.child.take() {
child.kill().await
.context("Failed to kill process")?;
}
info.status = ProcessStatus::Stopped;
info.stopped_at = Some(Utc::now());
info.pid = None;
println!("✓ Stopped process '{}'", name);
Ok(())
}
pub async fn restart_process(&self, name: &str) -> Result<()> {
self.stop_process(name).await?;
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
self.start_process(name).await?;
Ok(())
}
pub async fn list_processes(&self) -> Vec<(String, ProcessStatus, Option<u32>)> {
let processes = self.processes.read().await;
processes
.iter()
.map(|(name, info)| {
(name.clone(), info.status.clone(), info.pid)
})
.collect()
}
pub async fn get_process_info(&self, name: &str) -> Option<ProcessConfig> {
let processes = self.processes.read().await;
processes.get(name).map(|info| info.config.clone())
}
}
impl ProcessManager {
pub async fn monitor_processes(&self) {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
let mut processes = self.processes.write().await;
for (name, info) in processes.iter_mut() {
if !matches!(info.status, ProcessStatus::Running) {
continue;
}
// 檢查程序是否仍在運行
if let Some(child) = &mut info.child {
match child.try_wait() {
Ok(Some(_status)) => {
// 程序已結束
println!("⚠ Process '{}' crashed", name);
info.status = ProcessStatus::Crashed;
info.stopped_at = Some(Utc::now());
if info.config.auto_restart
&& info.restart_count < info.config.max_restarts {
info.restart_count += 1;
println!("↻ Restarting process '{}' (attempt {}/{})",
name, info.restart_count, info.config.max_restarts);
tokio::time::sleep(
tokio::time::Duration::from_millis(
info.config.restart_delay_ms
)
).await;
if let Err(e) = self.spawn_process(info).await {
eprintln!("✗ Failed to restart '{}': {}", name, e);
}
}
}
Ok(None) => {
// 程序仍在運行
}
Err(e) => {
eprintln!("✗ Error checking process '{}': {}", name, e);
}
}
}
}
}
}
pub fn spawn_monitor(self: Arc<Self>) {
tokio::spawn(async move {
self.monitor_processes().await;
});
}
}
use clap::{Parser, Subcommand};
#[derive(Parser)]
#[command(name = "pm")]
#[command(about = "Process Manager - Manage and monitor background processes")]
struct Cli {
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
/// Start a new process
Start {
/// Process name
name: String,
/// Command to execute
command: String,
/// Command arguments
#[arg(trailing_var_arg = true)]
args: Vec<String>,
/// Enable auto-restart
#[arg(long)]
auto_restart: bool,
},
/// Stop a running process
Stop {
/// Process name
name: String,
},
/// Restart a process
Restart {
/// Process name
name: String,
},
/// List all processes
List,
/// Show process information
Info {
/// Process name
name: String,
},
/// Remove a process
Delete {
/// Process name
name: String,
},
}
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
let manager = Arc::new(ProcessManager::new());
// 啟動監控任務
manager.clone().spawn_monitor();
match cli.command {
Commands::Start { name, command, args, auto_restart } => {
let config = ProcessConfig {
name: name.clone(),
command,
args,
cwd: None,
env: HashMap::new(),
auto_restart,
max_restarts: 10,
restart_delay_ms: 1000,
};
manager.add_process(config).await?;
manager.start_process(&name).await?;
}
Commands::Stop { name } => {
manager.stop_process(&name).await?;
}
Commands::Restart { name } => {
manager.restart_process(&name).await?;
}
Commands::List => {
let processes = manager.list_processes().await;
println!("\n{:<20} {:<15} {:<10}", "NAME", "STATUS", "PID");
println!("{:-<45}", "");
for (name, status, pid) in processes {
let status_str = format!("{:?}", status);
let pid_str = pid.map(|p| p.to_string())
.unwrap_or_else(|| "-".to_string());
println!("{:<20} {:<15} {:<10}", name, status_str, pid_str);
}
println!();
}
Commands::Info { name } => {
if let Some(config) = manager.get_process_info(&name).await {
println!("\nProcess: {}", config.name);
println!("Command: {}", config.command);
println!("Args: {:?}", config.args);
println!("Auto Restart: {}", config.auto_restart);
println!("Max Restarts: {}", config.max_restarts);
println!();
} else {
println!("Process '{}' not found", name);
}
}
Commands::Delete { name } => {
manager.stop_process(&name).await.ok();
println!("✓ Deleted process '{}'", name);
}
}
// 保持程式運行以便監控
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
Ok(())
}
# 啟動一個 Web 服務並啟用自動重啟
cargo run -- start web-server python3 -m http.server 8080 --auto-restart
# 列出所有程序
cargo run -- list
# 查看程序資訊
cargo run -- info web-server
# 重啟程序
cargo run -- restart web-server
# 停止程序
cargo run -- stop web-server
# 刪除程序
cargo run -- delete web-server
use std::fs::OpenOptions;
use std::io::Write;
impl ProcessManager {
async fn setup_logging(&self, info: &mut ProcessInfo) -> Result<()> {
let log_dir = format!("logs/{}", info.config.name);
std::fs::create_dir_all(&log_dir)?;
let stdout_path = format!("{}/stdout.log", log_dir);
let stderr_path = format!("{}/stderr.log", log_dir);
// 設定日誌檔案
// 在實際實作中,需要將子程序的輸出重定向到這些檔案
Ok(())
}
}
use serde_json;
use std::fs;
#[derive(Serialize, Deserialize)]
struct ManagerConfig {
processes: Vec<ProcessConfig>,
}
impl ProcessManager {
pub async fn load_config(&self, path: &str) -> Result<()> {
let content = fs::read_to_string(path)?;
let config: ManagerConfig = serde_json::from_str(&content)?;
for process_config in config.processes {
self.add_process(process_config).await?;
}
Ok(())
}
pub async fn save_config(&self, path: &str) -> Result<()> {
let processes = self.processes.read().await;
let configs: Vec<ProcessConfig> = processes
.values()
.map(|info| info.config.clone())
.collect();
let config = ManagerConfig { processes: configs };
let content = serde_json::to_string_pretty(&config)?;
fs::write(path, content)?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct ProcessStats {
pub cpu_percent: f64,
pub memory_mb: f64,
pub uptime_seconds: u64,
}
impl ProcessManager {
pub async fn get_process_stats(&self, name: &str) -> Option<ProcessStats> {
let processes = self.processes.read().await;
let info = processes.get(name)?;
if let Some(pid) = info.pid {
// 使用 sysinfo 或類似的 crate 來獲取資源使用情況
// 這裡簡化處理
Some(ProcessStats {
cpu_percent: 0.0,
memory_mb: 0.0,
uptime_seconds: info.started_at?
.signed_duration_since(Utc::now())
.num_seconds()
.abs() as u64,
})
} else {
None
}
}
}
o#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_add_and_start_process() {
let manager = ProcessManager::new();
let config = ProcessConfig {
name: "test".to_string(),
command: "echo".to_string(),
args: vec!["hello".to_string()],
cwd: None,
env: HashMap::new(),
auto_restart: false,
max_restarts: 0,
restart_delay_ms: 1000,
};
manager.add_process(config).await.unwrap();
manager.start_process("test").await.unwrap();
let processes = manager.list_processes().await;
assert_eq!(processes.len(), 1);
}
#[tokio::test]
async fn test_stop_process() {
let manager = ProcessManager::new();
let config = ProcessConfig {
name: "test".to_string(),
command: "sleep".to_string(),
args: vec!["10".to_string()],
cwd: None,
env: HashMap::new(),
auto_restart: false,
max_restarts: 0,
restart_delay_ms: 1000,
};
manager.add_process(config).await.unwrap();
manager.start_process("test").await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
manager.stop_process("test").await.unwrap();
}
}
在幾天就結束了!繼續努力😴