本日重點與方向 (TAG): GCP、Google Cloud Platform、Pub/Sub、Publisher、Subscriber、Tranciver、Reciver、Topic
今天就延續先前開始的 Google Cloud Platform (GCP) 上搭建服務的作法,今天會向 GCP 上的 Pub/Sub 事件傳遞系統的一些設定與流程為主,Pub/Sub 執行流程主要是基於 Publish/Subscriber (廢話)概念,但是在實作上面還有一層 Message Pool,主要提供消息重發與保留尚未發送訊息的保留行為,確認要有人能夠消化(具有事件處理者的意思)這個訊息,因為專案開發也有機會用到,比如說基於事件觸發(Event Trigger)去做某件事情,將訊息 Pass 給對應的接收端去撈訊息內的特定位置的資料等功能,或是作為 GCP 上的各個服務的連結使用(如: GKE 做資料預處理後,將特定傳資料出來,提供給 GCE 上的 VM 環境去開專有服務等等的),當然消息傳遞也是有一些對應的功能可以配置,Pub/Sub 提供一些發送轉發與主調提取訊息的作法,也有 Topic 作為通道分離的政策,給予一些篩選資料的機制存在,總之今天就大概弄一下發送接收的作法,有 GCP Web UI、gcloud-cli、python3 的作法,基本上就都是照著參考網站的實作,基礎操縱今天隨手弄一弄就只要能接到進去就是可以開搞了,有問題就再來底下問看看吧。
主題
(Topic),選建立主題
/project/專案名稱/topics/主題名稱
這邊的 Subscriotions 雖然是 Topic 端的接收的一個項目,但他也有一些篩選機制提供你設定,讓你只抓 Topic 上滿足特定內容的訊息,作爲 Topic 端的 Subscriotions,它其實同時也是真正的 subscriber (程式端)的訂閱端,你的 Subscriber 要收資料還是要對到它
訂閱項目
(Subscrition),選建立訂閱項目
projects/專案名稱/subscriotions
attributes.<item-name> = <item-info>
hasPrefix(<item-name>, <item-info>)
要用 GCP Web UI 發訊息測試的話,要先設定好你的 subscription,不然他抓不到你具有 subscription 端會認為發出訊息無效,就不會幫你送出了。
發布訊息
這邊的屬性可以對應的 subscription 的項目篩選功能。
這邊需要確認一下你的 Topic 有把資料打出來,我們在 subscription 接收端也符合規則(可以先關掉項目篩選),之後就看一下訊息進來到 subscription 的狀態與相關訊息。
檢視訊息
按鈕,按下去之後就能找到你的訊息了。提取
,這邊才會去找尋你的訊息做顯示。gcloud pubsub topics create <topic=id> --project <project-id>
gcloud pubsub subscriptions create --topic <topic-id> <subscription-id> --project <project-id>
gcloud pubsub topics publish <topic-id> --message <message-info> --project <project-id>
gcloud pubsub subscriptions pull --auto-ack <subscription-id> --project <project-id>
apt-get install python3 python3-pip ngrok
pip3 install flask
pub_sub-push-recive.py
---
from flask import Flask, request
app = Flask(__name__)
@app.route('/', methods=['POST'])
def hello():
print(request.json)
return "Hello World!"
if __name__ == '__main__':
app.run()
python3 pub_sub-push-recive.py
ngrok http 5000
配置都相同,就是改一下變成 推送消息模式 (Push Mode),把接收端設定成你剛剛 ngrok 搞出來的 URL。
apt-get install python3 python3-pip
pip3 install --upgrade google-cloud-pubsub
這邊需要先搞到你的 GCP 的 Service Account 的帳戶訊息 json 檔案
export GOOGLE_APPLICATION_CREDENTIALS="sa-json-file-path"
from google.cloud import pubsub_v1
project_id = "project-id"
topic_id="topic-id"
publisher = pubsub_v1.PublisherClient()
# Conbine the topic_path = 'projects/{project_id}/topic/{topic_id}'
topic_path = publisher.topic_path(project_id, topic_id)
print(topic_path)
for n in range(1, 3):
data = u"Message number {}".format(n)
data = data.encode("utf-8")
# username="gcp" & multiAttribute="test" is the message attributes.
future = publisher.publish(
topic_path, data, origin="python-sample", username="gcp", multiAttribute="test"
)
print(future.result())
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
project_id = "project-id"
subscription_id = "subscription-id"
# Number of seconds the subscriber should listen for messages
timeout = 5.0
subscriber = pubsub_v1.SubscriberClient()
# Conbine the subscription_path = 'projects/{project_id}/subscriptions/{subscription_id}'
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message):
print("Received message: {}".format(message))
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print("Listening for messages on {}..\n".format(subscription_path))
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel()