先前使用 Query 物件是在 in-memory的HashMap 所以更新就更新了,這次接surrealDB試試要透過Surreal的API(SQL)存取
流程大約是cqrs套件裡收到command後,如果執行成功會套用到aggregate並透過query派送至各下游viewcqrs.execute(command)
--> store.load(aggregate)
--> handle(command)
--> apply(aggregate)
--> store.commit(events)
--> query.dispatch(events)
--> view.update
先建立BookDto:
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BookDto {
pub book_id: String,
pub title: String,
pub isbn10: String,
pub description: String,
pub copies: u32,
pub available_copies: u32,
}
加BookQuery物件
#[derive(Debug, Clone)]
pub struct BookQuery {
pub db: Surreal<Db>,
}
impl BookQuery {
pub async fn insert_book(&self, book_dto: &BookDto) {
let result: Content<'_, Db, &BookDto, Vec<BookDto>> = self.db
.create("books")
.content(book_dto);
let _ = result.await.ok();
}
pub async fn ingest_book(&self, copies: u32, id: String) {
let ql = format!("UPDATE books SET copies = copies + {} \
, available_copies = available_copies + {} \
where book_id = '{}';", copies, copies, id);
let _ = self.db
.query(ql)
.await.ok();
}
pub async fn lent_book(&self, id: String) {
let ql = format!("UPDATE books SET available_copies = available_copies - 1 \
where book_id = '{}';", id);
let _ = self.db
.query(ql).await
.ok();
}
pub async fn returned_book(&self, id:String) {
let ql = format!("UPDATE books SET available_copies = available_copies + 1 \
where book_id = '{}';", id);
let _ = self.db
.query(ql).await
.ok();
}
}
套Query trait
#[async_trait]
impl Query<Book> for BookQuery {
async fn dispatch(&self, aggregate_id: &str, events: &[EventEnvelope<Book>]) {
for event in events {
match &event.payload {
BookEvent::BookCreated { id, title, isbn10, description } => {
let book_dto = BookDto {
book_id: id.to_string(),
title: title.to_string(),
isbn10: isbn10.to_string(),
description: description.to_string(),
copies: 0,
available_copies: 0,
};
self.insert_book(&book_dto).await;
}
BookEvent::BookIngested { id, copies } => {
self.ingest_book(*copies, id.to_string()).await;
}
BookEvent::BookLent(record) => {
self.lent_book(aggregate_id.to_string()).await;
}
BookEvent::BookReturned(record) => {
self.returned_book(aggregate_id.to_string()).await;
}
}
}
}
}
在main補上註冊:
let book_query = BookQuery { db: db.clone() };
let book_cqrs = CqrsFramework::new(
book_store,
vec![Box::new(book_query)],
book_service.clone());
測試:
建檔入庫後
======== Books ========
[
BookDto {
book_id: "test-book-1",
title: "Rust 語言開發實戰",
isbn10: "1234567890",
description: "使用rust,併同cqrs框架,實現event sourcing",
copies: 2,
available_copies: 2,
},
]
一借閱
======== Books ========
[
BookDto {
book_id: "test-book-1",
title: "Rust 語言開發實戰",
isbn10: "1234567890",
description: "使用rust,併同cqrs框架,實現event sourcing",
copies: 2,
available_copies: 1,
},
]
二借閱
======== Books ========
[
BookDto {
book_id: "test-book-1",
title: "Rust 語言開發實戰",
isbn10: "1234567890",
description: "使用rust,併同cqrs框架,實現event sourcing",
copies: 2,
available_copies: 0,
},
]
一還
======== Books ========
[
BookDto {
book_id: "test-book-1",
title: "Rust 語言開發實戰",
isbn10: "1234567890",
description: "使用rust,併同cqrs框架,實現event sourcing",
copies: 2,
available_copies: 1,
},
]
書籍的View (Query端)會隨著事件的推移進行更新,後面再處理reader vs book 的query