目前實作到 29 天了,做點不太一樣的東西,製作檔案同步系統
這個系統包含客戶端和伺服器端,能夠自動偵測檔案變更並即時同步到雲端,同時支援多個客戶端之間的檔案同步
cargo.toml
[package]
name = "file-sync-service"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1.35", features = ["full"] }
axum = "0.7"
tower = "0.4"
tower-http = { version = "0.5", features = ["fs", "cors"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
notify = "6.1"
sha2 = "0.10"
hex = "0.4"
chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1.6", features = ["v4", "serde"] }
futures = "0.3"
tokio-tungstenite = "0.21"
dashmap = "5.5"
walkdir = "2.4"
anyhow = "1.0"
tracing = "0.1"
tracing-subscriber = "0.3"
[dev-dependencies]
tempfile = "3.8"
use serde::{Deserialize, Serialize};
use chrono::{DateTime, Utc};
use std::path::PathBuf;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileMetadata {
pub id: String,
pub path: PathBuf,
pub hash: String,
pub size: u64,
pub modified: DateTime<Utc>,
pub version: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SyncEvent {
FileCreated { metadata: FileMetadata },
FileModified { metadata: FileMetadata },
FileDeleted { path: PathBuf },
DirectoryCreated { path: PathBuf },
DirectoryDeleted { path: PathBuf },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncMessage {
pub event: SyncEvent,
pub timestamp: DateTime<Utc>,
pub client_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncState {
pub files: Vec<FileMetadata>,
pub last_sync: DateTime<Utc>,
}
use sha2::{Sha256, Digest};
use std::fs::File;
use std::io::{self, Read};
use std::path::Path;
pub fn calculate_file_hash(path: &Path) -> io::Result<String> {
let mut file = File::open(path)?;
let mut hasher = Sha256::new();
let mut buffer = [0u8; 8192];
loop {
let n = file.read(&mut buffer)?;
if n == 0 {
break;
}
hasher.update(&buffer[..n]);
}
Ok(hex::encode(hasher.finalize()))
}
pub fn create_file_metadata(path: &Path) -> io::Result<FileMetadata> {
let metadata = std::fs::metadata(path)?;
let hash = calculate_file_hash(path)?;
Ok(FileMetadata {
id: uuid::Uuid::new_v4().to_string(),
path: path.to_path_buf(),
hash,
size: metadata.len(),
modified: metadata.modified()?.into(),
version: 1,
})
}
有監控才有同步
use notify::{
Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
};
use tokio::sync::mpsc;
use std::path::Path;
pub struct FileWatcher {
watcher: RecommendedWatcher,
event_tx: mpsc::Sender<SyncEvent>,
}
impl FileWatcher {
pub fn new(event_tx: mpsc::Sender<SyncEvent>) -> anyhow::Result<Self> {
let tx = event_tx.clone();
let watcher = RecommendedWatcher::new(
move |result: Result<Event, notify::Error>| {
if let Ok(event) = result {
let _ = tx.blocking_send(Self::process_event(event));
}
},
Config::default(),
)?;
Ok(Self {
watcher,
event_tx,
})
}
pub fn watch(&mut self, path: &Path) -> anyhow::Result<()> {
self.watcher.watch(path, RecursiveMode::Recursive)?;
Ok(())
}
fn process_event(event: Event) -> SyncEvent {
match event.kind {
EventKind::Create(_) => {
if let Some(path) = event.paths.first() {
if path.is_file() {
if let Ok(metadata) = create_file_metadata(path) {
return SyncEvent::FileCreated { metadata };
}
} else {
return SyncEvent::DirectoryCreated {
path: path.clone(),
};
}
}
}
EventKind::Modify(_) => {
if let Some(path) = event.paths.first() {
if path.is_file() {
if let Ok(metadata) = create_file_metadata(path) {
return SyncEvent::FileModified { metadata };
}
}
}
}
EventKind::Remove(_) => {
if let Some(path) = event.paths.first() {
return SyncEvent::FileDeleted {
path: path.clone(),
};
}
}
_ => {}
}
// 預設事件
SyncEvent::FileDeleted {
path: PathBuf::new(),
}
}
}
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
State,
},
response::IntoResponse,
routing::get,
Router,
};
use dashmap::DashMap;
use std::sync::Arc;
use tokio::sync::broadcast;
pub struct SyncServer {
clients: Arc<DashMap<String, broadcast::Sender<SyncMessage>>>,
file_store: Arc<DashMap<String, FileMetadata>>,
}
impl SyncServer {
pub fn new() -> Self {
Self {
clients: Arc::new(DashMap::new()),
file_store: Arc::new(DashMap::new()),
}
}
pub fn router(self) -> Router {
let state = Arc::new(self);
Router::new()
.route("/ws", get(ws_handler))
.route("/files", get(list_files))
.route("/sync-state", get(get_sync_state))
.with_state(state)
}
pub async fn broadcast_event(&self, message: SyncMessage) {
for client in self.clients.iter() {
let _ = client.value().send(message.clone());
}
}
pub fn update_file(&self, metadata: FileMetadata) {
self.file_store.insert(metadata.id.clone(), metadata);
}
pub fn remove_file(&self, id: &str) {
self.file_store.remove(id);
}
}
async fn ws_handler(
ws: WebSocketUpgrade,
State(server): State<Arc<SyncServer>>,
) -> impl IntoResponse {
ws.on_upgrade(|socket| handle_socket(socket, server))
}
async fn handle_socket(socket: WebSocket, server: Arc<SyncServer>) {
let client_id = uuid::Uuid::new_v4().to_string();
let (tx, mut rx) = broadcast::channel(100);
server.clients.insert(client_id.clone(), tx);
let (mut sender, mut receiver) = socket.split();
// 接收客戶端訊息
let server_clone = server.clone();
let receive_task = tokio::spawn(async move {
while let Some(Ok(msg)) = receiver.next().await {
if let Message::Text(text) = msg {
if let Ok(sync_msg) = serde_json::from_str::<SyncMessage>(&text) {
match sync_msg.event {
SyncEvent::FileCreated { ref metadata } |
SyncEvent::FileModified { ref metadata } => {
server_clone.update_file(metadata.clone());
}
SyncEvent::FileDeleted { .. } => {
// 處理檔案刪除
}
_ => {}
}
server_clone.broadcast_event(sync_msg).await;
}
}
}
});
// 發送訊息給客戶端
let send_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
if let Ok(json) = serde_json::to_string(&msg) {
if sender.send(Message::Text(json)).await.is_err() {
break;
}
}
}
});
tokio::select! {
_ = receive_task => {},
_ = send_task => {},
}
server.clients.remove(&client_id);
}
async fn list_files(
State(server): State<Arc<SyncServer>>,
) -> impl IntoResponse {
let files: Vec<FileMetadata> = server
.file_store
.iter()
.map(|entry| entry.value().clone())
.collect();
axum::Json(files)
}
async fn get_sync_state(
State(server): State<Arc<SyncServer>>,
) -> impl IntoResponse {
let state = SyncState {
files: server
.file_store
.iter()
.map(|entry| entry.value().clone())
.collect(),
last_sync: Utc::now(),
};
axum::Json(state)
}
use tokio_tungstenite::{connect_async, tungstenite::Message};
use futures::{SinkExt, StreamExt};
pub struct SyncClient {
server_url: String,
sync_dir: PathBuf,
client_id: String,
}
impl SyncClient {
pub fn new(server_url: String, sync_dir: PathBuf) -> Self {
Self {
server_url,
sync_dir,
client_id: uuid::Uuid::new_v4().to_string(),
}
}
pub async fn start(&self) -> anyhow::Result<()> {
let (ws_stream, _) = connect_async(&self.server_url).await?;
let (mut write, mut read) = ws_stream.split();
// 啟動檔案監控
let (event_tx, mut event_rx) = mpsc::channel(100);
let mut watcher = FileWatcher::new(event_tx)?;
watcher.watch(&self.sync_dir)?;
// 發送本地事件到伺服器
let client_id = self.client_id.clone();
let send_task = tokio::spawn(async move {
while let Some(event) = event_rx.recv().await {
let message = SyncMessage {
event,
timestamp: Utc::now(),
client_id: client_id.clone(),
};
if let Ok(json) = serde_json::to_string(&message) {
let _ = write.send(Message::Text(json)).await;
}
}
});
// 接收伺服器事件並同步到本地
let sync_dir = self.sync_dir.clone();
let my_client_id = self.client_id.clone();
let receive_task = tokio::spawn(async move {
while let Some(Ok(msg)) = read.next().await {
if let Message::Text(text) = msg {
if let Ok(sync_msg) = serde_json::from_str::<SyncMessage>(&text) {
// 忽略自己發送的事件
if sync_msg.client_id == my_client_id {
continue;
}
Self::apply_sync_event(&sync_dir, sync_msg.event).await;
}
}
}
});
tokio::select! {
_ = send_task => {},
_ = receive_task => {},
}
Ok(())
}
async fn apply_sync_event(sync_dir: &Path, event: SyncEvent) {
match event {
SyncEvent::FileCreated { metadata } |
SyncEvent::FileModified { metadata } => {
let local_path = sync_dir.join(&metadata.path);
// 這裡需要從伺服器下載檔案內容
println!("需要同步檔案: {:?}", local_path);
}
SyncEvent::FileDeleted { path } => {
let local_path = sync_dir.join(&path);
if local_path.exists() {
let _ = std::fs::remove_file(local_path);
}
}
SyncEvent::DirectoryCreated { path } => {
let local_path = sync_dir.join(&path);
let _ = std::fs::create_dir_all(local_path);
}
SyncEvent::DirectoryDeleted { path } => {
let local_path = sync_dir.join(&path);
let _ = std::fs::remove_dir_all(local_path);
}
}
}
pub async fn initial_sync(&self) -> anyhow::Result<()> {
// 獲取伺服器端的檔案狀態
let client = reqwest::Client::new();
let response = client
.get(format!("{}/sync-state", self.server_url))
.send()
.await?;
let state: SyncState = response.json().await?;
// 比對本地檔案並同步
for remote_file in state.files {
let local_path = self.sync_dir.join(&remote_file.path);
if !local_path.exists() {
println!("需要下載: {:?}", remote_file.path);
// 下載檔案
} else if let Ok(local_meta) = create_file_metadata(&local_path) {
if local_meta.hash != remote_file.hash {
println!("需要更新: {:?}", remote_file.path);
// 更新檔案
}
}
}
Ok(())
}
}
use tracing_subscriber;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let args: Vec<String> = std::env::args().collect();
if args.len() < 2 {
println!("使用方式:");
println!(" {} server [port]", args[0]);
println!(" {} client <server-url> <sync-dir>", args[0]);
return Ok(());
}
match args[1].as_str() {
"server" => {
let port = args.get(2)
.and_then(|p| p.parse().ok())
.unwrap_or(3000);
start_server(port).await?;
}
"client" => {
if args.len() < 4 {
println!("請提供伺服器 URL 和同步目錄");
return Ok(());
}
let server_url = args[2].clone();
let sync_dir = PathBuf::from(&args[3]);
start_client(server_url, sync_dir).await?;
}
_ => {
println!("未知命令: {}", args[1]);
}
}
Ok(())
}
async fn start_server(port: u16) -> anyhow::Result<()> {
let server = SyncServer::new();
let app = server.router();
let addr = format!("0.0.0.0:{}", port);
println!("🚀 同步伺服器啟動於 {}", addr);
let listener = tokio::net::TcpListener::bind(&addr).await?;
axum::serve(listener, app).await?;
Ok(())
}
async fn start_client(server_url: String, sync_dir: PathBuf) -> anyhow::Result<()> {
if !sync_dir.exists() {
std::fs::create_dir_all(&sync_dir)?;
}
let client = SyncClient::new(server_url, sync_dir);
println!("🔄 執行初始同步...");
client.initial_sync().await?;
println!("👀 開始監控檔案變更...");
client.start().await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
use std::fs::File;
use std::io::Write;
#[test]
fn test_file_hash() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("test.txt");
let mut file = File::create(&file_path).unwrap();
file.write_all(b"Hello, World!").unwrap();
let hash = calculate_file_hash(&file_path).unwrap();
assert!(!hash.is_empty());
}
#[test]
fn test_file_metadata() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("test.txt");
let mut file = File::create(&file_path).unwrap();
file.write_all(b"Test content").unwrap();
let metadata = create_file_metadata(&file_path).unwrap();
assert_eq!(metadata.size, 12);
assert!(!metadata.hash.is_empty());
}
#[tokio::test]
async fn test_sync_server() {
let server = SyncServer::new();
let metadata = FileMetadata {
id: "test-1".to_string(),
path: PathBuf::from("test.txt"),
hash: "abc123".to_string(),
size: 100,
modified: Utc::now(),
version: 1,
};
server.update_file(metadata.clone());
assert!(server.file_store.contains_key("test-1"));
server.remove_file("test-1");
assert!(!server.file_store.contains_key("test-1"));
}
}
啟動 server
cargo run -- server 3000
啟動 client
# 客戶端 1
cargo run -- client ws://localhost:3000/ws ./sync_folder_1
# 客戶端 2
cargo run -- client ws://localhost:3000/ws ./sync_folder_2
測試同步
# 在 sync_folder_1 中建立檔案
echo "Hello" > ./sync_folder_1/test.txt
# 檔案會自動同步到 sync_folder_2
cat ./sync_folder_2/test.txt # 輸出: Hello