Orleans的Grain除了昨天講述的Observer Pattern樣式的事件通知功能之外,事件流(event stream,以下簡稱Stream),是個能提供『穩定』的Grain與Grain、Grain與外界物件之間非同步、響應式執行動作或傳送訊息的機制。
Stream的機制類似訊息佇列(Message Queue),在Grain實作開發時,開發的思維模型可想像為底層有個事件流的機制,讓開發者能夠在Grain之間、Grain與外界物件之間,透過事件流的機制,來進行非同步、響應式的執行動作或傳送訊息:
如上圖,Sender和Receiver之間藉由訊息佇列(Message Queue)來進行溝通,Sender將訊息送入Message Queue,Receiver則從Message Queue中取出訊息來進行處理。
在Orleans的Grain實作程式碼內,可在發送和接受Stream API中的 streamNamespace
的字串參數值,來邏輯區分為不同的事件流(event stream)來分類訊息,且一個Grain可發送/訂閱多個事件流,以便實現 Grain actor 之間的多對多溝通。
在運營(Ops)的Silo配置方面,需要配置『Stream Provider』,用來串接提供事件流儲存與發送機制的實際訊息佇列系統,Orleans官方提供的Stream Provider選擇方案有:
其他第三方的開源Stream Provider還有Azure CosmosDB change feed、Kafka、RabbitMQ、Redis等等。
除了由Grain主動發送事件流的訊息之外,有些如Azure Event Hub的Stream Provider,可以撰寫自製DataAdapter,然後在Silo配置時用 .UseDataAdapter()
來註冊該DataAdapter,將外部的事件流透過DataAdapter轉換成Orleans的事件流,使Grain能接收外部事件流資料,進行處理。
.GetStreamProvider()
來取得Stream Provider的參考,然後再呼叫Stream Provider提供的 GetStream()
來取得Stream的物件實體參考,發送者Grain就可以開始發送事件流訊息了。.OnNextAsync()
來發送一筆筆訊息出去到事件流,等到全部發送完畢,不再需要Stream時,呼叫 .OnCompletedAsync()
來宣告結束全部的發送階段,或是呼叫 .OnErrorAsync()
當發送端有錯誤發生需強制結束時。OnCompleteAsync()
& OnErrorAsync()
這兩個API,只需將該stream物件參考設為null讓GC記憶體回收即可)SubscribeAsync()
擴充方法來訂閱事件流,該API需要傳入一個實作 IAsyncObserver<T>
介面的物件,或是有實作該介面的 OnNextAsync()
型態的方法/Action/Lambda表示式,在裡面撰寫訂閱者Grain接收訊息的處理邏輯。SubscribeAsync()
之後取得的 StreamSubscriptionHandle<T>
型態的訂閱處理(handle)物件,不是完全沒有用途的:當Grain本身因為Silo故障等問題而被重新啟動時,需呼叫該訂閱處理物件之 ResumeAsync()
來重新訂閱事件流,並且在訂閱者Grain不再需要事件流訊息時,呼叫 .UnsubscribeAsync()
來取消訂閱。ImplicitStreamSubscriptionAttribute
屬性,並實作 IStreamSubscriptionObserver
介面的 OnSubscribed()
方法。UnsubscribeAsync()
這個恢復訂閱的API,Orleans Runtime會自動處理好這些問題。至於Orleans Stream 在Silo的運營配置方面要怎麼做,明天以實作Orleans Steam的範例專案的方式來展示使用Azure Queue Storage作為Stream Provider。