Skip to content

Instantly share code, notes, and snippets.

@sharpe5
Forked from zelid/gist:6965002
Created December 23, 2016 11:34
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save sharpe5/890a378ef8278a81792039201d4d690b to your computer and use it in GitHub Desktop.
Examples of BulkInsert for PostgreSQL, MySQL and MS SQL using ServiceStack OrmLite. Work in progress...
public static void BulkInsertNpgsql<T>(this IDbConnection dbConn, IEnumerable<T> list, IEnumerable<string> insertFields = null)
{
if (list == null) return;
if (list.Count() < 1) return;
var objWithAttributes = list.FirstOrDefault();
var modelDef = OrmLiteConfig.GetModelDefinition(objWithAttributes.GetType());
if (insertFields == null) insertFields = new List<string>();
var sbColumnNames = new StringBuilder();
var sbColumnValues = new StringBuilder();
var targetTableName = "";
bool isFirstRecord = true;
var insertFieldsCount = insertFields.Count();
foreach (var record in list)
{
if (isFirstRecord) targetTableName = OrmLiteConfig.DialectProvider.GetQuotedTableName(modelDef);
var defsCount = modelDef.FieldDefinitions.Count;
var counter = 0;
foreach (var fieldDef in modelDef.FieldDefinitions)
{
counter += 1;
if (fieldDef.IsComputed) continue;
if (fieldDef.AutoIncrement) continue;
//insertFields contains Attribute "Name" of fields to insert ( that's how expressions work )
if (insertFieldsCount > 0 && !insertFields.Contains(fieldDef.Name)) continue;
if (sbColumnNames.Length > 0 && isFirstRecord) sbColumnNames.Append(",");
try
{
if (isFirstRecord) sbColumnNames.Append(OrmLiteConfig.DialectProvider.GetQuotedColumnName(fieldDef.FieldName));
var stringValue = "";
var value = fieldDef.GetValue(record);
if (value == null) stringValue = "\\N";
else if (fieldDef.FieldType == typeof(DateTime))
{
var dateValue = (DateTime)value;
const string iso8601Format = "yyyy-MM-dd HH:mm:ss.fff";
stringValue = dateValue.ToString(iso8601Format);
}
else if (fieldDef.FieldType == typeof(Guid))
{
var guidValue = (Guid)value;
stringValue = guidValue.ToString();
}
else
{
stringValue = value.ToString();
}
sbColumnValues.Append(stringValue);
if (counter < defsCount)
{
sbColumnValues.Append("\t");
}
}
catch (Exception ex)
{
throw;
}
}
sbColumnValues.Append("\r\n");
isFirstRecord = false;
}
var strColumnValues = sbColumnValues.ToString();
byte[] byteArrayColumnValues = System.Text.Encoding.UTF8.GetBytes(strColumnValues);
MemoryStream msColumnValues = new MemoryStream(byteArrayColumnValues);
//var connection = dbConn as NpgsqlConnection;
var ormliteConn = dbConn as OrmLiteConnection;
var connection = ormliteConn.DbConnection as NpgsqlConnection;
//var connection = (NpgsqlConnection)dbConn.Database.ToDbConnection();
//var connection = dbConn.DbConnection;
using (var command = new NpgsqlCommand(string.Format("COPY {0} ({1}) FROM STDIN DELIMITER '\t' ", targetTableName, sbColumnNames), connection))
{
var cin = new NpgsqlCopyIn(command, connection, msColumnValues);
try
{
cin.Start();
}
catch (Exception e)
{
try
{
cin.Cancel("Undo copy");
}
catch (NpgsqlException e2)
{
// we should get an error in response to our cancel request:
if (!("" + e2).Contains("Undo copy"))
{
throw new Exception("Failed to cancel copy: " + e2 + " upon failure: " + e);
}
}
throw e;
}
}
}
private static string ToMySqlBulkInsertValue<T>(this FieldDefinition fieldDef, T record)
{
var stringValue = "";
var value = fieldDef.GetValue(record);
if (value == null) stringValue = "";
else if (fieldDef.FieldType == typeof(DateTime))
{
var dateValue = (DateTime)value;
const string iso8601Format = "yyyy-MM-dd HH:mm:ss.fff";
stringValue = dateValue.ToString(iso8601Format);
}
else if (fieldDef.FieldType == typeof(Guid))
{
var guidValue = (Guid)value;
stringValue = guidValue.ToString();
}
else
{
stringValue = value.ToString();
}
return stringValue;
}
// http://theonetechnologies.com/outsourcing/post/mysql-bulk-data-import-using-net-connector-mysqlbulkloader-class.aspx
public static void BulkInsertMySql<T>(this IDbConnection myConnection, IEnumerable<T> list/*, IEnumerable<string> insertFields = null*/)
{
if (list == null) return;
if (list.Count() < 1) return;
var objWithAttributes = list.FirstOrDefault();
var modelDef = OrmLiteConfig.GetModelDefinition(objWithAttributes.GetType());
var targetTableName = OrmLiteConfig.DialectProvider.GetQuotedTableName(modelDef);
var Server = HttpContext.Current.Server;
string strFile = targetTableName + "_" + DateTime.Now.Ticks.ToString() + ".csv";
//Create directory if not exist... Make sure directory has required rights..
if (!Directory.Exists(Server.MapPath("~/TempFolder/")))
Directory.CreateDirectory(Server.MapPath("~/TempFolder/"));
strFile = Server.MapPath("~/TempFolder/") + strFile;
//If file does not exist then create it and right data into it..
if (!File.Exists(strFile))
{
FileStream fs = new FileStream(strFile, FileMode.Create, FileAccess.Write);
fs.Close();
fs.Dispose();
}
bool isFirstRecord = true;
StreamWriter sw = new StreamWriter(strFile, false);
//var insertFieldsCount = insertFields.Count();
var defsCount = modelDef.FieldDefinitions.Count;
foreach (var record in list)
{
if (isFirstRecord)
{
sw.Write(string.Join("\t", modelDef.FieldDefinitions.Select(x => x.Name)));
sw.Write(sw.NewLine);
isFirstRecord = false;
}
sw.Write(string.Join("\t", modelDef.FieldDefinitions.Select(x => x.ToMySqlBulkInsertValue<T>(record))));
sw.Write(sw.NewLine);
}
sw.Close();
sw.Dispose();
var connection = myConnection as MySqlConnection;
var bl = new MySqlBulkLoader(connection);
bl.TableName = targetTableName;
bl.FieldTerminator = "\t";
bl.LineTerminator = sw.NewLine;
bl.FileName = strFile;
bl.NumberOfLinesToSkip = 1;
var inserted = bl.Load();
}
public static void BulkInsertMsSql<T>(this IDbConnection dbConn, IEnumerable<T> list)
{
if (list == null) return;
if (list.Count() < 1) return;
var objWithAttributes = list.FirstOrDefault();
var modelDef = OrmLiteConfig.GetModelDefinition(objWithAttributes.GetType());
var targetTableName = OrmLiteConfig.DialectProvider.GetQuotedTableName(modelDef);
using (var bulkCopy = new SqlBulkCopy(dbConn as SqlConnection))
{
bulkCopy.BatchSize = list.Count();
bulkCopy.DestinationTableName = targetTableName;
var table = new DataTable();
foreach (var fieldDef in modelDef.FieldDefinitions)
{
if (fieldDef.IsComputed) continue;
if (fieldDef.AutoIncrement) continue;
bulkCopy.ColumnMappings.Add(fieldDef.Name, fieldDef.Name);
table.Columns.Add(fieldDef.Name, Nullable.GetUnderlyingType(fieldDef.FieldType) ?? fieldDef.FieldType);
}
foreach (var record in list)
{
//var values = new List<object>();
//var dataRows = new List<DataRow>();
DataRow row = table.NewRow();
foreach (var fieldDef in modelDef.FieldDefinitions)
{
if (fieldDef.IsComputed) continue;
if (fieldDef.AutoIncrement) continue;
//values.Add(fieldDef.GetValue(record));
//values.Add(SqlServerDialect.Provider.ConvertDbValue(fieldDef.GetValue(record), fieldDef.FieldType));
//row.
row[fieldDef.Name] = SqlServerDialect.Provider.ConvertDbValue(fieldDef.GetValue(record), fieldDef.FieldType); //Nullable.GetUnderlyingType(fieldDef.FieldType) ?? fieldDef.FieldType;
}
table.Rows.Add(row);
//table.Rows.Add(values);
}
bulkCopy.WriteToServer(table);
}
}
@sharpe5
Copy link
Author

sharpe5 commented Dec 23, 2016

To get this compiling with ServiceStack v4.5.4, update GetModelDefinition to GetModelMetadata, and update ConvertDbValue to FromDbValue. Not sure if this works, but at least it compiles!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment