今天我們透過 python 來實作 RabbitMQ
Linux
sudo apt-get install rabbitmq-server
sudo systemctl status rabbitmq-server
● rabbitmq-server.service - RabbitMQ Messaging Server
Loaded: loaded (/lib/systemd/system/rabbitmq-server.service; enabled; vendor preset: enabled)
Active: active (running) since Thu 2024-10-03 18:32:27 CST; 18min ago
Main PID: 1276808 (beam.smp)
Tasks: 27 (limit: 9402)
Memory: 91.5M
CPU: 9.778s
CGroup: /system.slice/rabbitmq-server.service
├─1276808 /usr/lib/erlang/erts-12.2.1/bin/beam.smp -W w -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -sbwt none -sbwtdcpu no>
├─1276819 erl_child_setup 65536
├─1276884 inet_gethost 4
└─1276885 inet_gethost 4
通常安裝完就會看到服務已經啟動了,若沒有的話再執行下面指令
sudo systemctl enable rabbitmq-server
sudo systemctl start rabbitmq-server
接著先安裝 pika 套件
pip install pika
Direct 模式
send.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 建立一個Queue(如果已存在就不會新增)
channel.queue_declare(queue='hello')
# 發送訊息到指定的 Queue
channel.basic_publish(exchange='', routing_key='hello', body='Hello RabbitMQ!')
print(" [x] Sent 'Hello RabbitMQ!'")
connection.close()
receive.py
#!/usr/bin/env python
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
接著我們打開一個新的 terminal 先執行 receive.py,持續的聽取這 Hello 的 queue
python3 receive.py
[*] Waiting for messages. To exit press CTRL+C
我們再打開另一個 terminal,執行 send.py
python3 send.py
[x] Sent 'Hello RabbitMQ!'
這時候 receive.py 的 terminal 就會 print 出收到的訊息
python3 receive.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received b'Hello RabbitMQ!'
簡單實作了之後我們繼續到更完整的 message model,也就是有 Exchange 的模式
首先我們先執行
sudo rabbitmqctl list_exchanges
你會看到一些 amq.*
的 Exchange,這些都是一些 default 的,我們暫時不會用到。
在上面的案例中沒有提到任何 Exchange 也能傳送到 Queue 是因為我們用預設的 Exchange 也就是空字串。
在 publish/subscribe 中的 send 和 receive 都不像上面的例子般需要指定 queue,因為是透過 Exchange 來傳送給 binding 的 receiver。
emit_log.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()
exchange_type='fanout'
指的是將接收到的每一條訊息廣播到所有與這個 Exchang binding 的queue,而不需考慮任何的 routing key 。
receive_log.py
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print("queue_name", queue_name)
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {body}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
如果沒有宣告 Exchange 的話,這些訊息會不知道送去哪而遺失。exclusive=True
代表 Queue 是獨佔的,這個連線 close 後,queue 也會跟著刪除。result = channel.queue_declare(queue='', exclusive=True)
指的是會自動生成一個獨佔的 queue,只有現在的連接可以使用這個 queue,並在連接結束後自動刪除這個 queue。channel.queue_bind(exchange='logs', queue=queue_name)
指的是將 logs exchange 和 queue 做綁定,所以只要透過 logs exchange 傳送訊息就會傳送到我們現在的 queue。
我們一樣先在另一個 terminal 執行 receive_log.py
,可以看到 queue 這時就是自動生成的。
python receive_log.py
queue name amq.gen-GhRBExkF5LgYtJy0w6nB3w
[*] Waiting for logs. To exit press CTRL+C
接著執行 emit_log.py
python3 emit_log.py
[x] Sent info: Hello World!
再回到 receive_log.py
的 terminal,我們就可以看到接收到啦!
[x] b'info: Hello World!'
執行後可以透過 sudo rabbitmqctl list_bindings
看到 logs 和 自動生成的 queue 產生 binding。
sudo rabbitmqctl list_bindings
source_name source_kind destination_name destination_kind routing_key arguments
exchange amq.gen-Qn9ZzK2bURLd8wbeZoVCYQ queue amq.gen-Qn9ZzK2bURLd8wbeZoVCYQ []
exchange hello queue hello []
logs exchange amq.gen-Qn9ZzK2bURLd8wbeZoVCYQ queue amq.gen-Qn9ZzK2bURLd8wbeZoVCYQ []
以上就是使用 Python 實作的例子,其他模式的例子,如果大家有興趣也可以去 官網 練習