Skip to content

Instantly share code, notes, and snippets.

@pratapprashant
Created September 23, 2016 05:17
Show Gist options
  • Save pratapprashant/8b3bd33e752399776f89aec1f55e7d87 to your computer and use it in GitHub Desktop.
Save pratapprashant/8b3bd33e752399776f89aec1f55e7d87 to your computer and use it in GitHub Desktop.
Azure Stream Analytics Code
using Microsoft.ServiceBus.Messaging;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace SendingEvents2EventHub
{
class Program
{
static void Main(string[] args)
{
// SendEvents(10);
//CheckIfEventHubGotEvents();
// CheckIfServiceBusGotEvents();
//CheckRedisCache(10);
}
static void SendEvents(int count)
{
var eventHubName = "wabaceventhub";
var connectionString = "Endpoint=sb://wabaceventhubs.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=";
var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
Console.Write($"Sending {count} messages");
MyInput e = new MyInput();
Random r = new Random();
for (int i = 0; i < count; i++)
{
e.DeviceName = $"Console App - {i}";
e.DateTime = DateTime.Now.ToString("o");
e.temperature = r.Next(0, 100);
e.speed = r.Next(200, 300);
var msg = Newtonsoft.Json.JsonConvert.SerializeObject(e);
//var msg = $"Event ({i}) from console app at {DateTime.Now.ToString("o")}";
eventHubClient.Send(new EventData(Encoding.UTF8.GetBytes(msg)));
Console.Write(".");
System.Threading.Thread.Sleep(1000);
}
Console.WriteLine("done");
}
static void CheckIfEventHubGotEvents()
{
var eventHubName = "wabaceventhub";
var eventHubConnectionString = "Endpoint=sb://wabaceventhubs.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=";
var storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=wabac;AccountKey=";
string eventProcessorHostName = Guid.NewGuid().ToString();
EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
var options = new EventProcessorOptions();
options.ExceptionReceived += (sender, e) => { Console.WriteLine(e.Exception); };
eventProcessorHost.RegisterEventProcessorAsync<MyEventProcessor>(options).Wait();
Console.WriteLine("Receiving. Press enter key to stop worker.");
Console.ReadLine();
eventProcessorHost.UnregisterEventProcessorAsync().Wait();
}
static void CheckIfServiceBusGotEvents()
{
var connectionString = "Endpoint=sb://wabacservicebus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=";
var queueName = "wabacservicebusqueue";
var client = QueueClient.CreateFromConnectionString(connectionString, queueName);
Console.WriteLine("Receiving from ServiceBus Queue. Press enter key to stop worker.");
client.OnMessage(message =>
{
var msg = message.GetBody<String>();
Console.WriteLine(String.Format("Message body: {0}", msg));
Console.WriteLine(String.Format("Message id: {0}", message.MessageId));
Save2Redis(msg);
});
Console.ReadLine();
}
static void Save2Redis(string val)
{
var msg = Newtonsoft.Json.JsonConvert.DeserializeObject<MyOutput>(val);
IDatabase cache = Connection.GetDatabase();
Console.WriteLine($"Trying to save {val} to Redis...");
cache.StringSet(msg.devicename, val);
Console.WriteLine("checking if Redis save worked....");
string r = cache.StringGet("StreamAnalyticsResults");
Console.WriteLine($"Saved value is {r}");
}
static void CheckRedisCache(int count)
{
IDatabase cache = Connection.GetDatabase();
for (int i = 0; i < count; i++)
{
var key = $"Console App - {i}";
var val = cache.StringGet(key);
Console.WriteLine($"From Redis Cache key={key} : Value={val}");
}
}
private static Lazy<ConnectionMultiplexer> lazyConnection = new Lazy<ConnectionMultiplexer>(() =>
{
return ConnectionMultiplexer.Connect("WabacRedis.redis.cache.windows.net:6380,password=,ssl=True,abortConnect=False");
});
public static ConnectionMultiplexer Connection
{
get
{
return lazyConnection.Value;
}
}
}
class MyInput
{
public string DateTime { get; set; }
public string DeviceName { get; set; }
public int temperature { get; set; }
public int speed { get; set; }
}
class MyOutput
{
//{"devicename":"Console App - 8","avgtemp":62.0,"avgspeed":207.0}
public string devicename { get; set; }
public double avgtemp { get; set; }
public double avgspeed { get; set; }
}
class MyEventProcessor : IEventProcessor
{
Stopwatch checkpointStopWatch;
async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
{
Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
Task IEventProcessor.OpenAsync(PartitionContext context)
{
Console.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
this.checkpointStopWatch = new Stopwatch();
this.checkpointStopWatch.Start();
return Task.FromResult<object>(null);
}
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (EventData eventData in messages)
{
string data = Encoding.UTF8.GetString(eventData.GetBytes());
Console.WriteLine(string.Format("Message received. Partition: '{0}', Data: '{1}'",
context.Lease.PartitionId, data));
}
//Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
// - there is no option to delete events in the eventhub...you need to call CheckPoint() so that next client will get events that got added after this call
if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
{
await context.CheckpointAsync();
this.checkpointStopWatch.Restart();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment