iT邦幫忙

2025 iThome 鐵人賽

DAY 26
0
Rust

Rust 實戰專案集:30 個漸進式專案從工具到服務系列 第 26

程序管理器 - 管理和監控後台程序

  • 分享至 

  • xImage
  •  

前言

Web 服務、資料庫、快取伺服器等。手動啟動、停止和監控這些程序既繁瑣又容易出錯。
今天我們將使用 Rust 做一個一個程序管理器,類似於 PM2 或 Supervisor

今日學習目標

  • 啟動和停止程序
  • 監控程序狀態
  • 自動重啟崩潰的程序
  • 記錄程序日誌
  • 提供簡單的 CLI 介面

啟動專案!!

cargo new process_manager
cd process_manager

依賴

cargo.toml

[dependencies]
tokio = { version = "1.35", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
clap = { version = "4.4", features = ["derive"] }
chrono = "0.4"
anyhow = "1.0"

定義資料結構

use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::process::Child;
use tokio::sync::RwLock;
use std::sync::Arc;
use chrono::{DateTime, Utc};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ProcessStatus {
    Running,
    Stopped,
    Crashed,
    Restarting,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProcessConfig {
    pub name: String,
    pub command: String,
    pub args: Vec<String>,
    pub cwd: Option<String>,
    pub env: HashMap<String, String>,
    pub auto_restart: bool,
    pub max_restarts: u32,
    pub restart_delay_ms: u64,
}

#[derive(Debug)]
pub struct ProcessInfo {
    pub config: ProcessConfig,
    pub status: ProcessStatus,
    pub pid: Option<u32>,
    pub restart_count: u32,
    pub started_at: Option<DateTime<Utc>>,
    pub stopped_at: Option<DateTime<Utc>>,
    #[allow(dead_code)]
    child: Option<Child>,
}

製作管理器核心 Process Manager

use std::process::Stdio;
use tokio::process::Command;
use anyhow::{Result, Context};

pub struct ProcessManager {
    processes: Arc<RwLock<HashMap<String, ProcessInfo>>>,
}

impl ProcessManager {
    pub fn new() -> Self {
        Self {
            processes: Arc::new(RwLock::new(HashMap::new())),
        }
    }

    pub async fn add_process(&self, config: ProcessConfig) -> Result<()> {
        let mut processes = self.processes.write().await;
        
        if processes.contains_key(&config.name) {
            anyhow::bail!("Process '{}' already exists", config.name);
        }

        let info = ProcessInfo {
            config,
            status: ProcessStatus::Stopped,
            pid: None,
            restart_count: 0,
            started_at: None,
            stopped_at: None,
            child: None,
        };

        processes.insert(info.config.name.clone(), info);
        Ok(())
    }

    pub async fn start_process(&self, name: &str) -> Result<()> {
        let mut processes = self.processes.write().await;
        
        let info = processes.get_mut(name)
            .context(format!("Process '{}' not found", name))?;

        if matches!(info.status, ProcessStatus::Running) {
            anyhow::bail!("Process '{}' is already running", name);
        }

        self.spawn_process(info).await?;
        
        println!("✓ Started process '{}'", name);
        Ok(())
    }

    async fn spawn_process(&self, info: &mut ProcessInfo) -> Result<()> {
        let mut cmd = Command::new(&info.config.command);
        
        cmd.args(&info.config.args)
           .stdout(Stdio::piped())
           .stderr(Stdio::piped())
           .stdin(Stdio::null());

        if let Some(cwd) = &info.config.cwd {
            cmd.current_dir(cwd);
        }

        for (key, value) in &info.config.env {
            cmd.env(key, value);
        }

        let mut child = cmd.spawn()
            .context("Failed to spawn process")?;

        info.pid = child.id();
        info.status = ProcessStatus::Running;
        info.started_at = Some(Utc::now());
        info.child = Some(child.into());

        Ok(())
    }

    pub async fn stop_process(&self, name: &str) -> Result<()> {
        let mut processes = self.processes.write().await;
        
        let info = processes.get_mut(name)
            .context(format!("Process '{}' not found", name))?;

        if !matches!(info.status, ProcessStatus::Running) {
            anyhow::bail!("Process '{}' is not running", name);
        }

        if let Some(mut child) = info.child.take() {
            child.kill().await
                .context("Failed to kill process")?;
        }

        info.status = ProcessStatus::Stopped;
        info.stopped_at = Some(Utc::now());
        info.pid = None;

        println!("✓ Stopped process '{}'", name);
        Ok(())
    }

    pub async fn restart_process(&self, name: &str) -> Result<()> {
        self.stop_process(name).await?;
        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
        self.start_process(name).await?;
        Ok(())
    }

    pub async fn list_processes(&self) -> Vec<(String, ProcessStatus, Option<u32>)> {
        let processes = self.processes.read().await;
        processes
            .iter()
            .map(|(name, info)| {
                (name.clone(), info.status.clone(), info.pid)
            })
            .collect()
    }

    pub async fn get_process_info(&self, name: &str) -> Option<ProcessConfig> {
        let processes = self.processes.read().await;
        processes.get(name).map(|info| info.config.clone())
    }
}

加入自動重啟機制

impl ProcessManager {
    pub async fn monitor_processes(&self) {
        loop {
            tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
            
            let mut processes = self.processes.write().await;
            
            for (name, info) in processes.iter_mut() {
                if !matches!(info.status, ProcessStatus::Running) {
                    continue;
                }

                // 檢查程序是否仍在運行
                if let Some(child) = &mut info.child {
                    match child.try_wait() {
                        Ok(Some(_status)) => {
                            // 程序已結束
                            println!("⚠ Process '{}' crashed", name);
                            info.status = ProcessStatus::Crashed;
                            info.stopped_at = Some(Utc::now());
                            
                            if info.config.auto_restart 
                                && info.restart_count < info.config.max_restarts {
                                info.restart_count += 1;
                                println!("↻ Restarting process '{}' (attempt {}/{})", 
                                    name, info.restart_count, info.config.max_restarts);
                                
                                tokio::time::sleep(
                                    tokio::time::Duration::from_millis(
                                        info.config.restart_delay_ms
                                    )
                                ).await;
                                
                                if let Err(e) = self.spawn_process(info).await {
                                    eprintln!("✗ Failed to restart '{}': {}", name, e);
                                }
                            }
                        }
                        Ok(None) => {
                            // 程序仍在運行
                        }
                        Err(e) => {
                            eprintln!("✗ Error checking process '{}': {}", name, e);
                        }
                    }
                }
            }
        }
    }

    pub fn spawn_monitor(self: Arc<Self>) {
        tokio::spawn(async move {
            self.monitor_processes().await;
        });
    }
}

cli 介面

use clap::{Parser, Subcommand};

#[derive(Parser)]
#[command(name = "pm")]
#[command(about = "Process Manager - Manage and monitor background processes")]
struct Cli {
    #[command(subcommand)]
    command: Commands,
}

#[derive(Subcommand)]
enum Commands {
    /// Start a new process
    Start {
        /// Process name
        name: String,
        /// Command to execute
        command: String,
        /// Command arguments
        #[arg(trailing_var_arg = true)]
        args: Vec<String>,
        /// Enable auto-restart
        #[arg(long)]
        auto_restart: bool,
    },
    /// Stop a running process
    Stop {
        /// Process name
        name: String,
    },
    /// Restart a process
    Restart {
        /// Process name
        name: String,
    },
    /// List all processes
    List,
    /// Show process information
    Info {
        /// Process name
        name: String,
    },
    /// Remove a process
    Delete {
        /// Process name
        name: String,
    },
}

main.rs - 主程式

#[tokio::main]
async fn main() -> Result<()> {
    let cli = Cli::parse();
    let manager = Arc::new(ProcessManager::new());

    // 啟動監控任務
    manager.clone().spawn_monitor();

    match cli.command {
        Commands::Start { name, command, args, auto_restart } => {
            let config = ProcessConfig {
                name: name.clone(),
                command,
                args,
                cwd: None,
                env: HashMap::new(),
                auto_restart,
                max_restarts: 10,
                restart_delay_ms: 1000,
            };

            manager.add_process(config).await?;
            manager.start_process(&name).await?;
        }
        Commands::Stop { name } => {
            manager.stop_process(&name).await?;
        }
        Commands::Restart { name } => {
            manager.restart_process(&name).await?;
        }
        Commands::List => {
            let processes = manager.list_processes().await;
            
            println!("\n{:<20} {:<15} {:<10}", "NAME", "STATUS", "PID");
            println!("{:-<45}", "");
            
            for (name, status, pid) in processes {
                let status_str = format!("{:?}", status);
                let pid_str = pid.map(|p| p.to_string())
                    .unwrap_or_else(|| "-".to_string());
                
                println!("{:<20} {:<15} {:<10}", name, status_str, pid_str);
            }
            println!();
        }
        Commands::Info { name } => {
            if let Some(config) = manager.get_process_info(&name).await {
                println!("\nProcess: {}", config.name);
                println!("Command: {}", config.command);
                println!("Args: {:?}", config.args);
                println!("Auto Restart: {}", config.auto_restart);
                println!("Max Restarts: {}", config.max_restarts);
                println!();
            } else {
                println!("Process '{}' not found", name);
            }
        }
        Commands::Delete { name } => {
            manager.stop_process(&name).await.ok();
            println!("✓ Deleted process '{}'", name);
        }
    }

    // 保持程式運行以便監控
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    Ok(())
}

開始使用

# 啟動一個 Web 服務並啟用自動重啟
cargo run -- start web-server python3 -m http.server 8080 --auto-restart

# 列出所有程序
cargo run -- list

# 查看程序資訊
cargo run -- info web-server

# 重啟程序
cargo run -- restart web-server

# 停止程序
cargo run -- stop web-server

# 刪除程序
cargo run -- delete web-server

(額外)加入 log 管理

use std::fs::OpenOptions;
use std::io::Write;

impl ProcessManager {
    async fn setup_logging(&self, info: &mut ProcessInfo) -> Result<()> {
        let log_dir = format!("logs/{}", info.config.name);
        std::fs::create_dir_all(&log_dir)?;

        let stdout_path = format!("{}/stdout.log", log_dir);
        let stderr_path = format!("{}/stderr.log", log_dir);

        // 設定日誌檔案
        // 在實際實作中,需要將子程序的輸出重定向到這些檔案
        
        Ok(())
    }
}

(額外) 增加 configuration

use serde_json;
use std::fs;

#[derive(Serialize, Deserialize)]
struct ManagerConfig {
    processes: Vec<ProcessConfig>,
}

impl ProcessManager {
    pub async fn load_config(&self, path: &str) -> Result<()> {
        let content = fs::read_to_string(path)?;
        let config: ManagerConfig = serde_json::from_str(&content)?;

        for process_config in config.processes {
            self.add_process(process_config).await?;
        }

        Ok(())
    }

    pub async fn save_config(&self, path: &str) -> Result<()> {
        let processes = self.processes.read().await;
        let configs: Vec<ProcessConfig> = processes
            .values()
            .map(|info| info.config.clone())
            .collect();

        let config = ManagerConfig { processes: configs };
        let content = serde_json::to_string_pretty(&config)?;
        fs::write(path, content)?;

        Ok(())
    }
}

增加資源監控

#[derive(Debug, Clone)]
pub struct ProcessStats {
    pub cpu_percent: f64,
    pub memory_mb: f64,
    pub uptime_seconds: u64,
}

impl ProcessManager {
    pub async fn get_process_stats(&self, name: &str) -> Option<ProcessStats> {
        let processes = self.processes.read().await;
        let info = processes.get(name)?;

        if let Some(pid) = info.pid {
            // 使用 sysinfo 或類似的 crate 來獲取資源使用情況
            // 這裡簡化處理
            Some(ProcessStats {
                cpu_percent: 0.0,
                memory_mb: 0.0,
                uptime_seconds: info.started_at?
                    .signed_duration_since(Utc::now())
                    .num_seconds()
                    .abs() as u64,
            })
        } else {
            None
        }
    }
}

寫測試

o#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_add_and_start_process() {
        let manager = ProcessManager::new();
        
        let config = ProcessConfig {
            name: "test".to_string(),
            command: "echo".to_string(),
            args: vec!["hello".to_string()],
            cwd: None,
            env: HashMap::new(),
            auto_restart: false,
            max_restarts: 0,
            restart_delay_ms: 1000,
        };

        manager.add_process(config).await.unwrap();
        manager.start_process("test").await.unwrap();

        let processes = manager.list_processes().await;
        assert_eq!(processes.len(), 1);
    }

    #[tokio::test]
    async fn test_stop_process() {
        let manager = ProcessManager::new();
        
        let config = ProcessConfig {
            name: "test".to_string(),
            command: "sleep".to_string(),
            args: vec!["10".to_string()],
            cwd: None,
            env: HashMap::new(),
            auto_restart: false,
            max_restarts: 0,
            restart_delay_ms: 1000,
        };

        manager.add_process(config).await.unwrap();
        manager.start_process("test").await.unwrap();
        
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
        
        manager.stop_process("test").await.unwrap();
    }
}

小語

在幾天就結束了!繼續努力😴


上一篇
備份自動化工具 - 定期備份檔案到雲端儲存
系列文
Rust 實戰專案集:30 個漸進式專案從工具到服務26
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言