今天將分成Queue及Topic兩個部分實作
參考https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus
接下來要講一下實戰性的部分了,在queue及topic都是處理訊息傳送的部分,所以會有程式將訊息傳送到service bus,也就會有程式從service bus中讀取出來,因此下面實作的部分,將個別說明Queue及Topic兩者在傳送與接收訊息之間的差異。本篇實作程式碼均參考官方說明文件再加以調整內容。實作過程使用JupyterNotebook,有興趣想動手試試的可以參考我的Jupyter Notebook,另外也有提供requirements.txt在自行使用JupyterNotebook時直接安裝相關套件測試。
執行前先安裝azure-servicebus套件,套件已經幫我們把最困難的部分解決了,我們只要好好的運用套件帶來的成果即可!
pip install azure-servicebus --pre
在使用程式碼前,須先取得服務的相關連線及驗證資訊。機密資訊的部分,我將他們都存在.env
檔案中,提供JupyterNotebook存取,以下程式碼可直接讀取,同時避免資訊外洩遭有心人士利用。
ConnectionString
在進入到Service Bus頁面後,按一下Shared access policies
,選擇其中一個policy,右側變顯示2個key跟2個ConnectionString,將其中一個複製起來。
Queue
QueueName
按一下Queues,此時會看到這個serviceBus中所有的queues列表(下圖只有1個),選擇一個要傳送的名稱並記起來,等一下執行傳送訊息後記得不要進到錯的queue中,這樣會看不到訊息的。
Topic
TopicName
按一下Topics,此時會看到這個serviceBus中所有的topics列表(下圖只有1個),選擇一個要傳送的名稱並記起來,接著按一下topic進入其畫面。
SubscriptionName
進入後按一下Subscriptions。如果您按照上一篇的文章動作的話,應該會跟我一樣,沒有subscription資料。點一下Subscription來新增一個:
設定名稱及最大傳送數量後,按下Create
建立新的Subscription。也可以再新增幾個來測試,當傳送訊息到Topic時,Topic底下的Subscription皆會收到同樣的訊息。同樣的要記下每一個subscription的名稱。
在前面已經知道ConnectionString及QueueName。以下程式碼為測試是否能傳送到指定的queue中。
from azure.servicebus import ServiceBusClient, Message
import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_sender(queue_name) as sender:
# Sending a single message
single_message = Message("Single message")
sender.send_messages(single_message)
# Sending a list of messages
messages = [Message("First message"), Message("Second message")]
sender.send_messages(messages)
送出後可以看到對應的queue裡面有3則訊息,Single message
、First message
及Second message
。
接著使用接收程式碼取得訊息:
from azure.servicebus import ServiceBusClient
import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
with ServiceBusClient.from_connection_string(connstr) as client:
# max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
# Default is None; to receive forever.
with client.get_queue_receiver(queue_name, max_wait_time=30) as receiver:
for msg in receiver: # ServiceBusReceiver instance is a generator. This is equivilent to get_streaming_message_iter().
print(str(msg))
# If it is desired to halt receiving early, one can break out of the loop here safely.
將收到剛剛送出的訊息:
如果這時候再回去看Azure上的紀錄,會發現3個訊息還是存在。是因為程式碼中少加了.complete()
的動作,要告訴service bus這則訊息已經處理完成,他才不會一直存在記憶體中。
Topic這邊我也測試一個subscription來接收看看,下面我讓他傳送6筆訊息過去:
from azure.servicebus import ServiceBusClient, Message
import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
topic_name = os.environ['SERVICE_BUS_TOPIC_NAME']
subscription_name = os.environ['SERVICE_BUS_SUBSCRIPTION_NAME']
with ServiceBusClient.from_connection_string(connstr) as client:
i = 0
while i <= 5:
with client.get_topic_sender(topic_name) as sender:
msg=f"Data{i}"
sender.send_messages(Message(msg))
print(msg)
i += 1
輸出的訊息長這樣:
來到Azure上面,發現有33筆訊息!!!(因為我測試好幾次)
先看一下程式碼
from azure.servicebus import ServiceBusClient, Message
import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
topic_name = os.environ['SERVICE_BUS_TOPIC_NAME']
subscription_name = os.environ['SERVICE_BUS_SUBSCRIPTION_NAME']
with ServiceBusClient.from_connection_string(connstr) as client:
# If session_id is null here, will receive from the first available session.
with client.get_subscription_receiver(topic_name, subscription_name) as receiver:
for msg in receiver:
print(str(msg))
看起來很正常,輸出的訊息長這樣:
![image-20200921234057558](/Users/tatamo/Library/Application Support/typora-user-images/image-20200921234057558.png)
這邊訊息我有先輸出了,但他還是存在於服務中。因為還少了一個步驟!!
from azure.servicebus import ServiceBusClient, Message
import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
topic_name = os.environ['SERVICE_BUS_TOPIC_NAME']
subscription_name = os.environ['SERVICE_BUS_SUBSCRIPTION_NAME']
with ServiceBusClient.from_connection_string(connstr) as client:
# If session_id is null here, will receive from the first available session.
with client.get_subscription_receiver(topic_name, subscription_name) as receiver:
for msg in receiver:
print(str(msg))
## 少了這個!!!
msg.complete()
訊息在處理完後記得要執行.complete()
,否則會一直存在在service bus中。
全部執行complete後就歸0了。
今天主要介紹使用Queue及Topic傳送、接收訊息的實作,其實做法都差不多,只差在queue只有一個接收者,topic可以設定許多接收者。
在接收到訊息並處理完後,必須執行complete()動作,讓service bus知道這個訊息已完成,才會釋出記憶體。除了.complete()
以外,還有.abandon(), .dead_letter()及.defer()等用法,可以參考相對的連結。
★ Amos3.0 團隊系列文 ★
以下為團隊所有成員的主題,也歡迎大家前往欣賞喔!