Skip to content

Instantly share code, notes, and snippets.

@StefH
Created October 10, 2018 07:31
Show Gist options
  • Save StefH/9c746f55f0c5209a56fcdbdb557ca448 to your computer and use it in GitHub Desktop.
Save StefH/9c746f55f0c5209a56fcdbdb557ca448 to your computer and use it in GitHub Desktop.
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.EventHubs.Processor;
using System;
using System.Text;
using System.Threading.Tasks;
namespace EventHubExampleNew
{
/// <summary>
/// Code based on:
/// * https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-dotnet-standard-getstarted-send
/// * https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-dotnet-standard-getstarted-receive-eph
/// </summary>
class Program
{
// Connection String for the namespace can be obtained from the Azure portal under the 'Shared access policies' section.
private const string EventHubConnectionString = "***";
private const string EventHubName = "***";
private const string StorageConnectionString = "***";
private const string StorageContainerName = "***";
static void Main(string[] args)
{
SendAsync().GetAwaiter().GetResult();
Console.WriteLine("Sending is done, press ENTER start receiving");
Console.ReadLine();
ReceiveAsync().GetAwaiter().GetResult();
}
private static async Task SendAsync()
{
// Creates an EventHubsConnectionStringBuilder object from the connection string, and sets the EntityPath.
// Typically, the connection string should have the entity path in it, but this simple scenario
// uses the connection string from the namespace.
// Else you get a `Unhandled Exception: System.ArgumentException: The argument EntityPath is null or white space.`
var connectionStringBuilder = new EventHubsConnectionStringBuilder(EventHubConnectionString)
{
EntityPath = EventHubName
};
var client = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
for (int i = 0; i < 10; i++)
{
string message = $"event hub message {i}";
Console.WriteLine($"Sending message `{message}`");
var eventData = new EventData(Encoding.UTF8.GetBytes(message));
await client.SendAsync(eventData);
await Task.Delay(TimeSpan.FromMilliseconds(50));
}
await client.CloseAsync();
}
private static async Task ReceiveAsync()
{
Console.WriteLine("Registering EventProcessor...");
var eventProcessorHost = new EventProcessorHost(
EventHubName,
PartitionReceiver.DefaultConsumerGroupName,
EventHubConnectionString,
StorageConnectionString,
StorageContainerName);
// Registers the Event Processor Host and starts receiving messages
await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>();
Console.WriteLine("Receiving. Press ENTER to stop worker.");
Console.ReadLine();
Console.WriteLine("Stopping");
// Disposes of the Event Processor Host
await eventProcessorHost.UnregisterEventProcessorAsync();
// TODO : Never arrive here ???
Console.WriteLine("Stopped (UnregisterEventProcessorAsync)");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment