iT邦幫忙

第 12 屆 iThome 鐵人賽

DAY 20
0

Day 20 GCP 公有雲_雲端事件消息傳遞服務實戰 - Pub/Sub 組建測試之路

本日重點與方向 (TAG): GCP、Google Cloud Platform、Pub/Sub、Publisher、Subscriber、Tranciver、Reciver、Topic
今天就延續先前開始的 Google Cloud Platform (GCP) 上搭建服務的作法,今天會向 GCP 上的 Pub/Sub 事件傳遞系統的一些設定與流程為主,Pub/Sub 執行流程主要是基於 Publish/Subscriber (廢話)概念,但是在實作上面還有一層 Message Pool,主要提供消息重發與保留尚未發送訊息的保留行為,確認要有人能夠消化(具有事件處理者的意思)這個訊息,因為專案開發也有機會用到,比如說基於事件觸發(Event Trigger)去做某件事情,將訊息 Pass 給對應的接收端去撈訊息內的特定位置的資料等功能,或是作為 GCP 上的各個服務的連結使用(如: GKE 做資料預處理後,將特定傳資料出來,提供給 GCE 上的 VM 環境去開專有服務等等的),當然消息傳遞也是有一些對應的功能可以配置,Pub/Sub 提供一些發送轉發與主調提取訊息的作法,也有 Topic 作為通道分離的政策,給予一些篩選資料的機制存在,總之今天就大概弄一下發送接收的作法,有 GCP Web UI、gcloud-cli、python3 的作法,基本上就都是照著參考網站的實作,基礎操縱今天隨手弄一弄就只要能接到進去就是可以開搞了,有問題就再來底下問看看吧。

Pub / Sub 組建

Pub/Sub 架構

https://cloud.google.com/pubsub/docs/overview?hl=zh-tw

  • Publish / Subscriber (1 to N, N to 1)

  • Publish / Subscriber 流程概念

GCP Web UI 組建

  • 按一下漢堡條,把 Pub/Sub 功能找出來

設定 Topic (消息通道識別名稱)

  • 進入 Pub/Sub 管理頁面有,找到主題(Topic),選建立主題

  • 設定一下主題名稱,名稱會變成一個主題 ID 提供訂閱使用 /project/專案名稱/topics/主題名稱

  • 確認一下主題生成的狀態

  • 點進去 Topic 名稱後可以看狀態

設定 Subscrition (接收資料的一個項目)

這邊的 Subscriotions 雖然是 Topic 端的接收的一個項目,但他也有一些篩選機制提供你設定,讓你只抓 Topic 上滿足特定內容的訊息,作爲 Topic 端的 Subscriotions,它其實同時也是真正的 subscriber (程式端)的訂閱端,你的 Subscriber 要收資料還是要對到它

  • 進入 Pub/Sub 管理頁面有,找到訂閱項目(Subscrition),選建立訂閱項目

  • 設定訂閱項目
    • 訂閱項目名稱: 會生成訂閱項目 ID projects/專案名稱/subscriotions
    • 選取 Cloud Pub/Sub 主題: 選擇訂閱的 Topic 主題(這邊就選剛剛的 Topic 做測試)
    • 傳送類型:
      • 提取: 程式去提取資料個概念
      • 推送: 設定一個 https 基底的 API 提供給 subscription 在接收資料時呼叫使用,也可適用於 GCP 內部服務開出的 API (如: Cloud Run, GKE, GCS, GCE)
    • 訂閱項目有效期: 訂閱項目的活躍保留天數 (多久沒傳遞訊息,之後會自動刪除的天數)
    • 確認期限: 訊息被 subscriber 捕抓或是提取後,需要多久的反應回饋時間,從提取到確認的時間差(確認服務處理的狀態),這邊可以作為事件被提取後,作完後續流程的確認,超過時間差就會返回到 Message Pool,提供給其他 Subscriber 提取實作。
    • 訂閱項目篩選器: 篩選你的訊息內容,之後才抓進來到你的 subscription 之中
      • attributes.<item-name> = <item-info>
      • hasPrefix(<item-name>, <item-info>)
    • 訊息保留時間: 訊息被發送後,到 subscription 且未被 subscriber 提取或是使用,保留在 Message Pool 的時間
    • 無效信件: 他會轉發你的訊息到其他的 topic 之上的設定

Topic 發送訊息測試

要用 GCP Web UI 發訊息測試的話,要先設定好你的 subscription,不然他抓不到你具有 subscription 端會認為發出訊息無效,就不會幫你送出了。

  • 找到你的 Topic,選擇發布訊息

  • 設定你的訊息內容、訊息的屬性(key:value),之後按送出即可。

這邊的屬性可以對應的 subscription 的項目篩選功能。

  • 檢閱一下你的 Topic 送出訊息的狀態

Subscription 接收訊息檢視

這邊需要確認一下你的 Topic 有把資料打出來,我們在 subscription 接收端也符合規則(可以先關掉項目篩選),之後就看一下訊息進來到 subscription 的狀態與相關訊息。

  • 先去一下 訂閱項目(Subscription) 的頁面

  • 找一下檢視訊息按鈕,按下去之後就能找到你的訊息了。

  • 這邊要先按一下提取,這邊才會去找尋你的訊息做顯示。

使用 gcloud-cli 進行 Pub/Sub 功能配置

建立 Topic

gcloud pubsub topics create <topic=id> --project <project-id>

建立 Subscription

gcloud pubsub subscriptions create --topic <topic-id> <subscription-id> --project <project-id>

發送訊息

gcloud pubsub topics publish <topic-id> --message <message-info> --project <project-id>

接收訊息

gcloud pubsub subscriptions pull --auto-ack <subscription-id> --project <project-id>

Subscription 訊息推送實作

建置一個接收端 Server

  • 安裝一些必要的套件
    • python3 flask 後端 Server
    • ngrok 區網 http 轉 外網 https 的代理服務
apt-get install python3 python3-pip ngrok
pip3 install flask
  • Server 端程式碼
pub_sub-push-recive.py
---
from flask import Flask, request

app = Flask(__name__)


@app.route('/', methods=['POST'])
def hello():
    print(request.json)
    return "Hello World!"

if __name__ == '__main__':
    app.run()
  • 執行 Server
python3 pub_sub-push-recive.py
  • 跑 ngrok 代理服務
ngrok http 5000

設定 subscription 配置

配置都相同,就是改一下變成 推送消息模式 (Push Mode),把接收端設定成你剛剛 ngrok 搞出來的 URL。

  • 照著改一下

對 Topic 打資料做一下測試

看一下接收端的狀態

Python3 程式端接收實作

https://cloud.google.com/pubsub/docs/pull

  • 安裝套件
apt-get install python3 python3-pip
pip3 install --upgrade google-cloud-pubsub
  • 配置一下你的 Service Account

這邊需要先搞到你的 GCP 的 Service Account 的帳戶訊息 json 檔案

export GOOGLE_APPLICATION_CREDENTIALS="sa-json-file-path"
  • 發送端
from google.cloud import pubsub_v1

project_id = "project-id"
topic_id="topic-id"

publisher = pubsub_v1.PublisherClient()
# Conbine the topic_path = 'projects/{project_id}/topic/{topic_id}'
topic_path = publisher.topic_path(project_id, topic_id)
print(topic_path)

for n in range(1, 3):
    data = u"Message number {}".format(n)
    data = data.encode("utf-8")
    # username="gcp" & multiAttribute="test" is the message attributes.
    future = publisher.publish(
        topic_path, data, origin="python-sample", username="gcp", multiAttribute="test"
    )
    print(future.result())
  • 接收端
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

project_id = "project-id"
subscription_id = "subscription-id"
# Number of seconds the subscriber should listen for messages
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# Conbine the subscription_path = 'projects/{project_id}/subscriptions/{subscription_id}'
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    print("Received message: {}".format(message))
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print("Listening for messages on {}..\n".format(subscription_path))

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()

上一篇
Day 19 GCP 公有雲_雲端容器化服務實戰 - Cloud Run 組建之路
下一篇
Day 21 基於事件消息傳遞服務拓展實戰 - KEDA 組建測試之路 RabbitMQ 篇
系列文
基於付費公有雲與開源機房自建私有雲之雲端應用服務測試兼叢集與機房託管服務實戰之勇者崎嶇波折且劍還掉在路上的試煉之路30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言