Skip to content

Instantly share code, notes, and snippets.

@cloudolaf
Last active June 27, 2017 05:56
Show Gist options
  • Save cloudolaf/c9d4d50c7a507e56baae929c294817dd to your computer and use it in GitHub Desktop.
Save cloudolaf/c9d4d50c7a507e56baae929c294817dd to your computer and use it in GitHub Desktop.
ADF demo - DataDownLoaderActivity
using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.Management.DataFactories.Models;
using Microsoft.Azure.Management.DataFactories.Runtime;
using Newtonsoft.Json;
namespace DataDownloaderActivityNS
{
public class DataDownloaderActivity : IDotNetActivity
{
private string _dataStorageAccountKey;
private string _dataStorageAccountName;
private string _dataStorageContainer;
private IActivityLogger _logger;
public IDictionary<string, string> Execute(
IEnumerable<LinkedService> linkedServices,
IEnumerable<Dataset> datasets,
Activity activity,
IActivityLogger logger)
{
logger.Write("DataDownloaderActivity started");
var inputDataSet = datasets.Single(ds => ds.Name == activity.Inputs[0].Name);
var outputDataSet = datasets.Single(ds => ds.Name == activity.Outputs[0].Name);
var inputService = linkedServices.Single(s => s.Name == inputDataSet.Properties.LinkedServiceName);
var tableInputProperties = inputDataSet.Properties.TypeProperties as AzureSqlTableDataset;
var sqlServiceInputProperties = inputService.Properties.TypeProperties as AzureSqlDatabaseLinkedService;
logger.Write("Reading config data");
var dnActivity = activity.TypeProperties as DotNetActivity;
var uri = dnActivity.ExtendedProperties["output:Uri"];
logger.Write("Creating column mappings");
var columnMappings = dnActivity.ExtendedProperties["columnMappings"].Split(',');
// column key-value pair contains output column as key, input column as value
var columnDictionary = columnMappings.ToDictionary(s => s.Split(':')[1], s => s.Split(':')[0]);
logger.Write("Creating SQL connection");
try
{
using (var connection = new SqlConnection(sqlServiceInputProperties.ConnectionString))
{
var chunkIndex = 0;
var columnNames = inputDataSet.Properties.Structure.Select(s => s.Name).ToList();
var command = new SqlCommand(
$"SELECT {string.Join(", ", columnNames)} FROM {tableInputProperties.TableName}",
connection);
connection.Open();
var reader = command.ExecuteReader();
if (reader.HasRows)
{
var sb = new StringBuilder();
var sw = new StringWriter(sb);
var chunkSize = 250;
using (JsonWriter jsonWriter = new JsonTextWriter(sw))
{
while (reader.Read())
{
if (chunkIndex % chunkSize == 0)
jsonWriter.WriteStartArray();
chunkIndex++;
jsonWriter.WriteStartObject();
foreach (var dataElement in outputDataSet.Properties.Structure)
if (columnDictionary.ContainsKey(dataElement.Name))
{
var inputColumn = columnDictionary[dataElement.Name];
var index = columnNames.IndexOf(inputColumn);
if (index >= 0)
{
jsonWriter.WritePropertyName(dataElement.Name);
jsonWriter.WriteValue(reader[index]);
}
}
jsonWriter.WriteEndObject();
if (chunkIndex % chunkSize == 0)
{
jsonWriter.WriteEndArray();
AsyncHelper.RunSync(() => SendJson(logger, sw.ToString(), uri));
sb.Clear();
}
}
// if the end of the chunk hasn't been reached before the data is all read, end the chunk
if (chunkIndex % chunkSize != 0)
{
jsonWriter.WriteEndArray();
AsyncHelper.RunSync(() => SendJson(logger, sw.ToString(), uri));
}
}
}
else
{
logger.Write("No rows found.");
}
reader.Close();
logger.Write($"Completed run succesfully with {chunkIndex} rows");
}
}
catch (Exception ex)
{
logger.Write(ex.ToString());
}
return new Dictionary<string, string>();
}
private async Task SendJson(IActivityLogger logger, string json, string uri)
{
var httpClient = new HttpClient();
using (var response =
await httpClient.PostAsync(uri, new StringContent(json, Encoding.UTF8, "application/json")))
{
try
{
response.EnsureSuccessStatusCode();
}
catch (Exception e)
{
logger.Write(e.Message);
throw;
}
}
Console.WriteLine(json);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment