Skip to content

Instantly share code, notes, and snippets.

@zelid
Last active August 3, 2023 17:23
Show Gist options
  • Star 10 You must be signed in to star a gist
  • Fork 7 You must be signed in to fork a gist
  • Save zelid/6965002 to your computer and use it in GitHub Desktop.
Save zelid/6965002 to your computer and use it in GitHub Desktop.
Examples of BulkInsert for PostgreSQL, MySQL and MS SQL using ServiceStack OrmLite. Work in progress...
public static void BulkInsertNpgsql<T>(this IDbConnection dbConn, IEnumerable<T> list, IEnumerable<string> insertFields = null)
{
if (list == null) return;
if (list.Count() < 1) return;
var objWithAttributes = list.FirstOrDefault();
var modelDef = OrmLiteConfig.GetModelDefinition(objWithAttributes.GetType());
if (insertFields == null) insertFields = new List<string>();
var sbColumnNames = new StringBuilder();
var sbColumnValues = new StringBuilder();
var targetTableName = "";
bool isFirstRecord = true;
var insertFieldsCount = insertFields.Count();
foreach (var record in list)
{
if (isFirstRecord) targetTableName = OrmLiteConfig.DialectProvider.GetQuotedTableName(modelDef);
var defsCount = modelDef.FieldDefinitions.Count;
var counter = 0;
foreach (var fieldDef in modelDef.FieldDefinitions)
{
counter += 1;
if (fieldDef.IsComputed) continue;
if (fieldDef.AutoIncrement) continue;
//insertFields contains Attribute "Name" of fields to insert ( that's how expressions work )
if (insertFieldsCount > 0 && !insertFields.Contains(fieldDef.Name)) continue;
if (sbColumnNames.Length > 0 && isFirstRecord) sbColumnNames.Append(",");
try
{
if (isFirstRecord) sbColumnNames.Append(OrmLiteConfig.DialectProvider.GetQuotedColumnName(fieldDef.FieldName));
var stringValue = "";
var value = fieldDef.GetValue(record);
if (value == null) stringValue = "\\N";
else if (fieldDef.FieldType == typeof(DateTime))
{
var dateValue = (DateTime)value;
const string iso8601Format = "yyyy-MM-dd HH:mm:ss.fff";
stringValue = dateValue.ToString(iso8601Format);
}
else if (fieldDef.FieldType == typeof(Guid))
{
var guidValue = (Guid)value;
stringValue = guidValue.ToString();
}
else
{
stringValue = value.ToString();
}
sbColumnValues.Append(stringValue);
if (counter < defsCount)
{
sbColumnValues.Append("\t");
}
}
catch (Exception ex)
{
throw;
}
}
sbColumnValues.Append("\r\n");
isFirstRecord = false;
}
var strColumnValues = sbColumnValues.ToString();
byte[] byteArrayColumnValues = System.Text.Encoding.UTF8.GetBytes(strColumnValues);
MemoryStream msColumnValues = new MemoryStream(byteArrayColumnValues);
//var connection = dbConn as NpgsqlConnection;
var ormliteConn = dbConn as OrmLiteConnection;
var connection = ormliteConn.DbConnection as NpgsqlConnection;
//var connection = (NpgsqlConnection)dbConn.Database.ToDbConnection();
//var connection = dbConn.DbConnection;
using (var command = new NpgsqlCommand(string.Format("COPY {0} ({1}) FROM STDIN DELIMITER '\t' ", targetTableName, sbColumnNames), connection))
{
var cin = new NpgsqlCopyIn(command, connection, msColumnValues);
try
{
cin.Start();
}
catch (Exception e)
{
try
{
cin.Cancel("Undo copy");
}
catch (NpgsqlException e2)
{
// we should get an error in response to our cancel request:
if (!("" + e2).Contains("Undo copy"))
{
throw new Exception("Failed to cancel copy: " + e2 + " upon failure: " + e);
}
}
throw e;
}
}
}
private static string ToMySqlBulkInsertValue<T>(this FieldDefinition fieldDef, T record)
{
var stringValue = "";
var value = fieldDef.GetValue(record);
if (value == null) stringValue = "";
else if (fieldDef.FieldType == typeof(DateTime))
{
var dateValue = (DateTime)value;
const string iso8601Format = "yyyy-MM-dd HH:mm:ss.fff";
stringValue = dateValue.ToString(iso8601Format);
}
else if (fieldDef.FieldType == typeof(Guid))
{
var guidValue = (Guid)value;
stringValue = guidValue.ToString();
}
else
{
stringValue = value.ToString();
}
return stringValue;
}
// http://theonetechnologies.com/outsourcing/post/mysql-bulk-data-import-using-net-connector-mysqlbulkloader-class.aspx
public static void BulkInsertMySql<T>(this IDbConnection myConnection, IEnumerable<T> list/*, IEnumerable<string> insertFields = null*/)
{
if (list == null) return;
if (list.Count() < 1) return;
var objWithAttributes = list.FirstOrDefault();
var modelDef = OrmLiteConfig.GetModelDefinition(objWithAttributes.GetType());
var targetTableName = OrmLiteConfig.DialectProvider.GetQuotedTableName(modelDef);
var Server = HttpContext.Current.Server;
string strFile = targetTableName + "_" + DateTime.Now.Ticks.ToString() + ".csv";
//Create directory if not exist... Make sure directory has required rights..
if (!Directory.Exists(Server.MapPath("~/TempFolder/")))
Directory.CreateDirectory(Server.MapPath("~/TempFolder/"));
strFile = Server.MapPath("~/TempFolder/") + strFile;
//If file does not exist then create it and right data into it..
if (!File.Exists(strFile))
{
FileStream fs = new FileStream(strFile, FileMode.Create, FileAccess.Write);
fs.Close();
fs.Dispose();
}
bool isFirstRecord = true;
StreamWriter sw = new StreamWriter(strFile, false);
//var insertFieldsCount = insertFields.Count();
var defsCount = modelDef.FieldDefinitions.Count;
foreach (var record in list)
{
if (isFirstRecord)
{
sw.Write(string.Join("\t", modelDef.FieldDefinitions.Select(x => x.Name)));
sw.Write(sw.NewLine);
isFirstRecord = false;
}
sw.Write(string.Join("\t", modelDef.FieldDefinitions.Select(x => x.ToMySqlBulkInsertValue<T>(record))));
sw.Write(sw.NewLine);
}
sw.Close();
sw.Dispose();
var connection = myConnection as MySqlConnection;
var bl = new MySqlBulkLoader(connection);
bl.TableName = targetTableName;
bl.FieldTerminator = "\t";
bl.LineTerminator = sw.NewLine;
bl.FileName = strFile;
bl.NumberOfLinesToSkip = 1;
var inserted = bl.Load();
}
public static void BulkInsertMsSql<T>(this IDbConnection dbConn, IEnumerable<T> list)
{
if (list == null) return;
if (list.Count() < 1) return;
var objWithAttributes = list.FirstOrDefault();
var modelDef = OrmLiteConfig.GetModelDefinition(objWithAttributes.GetType());
var targetTableName = OrmLiteConfig.DialectProvider.GetQuotedTableName(modelDef);
using (var bulkCopy = new SqlBulkCopy(dbConn as SqlConnection))
{
bulkCopy.BatchSize = list.Count();
bulkCopy.DestinationTableName = targetTableName;
var table = new DataTable();
foreach (var fieldDef in modelDef.FieldDefinitions)
{
if (fieldDef.IsComputed) continue;
if (fieldDef.AutoIncrement) continue;
bulkCopy.ColumnMappings.Add(fieldDef.Name, fieldDef.Name);
table.Columns.Add(fieldDef.Name, Nullable.GetUnderlyingType(fieldDef.FieldType) ?? fieldDef.FieldType);
}
foreach (var record in list)
{
//var values = new List<object>();
//var dataRows = new List<DataRow>();
DataRow row = table.NewRow();
foreach (var fieldDef in modelDef.FieldDefinitions)
{
if (fieldDef.IsComputed) continue;
if (fieldDef.AutoIncrement) continue;
//values.Add(fieldDef.GetValue(record));
//values.Add(SqlServerDialect.Provider.ConvertDbValue(fieldDef.GetValue(record), fieldDef.FieldType));
//row.
row[fieldDef.Name] = SqlServerDialect.Provider.ConvertDbValue(fieldDef.GetValue(record), fieldDef.FieldType); //Nullable.GetUnderlyingType(fieldDef.FieldType) ?? fieldDef.FieldType;
}
table.Rows.Add(row);
//table.Rows.Add(values);
}
bulkCopy.WriteToServer(table);
}
}
@sharpe5
Copy link

sharpe5 commented Dec 23, 2016

To get this compiling with ServiceStack v4.5.12, update GetModelDefinition to GetModelMetadata, and update ConvertDbValue to FromDbValue. This works absolutely perfectly with Visual Studio 2017 and SQL Server 2016, and has been battle tested in anger with billions of points of data across hundreds of tables.

If anybody wants the complete final version of code that I am using, please let me know.

@barrychapman
Copy link

Is there one that will do bulk inserts on MySQL?

@mythz
Copy link

mythz commented Oct 20, 2018

Thanks for the examples, is this released under an OSS license?

@rpm61
Copy link

rpm61 commented May 9, 2019

I am getting "Cannot set Column 'foo' to be null. Please use DBNull instead.".

@jklemmack
Copy link

I am getting "Cannot set Column 'foo' to be null. Please use DBNull instead.".

Welcome to the persnippity world of SqlBulkCopy ☹️

@zelid
Copy link
Author

zelid commented Mar 2, 2021

Thanks for the examples, is this released under an OSS license?

Yep. OSS. @mythz Feel free to integrate it to OrmLite if needed.

Funny thing that I completely forgot about this Gist and found a link to it on https://forums.servicestack.net/t/how-to-insert-multiple-records-with-1-insert/6494/2 when was googling how to do a bulk insert with OrmLite ;-)

@zelid
Copy link
Author

zelid commented Mar 2, 2021

@sharpe5 please also note Postgres Bulk insert API has been changed - http://www.npgsql.org/doc/copy.html
I will try to update the code with your changes and new Bulk Insert API

@mythz
Copy link

mythz commented Mar 3, 2021

@zelid Thanks for the update!

When you update the gist can you add the OSS license you're releasing it under e.g. // MIT or if you want to make it freely available you can release it to public domain with // Any copyright is dedicated to the Public Domain..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment