Skip to content

Instantly share code, notes, and snippets.

@ilyalukyanov
Created October 27, 2017 04:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ilyalukyanov/682e981dbb7cf74e10fd70c99332458d to your computer and use it in GitHub Desktop.
Save ilyalukyanov/682e981dbb7cf74e10fd70c99332458d to your computer and use it in GitHub Desktop.
using System;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
using Orleans;
using Orleans.Runtime;
using Orleans.Runtime.Configuration;
using Orleans.Streams;
namespace StreamConsumer
{
internal class Program
{
private static void Main(string[] args)
{
const string deploymentId = "my-orleans-silo";
const string connectionString = "UseDevelopmentStorage=true";
var config = new ClientConfiguration
{
DeploymentId = deploymentId,
PropagateActivityId = true,
DefaultTraceLevel = Severity.Error,
ClientName = "StreamConsumer"
};
var ip = Dns
.GetHostAddresses(Environment.MachineName)
.First(a => a.AddressFamily == AddressFamily.InterNetwork);
config.Gateways.Add(new IPEndPoint(ip, 30000));
config.AddAzureQueueStreamProviderV2("AzureQueueStreams", connectionString, 8, deploymentId);
var client = new ClientBuilder()
.UseConfiguration(config)
.Build();
using (client)
{
client.Connect().Wait();
var stream = client.GetStreamProvider("AzureQueueStreams")
.GetStream<string>(Guid.Empty, "test");
var subscription = stream.SubscribeAsync(OnNextAsync).Result;
Console.WriteLine("Ready");
var key = Console.ReadKey();
switch (key.Key)
{
case ConsoleKey.U:
subscription.UnsubscribeAsync().Wait();
break;
}
}
}
private static Task OnNextAsync(string message, StreamSequenceToken token)
{
Console.WriteLine($"Received: {message}");
return Task.CompletedTask;
}
}
}
using System;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using Orleans;
using Orleans.Runtime;
using Orleans.Runtime.Configuration;
namespace StreamProducer
{
internal class Program
{
private static void Main(string[] args)
{
const string deploymentId = "my-orleans-silo";
const string connectionString = "UseDevelopmentStorage=true";
var config = new ClientConfiguration
{
DeploymentId = deploymentId,
PropagateActivityId = true,
DefaultTraceLevel = Severity.Error,
ClientName = "StreamProducer"
};
var ip = Dns
.GetHostAddresses(Environment.MachineName)
.First(a => a.AddressFamily == AddressFamily.InterNetwork);
config.Gateways.Add(new IPEndPoint(ip, 30000));
config.AddAzureQueueStreamProviderV2("AzureQueueStreams", connectionString, 8, deploymentId);
var client = new ClientBuilder()
.UseConfiguration(config)
.Build();
using (client)
{
client.Connect().Wait();
var stream = client.GetStreamProvider("AzureQueueStreams")
.GetStream<string>(Guid.Empty, "test");
Console.WriteLine("Ready");
while (true)
{
var key = Console.ReadKey();
switch (key.Key)
{
case ConsoleKey.Q: return;
}
stream.OnNextAsync(DateTime.UtcNow.ToString("O"));
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment