隱式訂閱的寫法相較於顯式訂閱的寫法,就變得很簡單,只需在Grain Identity是GUID的Grain類別宣告上加掛 ImplicitStreamSubscriptionAttribute 屬性 ,不需要自行呼叫 SubscribeAsync()
,並且在萬一Silo故障Grain在別的機器上重啟時,也不需要在 OnActivateAsync()
的Grain生命週期反應函式裡呼叫 ResumeAsync
恢復訂閱的繁瑣事項。
[ImplicitStreamSubscription("demo-streaming-namespace")]
public class ConsumerGrain : Grain , ...
{
// Grain implementation...
}
不過像上面這種寫法有一個缺點,就是只能訂閱一個namespace是 demo-streaming-namespace
的事件流,如果要動態訂閱或訂閱多個事件流,屬性可用另一種接受實作 IStreamNamespacePredicate 介面的任意自訂類別型態之參數,該介面有一個 bool IsMatch (string streamNamespace)
需要開發者自行實作的方法,就可以在此方法內動態判斷。
而在實際使用時,通常Grain實作類別還會多實作一個 IStreamSubscriptionObserver
介面,這樣可以在事件流訂閱時,Orleans Runtime自動會呼叫該介面的 OnSubscribed ()
方法,如此可以在此方法的實作內,取得訂閱事件的handle以便呼叫 ResumeAsync()
來恢復此Grain的事件流訂閱,並且指派事件收到時真正執行消化事件資料進行處理的實作 IAsyncObserver<T>
介面的物件。
[ImplicitStreamSubscription("demo-streaming-namespace")]
public class ConsumerGrain : Grain , IStreamSubscriptionObserver
{
private readonly LoggerObserver _observer;
// other Grain methods...
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
var handle = handleFactory.Create<int>();
await handle.ResumeAsync(_observer);
}
}
internal class LoggerObserver : IAsyncObserver<string>
{
// class implementation...
}
using Orleans;
namespace RpcDemo.Interfaces.EventStreams;
public interface IConsumerGrain : IGrainWithGuidKey, IStreamSubscriptionObserver
{
}
此介面由於之後要套用的Grain實作專案沒有其他的RPC方法,所以內部宣告是空的。namespace RpcDemo.Interfaces.EventStreams;
[Serializable]
public record struct StreamDto(int Serial, string Message, DateTimeOffset Timestamp);
public static class StreamConstant
{
public const string DefaultStreamProviderName = "MyDefaultStreamProvider";
public const string ImplicitSubscribeStreamNamespace = "event-streaming-02";
}
using Microsoft.Extensions.Logging;
using Orleans;
using Orleans.Streams;
using Orleans.Streams.Core;
using RpcDemo.Interfaces.EventStreams;
namespace RpcDemo.Grains.EventStreams;
[ImplicitStreamSubscription(StreamConstant.ImplicitSubscribeStreamNamespace)]
public class ConsumerGrain : Grain, IConsumerGrain
{
private readonly LoggerObserver _observer;
public ConsumerGrain(ILogger<ConsumerGrain> logger)
{
_observer = new LoggerObserver(this.GetPrimaryKey().ToString(), logger);
}
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
var handle = handleFactory.Create<StreamDto>();
await handle.ResumeAsync(_observer);
}
}
internal class LoggerObserver : IAsyncObserver<StreamDto>
{
private readonly ILogger _logger;
private readonly string _grainPrimaryKey;
// ReSharper disable once ContextualLoggerProblem
public LoggerObserver(string grainPrimaryKey, ILogger logger)
{
_grainPrimaryKey = grainPrimaryKey;
_logger = logger;
}
public Task OnNextAsync(StreamDto item, StreamSequenceToken? token = null)
{
_logger.LogInformation("Grain {0} receive : {1}", _grainPrimaryKey, item);
return Task.CompletedTask;
}
public Task OnCompletedAsync()
{
_logger.LogInformation("call OnCompletedAsync()");
return Task.CompletedTask;
}
public Task OnErrorAsync(Exception ex)
{
_logger.LogError(ex, "call OnErrorAsync()");
return Task.CompletedTask;
}
}
此Grain實作類別,宣告了一個 LoggerObserver
類別的成員,在 OnSubscribed()
方法內,呼叫 ResumeAsync()
恢復既有訂閱(抑或起始新訂閱)時,指派 LoggerObserver
類別的物件作為事件收到時的處理者。ConsumerGrain實作完成,來在測試專案裡加上對應的測試:
在 src/tests 的 EventStreamGrains.Tests 測試專案中,新增一個 ConsumerGrainTest.cs 程式碼檔案,內容如下:
using Moq;
using Orleans;
using Orleans.Hosting;
using Orleans.Providers;
using Orleans.TestingHost;
using Orleans.Timers;
using RpcDemo.Grains.EventStreams;
using RpcDemo.Interfaces.EventStreams;
namespace EventStreamGrains.Tests;
public class ConsumerGrainTest
{
private static Mock<ILogger<ConsumerGrain>>? _loggerMock;
#region Test Silo Setup
private class TestSiloAndClientConfigurator : ISiloConfigurator, IClientBuilderConfigurator
{
public static Func<object, Task>? TimerTick { get; private set; }
public void Configure(ISiloBuilder siloBuilder)
{
_loggerMock = new Mock<ILogger<ConsumerGrain>>();
var loggerFactorMock = new Mock<ILoggerFactory>();
loggerFactorMock.Setup(x => x.CreateLogger(It.IsAny<string>())).Returns(_loggerMock.Object);
var mockTimerRegistry = new Mock<ITimerRegistry>();
mockTimerRegistry.Setup(x =>
x.RegisterTimer(It.IsAny<Grain>(),
It.IsAny<Func<object, Task>>(), It.IsAny<object>(), It.IsAny<TimeSpan>(), It.IsAny<TimeSpan>()))
.Returns(new Mock<IDisposable>().Object)
.Callback(
(Grain targetGrain, Func<object, Task>? timerTick, object _, TimeSpan _, TimeSpan _) =>
{
// Hook producer's every second message producing timer,
// so we can invoke it later in Test method.
if (targetGrain is ProducerGrain && timerTick != null)
{
TimerTick = timerTick;
}
});
siloBuilder
.AddMemoryGrainStorage("PubSubStore")
.AddMemoryStreams<DefaultMemoryMessageBodySerializer>(StreamConstant.DefaultStreamProviderName)
.ConfigureServices(services =>
{
services.AddSingleton(loggerFactorMock.Object);
services.AddSingleton(mockTimerRegistry.Object);
});
}
public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
{
clientBuilder.AddMemoryStreams<DefaultMemoryMessageBodySerializer>(StreamConstant.DefaultStreamProviderName);
}
}
#endregion
[Fact]
public async Task Test_ConsumerGrain_Receive()
{
// Arrange
var builder = new TestClusterBuilder();
builder.AddSiloBuilderConfigurator<TestSiloAndClientConfigurator>();
var testCluster = builder.Build();
await testCluster.DeployAsync();
var key = Guid.NewGuid();
var producer = testCluster.GrainFactory.GetGrain<IProducerGrain>("sender1");
var consumer = testCluster.GrainFactory.GetGrain<IConsumerGrain>("receiver1");
// Act
await producer.StartProducing(StreamConstant.ImplicitSubscribeStreamNamespace, key);
//Manual Invoke Timer to force produce message to consumer
var timerTick = TestSiloAndClientConfigurator.TimerTick;
Assert.NotNull(timerTick);
await timerTick.Invoke(new object());
await timerTick.Invoke(new object());
await producer.StopProducing();
//Give some time for stream to deliver message
await Task.Delay(TimeSpan.FromSeconds(0.3));
await testCluster.StopAllSilosAsync();
// Assert
Assert.NotNull(_loggerMock);
_loggerMock.VerifyLog(logger =>
logger.LogInformation("Grain {0} receive: {1}",
It.IsAny<string>(), It.IsAny<StreamDto>()), Times.Exactly(2));
}
}
這個和昨天測試顯式訂閱Grain的測試程式碼幾乎一樣,除了Silo配置時可以省略掉先前顯式訂閱時需儲存的訂閱狀態資料之外,在測試程式碼Act的階段也省掉了呼叫接收端Grain訂閱事件流RPC方法的動作。
Program.cs
檔案內配置SiloBuilder ApplicationParts的配置程式碼新增 ConsumerGrain
的註冊:
siloBuilder.ConfigureApplicationParts(parts =>
{
parts.AddApplicationPart(typeof(ManualConsumerGrain).Assembly).WithReferences();
parts.AddApplicationPart(typeof(ProducerGrain).Assembly).WithReferences();
parts.AddApplicationPart(typeof(ConsumerGrain).Assembly).WithReferences();
});
Program.cs
檔案:
IConsumerGrain
的註冊:
.ConfigureApplicationParts(parts =>
{
parts.AddApplicationPart(typeof(IProducerGrain).Assembly).WithReferences();
parts.AddApplicationPart(typeof(IManualConsumerGrain).Assembly).WithReferences();
parts.AddApplicationPart(typeof(IConsumerGrain).Assembly).WithReferences();
})
await receiver2.UnSubscribe();
修改增加另外測試隱式訂閱的訊息發送程式碼:
Log.Logger.Information("\r\nPress any key to demo implicit stream subscription\r\n");
Console.ReadKey();
await producer.StartProducing(StreamConstant.ImplicitSubscribeStreamNamespace, key);
Log.Logger.Information("Stopped streaming in Producer Grain, press any key to disconnect from Silo and exit");
Console.ReadKey();
await producer.StopProducing();
await client.Close();
執行起來到最後的步驟,應該可以看到Silo產生的訊息接收log類似如下:
[15:24:18 INF] Grain dbd4a9d2-ebc7-4114-a2a5-abcc61c24ddd receive: StreamDto { Serial = 49, Message = #0049 from ProducerGrain:sender1, Timestamp = 10/6/2022 7:24:18 AM +00:00 }
Orleans RPC Client端訂閱事件流的程式碼寫法,就是在Client端呼叫 GetStreamProvider("A Stream Provider Name").GetStream<T>(a_GUID, "a-stream-namespace")
來取得事件流,並且呼叫 SubscribeAsync()
來訂閱事件流,指派事件收到時真正執行消化事件資料進行處理的實作 IAsyncObserver<T>
介面的物件。
而在運營(ops)方面,Client端要能夠訂閱事件流,在Silo端使用的 Stream Provider,也必須要在Client端配置,以使Client端能夠連接到和Silo端同樣的底層訊息佇列系統,才能使用。
而其實之前第18天講的Grain Observer事件發送機制,也可以讓外界有實作 IGrainObserver
的物件訂閱來動作,只是這樣的寫法,在Orleans的RPC Client端呼叫沒有作用,而寫在Silo端的話,要注意事件訂閱物件在取消訂閱後是否有記憶體洩漏的問題,所以在此不示範一般物件訂閱Grain Observer事件發送機制的實作程式。
建議使用事件流訂閱的方式來讓Grain驅動外部程式反應的方法,因為此種方式可以讓外部程式在不需要知道Grain實體參考的情況下,就可以訂閱事件流,並且因為有底層實際訊息佇列系統來分派發送事件流訊息,可靠性較Grain Observer高。
以下示範使用RPC Client端訂閱的寫法:
Program.cs
檔案,為 clientBuilder
配置程式碼新增Azure queue storage stream provider的設定,修改為:
var clientBuilder = new ClientBuilder()
.UseLocalhostClustering()
.Configure<ClusterOptions>(options =>
{
options.ClusterId = "client1";
options.ServiceId = "Stream-Demo";
})
.AddAzureQueueStreams(StreamConstant.DefaultStreamProviderName,
(OptionsBuilder<AzureQueueOptions> optionsBuilder) =>
{
optionsBuilder.Configure(options => { options.ConfigureQueueServiceClient("UseDevelopmentStorage=true"); });
})
.ConfigureApplicationParts(parts =>
{
parts.AddApplicationPart(typeof(IProducerGrain).Assembly).WithReferences();
parts.AddApplicationPart(typeof(IManualConsumerGrain).Assembly).WithReferences();
parts.AddApplicationPart(typeof(IConsumerGrain).Assembly).WithReferences();
})
.ConfigureLogging(logging => logging.AddSerilog());
最前面的引用命名空間需要增加這些:
using Microsoft.Extensions.Options;
using Orleans.Hosting;
using Orleans.Streams;
Program.cs
檔案,從呼叫停止隱式訂閱範例的producer的RPC方法之後,最後段改為:
Log.Logger.Information("\r\nPress any key to demo client-side stream subscription\r\n");
Console.ReadKey();
var stream = client.GetStreamProvider(StreamConstant.DefaultStreamProviderName)
.GetStream<StreamDto>(key, StreamConstant.ImplicitSubscribeStreamNamespace);
await stream.SubscribeAsync((dto, _) =>
{
Log.Logger.Information("Received message from stream: {dto}", dto);
return Task.CompletedTask;
});
await producer.StartProducing(StreamConstant.ImplicitSubscribeStreamNamespace, key);
Log.Logger.Information("\r\nPress any key to stop streaming in Producer Grain\r\n");
Console.ReadKey();
await producer.StopProducing();
Log.Logger.Information("Stopped streaming in Producer Grain, press any key to disconnect from Silo and exit");
Console.ReadKey();
await client.Close();
其中那個 await stream.SubscribeAsync()
的訂閱事件流API,除了像在Silo端Grain實作程式碼內定義 Task OnNextAsync(int item, StreamSequenceToken? token = null){ ... }
自訂函式來接收事件訊息並處理之外,也可像上面的範例程式碼一樣,使用Lambda運算式來產生對應 Func<StreamDto, StreamSequenceToken, Task>
型態的事件訊息處理匿名函式。將Silo和Client端都執行起來到最後的步驟,應該可以看到Client端Console視窗產生的訊息接收log類似如下:
[16:40:28 INF] Received message from stream: StreamDto { Serial = 8, Message = #0008 from ProducerGrain:sender1, Timestamp = 10/6/2022 8:40:28 AM +00:00 }
完整範例的程式碼:
https://github.com/windperson/OrleansRpcDemo/tree/day21
明天來說明Orleans 3.x版最新提供的功能:Grain RPC的Transaction機制。