Created
August 28, 2019 09:57
-
-
Save sandervandevelde/0be3d3313a85f3fc5343efb3e2d242f9 to your computer and use it in GitHub Desktop.
BlockBlob Generator module for testing IoT Edge Blob storage module
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.IO; | |
using System.Runtime.Loader; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Microsoft.Azure.Devices.Client; | |
using Microsoft.Azure.Devices.Client.Transport.Mqtt; | |
using Microsoft.WindowsAzure.Storage; | |
using Microsoft.WindowsAzure.Storage.Blob; | |
using System.Collections.Generic; | |
using System.Linq; | |
class Program | |
{ | |
private static MemoryStream _stream; | |
private static StreamWriter _writer; | |
private static string _currentFileName; | |
private const int _interval = 5000; | |
private static UInt16 _counter; | |
private static CloudBlobContainer _container; | |
static void Main(string[] args) | |
{ | |
Init().Wait(); | |
var cts = new CancellationTokenSource(); | |
AssemblyLoadContext.Default.Unloading += (ctx) => cts.Cancel(); | |
Console.CancelKeyPress += (sender, cpe) => cts.Cancel(); | |
WhenCancelled(cts.Token).Wait(); | |
} | |
public static Task WhenCancelled(CancellationToken cancellationToken) | |
{ | |
var tcs = new TaskCompletionSource<bool>(); | |
cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).SetResult(true), tcs); | |
return tcs.Task; | |
} | |
static async Task Init() | |
{ | |
MqttTransportSettings mqttSetting = new MqttTransportSettings(TransportType.Mqtt_Tcp_Only); | |
ITransportSettings[] settings = { mqttSetting }; | |
ModuleClient ioTHubModuleClient = await ModuleClient.CreateFromEnvironmentAsync(settings); | |
await ioTHubModuleClient.OpenAsync(); | |
Console.WriteLine("IoT Hub module Generator client initialized."); | |
// Blob container construction and thread start | |
var account = CloudStorageAccount.Parse( | |
"DefaultEndpointsProtocol=https;BlobEndpoint=http://blob:11002/blobaccount;AccountName=blobaccount;AccountKey=[local storage key]"); | |
var client = account.CreateCloudBlobClient(); | |
_container = client.GetContainerReference("updatedblockblobcontainer"); | |
_container.CreateIfNotExistsAsync().Wait(); | |
var thread = new Thread(() => ThreadBody(ioTHubModuleClient)); | |
thread.Start(); | |
} | |
private static void ThreadBody(object userContext) | |
{ | |
while (true) | |
{ | |
WriteBlockBlob().GetAwaiter().GetResult(); | |
Thread.Sleep(_interval); | |
} | |
} | |
private static async Task WriteBlockBlob() | |
{ | |
_counter += 1; | |
var now = DateTime.UtcNow; | |
var filename = "File" + now.ToString("yyyyMMddHHmm"); | |
var blob = _container.GetBlockBlobReference(filename); | |
var blockList = new List<string>(); | |
if (blob.ExistsAsync().Result) | |
{ | |
IEnumerable<ListBlockItem> existingBlockList = await blob.DownloadBlockListAsync(); | |
blockList.AddRange(existingBlockList.Select(a => a.Name)); | |
} | |
string blockId = Convert.ToBase64String(ASCIIEncoding.ASCII.GetBytes(now.ToString())); | |
if (filename != _currentFileName) | |
{ | |
if (!string.IsNullOrEmpty( _currentFileName)) | |
{ | |
_writer.Close(); | |
_stream.Close(); | |
} | |
_stream = new MemoryStream(); | |
_writer = new StreamWriter(_stream); | |
_currentFileName = filename; | |
} | |
_writer.Write($"We add a test for counter {_counter} at {now:yyyyMMddHHmmss}"); | |
_writer.Flush(); | |
_stream.Position = 0; | |
await blob.PutBlockAsync(blockId, _stream, null); | |
blockList.Add(blockId); | |
await blob.PutBlockListAsync(blockList.ToArray()); | |
Console.WriteLine($"Block {_counter} written for file {filename} at {now:yyyy-MM-dd HH:mm:ss}"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment