iT邦幫忙

2025 iThome 鐵人賽

DAY 24
0
Rust

Rust 實戰專案集:30 個漸進式專案從工具到服務系列 第 24

服務健康監控 - 監控系統服務狀態並發送警報

  • 分享至 

  • xImage
  •  

前言

今天我們要做一個服務監控工具,定期檢查多個服務的健康狀況,並且在服務異常時要有警報工作
這個在我做網頁專案時非常實用,當然網上有很多類似的工具甚至 Grafana/prometheus 也有相關的工具
但是一樣,我們是以學習為目的做開發,那我就在這裡展示一下如何用 rust 實現這項功能

可能內容看起來像我們 Day13
但我們今天著重更近一步的狀況,像是 Email,slack,webhook 等

今日學習目標

  • 支援多種健康檢查方式(HTTP、TCP、自定義命令)
  • 可配置的檢查間隔和超時設定
  • 多種警報通知方式(Email、Slack、Webhook)
  • 服務狀態歷史記錄
  • 簡單的 Web Dashboard 查看監控狀態
  • 自動重試機制和故障轉移

雖然先前專案已經有使用過這些但還是簡單介紹一下每個 Dependencies 的功用

reqwest: HTTP 客戶端,用於 HTTP 健康檢查
tokio: 異步運行時
serde: 序列化和反序列化配置
chrono: 時間處理
axum: Web 框架,提供 Dashboard API
lettre: Email 發送
rusqlite: SQLite 資料庫,存儲歷史記錄

依賴

cargo.toml

[package]
name = "service_health_monitor"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = { version = "1.35", features = ["full"] }
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
chrono = { version = "0.4", features = ["serde"] }
axum = "0.7"
tower-http = { version = "0.5", features = ["cors", "trace"] }
tracing = "0.1"
tracing-subscriber = "0.3"
lettre = "0.11"
rusqlite = { version = "0.30", features = ["bundled"] }
anyhow = "1.0"
config = "0.13"

開始實作

一樣,我們先把資料結構弄出來

資料結構

src/models.rs

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

/// 服務健康狀態
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum HealthStatus {
    Healthy,
    Unhealthy,
    Unknown,
}

/// 健康檢查類型
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum CheckType {
    Http {
        url: String,
        method: String,
        expected_status: u16,
        timeout_secs: u64,
    },
    Tcp {
        host: String,
        port: u16,
        timeout_secs: u64,
    },
    Command {
        command: String,
        args: Vec<String>,
        timeout_secs: u64,
    },
}

/// 警報通知配置
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum AlertConfig {
    Email {
        smtp_server: String,
        smtp_port: u16,
        username: String,
        password: String,
        from: String,
        to: Vec<String>,
    },
    Slack {
        webhook_url: String,
    },
    Webhook {
        url: String,
        headers: Vec<(String, String)>,
    },
}

/// 服務配置
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceConfig {
    pub name: String,
    pub description: String,
    pub check: CheckType,
    pub check_interval_secs: u64,
    pub retry_count: u32,
    pub retry_delay_secs: u64,
    pub alerts: Vec<AlertConfig>,
}

/// 健康檢查結果
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthCheckResult {
    pub service_name: String,
    pub status: HealthStatus,
    pub message: String,
    pub response_time_ms: u64,
    pub timestamp: DateTime<Utc>,
}

/// 服務狀態
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceStatus {
    pub name: String,
    pub description: String,
    pub current_status: HealthStatus,
    pub last_check: DateTime<Utc>,
    pub last_healthy: Option<DateTime<Utc>>,
    pub consecutive_failures: u32,
    pub uptime_percentage: f64,
}

/// 監控配置
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MonitorConfig {
    pub services: Vec<ServiceConfig>,
    pub database_path: String,
    pub dashboard_port: u16,
}

這裡我們做一個 health checker

src/checker.rs

use anyhow::{Context, Result};
use chrono::Utc;
use std::process::Command;
use std::time::{Duration, Instant};
use tokio::net::TcpStream;
use tokio::time::timeout;

use crate::models::{CheckType, HealthCheckResult, HealthStatus};

pub struct HealthChecker {
    client: reqwest::Client,
}

impl HealthChecker {
    pub fn new() -> Self {
        Self {
            client: reqwest::Client::builder()
                .timeout(Duration::from_secs(30))
                .build()
                .unwrap(),
        }
    }

    pub async fn check(
        &self,
        service_name: &str,
        check: &CheckType,
    ) -> HealthCheckResult {
        let start = Instant::now();
        
        let (status, message) = match check {
            CheckType::Http {
                url,
                method,
                expected_status,
                timeout_secs,
            } => self.check_http(url, method, *expected_status, *timeout_secs).await,
            
            CheckType::Tcp {
                host,
                port,
                timeout_secs,
            } => self.check_tcp(host, *port, *timeout_secs).await,
            
            CheckType::Command {
                command,
                args,
                timeout_secs,
            } => self.check_command(command, args, *timeout_secs).await,
        };

        let response_time_ms = start.elapsed().as_millis() as u64;

        HealthCheckResult {
            service_name: service_name.to_string(),
            status,
            message,
            response_time_ms,
            timestamp: Utc::now(),
        }
    }

    async fn check_http(
        &self,
        url: &str,
        method: &str,
        expected_status: u16,
        timeout_secs: u64,
    ) -> (HealthStatus, String) {
        let request = match method.to_uppercase().as_str() {
            "GET" => self.client.get(url),
            "POST" => self.client.post(url),
            "HEAD" => self.client.head(url),
            _ => return (HealthStatus::Unknown, format!("Unsupported HTTP method: {}", method)),
        };

        match timeout(
            Duration::from_secs(timeout_secs),
            request.send(),
        )
        .await
        {
            Ok(Ok(response)) => {
                let status_code = response.status().as_u16();
                if status_code == expected_status {
                    (
                        HealthStatus::Healthy,
                        format!("HTTP {} OK (expected {})", status_code, expected_status),
                    )
                } else {
                    (
                        HealthStatus::Unhealthy,
                        format!(
                            "HTTP {} (expected {})",
                            status_code, expected_status
                        ),
                    )
                }
            }
            Ok(Err(e)) => (
                HealthStatus::Unhealthy,
                format!("HTTP request failed: {}", e),
            ),
            Err(_) => (
                HealthStatus::Unhealthy,
                format!("HTTP request timeout after {}s", timeout_secs),
            ),
        }
    }

    async fn check_tcp(
        &self,
        host: &str,
        port: u16,
        timeout_secs: u64,
    ) -> (HealthStatus, String) {
        let addr = format!("{}:{}", host, port);
        
        match timeout(
            Duration::from_secs(timeout_secs),
            TcpStream::connect(&addr),
        )
        .await
        {
            Ok(Ok(_)) => (
                HealthStatus::Healthy,
                format!("TCP connection to {} successful", addr),
            ),
            Ok(Err(e)) => (
                HealthStatus::Unhealthy,
                format!("TCP connection to {} failed: {}", addr, e),
            ),
            Err(_) => (
                HealthStatus::Unhealthy,
                format!("TCP connection timeout after {}s", timeout_secs),
            ),
        }
    }

    async fn check_command(
        &self,
        command: &str,
        args: &[String],
        timeout_secs: u64,
    ) -> (HealthStatus, String) {
        let command_str = format!("{} {}", command, args.join(" "));
        
        match timeout(
            Duration::from_secs(timeout_secs),
            tokio::task::spawn_blocking({
                let cmd = command.to_string();
                let args = args.to_vec();
                move || {
                    Command::new(&cmd)
                        .args(&args)
                        .output()
                }
            }),
        )
        .await
        {
            Ok(Ok(Ok(output))) => {
                if output.status.success() {
                    (
                        HealthStatus::Healthy,
                        format!("Command '{}' executed successfully", command_str),
                    )
                } else {
                    let stderr = String::from_utf8_lossy(&output.stderr);
                    (
                        HealthStatus::Unhealthy,
                        format!("Command '{}' failed: {}", command_str, stderr),
                    )
                }
            }
            Ok(Ok(Err(e))) => (
                HealthStatus::Unhealthy,
                format!("Failed to execute command '{}': {}", command_str, e),
            ),
            Ok(Err(e)) => (
                HealthStatus::Unhealthy,
                format!("Command task failed: {}", e),
            ),
            Err(_) => (
                HealthStatus::Unhealthy,
                format!("Command timeout after {}s", timeout_secs),
            ),
        }
    }
}

製作警報系統

src/alerter.rs

use anyhow::Result;
use lettre::message::{header, MultiPart, SinglePart};
use lettre::transport::smtp::authentication::Credentials;
use lettre::{Message, SmtpTransport, Transport};
use serde_json::json;

use crate::models::{AlertConfig, HealthCheckResult, HealthStatus};

pub struct Alerter {
    client: reqwest::Client,
}

impl Alerter {
    pub fn new() -> Self {
        Self {
            client: reqwest::Client::new(),
        }
    }

    pub async fn send_alert(
        &self,
        config: &AlertConfig,
        result: &HealthCheckResult,
    ) -> Result<()> {
        match config {
            AlertConfig::Email {
                smtp_server,
                smtp_port,
                username,
                password,
                from,
                to,
            } => {
                self.send_email(
                    smtp_server,
                    *smtp_port,
                    username,
                    password,
                    from,
                    to,
                    result,
                )
                .await
            }
            AlertConfig::Slack { webhook_url } => {
                self.send_slack(webhook_url, result).await
            }
            AlertConfig::Webhook { url, headers } => {
                self.send_webhook(url, headers, result).await
            }
        }
    }

    async fn send_email(
        &self,
        smtp_server: &str,
        smtp_port: u16,
        username: &str,
        password: &str,
        from: &str,
        to: &[String],
        result: &HealthCheckResult,
    ) -> Result<()> {
        let subject = format!(
            "[{}] Service {} Health Alert",
            if result.status == HealthStatus::Healthy {
                "RECOVERED"
            } else {
                "CRITICAL"
            },
            result.service_name
        );

        let body = format!(
            r#"
Service Health Alert

Service: {}
Status: {:?}
Message: {}
Response Time: {}ms
Timestamp: {}

This is an automated alert from the Service Health Monitor.
            "#,
            result.service_name,
            result.status,
            result.message,
            result.response_time_ms,
            result.timestamp.format("%Y-%m-%d %H:%M:%S UTC")
        );

        let mut email_builder = Message::builder()
            .from(from.parse()?)
            .subject(subject);

        for recipient in to {
            email_builder = email_builder.to(recipient.parse()?);
        }

        let email = email_builder
            .multipart(
                MultiPart::alternative()
                    .singlepart(
                        SinglePart::builder()
                            .header(header::ContentType::TEXT_PLAIN)
                            .body(body),
                    )
            )?;

        let creds = Credentials::new(username.to_string(), password.to_string());

        let mailer = SmtpTransport::relay(smtp_server)?
            .port(smtp_port)
            .credentials(creds)
            .build();

        tokio::task::spawn_blocking(move || mailer.send(&email))
            .await??;

        Ok(())
    }

    async fn send_slack(
        &self,
        webhook_url: &str,
        result: &HealthCheckResult,
    ) -> Result<()> {
        let color = match result.status {
            HealthStatus::Healthy => "good",
            HealthStatus::Unhealthy => "danger",
            HealthStatus::Unknown => "warning",
        };

        let status_emoji = match result.status {
            HealthStatus::Healthy => "✅",
            HealthStatus::Unhealthy => "🔴",
            HealthStatus::Unknown => "⚠️",
        };

        let payload = json!({
            "attachments": [{
                "color": color,
                "title": format!("{} Service: {}", status_emoji, result.service_name),
                "fields": [
                    {
                        "title": "Status",
                        "value": format!("{:?}", result.status),
                        "short": true
                    },
                    {
                        "title": "Response Time",
                        "value": format!("{}ms", result.response_time_ms),
                        "short": true
                    },
                    {
                        "title": "Message",
                        "value": &result.message,
                        "short": false
                    },
                    {
                        "title": "Timestamp",
                        "value": result.timestamp.format("%Y-%m-%d %H:%M:%S UTC").to_string(),
                        "short": false
                    }
                ],
                "footer": "Service Health Monitor",
                "ts": result.timestamp.timestamp()
            }]
        });

        self.client
            .post(webhook_url)
            .json(&payload)
            .send()
            .await?;

        Ok(())
    }

    async fn send_webhook(
        &self,
        url: &str,
        headers: &[(String, String)],
        result: &HealthCheckResult,
    ) -> Result<()> {
        let mut request = self.client.post(url);

        for (key, value) in headers {
            request = request.header(key, value);
        }

        let payload = json!({
            "service_name": result.service_name,
            "status": result.status,
            "message": result.message,
            "response_time_ms": result.response_time_ms,
            "timestamp": result.timestamp.to_rfc3339()
        });

        request.json(&payload).send().await?;

        Ok(())
    }
}

資料庫存儲

src/database.rs

use anyhow::Result;
use chrono::{DateTime, Duration, Utc};
use rusqlite::{params, Connection};
use std::sync::{Arc, Mutex};

use crate::models::{HealthCheckResult, HealthStatus, ServiceStatus};

pub struct Database {
    conn: Arc<Mutex<Connection>>,
}

impl Database {
    pub fn new(path: &str) -> Result<Self> {
        let conn = Connection::open(path)?;
        
        conn.execute(
            "CREATE TABLE IF NOT EXISTS health_checks (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                service_name TEXT NOT NULL,
                status TEXT NOT NULL,
                message TEXT NOT NULL,
                response_time_ms INTEGER NOT NULL,
                timestamp TEXT NOT NULL
            )",
            [],
        )?;

        conn.execute(
            "CREATE INDEX IF NOT EXISTS idx_service_timestamp 
             ON health_checks(service_name, timestamp DESC)",
            [],
        )?;

        Ok(Self {
            conn: Arc::new(Mutex::new(conn)),
        })
    }

    pub fn insert_check_result(&self, result: &HealthCheckResult) -> Result<()> {
        let conn = self.conn.lock().unwrap();
        
        conn.execute(
            "INSERT INTO health_checks (service_name, status, message, response_time_ms, timestamp)
             VALUES (?1, ?2, ?3, ?4, ?5)",
            params![
                &result.service_name,
                format!("{:?}", result.status),
                &result.message,
                result.response_time_ms,
                result.timestamp.to_rfc3339(),
            ],
        )?;

        Ok(())
    }

    pub fn get_service_status(&self, service_name: &str, description: &str) -> Result<ServiceStatus> {
        let conn = self.conn.lock().unwrap();

        let mut stmt = conn.prepare(
            "SELECT status, message, response_time_ms, timestamp 
             FROM health_checks 
             WHERE service_name = ?1 
             ORDER BY timestamp DESC 
             LIMIT 1"
        )?;

        let result = stmt.query_row(params![service_name], |row| {
            let status_str: String = row.get(0)?;
            let timestamp_str: String = row.get(3)?;
            
            Ok((
                match status_str.as_str() {
                    "Healthy" => HealthStatus::Healthy,
                    "Unhealthy" => HealthStatus::Unhealthy,
                    _ => HealthStatus::Unknown,
                },
                DateTime::parse_from_rfc3339(&timestamp_str)
                    .unwrap()
                    .with_timezone(&Utc),
            ))
        });

        let (current_status, last_check) = result.unwrap_or((HealthStatus::Unknown, Utc::now()));

        let last_healthy = self.get_last_healthy_time(service_name)?;
        let consecutive_failures = self.get_consecutive_failures(service_name)?;
        let uptime_percentage = self.calculate_uptime(service_name, Duration::hours(24))?;

        Ok(ServiceStatus {
            name: service_name.to_string(),
            description: description.to_string(),
            current_status,
            last_check,
            last_healthy,
            consecutive_failures,
            uptime_percentage,
        })
    }

    fn get_last_healthy_time(&self, service_name: &str) -> Result<Option<DateTime<Utc>>> {
        let conn = self.conn.lock().unwrap();

        let mut stmt = conn.prepare(
            "SELECT timestamp 
             FROM health_checks 
             WHERE service_name = ?1 AND status = 'Healthy'
             ORDER BY timestamp DESC 
             LIMIT 1"
        )?;

        let result = stmt.query_row(params![service_name], |row| {
            let timestamp_str: String = row.get(0)?;
            Ok(DateTime::parse_from_rfc3339(&timestamp_str)
                .unwrap()
                .with_timezone(&Utc))
        });

        Ok(result.ok())
    }

    fn get_consecutive_failures(&self, service_name: &str) -> Result<u32> {
        let conn = self.conn.lock().unwrap();

        let mut stmt = conn.prepare(
            "SELECT status 
             FROM health_checks 
             WHERE service_name = ?1 
             ORDER BY timestamp DESC"
        )?;

        let mut rows = stmt.query(params![service_name])?;
        let mut count = 0u32;

        while let Some(row) = rows.next()? {
            let status: String = row.get(0)?;
            if status == "Unhealthy" {
                count += 1;
            } else {
                break;
            }
        }

        Ok(count)
    }

    fn calculate_uptime(&self, service_name: &str, duration: Duration) -> Result<f64> {
        let conn = self.conn.lock().unwrap();
        let since = Utc::now() - duration;

        let mut stmt = conn.prepare(
            "SELECT COUNT(*) as total,
                    SUM(CASE WHEN status = 'Healthy' THEN 1 ELSE 0 END) as healthy
             FROM health_checks 
             WHERE service_name = ?1 AND timestamp >= ?2"
        )?;

        let (total, healthy): (i64, i64) = stmt.query_row(
            params![service_name, since.to_rfc3339()],
            |row| Ok((row.get(0)?, row.get(1)?))
        )?;

        if total == 0 {
            Ok(100.0)
        } else {
            Ok((healthy as f64 / total as f64) * 100.0)
        }
    }

    pub fn get_recent_checks(
        &self,
        service_name: &str,
        limit: usize,
    ) -> Result<Vec<HealthCheckResult>> {
        let conn = self.conn.lock().unwrap();

        let mut stmt = conn.prepare(
            "SELECT service_name, status, message, response_time_ms, timestamp
             FROM health_checks 
             WHERE service_name = ?1 
             ORDER BY timestamp DESC 
             LIMIT ?2"
        )?;

        let results = stmt
            .query_map(params![service_name, limit], |row| {
                let status_str: String = row.get(1)?;
                let timestamp_str: String = row.get(4)?;

                Ok(HealthCheckResult {
                    service_name: row.get(0)?,
                    status: match status_str.as_str() {
                        "Healthy" => HealthStatus::Healthy,
                        "Unhealthy" => HealthStatus::Unhealthy,
                        _ => HealthStatus::Unknown,
                    },
                    message: row.get(2)?,
                    response_time_ms: row.get(3)?,
                    timestamp: DateTime::parse_from_rfc3339(&timestamp_str)
                        .unwrap()
                        .with_timezone(&Utc),
                })
            })?
            .collect::<Result<Vec<_>, _>>()?;

        Ok(results)
    }

    pub fn cleanup_old_records(&self, days: i64) -> Result<usize> {
        let conn = self.conn.lock().unwrap();
        let cutoff = Utc::now() - Duration::days(days);

        let deleted = conn.execute(
            "DELETE FROM health_checks WHERE timestamp < ?1",
            params![cutoff.to_rfc3339()],
        )?;

        Ok(deleted)
    }
}

監控

src/monitor.rs

use anyhow::Result;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::time::{interval, Duration};
use tracing::{error, info, warn};

use crate::alerter::Alerter;
use crate::checker::HealthChecker;
use crate::database::Database;
use crate::models::{HealthStatus, MonitorConfig, ServiceConfig, ServiceStatus};

pub struct Monitor {
    config: MonitorConfig,
    checker: HealthChecker,
    alerter: Alerter,
    database: Arc<Database>,
    service_states: Arc<RwLock<HashMap<String, ServiceState>>>,
}

struct ServiceState {
    last_status: HealthStatus,
    alert_sent: bool,
}

impl Monitor {
    pub fn new(config: MonitorConfig) -> Result<Self> {
        let database = Arc::new(Database::new(&config.database_path)?);
        
        Ok(Self {
            config,
            checker: HealthChecker::new(),
            alerter: Alerter::new(),
            database,
            service_states: Arc::new(RwLock::new(HashMap::new())),
        })
    }

    pub async fn start(&self) -> Result<()> {
        info!("Starting health monitor for {} services", self.config.services.len());

        let mut handles = vec![];

        for service in &self.config.services {
            let service = service.clone();
            let checker = self.checker.clone();
            let alerter = self.alerter.clone();
            let database = Arc::clone(&self.database);
            let service_states = Arc::clone(&self.service_states);

            let handle = tokio::spawn(async move {
                Self::monitor_service(
                    service,
                    checker,
                    alerter,
                    database,
                    service_states,
                )
                .await
            });

            handles.push(handle);
        }

        self.start_cleanup_task();

        for handle in handles {
            if let Err(e) = handle.await {
                error!("Service monitoring task failed: {}", e);
            }
        }

        Ok(())
    }

    async fn monitor_service(
        service: ServiceConfig,
        checker: HealthChecker,
        alerter: Alerter,
        database: Arc<Database>,
        service_states: Arc<RwLock<HashMap<String, ServiceState>>>,
    ) {
        info!("Starting monitor for service: {}", service.name);

        let mut interval = interval(Duration::from_secs(service.check_interval_secs));

        loop {
            interval.tick().await;

            let mut retry_count = 0;
            let mut last_result = None;

            while retry_count <= service.retry_count {
                let result = checker.check(&service.name, &service.check).await;
                
                if result.status == HealthStatus::Healthy {
                    last_result = Some(result);
                    break;
                }

                if retry_count < service.retry_count {
                    warn!(
                        "Service {} check failed (attempt {}/{}), retrying...",
                        service.name,
                        retry_count + 1,
                        service.retry_count + 1
                    );
                    tokio::time::sleep(Duration::from_secs(service.retry_delay_secs)).await;
                }

                last_result = Some(result);
                retry_count += 1;
            }

            if let Some(result) = last_result {
                if let Err(e) = database.insert_check_result(&result) {
                    error!("Failed to save check result: {}", e);
                }

                let mut states = service_states.write().await;
                let state = states
                    .entry(service.name.clone())
                    .or_insert_with(|| ServiceState {
                        last_status: HealthStatus::Unknown,
                        alert_sent: false,
                    });

                let status_changed = state.last_status != result.status;
                let should_alert = match result.status {
                    HealthStatus::Unhealthy => !state.alert_sent,
                    HealthStatus::Healthy => state.alert_sent,
                    HealthStatus::Unknown => false,
                };

                if status_changed || should_alert {
                    info!(
                        "Service {} status: {:?} -> {:?}",
                        service.name, state.last_status, result.status
                    );

                    for alert_config in &service.alerts {
                        if let Err(e) = alerter.send_alert(alert_config, &result).await {
                            error!("Failed to send alert: {}", e);
                        } else {
                            info!("Alert sent for service: {}", service.name);
                        }
                    }

                    state.alert_sent = result.status == HealthStatus::Unhealthy;
                }

                state.last_status = result.status;
            }
        }
    }

    fn start_cleanup_task(&self) {
        let database = Arc::clone(&self.database);
        
        tokio::spawn(async move {
            let mut interval = interval(Duration::from_secs(86400)); // 每天執行一次
            
            loop {
                interval.tick().await;
                
                match database.cleanup_old_records(30) {
                    Ok(deleted) => {
                        info!("Cleaned up {} old records", deleted);
                    }
                    Err(e) => {
                        error!("Failed to cleanup old records: {}", e);
                    }
                }
            }
        });
    }

    pub async fn get_all_service_status(&self) -> Result<Vec<ServiceStatus>> {
        let mut statuses = Vec::new();

        for service in &self.config.services {
            let status = self.database.get_service_status(&service.name, &service.description)?;
            statuses.push(status);
        }

        Ok(statuses)
    }

    pub fn get_database(&self) -> Arc<Database> {
        Arc::clone(&self.database)
    }
}

impl HealthChecker {
    fn clone(&self) -> Self {
        Self {
            client: self.client.clone(),
        }
    }
}

impl Alerter {
    fn clone(&self) -> Self {
        Self {
            client: self.client.clone(),
        }
    }
}

這裏實現 web api

src/api.rs

use axum::{
    extract::{Path, State},
    http::StatusCode,
    response::Json,
    routing::get,
    Router,
};
use serde_json::{json, Value};
use std::sync::Arc;
use tower_http::cors::CorsLayer;

use crate::database::Database;
use crate::monitor::Monitor;

pub struct ApiState {
    pub monitor: Arc<Monitor>,
    pub database: Arc<Database>,
}

pub fn create_router(state: Arc<ApiState>) -> Router {
    Router::new()
        .route("/api/health", get(health_check))
        .route("/api/services", get(get_services))
        .route("/api/services/:name", get(get_service_detail))
        .route("/api/services/:name/history", get(get_service_history))
        .layer(CorsLayer::permissive())
        .with_state(state)
}

async fn health_check() -> Json<Value> {
    Json(json!({
        "status": "ok",
        "timestamp": chrono::Utc::now().to_rfc3339()
    }))
}

async fn get_services(
    State(state): State<Arc<ApiState>>,
) -> Result<Json<Value>, (StatusCode, String)> {
    match state.monitor.get_all_service_status().await {
        Ok(services) => Ok(Json(json!({
            "services": services,
            "timestamp": chrono::Utc::now().to_rfc3339()
        }))),
        Err(e) => Err((
            StatusCode::INTERNAL_SERVER_ERROR,
            format!("Failed to get services: {}", e),
        )),
    }
}

async fn get_service_detail(
    State(state): State<Arc<ApiState>>,
    Path(name): Path<String>,
) -> Result<Json<Value>, (StatusCode, String)> {
    // 找到對應的服務配置
    let service_config = state
        .monitor
        .config
        .services
        .iter()
        .find(|s| s.name == name)
        .ok_or_else(|| {
            (
                StatusCode::NOT_FOUND,
                format!("Service '{}' not found", name),
            )
        })?;

    match state
        .database
        .get_service_status(&name, &service_config.description)
    {
        Ok(status) => Ok(Json(json!({
            "service": status,
            "config": {
                "check_interval_secs": service_config.check_interval_secs,
                "retry_count": service_config.retry_count,
            },
            "timestamp": chrono::Utc::now().to_rfc3339()
        }))),
        Err(e) => Err((
            StatusCode::INTERNAL_SERVER_ERROR,
            format!("Failed to get service status: {}", e),
        )),
    }
}

async fn get_service_history(
    State(state): State<Arc<ApiState>>,
    Path(name): Path<String>,
) -> Result<Json<Value>, (StatusCode, String)> {
    match state.database.get_recent_checks(&name, 100) {
        Ok(history) => Ok(Json(json!({
            "service_name": name,
            "history": history,
            "count": history.len(),
            "timestamp": chrono::Utc::now().to_rfc3339()
        }))),
        Err(e) => Err((
            StatusCode::INTERNAL_SERVER_ERROR,
            format!("Failed to get service history: {}", e),
        )),
    }
}

main.rs

mod alerter;
mod api;
mod checker;
mod database;
mod models;
mod monitor;

use anyhow::Result;
use std::sync::Arc;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

use crate::api::{create_router, ApiState};
use crate::models::MonitorConfig;
use crate::monitor::Monitor;

#[tokio::main]
async fn main() -> Result<()> {
    tracing_subscriber::registry()
        .with(
            tracing_subscriber::EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| "info".into()),
        )
        .with(tracing_subscriber::fmt::layer())
        .init();

    let config = load_config()?;
    let dashboard_port = config.dashboard_port;

    let monitor = Arc::new(Monitor::new(config)?);
    let database = monitor.get_database();

    let api_state = Arc::new(ApiState {
        monitor: Arc::clone(&monitor),
        database,
    });

    let app = create_router(api_state);

    let monitor_clone = Arc::clone(&monitor);
    tokio::spawn(async move {
        if let Err(e) = monitor_clone.start().await {
            tracing::error!("Monitor error: {}", e);
        }
    });

    let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", dashboard_port))
        .await?;
    
    tracing::info!("Dashboard API listening on http://0.0.0.0:{}", dashboard_port);
    
    axum::serve(listener, app).await?;

    Ok(())
}

fn load_config() -> Result<MonitorConfig> {
    let config_str = std::fs::read_to_string("config.json")?;
    let config: MonitorConfig = serde_json::from_str(&config_str)?;
    Ok(config)
}

配置

config.json.example

{
  "services": [
    {
      "name": "web-api",
      "description": "Main Web API Server",
      "check": {
        "type": "Http",
        "url": "https://api.example.com/health",
        "method": "GET",
        "expected_status": 200,
        "timeout_secs": 10
      },
      "check_interval_secs": 60,
      "retry_count": 3,
      "retry_delay_secs": 5,
      "alerts": [
        {
          "type": "Slack",
          "webhook_url": "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
        },
        {
          "type": "Email",
          "smtp_server": "smtp.gmail.com",
          "smtp_port": 587,
          "username": "your-email@gmail.com",
          "password": "your-app-password",
          "from": "monitor@example.com",
          "to": ["admin@example.com", "devops@example.com"]
        }
      ]
    },
    {
      "name": "database",
      "description": "PostgreSQL Database",
      "check": {
        "type": "Tcp",
        "host": "db.example.com",
        "port": 5432,
        "timeout_secs": 5
      },
      "check_interval_secs": 30,
      "retry_count": 2,
      "retry_delay_secs": 3,
      "alerts": [
        {
          "type": "Webhook",
          "url": "https://your-webhook-endpoint.com/alert",
          "headers": [
            ["Authorization", "Bearer YOUR_TOKEN"],
            ["Content-Type", "application/json"]
          ]
        }
      ]
    },
    {
      "name": "backup-service",
      "description": "Daily Backup Service",
      "check": {
        "type": "Command",
        "command": "systemctl",
        "args": ["is-active", "backup.service"],
        "timeout_secs": 10
      },
      "check_interval_secs": 300,
      "retry_count": 1,
      "retry_delay_secs": 10,
      "alerts": [
        {
          "type": "Slack",
          "webhook_url": "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
        }
      ]
    },
    {
      "name": "redis-cache",
      "description": "Redis Cache Server",
      "check": {
        "type": "Tcp",
        "host": "localhost",
        "port": 6379,
        "timeout_secs": 5
      },
      "check_interval_secs": 60,
      "retry_count": 2,
      "retry_delay_secs": 5,
      "alerts": [
        {
          "type": "Slack",
          "webhook_url": "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
        }
      ]
    }
  ],
  "database_path": "health_monitor.db",
  "dashboard_port": 8080
}

使用

cargo build --release

準備配置文件

# 複製範例配置
cp config.json.example config.json

# 編輯配置文件
nvim config.json

運行監控

./target/release/service_health_monitor

上一篇
記憶體使用分析器 - 分析程式記憶體使用模式
系列文
Rust 實戰專案集:30 個漸進式專案從工具到服務24
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言