Created
April 16, 2019 13:04
-
-
Save danield137/5825a6199f008d8784bdc56230f2e71c to your computer and use it in GitHub Desktop.
Log Azure Storage Event Via Event Grid Subscription
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.IO; | |
using System.Text; | |
using System.Threading.Tasks; | |
using Microsoft.Azure.EventGrid; | |
using Microsoft.Azure.EventHubs.Processor; | |
using Newtonsoft.Json; | |
using CloseReason = Microsoft.Azure.EventHubs.Processor.CloseReason; | |
using EventData = Microsoft.Azure.EventHubs.EventData; | |
using EventPosition = Microsoft.Azure.EventHubs.EventPosition; | |
using EventProcessorOptions = Microsoft.Azure.EventHubs.Processor.EventProcessorOptions; | |
using IEventProcessor = Microsoft.Azure.EventHubs.Processor.IEventProcessor; | |
using PartitionContext = Microsoft.Azure.EventHubs.Processor.PartitionContext; | |
namespace EventHubTester | |
{ | |
internal class SimpleEventProcessor : IEventProcessor | |
{ | |
private Stopwatch checkpointStopWatch; | |
private StreamWriter fileStream; | |
private readonly int counter = 0; | |
public Task ProcessErrorAsync(PartitionContext context, Exception error) | |
{ | |
Console.WriteLine($"ProcessErrorAsync {error.Message}"); | |
return Task.FromResult<object>(null); | |
} | |
async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason) | |
{ | |
Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason); | |
fileStream?.Close(); | |
if (reason == CloseReason.Shutdown) | |
{ | |
await context.CheckpointAsync(); | |
} | |
} | |
Task IEventProcessor.OpenAsync(PartitionContext context) | |
{ | |
Console.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset); | |
fileStream = new StreamWriter($"events+{context.PartitionId}+{DateTimeOffset.Now.ToUnixTimeSeconds()}.log"); | |
checkpointStopWatch = new Stopwatch(); | |
checkpointStopWatch.Start(); | |
return Task.FromResult<object>(null); | |
} | |
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) | |
{ | |
foreach (var eventData in messages) | |
{ | |
try | |
{ | |
var eventGridSubscriber = new EventGridSubscriber(); | |
var eventGridEvents = eventGridSubscriber.DeserializeEventGridEvents(Encoding.UTF8.GetString(eventData.Body.Array)); | |
foreach (var e in eventGridEvents) | |
{ | |
fileStream.WriteLine(JsonConvert.SerializeObject(e)); | |
} | |
fileStream.Flush(); | |
Console.WriteLine($"{eventData.SystemProperties.Offset} {eventData.SystemProperties.EnqueuedTimeUtc} {context.PartitionId} {context.Lease}"); | |
} | |
catch | |
{ | |
} | |
//string data = Encoding.UTF8.GetString(eventData.Body.Array); | |
//Console.WriteLine(string.Format("Message received. Partition: '{0}', Data: '{1}'", | |
// context.Lease.PartitionId, data)); | |
} | |
//Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts. | |
if (checkpointStopWatch.Elapsed > TimeSpan.FromSeconds(10)) | |
{ | |
await context.CheckpointAsync(); | |
Console.WriteLine("Checkpoint"); | |
checkpointStopWatch.Restart(); | |
} | |
} | |
} | |
internal class Program | |
{ | |
private const string url = $"https://{account}.blob.core.windows.net/data/test.csv"; | |
private static void Main(string[] args) | |
{ | |
var eventHubConnectionString = ""; | |
var eventHubName = ""; | |
var storageAccountName = $"{account}"; | |
var storageAccountKey = ""; | |
var storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", storageAccountName, storageAccountKey); | |
var eventProcessorHostName = Guid.NewGuid().ToString(); | |
var eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, | |
"$Default", eventHubConnectionString, storageConnectionString, | |
$"lm-testing-{new Random().Next(10000)}"); | |
Console.WriteLine("Registering EventProcessor..."); | |
var eventProcessorOptions = EventProcessorOptions.DefaultOptions; | |
eventProcessorOptions.InitialOffsetProvider = partition => | |
{ | |
return EventPosition.FromEnqueuedTime(DateTime.Parse("2018-12-05T07:50:52.52522Z")); | |
}; | |
eventProcessorHost.PartitionManagerOptions = new PartitionManagerOptions | |
{ | |
RenewInterval = TimeSpan.FromSeconds(25), | |
LeaseDuration = TimeSpan.FromSeconds(60) | |
}; | |
; | |
eventProcessorOptions.InvokeProcessorAfterReceiveTimeout = true; | |
eventProcessorOptions.MaxBatchSize = 4096; | |
eventProcessorOptions.PrefetchCount = 1000; | |
eventProcessorOptions.ReceiveTimeout = TimeSpan.FromSeconds(30); | |
try | |
{ | |
var t = eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(eventProcessorOptions); | |
t.GetAwaiter().GetResult(); | |
Console.WriteLine("Receiving. Press enter key to stop worker."); | |
Console.ReadLine(); | |
eventProcessorHost.UnregisterEventProcessorAsync().Wait(); | |
} | |
catch (Microsoft.Azure.EventHubs.Processor.EventProcessorConfigurationException ex) | |
{ | |
Console.WriteLine(ex.Message); | |
} | |
catch (Exception ex) | |
{ | |
Console.WriteLine(ex.Message); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.IO; | |
using System.Text; | |
using System.Threading.Tasks; | |
using Microsoft.WindowsAzure.Storage; | |
using Microsoft.WindowsAzure.Storage.Blob; | |
namespace AzureStorageTester | |
{ | |
internal class Program | |
{ | |
private static void Main(string[] args) | |
{ | |
var account = CloudStorageAccount.Parse(""); | |
var blobClient = account.CreateCloudBlobClient(); | |
var container = blobClient.GetContainerReference("testint"); | |
// upload append blob | |
DoAppendBlobOps(container); | |
DoBlockBlobOps(container); | |
DoPageBlobOps(container); | |
} | |
public static List<string> GetBlockIdList(int count) | |
{ | |
var blocks = new List<string>(); | |
for (var i = 0; i < count; i++) | |
{ | |
blocks.Add(Convert.ToBase64String(Guid.NewGuid().ToByteArray())); | |
} | |
return blocks; | |
} | |
internal static async Task CreateForTestAsync(CloudBlockBlob blob, int blockCount, bool commit = true) | |
{ | |
var buffer = Encoding.UTF8.GetBytes("data"); | |
var blocks = GetBlockIdList(blockCount); | |
foreach (var block in blocks) | |
{ | |
using (var stream = new MemoryStream(buffer)) | |
{ | |
await blob.PutBlockAsync(block, stream, null); | |
} | |
} | |
if (commit) | |
{ | |
await blob.PutBlockListAsync(blocks); | |
} | |
} | |
private static void DoPageBlobOps(CloudBlobContainer container) | |
{ | |
var pageBlob = container.GetPageBlobReference("pageBlob_upload"); | |
var bytes = new byte[512]; | |
//// Fill array with random bytes. | |
var random = new Random(); | |
random.NextBytes(bytes); | |
pageBlob.UploadFromByteArrayAsync(bytes, 0, bytes.Length).Wait(); | |
pageBlob = container.GetPageBlobReference("pageBlob_copy"); | |
pageBlob.StartCopyAsync(container.GetPageBlobReference("pageBlob_upload")).Wait(); | |
pageBlob = container.GetPageBlobReference("pageBlob_create"); | |
pageBlob.CreateAsync(512 * 10).Wait(); | |
pageBlob = container.GetPageBlobReference("pageBlob_open"); | |
var stream = pageBlob.OpenWriteAsync(bytes.Length).GetAwaiter().GetResult(); | |
stream.Write(bytes, 0, bytes.Length); | |
stream.Close(); | |
} | |
private static void DoBlockBlobOps(CloudBlobContainer container) | |
{ | |
var blockBlob = container.GetBlockBlobReference("blockBlob_upload"); | |
blockBlob.UploadTextAsync("data").Wait(); | |
blockBlob = container.GetBlockBlobReference("blockBlob_empty"); | |
blockBlob.UploadTextAsync("").Wait(); | |
blockBlob = container.GetBlockBlobReference("blockBlob_copy"); | |
blockBlob.StartCopyAsync(container.GetBlockBlobReference("blockBlob_upload")).Wait(); | |
blockBlob = container.GetBlockBlobReference("blockBlob_open"); | |
var stream = blockBlob.OpenWriteAsync().GetAwaiter().GetResult(); | |
var bytes = new byte[512]; | |
// Fill array with random bytes. | |
var random = new Random(); | |
random.NextBytes(bytes); | |
stream.Write(bytes, 0, bytes.Length); | |
stream.Close(); | |
var memStream = new MemoryStream(); | |
memStream.Write(bytes); | |
// create or replace | |
blockBlob = container.GetBlockBlobReference("blockBlob_putblock"); | |
CreateForTestAsync(blockBlob, 3).Wait(); | |
// try to re-write | |
CreateForTestAsync(blockBlob, 6).Wait(); | |
} | |
private static void DoAppendBlobOps(CloudBlobContainer container) | |
{ | |
var appendBlob = container.GetAppendBlobReference("appendBlob_upload"); | |
appendBlob.UploadTextAsync("data").Wait(); | |
appendBlob = container.GetAppendBlobReference("appendBlob_copy"); | |
appendBlob.StartCopyAsync(container.GetAppendBlobReference("appendBlob_upload")).Wait(); | |
// create or replace | |
appendBlob = container.GetAppendBlobReference("appendBlob_create"); | |
appendBlob.CreateOrReplaceAsync().Wait(); | |
var bytes = new byte[1024]; | |
// Fill array with random bytes. | |
var random = new Random(); | |
random.NextBytes(bytes); | |
appendBlob = container.GetAppendBlobReference("appendBlob_append"); | |
appendBlob.CreateOrReplaceAsync().Wait(); | |
appendBlob.AppendTextAsync(Encoding.UTF8.GetString(bytes)).Wait(); | |
appendBlob.AppendTextAsync(Encoding.UTF8.GetString(bytes)).Wait(); | |
appendBlob = container.GetAppendBlobReference("appendBlob_open"); | |
var stream = appendBlob.OpenWriteAsync(true).GetAwaiter().GetResult(); | |
stream.Write(bytes, 0, bytes.Length); | |
stream.Close(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment