Skip to content

Instantly share code, notes, and snippets.

@sakapon
Last active September 18, 2015 02:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sakapon/b18a04d8da9153324065 to your computer and use it in GitHub Desktop.
Save sakapon/b18a04d8da9153324065 to your computer and use it in GitHub Desktop.
EventHubsSample / ReceiverWpf
using System;
using System.Configuration;
using Microsoft.ServiceBus.Messaging;
using Newtonsoft.Json;
namespace ReceiverWpf
{
public class AppModel
{
public AppModel()
{
StaticEventProcessor.MessageArrived += s =>
{
dynamic obj = JsonConvert.DeserializeObject(s);
int id = obj.id;
string text = obj.text;
};
var hostName = string.Format("Host-{0:yyyyMMdd-HHmmss}", DateTime.Now);
var eventHubName = "sakapon-event-201508";
var eventHubConnectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
var storageConnectionString = ConfigurationManager.AppSettings["StorageConnection"];
// Receives an event once for one consumer group.
// EventHubConsumerGroup.DefaultGroupName is "$Default".
var host = new EventProcessorHost(hostName, eventHubName, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
host.RegisterEventProcessorAsync<StaticEventProcessor>();
}
}
}
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;
namespace ReceiverWpf
{
public class StaticEventProcessor : IEventProcessor
{
public static event Action<string> MessageArrived;
static readonly object messageLock = new object();
public async Task CloseAsync(PartitionContext context, CloseReason reason)
{
Debug.WriteLine("Partition {0}: Processor closing. Reason: {1}", context.Lease.PartitionId, reason);
if (reason == CloseReason.Shutdown)
await context.CheckpointAsync();
}
public Task OpenAsync(PartitionContext context)
{
Debug.WriteLine("Partition {0}: Processor opening. Offset: {1}", context.Lease.PartitionId, context.Lease.Offset);
return Task.FromResult<object>(null);
}
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var data in messages)
{
lock (messageLock)
{
var message = Encoding.UTF8.GetString(data.GetBytes());
Debug.WriteLine("Partition {0}: Message received. {1}", context.Lease.PartitionId, message);
var h = MessageArrived;
if (h != null) h(message);
}
}
await context.CheckpointAsync();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment