這是 message queue 系列文第三篇,以下附上前兩篇連結
讓任務排隊吧:Message Queue — (1)
讓任務排隊吧:Message Queue — (2)
第一篇文章中介紹了 message queue 基本概念,第二篇則介紹 RabbitMQ 基本元件與屬性,這篇則是要運用 RabbitMQ 來做簡單的 send email 應用,利用簡單的 use case 了解非同步任務的處理方式。
RabbitMQ 的官網範例已提供非常多 publisher 與 consumer 的溝通方式,包含錯誤處理等也都有涵蓋到,但是當初我在學習官網範例時,卻不知道在實際開發中如何融入 RabbitMQ。因此今天想示範一個搭配 Restful API 的簡單範例(實際狀況當然不會這麼簡單,這個範例的目的是讓跟我ㄧ樣的初學者可以了解 Message Queue 實際處理非同步事件的狀況,如有大神路過請見諒)
回到正題,還記得第一篇中提到非同步任務我舉了什麼例子嗎?
傳送 Email,你在送出 email 之後不會特別去確認信件真的傳遞到對方信箱,而是會繼續去做其他事情。
今天就要來使用 RabbitMQ 搭配 Node.js Express 嘗試做到傳送 email 這件非同步任務。
要使用 RabbitMQ 前我們必須先安裝它,安裝的方式可以透過安裝檔案、Docker、K8S…等,詳細各種方法可以參考官方文檔,自己則推薦新手直接下載檔案或是透過 Docker 較為簡單。
首先建立一個資料夾後跑 npm init 起始一個 npm 專案,接著安裝需要用到的套件
$yarn add amqplib body-parser express nodemailer
amqplib 為讓 node.js 可以使用 RabbitMQ 的套件,nodemailer 則是處理寄送 email 的第三方套件
檔案架構如下
app.js
worker.js
- models
   messageQueue.js
   nodemailer.js
app.js 為建構 express.js web server 的檔案,也在該檔中定義了 API endpoint。
***worker.js 為實際上利用 nodemailer 套件處理 email 發送的 worker,也就是 message queue 架構中的 consumer,這邊為了方便直接在同個 repo 建立一個檔案當作 worker ,實際應用中如 microservice, consumer 則會是一個獨立的 mail service。
models folder 則是將會用到的 RabbitMQ client 與 nodemailer client 用 class 的形式封裝,增加程式碼的可維護性與擴展性。
首先來看 models 資料夾中的兩個 class 吧
models/messageQueue.js:
const amqp = require('amqplib');
class MessageQueueService {
	constructor(CONN_URL) {
		this.connection_url = CONN_URL;
	}
	async connect() {
		const connection = await amqp.connect(this.connection_url);
		this.channel = await connection.createChannel();
	}
	async publishToQueue(queueName, data) {
		await this.channel.assertQueue(queueName);
		this.channel.sendToQueue(queueName, Buffer.from(data));
	}
	closeChannel() {
		this.channel.close();
		console.log(`Closing rabbitmq channel`);
	}
}
module.exports = MessageQueueService;
( 這邊假設讀者是讀過官網範例或相關資料,是了解 RabbitMQ 基本操作的。)
models/nodemailer.js:
const nodemailer = require('nodemailer');
async function main() {
	let testAccount = await nodemailer.createTestAccount();
	let transporter = nodemailer.createTransport({
		host: "smtp.ethereal.email",
		port: 587,
		secure: false, 
		auth: {
		  user: testAccount.user, 
		  pass: testAccount.pass
		}
	  });
	return transporter;
}
module.exports = main;
為了避免設定帳號與處理 SMTP 協定等複雜過程,這邊使用 nodemailer 的 test account 進行測試(email 不會真的寄出)。
接下來先一次看完 app.js 與 worker.js 吧
app.js:
const express = require('express');
const bodyParser = require('body-parser');
const MessageQueueService = require('./models/messageQueue')
const app = express();
app.use(bodyParser.urlencoded({ extended: false }))
app.use(bodyParser.json())
app.get('/', (req, res) => {
	res.send('test route');
})
app.post('/email', async (req, res) => {
	let { queueName, email } = req.body;
	const messageQueue = new MessageQueueService('amqp://localhost:5672');
	await messageQueue.connect();
	messageQueue.publishToQueue(queueName, email);
	res.status(200).send({
		"message": "email sent successfully."
	})
})
app.listen(5000, () => {
	console.log('server listening on port 5000...');
})
worker.js:
const amqp = require('amqplib');
const mailservice = require('./models/nodemailer');
async function connect() {
	try {
		const connection = await amqp.connect('amqp://localhost:5672');
		const channel = await connection.createChannel();
		await channel.assertQueue("email-service");
		channel.prefetch(1);
		channel.consume("email-service", async (message) => {
			const transporter = await mailservice();
			const targetEmail = message.content.toString();
			let mailOptions = {
				from: 'kylemo860617@gmail.com',
				to: targetEmail,
				subject: 'Email sample from Kyle Mo',
				text: 'Hello world!!!!!'
			  };
			transporter.sendMail(mailOptions, function(error, info){
			if (error) {
				console.log(error);
			} else {
				console.log('Email sent: ' + info.response);
			}
			});
			console.log('Recieved job message: ', message.content.toString())
			// 確認接收並 dequeue
			channel.ack(message);
		})
		console.log("I am waiting for jobs to do....")
	} catch (err) {
		console.log(err);
	}
}
connect();
worker.js 中的邏輯似乎有點難以理解,它其實是去 RabbitMQ queue 中取出從 API 傳送來的訊息做進一步的處理而已。如果你對 RabbitMQ 的程式操作還不熟悉,程式碼沒辦法完全看懂也沒關係,以上範例的最大特點可以濃縮為以下的話:
有了 message queue 架構後,client 呼叫的 API endpoint不再需要處理複雜的邏輯與任務(以我們的例子來說,就是寄送 email 這件事),API endpoint 做的事只剩下簡單的接受任務,將使用者指定的訊息傳到 queue 中,並回傳給使用者傳送成功的 message,剩下實際寄送 email 的任務則交由 consumer 在 queue 中取出訊息後進行,如此一來使用者呼叫 API endpoint 後就不需要等待 mail 真的被寄送後再收到 response 了,也許只單純寄送一封信並沒有問題,但是如果需求是批次寄送千封甚至萬封的 email,就非常有可能造成 request 被 block 住的情況,message queue 則可以解決上述的問題,很有趣吧!
以上透過一個簡單的小範例示範 message queue 在現實中可能的應用,不過要再三強調的是這個範例在現實中不太可能會這樣寫,現實中還需要注意很多錯誤處理、效能,甚至要注意 queue 會不會爆掉等問題,這個範例純粹是希望跟我ㄧ樣的初學者可以了解 message queue 可能的應用方向,最後也希望這三篇關於 message queue 的系列文可以幫助到跟我一樣初學這個概念的人可以更快理解它的原理。
https://github.com/kylemocode/2020-it-iron-man-challenge
(其中的 message-queue folder)
想盡辦法當好一個Junior Backend Developer
用舒服的姿勢開發 Python Project