Created September 23, 2016 05:17
Azure Stream Analytics Code
using Microsoft.ServiceBus.Messaging;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace SendingEvents2EventHub
class Program
static void Main(string[] args)
// SendEvents(10);
// CheckIfServiceBusGotEvents();
static void SendEvents(int count)
var eventHubName = "wabaceventhub";
var connectionString = "Endpoint=sb://;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=";
var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
Console.Write($"Sending {count} messages");
MyInput e = new MyInput();
Random r = new Random();
for (int i = 0; i < count; i++)
e.DeviceName = $"Console App - {i}";
e.DateTime = DateTime.Now.ToString("o");
e.temperature = r.Next(0, 100);
e.speed = r.Next(200, 300);
var msg = Newtonsoft.Json.JsonConvert.SerializeObject(e);
//var msg = $"Event ({i}) from console app at {DateTime.Now.ToString("o")}";
eventHubClient.Send(new EventData(Encoding.UTF8.GetBytes(msg)));
static void CheckIfEventHubGotEvents()
var eventHubName = "wabaceventhub";
var eventHubConnectionString = "Endpoint=sb://;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=";
var storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=wabac;AccountKey=";
string eventProcessorHostName = Guid.NewGuid().ToString();
EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
var options = new EventProcessorOptions();
options.ExceptionReceived += (sender, e) => { Console.WriteLine(e.Exception); };
Console.WriteLine("Receiving. Press enter key to stop worker.");
static void CheckIfServiceBusGotEvents()
var connectionString = "Endpoint=sb://;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=";
var queueName = "wabacservicebusqueue";
var client = QueueClient.CreateFromConnectionString(connectionString, queueName);
Console.WriteLine("Receiving from ServiceBus Queue. Press enter key to stop worker.");
client.OnMessage(message =>
var msg = message.GetBody<String>();
Console.WriteLine(String.Format("Message body: {0}", msg));
Console.WriteLine(String.Format("Message id: {0}", message.MessageId));
static void Save2Redis(string val)
var msg = Newtonsoft.Json.JsonConvert.DeserializeObject<MyOutput>(val);
IDatabase cache = Connection.GetDatabase();
Console.WriteLine($"Trying to save {val} to Redis...");
cache.StringSet(msg.devicename, val);
Console.WriteLine("checking if Redis save worked....");
string r = cache.StringGet("StreamAnalyticsResults");
Console.WriteLine($"Saved value is {r}");
static void CheckRedisCache(int count)
IDatabase cache = Connection.GetDatabase();
for (int i = 0; i < count; i++)
var key = $"Console App - {i}";
var val = cache.StringGet(key);
Console.WriteLine($"From Redis Cache key={key} : Value={val}");
private static Lazy<ConnectionMultiplexer> lazyConnection = new Lazy<ConnectionMultiplexer>(() =>
return ConnectionMultiplexer.Connect(",password=,ssl=True,abortConnect=False");
public static ConnectionMultiplexer Connection
return lazyConnection.Value;
class MyInput
public string DateTime { get; set; }
public string DeviceName { get; set; }
public int temperature { get; set; }
public int speed { get; set; }
class MyOutput
//{"devicename":"Console App - 8","avgtemp":62.0,"avgspeed":207.0}
public string devicename { get; set; }
public double avgtemp { get; set; }
public double avgspeed { get; set; }
class MyEventProcessor : IEventProcessor
Stopwatch checkpointStopWatch;
async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
if (reason == CloseReason.Shutdown)
await context.CheckpointAsync();
Task IEventProcessor.OpenAsync(PartitionContext context)
Console.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
this.checkpointStopWatch = new Stopwatch();
return Task.FromResult<object>(null);
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
foreach (EventData eventData in messages)
string data = Encoding.UTF8.GetString(eventData.GetBytes());
Console.WriteLine(string.Format("Message received. Partition: '{0}', Data: '{1}'",
context.Lease.PartitionId, data));
//Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
// - there is no option to delete events in the need to call CheckPoint() so that next client will get events that got added after this call
if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
await context.CheckpointAsync();
