Last active
June 27, 2017 05:56
-
-
Save cloudolaf/c9d4d50c7a507e56baae929c294817dd to your computer and use it in GitHub Desktop.
ADF demo - DataDownLoaderActivity
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Data.SqlClient; | |
using System.IO; | |
using System.Linq; | |
using System.Net.Http; | |
using System.Text; | |
using System.Threading.Tasks; | |
using Microsoft.Azure.Management.DataFactories.Models; | |
using Microsoft.Azure.Management.DataFactories.Runtime; | |
using Newtonsoft.Json; | |
namespace DataDownloaderActivityNS | |
{ | |
public class DataDownloaderActivity : IDotNetActivity | |
{ | |
private string _dataStorageAccountKey; | |
private string _dataStorageAccountName; | |
private string _dataStorageContainer; | |
private IActivityLogger _logger; | |
public IDictionary<string, string> Execute( | |
IEnumerable<LinkedService> linkedServices, | |
IEnumerable<Dataset> datasets, | |
Activity activity, | |
IActivityLogger logger) | |
{ | |
logger.Write("DataDownloaderActivity started"); | |
var inputDataSet = datasets.Single(ds => ds.Name == activity.Inputs[0].Name); | |
var outputDataSet = datasets.Single(ds => ds.Name == activity.Outputs[0].Name); | |
var inputService = linkedServices.Single(s => s.Name == inputDataSet.Properties.LinkedServiceName); | |
var tableInputProperties = inputDataSet.Properties.TypeProperties as AzureSqlTableDataset; | |
var sqlServiceInputProperties = inputService.Properties.TypeProperties as AzureSqlDatabaseLinkedService; | |
logger.Write("Reading config data"); | |
var dnActivity = activity.TypeProperties as DotNetActivity; | |
var uri = dnActivity.ExtendedProperties["output:Uri"]; | |
logger.Write("Creating column mappings"); | |
var columnMappings = dnActivity.ExtendedProperties["columnMappings"].Split(','); | |
// column key-value pair contains output column as key, input column as value | |
var columnDictionary = columnMappings.ToDictionary(s => s.Split(':')[1], s => s.Split(':')[0]); | |
logger.Write("Creating SQL connection"); | |
try | |
{ | |
using (var connection = new SqlConnection(sqlServiceInputProperties.ConnectionString)) | |
{ | |
var chunkIndex = 0; | |
var columnNames = inputDataSet.Properties.Structure.Select(s => s.Name).ToList(); | |
var command = new SqlCommand( | |
$"SELECT {string.Join(", ", columnNames)} FROM {tableInputProperties.TableName}", | |
connection); | |
connection.Open(); | |
var reader = command.ExecuteReader(); | |
if (reader.HasRows) | |
{ | |
var sb = new StringBuilder(); | |
var sw = new StringWriter(sb); | |
var chunkSize = 250; | |
using (JsonWriter jsonWriter = new JsonTextWriter(sw)) | |
{ | |
while (reader.Read()) | |
{ | |
if (chunkIndex % chunkSize == 0) | |
jsonWriter.WriteStartArray(); | |
chunkIndex++; | |
jsonWriter.WriteStartObject(); | |
foreach (var dataElement in outputDataSet.Properties.Structure) | |
if (columnDictionary.ContainsKey(dataElement.Name)) | |
{ | |
var inputColumn = columnDictionary[dataElement.Name]; | |
var index = columnNames.IndexOf(inputColumn); | |
if (index >= 0) | |
{ | |
jsonWriter.WritePropertyName(dataElement.Name); | |
jsonWriter.WriteValue(reader[index]); | |
} | |
} | |
jsonWriter.WriteEndObject(); | |
if (chunkIndex % chunkSize == 0) | |
{ | |
jsonWriter.WriteEndArray(); | |
AsyncHelper.RunSync(() => SendJson(logger, sw.ToString(), uri)); | |
sb.Clear(); | |
} | |
} | |
// if the end of the chunk hasn't been reached before the data is all read, end the chunk | |
if (chunkIndex % chunkSize != 0) | |
{ | |
jsonWriter.WriteEndArray(); | |
AsyncHelper.RunSync(() => SendJson(logger, sw.ToString(), uri)); | |
} | |
} | |
} | |
else | |
{ | |
logger.Write("No rows found."); | |
} | |
reader.Close(); | |
logger.Write($"Completed run succesfully with {chunkIndex} rows"); | |
} | |
} | |
catch (Exception ex) | |
{ | |
logger.Write(ex.ToString()); | |
} | |
return new Dictionary<string, string>(); | |
} | |
private async Task SendJson(IActivityLogger logger, string json, string uri) | |
{ | |
var httpClient = new HttpClient(); | |
using (var response = | |
await httpClient.PostAsync(uri, new StringContent(json, Encoding.UTF8, "application/json"))) | |
{ | |
try | |
{ | |
response.EnsureSuccessStatusCode(); | |
} | |
catch (Exception e) | |
{ | |
logger.Write(e.Message); | |
throw; | |
} | |
} | |
Console.WriteLine(json); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment