Created
February 28, 2020 19:02
-
-
Save pawanrao/6c0c682e490b625a2db7d08098712b3c to your computer and use it in GitHub Desktop.
Orleans Stream Error Reproducer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class TestGrain : Grain, ITestGrain { | |
private const string StreamProvider = "test_provider" | |
private const string StreamNamespace = "test_namespace" | |
public override async Task OnActivateAsync() { | |
var streamProvider = this.GetStreamProvider(StreamProvider); | |
var stream = streamProvider.GetStream<Foo>(this.GetPrimaryKey(), StreamNamespace); | |
var subscriptionHandles = await stream.GetAllSubscriptionHandles(); | |
if (subscriptionHandles != null && subscriptionHandles.Any()) { | |
foreach (var handle in subscriptionHandles) { | |
await handle.ResumeAsync(this.Handle); | |
} | |
} | |
await base.OnActivateAsync(); | |
} | |
public async Task Configure() { | |
var streamProvider = this.GetStreamProvider(StreamProvider); | |
var stream = streamProvider.GetStream<Foo>(this.GetPrimaryKey(), StreamNamespace); | |
await stream.SubscribeAsync(this.Handle); | |
} | |
public Task Handle(Foo foobar) { | |
Console.Writeline("Handled stream"); | |
await Cleanup(); | |
return Task.CompletedTask; | |
} | |
private async Task Cleanup() { | |
var streamProvider = this.GetStreamProvider(StreamProvider); | |
var stream = streamProvider.GetStream<Foo>(this.GetPrimaryKey(), StreamNamespace); | |
var subscriptionHandles = await stream.GetAllSubscriptionHandles(); | |
if (subscriptionHandles != null && subscriptionHandles.Any()) { | |
foreach (var handle in subscriptionHandles) { | |
await handle.UnsubscribeAsync(); | |
} | |
} | |
} | |
} | |
[Test] | |
public async Task Test_Stream_Publish() { | |
var guid = new Guid() | |
var stream = this.Silo.AddStreamProbe<Foo>(new Guid(), "test_namespace", "test_provider"); | |
var grain = this.Silo.CreateGrainAsync<TestGrain>(guid); | |
await stream.OnNextAsync(new Foo()) | |
//Asset Foo.bar is set | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment