Skip to content

Instantly share code, notes, and snippets.

@danield137
Created April 16, 2019 13:04
Show Gist options
  • Save danield137/5825a6199f008d8784bdc56230f2e71c to your computer and use it in GitHub Desktop.
Save danield137/5825a6199f008d8784bdc56230f2e71c to your computer and use it in GitHub Desktop.
Log Azure Storage Event Via Event Grid Subscription
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.EventGrid;
using Microsoft.Azure.EventHubs.Processor;
using Newtonsoft.Json;
using CloseReason = Microsoft.Azure.EventHubs.Processor.CloseReason;
using EventData = Microsoft.Azure.EventHubs.EventData;
using EventPosition = Microsoft.Azure.EventHubs.EventPosition;
using EventProcessorOptions = Microsoft.Azure.EventHubs.Processor.EventProcessorOptions;
using IEventProcessor = Microsoft.Azure.EventHubs.Processor.IEventProcessor;
using PartitionContext = Microsoft.Azure.EventHubs.Processor.PartitionContext;
namespace EventHubTester
{
internal class SimpleEventProcessor : IEventProcessor
{
private Stopwatch checkpointStopWatch;
private StreamWriter fileStream;
private readonly int counter = 0;
public Task ProcessErrorAsync(PartitionContext context, Exception error)
{
Console.WriteLine($"ProcessErrorAsync {error.Message}");
return Task.FromResult<object>(null);
}
async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
{
Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
fileStream?.Close();
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
Task IEventProcessor.OpenAsync(PartitionContext context)
{
Console.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
fileStream = new StreamWriter($"events+{context.PartitionId}+{DateTimeOffset.Now.ToUnixTimeSeconds()}.log");
checkpointStopWatch = new Stopwatch();
checkpointStopWatch.Start();
return Task.FromResult<object>(null);
}
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var eventData in messages)
{
try
{
var eventGridSubscriber = new EventGridSubscriber();
var eventGridEvents = eventGridSubscriber.DeserializeEventGridEvents(Encoding.UTF8.GetString(eventData.Body.Array));
foreach (var e in eventGridEvents)
{
fileStream.WriteLine(JsonConvert.SerializeObject(e));
}
fileStream.Flush();
Console.WriteLine($"{eventData.SystemProperties.Offset} {eventData.SystemProperties.EnqueuedTimeUtc} {context.PartitionId} {context.Lease}");
}
catch
{
}
//string data = Encoding.UTF8.GetString(eventData.Body.Array);
//Console.WriteLine(string.Format("Message received. Partition: '{0}', Data: '{1}'",
// context.Lease.PartitionId, data));
}
//Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
if (checkpointStopWatch.Elapsed > TimeSpan.FromSeconds(10))
{
await context.CheckpointAsync();
Console.WriteLine("Checkpoint");
checkpointStopWatch.Restart();
}
}
}
internal class Program
{
private const string url = $"https://{account}.blob.core.windows.net/data/test.csv";
private static void Main(string[] args)
{
var eventHubConnectionString = "";
var eventHubName = "";
var storageAccountName = $"{account}";
var storageAccountKey = "";
var storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", storageAccountName, storageAccountKey);
var eventProcessorHostName = Guid.NewGuid().ToString();
var eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName,
"$Default", eventHubConnectionString, storageConnectionString,
$"lm-testing-{new Random().Next(10000)}");
Console.WriteLine("Registering EventProcessor...");
var eventProcessorOptions = EventProcessorOptions.DefaultOptions;
eventProcessorOptions.InitialOffsetProvider = partition =>
{
return EventPosition.FromEnqueuedTime(DateTime.Parse("2018-12-05T07:50:52.52522Z"));
};
eventProcessorHost.PartitionManagerOptions = new PartitionManagerOptions
{
RenewInterval = TimeSpan.FromSeconds(25),
LeaseDuration = TimeSpan.FromSeconds(60)
};
;
eventProcessorOptions.InvokeProcessorAfterReceiveTimeout = true;
eventProcessorOptions.MaxBatchSize = 4096;
eventProcessorOptions.PrefetchCount = 1000;
eventProcessorOptions.ReceiveTimeout = TimeSpan.FromSeconds(30);
try
{
var t = eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(eventProcessorOptions);
t.GetAwaiter().GetResult();
Console.WriteLine("Receiving. Press enter key to stop worker.");
Console.ReadLine();
eventProcessorHost.UnregisterEventProcessorAsync().Wait();
}
catch (Microsoft.Azure.EventHubs.Processor.EventProcessorConfigurationException ex)
{
Console.WriteLine(ex.Message);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
}
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
namespace AzureStorageTester
{
internal class Program
{
private static void Main(string[] args)
{
var account = CloudStorageAccount.Parse("");
var blobClient = account.CreateCloudBlobClient();
var container = blobClient.GetContainerReference("testint");
// upload append blob
DoAppendBlobOps(container);
DoBlockBlobOps(container);
DoPageBlobOps(container);
}
public static List<string> GetBlockIdList(int count)
{
var blocks = new List<string>();
for (var i = 0; i < count; i++)
{
blocks.Add(Convert.ToBase64String(Guid.NewGuid().ToByteArray()));
}
return blocks;
}
internal static async Task CreateForTestAsync(CloudBlockBlob blob, int blockCount, bool commit = true)
{
var buffer = Encoding.UTF8.GetBytes("data");
var blocks = GetBlockIdList(blockCount);
foreach (var block in blocks)
{
using (var stream = new MemoryStream(buffer))
{
await blob.PutBlockAsync(block, stream, null);
}
}
if (commit)
{
await blob.PutBlockListAsync(blocks);
}
}
private static void DoPageBlobOps(CloudBlobContainer container)
{
var pageBlob = container.GetPageBlobReference("pageBlob_upload");
var bytes = new byte[512];
//// Fill array with random bytes.
var random = new Random();
random.NextBytes(bytes);
pageBlob.UploadFromByteArrayAsync(bytes, 0, bytes.Length).Wait();
pageBlob = container.GetPageBlobReference("pageBlob_copy");
pageBlob.StartCopyAsync(container.GetPageBlobReference("pageBlob_upload")).Wait();
pageBlob = container.GetPageBlobReference("pageBlob_create");
pageBlob.CreateAsync(512 * 10).Wait();
pageBlob = container.GetPageBlobReference("pageBlob_open");
var stream = pageBlob.OpenWriteAsync(bytes.Length).GetAwaiter().GetResult();
stream.Write(bytes, 0, bytes.Length);
stream.Close();
}
private static void DoBlockBlobOps(CloudBlobContainer container)
{
var blockBlob = container.GetBlockBlobReference("blockBlob_upload");
blockBlob.UploadTextAsync("data").Wait();
blockBlob = container.GetBlockBlobReference("blockBlob_empty");
blockBlob.UploadTextAsync("").Wait();
blockBlob = container.GetBlockBlobReference("blockBlob_copy");
blockBlob.StartCopyAsync(container.GetBlockBlobReference("blockBlob_upload")).Wait();
blockBlob = container.GetBlockBlobReference("blockBlob_open");
var stream = blockBlob.OpenWriteAsync().GetAwaiter().GetResult();
var bytes = new byte[512];
// Fill array with random bytes.
var random = new Random();
random.NextBytes(bytes);
stream.Write(bytes, 0, bytes.Length);
stream.Close();
var memStream = new MemoryStream();
memStream.Write(bytes);
// create or replace
blockBlob = container.GetBlockBlobReference("blockBlob_putblock");
CreateForTestAsync(blockBlob, 3).Wait();
// try to re-write
CreateForTestAsync(blockBlob, 6).Wait();
}
private static void DoAppendBlobOps(CloudBlobContainer container)
{
var appendBlob = container.GetAppendBlobReference("appendBlob_upload");
appendBlob.UploadTextAsync("data").Wait();
appendBlob = container.GetAppendBlobReference("appendBlob_copy");
appendBlob.StartCopyAsync(container.GetAppendBlobReference("appendBlob_upload")).Wait();
// create or replace
appendBlob = container.GetAppendBlobReference("appendBlob_create");
appendBlob.CreateOrReplaceAsync().Wait();
var bytes = new byte[1024];
// Fill array with random bytes.
var random = new Random();
random.NextBytes(bytes);
appendBlob = container.GetAppendBlobReference("appendBlob_append");
appendBlob.CreateOrReplaceAsync().Wait();
appendBlob.AppendTextAsync(Encoding.UTF8.GetString(bytes)).Wait();
appendBlob.AppendTextAsync(Encoding.UTF8.GetString(bytes)).Wait();
appendBlob = container.GetAppendBlobReference("appendBlob_open");
var stream = appendBlob.OpenWriteAsync(true).GetAwaiter().GetResult();
stream.Write(bytes, 0, bytes.Length);
stream.Close();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment