Created
September 23, 2016 05:17
-
-
Save pratapprashant/8b3bd33e752399776f89aec1f55e7d87 to your computer and use it in GitHub Desktop.
Azure Stream Analytics Code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Microsoft.ServiceBus.Messaging; | |
using StackExchange.Redis; | |
using System; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.Linq; | |
using System.Text; | |
using System.Threading.Tasks; | |
namespace SendingEvents2EventHub | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
// SendEvents(10); | |
//CheckIfEventHubGotEvents(); | |
// CheckIfServiceBusGotEvents(); | |
//CheckRedisCache(10); | |
} | |
static void SendEvents(int count) | |
{ | |
var eventHubName = "wabaceventhub"; | |
var connectionString = "Endpoint=sb://wabaceventhubs.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey="; | |
var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName); | |
Console.Write($"Sending {count} messages"); | |
MyInput e = new MyInput(); | |
Random r = new Random(); | |
for (int i = 0; i < count; i++) | |
{ | |
e.DeviceName = $"Console App - {i}"; | |
e.DateTime = DateTime.Now.ToString("o"); | |
e.temperature = r.Next(0, 100); | |
e.speed = r.Next(200, 300); | |
var msg = Newtonsoft.Json.JsonConvert.SerializeObject(e); | |
//var msg = $"Event ({i}) from console app at {DateTime.Now.ToString("o")}"; | |
eventHubClient.Send(new EventData(Encoding.UTF8.GetBytes(msg))); | |
Console.Write("."); | |
System.Threading.Thread.Sleep(1000); | |
} | |
Console.WriteLine("done"); | |
} | |
static void CheckIfEventHubGotEvents() | |
{ | |
var eventHubName = "wabaceventhub"; | |
var eventHubConnectionString = "Endpoint=sb://wabaceventhubs.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey="; | |
var storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=wabac;AccountKey="; | |
string eventProcessorHostName = Guid.NewGuid().ToString(); | |
EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString); | |
var options = new EventProcessorOptions(); | |
options.ExceptionReceived += (sender, e) => { Console.WriteLine(e.Exception); }; | |
eventProcessorHost.RegisterEventProcessorAsync<MyEventProcessor>(options).Wait(); | |
Console.WriteLine("Receiving. Press enter key to stop worker."); | |
Console.ReadLine(); | |
eventProcessorHost.UnregisterEventProcessorAsync().Wait(); | |
} | |
static void CheckIfServiceBusGotEvents() | |
{ | |
var connectionString = "Endpoint=sb://wabacservicebus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey="; | |
var queueName = "wabacservicebusqueue"; | |
var client = QueueClient.CreateFromConnectionString(connectionString, queueName); | |
Console.WriteLine("Receiving from ServiceBus Queue. Press enter key to stop worker."); | |
client.OnMessage(message => | |
{ | |
var msg = message.GetBody<String>(); | |
Console.WriteLine(String.Format("Message body: {0}", msg)); | |
Console.WriteLine(String.Format("Message id: {0}", message.MessageId)); | |
Save2Redis(msg); | |
}); | |
Console.ReadLine(); | |
} | |
static void Save2Redis(string val) | |
{ | |
var msg = Newtonsoft.Json.JsonConvert.DeserializeObject<MyOutput>(val); | |
IDatabase cache = Connection.GetDatabase(); | |
Console.WriteLine($"Trying to save {val} to Redis..."); | |
cache.StringSet(msg.devicename, val); | |
Console.WriteLine("checking if Redis save worked...."); | |
string r = cache.StringGet("StreamAnalyticsResults"); | |
Console.WriteLine($"Saved value is {r}"); | |
} | |
static void CheckRedisCache(int count) | |
{ | |
IDatabase cache = Connection.GetDatabase(); | |
for (int i = 0; i < count; i++) | |
{ | |
var key = $"Console App - {i}"; | |
var val = cache.StringGet(key); | |
Console.WriteLine($"From Redis Cache key={key} : Value={val}"); | |
} | |
} | |
private static Lazy<ConnectionMultiplexer> lazyConnection = new Lazy<ConnectionMultiplexer>(() => | |
{ | |
return ConnectionMultiplexer.Connect("WabacRedis.redis.cache.windows.net:6380,password=,ssl=True,abortConnect=False"); | |
}); | |
public static ConnectionMultiplexer Connection | |
{ | |
get | |
{ | |
return lazyConnection.Value; | |
} | |
} | |
} | |
class MyInput | |
{ | |
public string DateTime { get; set; } | |
public string DeviceName { get; set; } | |
public int temperature { get; set; } | |
public int speed { get; set; } | |
} | |
class MyOutput | |
{ | |
//{"devicename":"Console App - 8","avgtemp":62.0,"avgspeed":207.0} | |
public string devicename { get; set; } | |
public double avgtemp { get; set; } | |
public double avgspeed { get; set; } | |
} | |
class MyEventProcessor : IEventProcessor | |
{ | |
Stopwatch checkpointStopWatch; | |
async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason) | |
{ | |
Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason); | |
if (reason == CloseReason.Shutdown) | |
{ | |
await context.CheckpointAsync(); | |
} | |
} | |
Task IEventProcessor.OpenAsync(PartitionContext context) | |
{ | |
Console.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset); | |
this.checkpointStopWatch = new Stopwatch(); | |
this.checkpointStopWatch.Start(); | |
return Task.FromResult<object>(null); | |
} | |
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) | |
{ | |
foreach (EventData eventData in messages) | |
{ | |
string data = Encoding.UTF8.GetString(eventData.GetBytes()); | |
Console.WriteLine(string.Format("Message received. Partition: '{0}', Data: '{1}'", | |
context.Lease.PartitionId, data)); | |
} | |
//Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts. | |
// - there is no option to delete events in the eventhub...you need to call CheckPoint() so that next client will get events that got added after this call | |
if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5)) | |
{ | |
await context.CheckpointAsync(); | |
this.checkpointStopWatch.Restart(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment