Skip to content

Instantly share code, notes, and snippets.

@pshrosbree
Last active February 14, 2023 11:06
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pshrosbree/74c8c4b4744c00cf3d92939952808d1e to your computer and use it in GitHub Desktop.
Save pshrosbree/74c8c4b4744c00cf3d92939952808d1e to your computer and use it in GitHub Desktop.
Deserializing Azure event hub capture files from Apache Avro with C#
/*
NuGet references:
Microsoft.Hadoop.Avro2 (1.2.1 works)
WindowsAzure.Storage (8.3.0 works)
Namespaces:
Microsoft.Hadoop.Avro.Container
Microsoft.WindowsAzure.Storage
*/
void Main()
{
var connectionString = "<Azure event hub capture storage account connection string>";
var containerName = "<Azure event hub capture container name>";
var blobName = "<Azure event hub capture BLOB name (ends in .avro)>";
var storageAccount = CloudStorageAccount.Parse(connectionString);
var blobClient = storageAccount.CreateCloudBlobClient();
var container = blobClient.GetContainerReference(containerName);
var blob = container.GetBlockBlobReference(blobName);
using (var stream = blob.OpenRead())
using (var reader = AvroContainer.CreateGenericReader(stream))
{
while (reader.MoveNext())
{
foreach (dynamic result in reader.Current.Objects)
{
var record = new AvroEventData(result);
record.Dump();
}
}
}
}
public struct AvroEventData
{
public AvroEventData(dynamic record)
{
SequenceNumber = (long) record.SequenceNumber;
Offset = (string) record.Offset;
DateTime.TryParse((string) record.EnqueuedTimeUtc, out var enqueuedTimeUtc);
EnqueuedTimeUtc = enqueuedTimeUtc;
SystemProperties = (Dictionary<string, object>) record.SystemProperties;
Properties = (Dictionary<string, object>) record.Properties;
Body = (byte[]) record.Body;
}
public long SequenceNumber { get; set; }
public string Offset { get; set; }
public DateTime EnqueuedTimeUtc { get; set; }
public Dictionary<string, object> SystemProperties { get; set; }
public Dictionary<string, object> Properties { get; set; }
public byte[] Body { get; set; }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment