Skip to content

Instantly share code, notes, and snippets.

@SamVanhoutte
Created December 18, 2020 07:27
Show Gist options
  • Save SamVanhoutte/6eb486b4893d5ca2f78010966c5596dd to your computer and use it in GitHub Desktop.
Save SamVanhoutte/6eb486b4893d5ca2f78010966c5596dd to your computer and use it in GitHub Desktop.
Data lake file transmitter
using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Storage.Files.DataLake;
using System.IO;
using System.Net;
using Azure;
using Azure.Storage;
using Azure.Storage.Files.DataLake.Models;
using Azure.Storage.Sas;
using ServiceStack;
namespace Savanh.Adapters.DataLake
{
public abstract class DataLakeTransmitter<T> where T : new()
{
private DataLakeTransmitter _dataLakeClient;
public DataLakeTransmitter(StorageSettings settings, string directoryName)
{
_dataLakeClient = new DataLakeTransmitter(settings, directoryName);
}
protected async Task<DataLakeDirectoryClient> CreateDirectoryIfNotExists()
{
return await _dataLakeClient.CreateDirectoryIfNotExists();
}
protected async Task<bool> AppendToFile(string fileName, T record, bool includeHeader = false)
{
var recordContent = GetRecord(record, includeHeader);
return await _dataLakeClient.AppendToFile(fileName, recordContent);
}
protected async Task<bool> WriteFile(string fileName, T record)
{
var recordContent = GetRecord(record, false);
return await _dataLakeClient.WriteText(fileName, recordContent);
}
private string GetRecord(T record, bool includeHeader)
{
var recordText = record.ToCsv();
if (!includeHeader)
{
var recordStream = new MemoryStream(Encoding.UTF8.GetBytes(recordText));
var reader = new StreamReader(recordStream);
reader.ReadLine();
return reader.ReadToEnd();
}
return recordText;
}
}
public class DataLakeTransmitter
{
private StorageSettings _settings;
private string _directoryName;
private DataLakeServiceClient _dataLakeService;
private DataLakeFileSystemClient _fileSystemClient;
private DataLakeDirectoryClient _directory;
private const string FileSystemName = "traiders";
public DataLakeTransmitter(StorageSettings settings, string directoryName)
{
_settings = settings;
_directoryName = directoryName;
}
private DataLakeServiceClient DataLakeService
{
get
{
if (_dataLakeService == null)
{
var sharedKeyCredential =
new StorageSharedKeyCredential(_settings.AccountName, _settings.AccountKey);
string dfsUri = "https://" + _settings.AccountName + ".dfs.core.windows.net";
_dataLakeService = new DataLakeServiceClient(new Uri(dfsUri), sharedKeyCredential);
}
return _dataLakeService;
}
}
public async Task<bool> CheckFileExist(string fileName)
{
var directory = await CreateDirectoryIfNotExists();
var fileClient = directory.GetFileClient(fileName);
try
{
var fileProperties = (await fileClient.GetPropertiesAsync()).Value;
var fileLength = fileProperties.ContentLength;
return fileLength > 0;
}
catch (RequestFailedException ex)
{
if (ex.ErrorCode == "BlobNotFound")
{
return false;
}
throw;
}
}
private DataLakeFileSystemClient FileSystem
{
get
{
if (_fileSystemClient == null)
{
try
{
_fileSystemClient = DataLakeService.CreateFileSystemAsync(FileSystemName).Result;
}
catch (AggregateException aex)
{
var reqException = aex.InnerException as RequestFailedException;
if (reqException != null && reqException.ErrorCode == "ContainerAlreadyExists")
{
_fileSystemClient = DataLakeService.GetFileSystemClient(FileSystemName);
}
}
}
return _fileSystemClient;
}
}
public async Task<DataLakeDirectoryClient> CreateDirectoryIfNotExists()
{
if (_directory == null)
{
_directory = FileSystem.GetDirectoryClient(_directoryName);
try
{
// Check if directory already exists
_directory.GetProperties();
}
catch (RequestFailedException ex)
{
if (ex.ErrorCode == "BlobNotFound")
{
_directory = await FileSystem.CreateDirectoryAsync(_directoryName);
}
}
}
return _directory;
}
public async Task<bool> AppendToFile(string fileName, string recordContent)
{
long fileLength = 0;
var directory = await CreateDirectoryIfNotExists();
var fileClient = directory.GetFileClient(fileName);
try
{
var fileProperties = (await fileClient.GetPropertiesAsync()).Value;
fileLength = fileProperties.ContentLength;
}
catch (RequestFailedException ex)
{
if (ex.ErrorCode == "BlobNotFound")
{
fileClient = await directory.CreateFileAsync(fileName);
}
}
long recordSize = recordContent.Length;
var recordStream = new MemoryStream(Encoding.UTF8.GetBytes(recordContent));
await fileClient.AppendAsync(recordStream, fileLength);
await fileClient.FlushAsync(recordSize + fileLength);
return true;
}
public async Task<bool> WriteText(string fileName, string recordContent, string contentType = null)
{
return await WriteBytes(fileName, Encoding.UTF8.GetBytes(recordContent), contentType);
}
public async Task<bool> WriteBytes(string fileName, byte[] recordContent, string contentType = null)
{
var directory = await CreateDirectoryIfNotExists();
DataLakeFileClient fileClient = null;
fileClient = (await directory.CreateFileAsync(fileName)).Value;
long recordSize = recordContent.Length;
var recordStream = new MemoryStream(recordContent);
await fileClient.AppendAsync(recordStream, 0);
if (!string.IsNullOrEmpty(contentType))
{
await fileClient.FlushAsync(recordSize,
httpHeaders: new PathHttpHeaders {ContentType = contentType});
}
else
{
await fileClient.FlushAsync(recordSize);
}
return true;
}
public async Task<Uri> GenerateSasUri(string fileName, int validHours = 1)
{
var directory = await CreateDirectoryIfNotExists();
var fileClient = directory.GetFileClient(fileName);
var sasBuilder = new AccountSasBuilder
{
Protocol = SasProtocol.Https,
Services = AccountSasServices.Blobs,
ResourceTypes = AccountSasResourceTypes.All,
StartsOn = DateTimeOffset.UtcNow.AddHours(-1),
ExpiresOn = DateTimeOffset.UtcNow.AddHours(validHours),
IPRange = new SasIPRange(IPAddress.None, IPAddress.None)
};
// Allow read access
sasBuilder.SetPermissions(AccountSasPermissions.Read);
// Create a SharedKeyCredential that we can use to sign the SAS token
var credential = new StorageSharedKeyCredential(_settings.AccountName, _settings.AccountKey);
// Build a SAS URI
//https://traiders.blob.core.windows.net/traiders/datainput
var sasUri = new UriBuilder(fileClient.Uri)
{
Query = sasBuilder.ToSasQueryParameters(credential).ToString()
};
return sasUri.Uri;
}
public async Task<byte[]> ReadFile(string reportName)
{
var directory = await CreateDirectoryIfNotExists();
var fileClient = directory.GetFileClient(reportName);
var memStream = new MemoryStream();
await fileClient.ReadToAsync(memStream);
memStream.Position = 0;
return memStream.ToArray();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment