今天有一種情境,假設我們需要去匹次處理大量任務,每個任務都可能會有發生 error 的風險(資料錯誤或是暫時性第三方錯誤或是任何非預期原因)
例如: 假設我們今天有十萬個商品,都需要透過 API 去抓取他們的資料,然後根據情況進行某種商品資料更新,那一般會如何處理呢?
第二種方式看似可以解決這個問題,但今天任務一多時,如果只有一個 instance 在處理這些任務,則處理效率會十分緩慢。然而若是開多個 instance 來分擔這些任務時,則需要擔心 race condition 的問題,若是沒處理好,可能會導致任務被重複執行的錯誤狀況。
並且針對 error 的任務,事後若要重新執行,也相對麻煩。
Task Queue (Message Queue) 可以幫助我們解決上述的問題,並且是十分容易擴展服務的方式,而本篇介紹的 Bull 便是 node 生態系中一種 task queue 的解決方案。
什麼是 bull 呢?我們先看一下官網的介紹:
The fastest, most reliable, Redis-based queue for Node.
Carefully written for rock solid stability and atomicity.
簡單說,bull 是一種基於 redis 的 queue,可以幫助我們解決剛剛以上的問題,類似的工具有非常多,例如 RabitMQ 或 AWS 的 SQS 等。
透過使用 bull,我們可以在 task queue 中加入我們想要處理的任務。
以上述例子為例,則為 product ids。然後任務將分別被派發給處理這些任務的對象,簡稱 processor 或是 consumer。
值得開心的是,processor 可以是複數個,言下之意是,我們可以由多個 processors 來消耗及處理我們的任務,這些 processors 不只可以是位於不同主機的 instances,並且 bull 會協助我們派發這些任務,避免任務被重複執行。
另外,task queue 還能幫助我們設定每個任務之間的延遲、retry 次數、rate limite 等。暫停任務跟刪除任務也是可以做到的,使我們的任務變得更可掌控。
另外,在這邊推薦一個 bull 的 admin dashboard,目前在工作專案已使用幾週,可視化的介面非常親民方便,並且可以方便地重啟失敗的任務。
https://github.com/felixmosh/bull-board
另外,對 task queue 有興趣深入瞭解的,可以參考這篇莫力全大大的文章
Task queue 是高併發架構常見的其中一個解決方案,可以幫助我們有條不亂的處理大量非同步任務,並且使系統更容易被管理跟追蹤,配合 node 搭配 pm2 的使用,更是如虎添翼,可以在近乎無痛的情況下開啟多個 consumers 來同時處理任務。
以下是我使用 bulljs 配合上一篇的 node-cron 設計的一個管理 cron job 的 class,任務統一存在 redis 做管理,如果是以 datetime 設定,則會被視為一次性任務,執行後就會刪除。
import { CronJob } from 'cron';
import moment from 'moment-timezone';
import mainQueue from '../queue/queue';
import { redis } from './redis';
class CronManager {
private jobs: CronJob[] = [];
private redis_prefix = process.env.SITE_PREFIX;
constructor() {
this.refreshJobs();
}
async refreshJobs() {
const allJobs = await this.getCronJobList();
this.jobs.forEach(job => {
job.stop();
});
this.jobs = allJobs
.map(job => {
const { id, action, payload, type, value } = job;
const cron = type === 'DATETIME' ? moment(value).toDate() : value;
try {
return new CronJob(
cron,
() => {
const data = {
action,
payload: payload,
};
if (type === 'DATETIME') {
this.deleteCronJob(id);
}
mainQueue.add('main', data);
},
null,
true,
'Asia/Taipei',
);
} catch (error) {
return null;
}
})
.filter(item => item !== null);
}
async getCronJobList() {
const all = await redis.get(`${this.redis_prefix}_conjob`);
return all ? JSON.parse(all) : [];
}
async deleteCronJob(jobId: number) {
const all = await this.getCronJobList();
const newAll = all.filter(({ id }) => {
return Number(id) !== Number(jobId);
});
await redis.set(`${this.redis_prefix}_conjob`, JSON.stringify(newAll));
this.refreshJobs();
}
}
export default new CronManager();