Skip to content

Instantly share code, notes, and snippets.

@perfectly-panda
Last active May 19, 2018 18:49
Show Gist options
  • Save perfectly-panda/ff9e3047d8341398df4aea7557f0a82c to your computer and use it in GitHub Desktop.
Save perfectly-panda/ff9e3047d8341398df4aea7557f0a82c to your computer and use it in GitHub Desktop.
Use column names to map csv file to extract columns
using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using Microsoft.VisualBasic.FileIO;
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using System.Linq;
using System.Text;
[SqlUserDefinedExtractor(AtomicFileProcessing = true)]
public class CSVExtractor : IExtractor
{
private string[] delimiters;
private string sqlSet;
/// <summary/>
public CSVExtractor(string sqlSet = null, string[] delimiters = null)
{
this.sqlSet = sqlSet;
this.delimiters = new string[]{ ","};
if(delimiters != null) this.delimiters = delimiters;
}
public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow output)
{
using (var reader = new TextFieldParser(input.BaseStream))
{
//Set options
reader.Delimiters = this.delimiters;
reader.TrimWhiteSpace = true;
reader.HasFieldsEnclosedInQuotes = true;
//get header row
var headers = reader.ReadFields();
while (!reader.EndOfData)
{
var line = reader.ReadFields();
this.ObjectToRow(line, output, headers);
yield return output.AsReadOnly();
}
}
}
protected virtual void ObjectToRow(string[] items, IUpdatableRow row, string[] headers)
{
var sqlmap = new Dictionary<string, string>();
for (var i = 0; i < headers.Length; i++)
{
var name = row.Schema.Where(r => r.Name.Replace(" ", "").Replace("#", "").Replace("/", "").ToLower() == headers[i].ToLower()).Select(s => s.Name).FirstOrDefault();
if (name != null && items.Length > i)
{
row.Set<object>(name, items[i]);
}
else
{
if (!sqlmap.Keys.Contains(headers[i].ToLower()) && items.Length > i) sqlmap.Add(headers[i].ToLower(), items[i]);
}
}
if (this.sqlSet != null)
{
row.Set<SqlMap<string, string>>(this.sqlSet, new SqlMap<string, string>(sqlmap));
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment