iT邦幫忙

2024 iThome 鐵人賽

DAY 22
0
Software Development

埋藏在後端工程下的地雷與寶藏系列 第 22

Day-22 | Message Queue - RabbitMQ (2) feat. Python

  • 分享至 

  • xImage
  •  

今天我們透過 python 來實作 RabbitMQ

安裝

Linux

sudo apt-get install rabbitmq-server
sudo systemctl status rabbitmq-server
● rabbitmq-server.service - RabbitMQ Messaging Server
     Loaded: loaded (/lib/systemd/system/rabbitmq-server.service; enabled; vendor preset: enabled)
     Active: active (running) since Thu 2024-10-03 18:32:27 CST; 18min ago
   Main PID: 1276808 (beam.smp)
      Tasks: 27 (limit: 9402)
     Memory: 91.5M
        CPU: 9.778s
     CGroup: /system.slice/rabbitmq-server.service
             ├─1276808 /usr/lib/erlang/erts-12.2.1/bin/beam.smp -W w -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -sbwt none -sbwtdcpu no>
             ├─1276819 erl_child_setup 65536
             ├─1276884 inet_gethost 4
             └─1276885 inet_gethost 4

通常安裝完就會看到服務已經啟動了,若沒有的話再執行下面指令

sudo systemctl enable rabbitmq-server
sudo systemctl start rabbitmq-server

接著先安裝 pika 套件

pip install pika

Direct 模式

send.py

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 建立一個Queue(如果已存在就不會新增)
channel.queue_declare(queue='hello')

# 發送訊息到指定的 Queue
channel.basic_publish(exchange='', routing_key='hello', body='Hello RabbitMQ!')
print(" [x] Sent 'Hello RabbitMQ!'")

connection.close()

receive.py

#!/usr/bin/env python
import pika, sys, os

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='hello')

    def callback(ch, method, properties, body):
        print(f" [x] Received {body}")

    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

接著我們打開一個新的 terminal 先執行 receive.py,持續的聽取這 Hello 的 queue

python3 receive.py
[*] Waiting for messages. To exit press CTRL+C

我們再打開另一個 terminal,執行 send.py

python3 send.py
[x] Sent 'Hello RabbitMQ!'

這時候 receive.py 的 terminal 就會 print 出收到的訊息

python3 receive.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received b'Hello RabbitMQ!'

簡單實作了之後我們繼續到更完整的 message model,也就是有 Exchange 的模式

Publish/Subscribe 模式

首先我們先執行

sudo rabbitmqctl list_exchanges

你會看到一些 amq.* 的 Exchange,這些都是一些 default 的,我們暫時不會用到。
在上面的案例中沒有提到任何 Exchange 也能傳送到 Queue 是因為我們用預設的 Exchange 也就是空字串。
在 publish/subscribe 中的 send 和 receive 都不像上面的例子般需要指定 queue,因為是透過 Exchange 來傳送給 binding 的 receiver。

emit_log.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()

exchange_type='fanout' 指的是將接收到的每一條訊息廣播到所有與這個 Exchang binding 的queue,而不需考慮任何的 routing key 。

receive_log.py

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print("queue_name", queue_name)

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(f" [x] {body}")

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

如果沒有宣告 Exchange 的話,這些訊息會不知道送去哪而遺失。
exclusive=True 代表 Queue 是獨佔的,這個連線 close 後,queue 也會跟著刪除。
result = channel.queue_declare(queue='', exclusive=True) 指的是會自動生成一個獨佔的 queue,只有現在的連接可以使用這個 queue,並在連接結束後自動刪除這個 queue。
channel.queue_bind(exchange='logs', queue=queue_name) 指的是將 logs exchange 和 queue 做綁定,所以只要透過 logs exchange 傳送訊息就會傳送到我們現在的 queue。

我們一樣先在另一個 terminal 執行 receive_log.py,可以看到 queue 這時就是自動生成的。

python receive_log.py
queue name amq.gen-GhRBExkF5LgYtJy0w6nB3w
[*] Waiting for logs. To exit press CTRL+C

接著執行 emit_log.py

python3 emit_log.py
[x] Sent info: Hello World!

再回到 receive_log.py 的 terminal,我們就可以看到接收到啦!

[x] b'info: Hello World!'

執行後可以透過 sudo rabbitmqctl list_bindings 看到 logs 和 自動生成的 queue 產生 binding。

 sudo rabbitmqctl list_bindings
 source_name     source_kind     destination_name        destination_kind        routing_key     arguments
        exchange        amq.gen-Qn9ZzK2bURLd8wbeZoVCYQ  queue   amq.gen-Qn9ZzK2bURLd8wbeZoVCYQ  []
        exchange        hello   queue   hello   []
logs    exchange        amq.gen-Qn9ZzK2bURLd8wbeZoVCYQ  queue   amq.gen-Qn9ZzK2bURLd8wbeZoVCYQ  []

以上就是使用 Python 實作的例子,其他模式的例子,如果大家有興趣也可以去 官網 練習

Reference


上一篇
Day-21 | Message Queue - RabbitMQ
下一篇
Day-23 |自動化部署的好朋友 Ansible
系列文
埋藏在後端工程下的地雷與寶藏30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言