Skip to content

Instantly share code, notes, and snippets.

@pawanrao
Created February 28, 2020 19:02
Show Gist options
  • Save pawanrao/6c0c682e490b625a2db7d08098712b3c to your computer and use it in GitHub Desktop.
Save pawanrao/6c0c682e490b625a2db7d08098712b3c to your computer and use it in GitHub Desktop.
Orleans Stream Error Reproducer
class TestGrain : Grain, ITestGrain {
private const string StreamProvider = "test_provider"
private const string StreamNamespace = "test_namespace"
public override async Task OnActivateAsync() {
var streamProvider = this.GetStreamProvider(StreamProvider);
var stream = streamProvider.GetStream<Foo>(this.GetPrimaryKey(), StreamNamespace);
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
if (subscriptionHandles != null && subscriptionHandles.Any()) {
foreach (var handle in subscriptionHandles) {
await handle.ResumeAsync(this.Handle);
}
}
await base.OnActivateAsync();
}
public async Task Configure() {
var streamProvider = this.GetStreamProvider(StreamProvider);
var stream = streamProvider.GetStream<Foo>(this.GetPrimaryKey(), StreamNamespace);
await stream.SubscribeAsync(this.Handle);
}
public Task Handle(Foo foobar) {
Console.Writeline("Handled stream");
await Cleanup();
return Task.CompletedTask;
}
private async Task Cleanup() {
var streamProvider = this.GetStreamProvider(StreamProvider);
var stream = streamProvider.GetStream<Foo>(this.GetPrimaryKey(), StreamNamespace);
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
if (subscriptionHandles != null && subscriptionHandles.Any()) {
foreach (var handle in subscriptionHandles) {
await handle.UnsubscribeAsync();
}
}
}
}
[Test]
public async Task Test_Stream_Publish() {
var guid = new Guid()
var stream = this.Silo.AddStreamProbe<Foo>(new Guid(), "test_namespace", "test_provider");
var grain = this.Silo.CreateGrainAsync<TestGrain>(guid);
await stream.OnNextAsync(new Foo())
//Asset Foo.bar is set
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment