Skip to content

Instantly share code, notes, and snippets.

@kobi
Created August 4, 2020 09:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kobi/f56ab3cbb30cf79f04b3d08384ad4e81 to your computer and use it in GitHub Desktop.
Save kobi/f56ab3cbb30cf79f04b3d08384ad4e81 to your computer and use it in GitHub Desktop.
Example code for Entity Framework 6 – Bulk Insert and Returning of Generated Primary Keys https://kobikobi.wordpress.com/2018/02/05/entity-framework-6-bulk-insert-and-returning-of-generated-primary-keys/
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Data.Entity;
using System.Data.SqlClient;
using System.Linq;
using System.Linq.Expressions;
using EntityFramework.Utilities;
using log4net;
public interface IHasPrimaryKey
{
long PrimaryKey { get; set; }
//this is needed because EntityFramework.Utilities doesn't reveal this internal information to the SQL provider.
// https://github.com/MikaelEliasson/EntityFramework.Utilities/blob/a5abc50b7367d64ca541b6e7e2e6018a500b6d8d/EntityFramework.Utilities/EntityFramework.Utilities/EFBatchOperation.cs#L119
string PrimaryKeyPropertyName { get; }
}
public static class BulkOperationsExtensions
{
static BulkOperationsExtensions()
{
SetProvider();
}
private static void SetProvider()
{
Configuration.Providers.Clear();
Configuration.Providers.Add(new BulkInsertEntityFrameworkUtitlitiesSqlProvider());
}
private static readonly object SyncLock = new object();
private static void SyncronizedInitContextMetadata(DbContext context)
{
lock (SyncLock)
{
//this method is not thread safe, so we lock it.
EfMappingFactory.GetMappingsForContext(context);
}
}
public static void InsertAll(this DbContext context, IDbSet dbSet, IList items,
BulkFailurePolicy failurePolicy = BulkFailurePolicy.AllOrNothing)
where TEntity : class
{
SyncronizedInitContextMetadata(context);
var wrapper = new BulkInsertionCollectionMetadata(items) { SetGeneratedRecordIds = false, };
RetryOnFailure(wrapper, data => EFBatchOperation.For(context, dbSet).InsertAll(data), failurePolicy);
}
public static void InsertAllAndReturnRecordIds(this DbContext context, IDbSet dbSet,
IList items,
BulkFailurePolicy failurePolicy = BulkFailurePolicy.AllOrNothing) where TEntity : class, IHasPrimaryKey
{
SyncronizedInitContextMetadata(context);
var wrapper = new BulkInsertionCollectionMetadata(items) { SetGeneratedRecordIds = true, };
RetryOnFailure(wrapper, data => EFBatchOperation.For(context, dbSet).InsertAll(data), failurePolicy);
}
private const int MaxRetries = 80;
private const int MaxRowsPerInsertAction = 3000;
private static void RetryOnFailure(BulkInsertionCollectionMetadata items,
Action<BulkInsertionCollectionMetadata> insertAction, BulkFailurePolicy failurePolicy,
int retryCounter = 0)
{
// If there are too many items, split the load to smaller items.
// This is only allowed if we don't have to rollback after filures.
if (failurePolicy == BulkFailurePolicy.IgnoreFailures && items.Count > MaxRowsPerInsertAction)
{
var split = items.Split();
foreach (var smallerCollection in split)
{
RetryOnFailure(smallerCollection, insertAction, failurePolicy, retryCounter);
}
return;
}
try
{
insertAction(items);
}
//We are catching all exceptions. We can also get InvalidOperationException or others, not just SqlException.
catch (Exception exception) when (exception.Message.Contains("deadlock"))
{
Log.Error($"Encountered deadlock while ingesting records of type {typeof(TEntity).Name} to database.", exception);
//never ignore deadlock related failures, we will throw this exception.
if (retryCounter > MaxRetries) throw;
RetryOnFailure(items, insertAction, failurePolicy, retryCounter + 1);
}
//any other exception - split and retry
catch (Exception exception)
{
if (items.Count == 1 && failurePolicy == BulkFailurePolicy.IgnoreFailures)
{
Log.Error($"Problem ingesting record of type {typeof(TEntity).Name} to database.", exception);
return;
}
if (failurePolicy == BulkFailurePolicy.AllOrNothing)
{
Log.Error($"Problem ingesting records of type {typeof(TEntity).Name} to database.", exception);
throw;
}
//else - split the list to two
var split = items.Split();
foreach (var smallerCollection in split)
{
RetryOnFailure(smallerCollection, insertAction, failurePolicy, retryCounter + 1);
}
}
}
private static readonly ILog Log = LogManager.GetLogger(typeof(BulkOperationsExtensions));
}
public enum BulkFailurePolicy
{
/// <summary>
/// The collection will be insertes as whole, or all rows will be rolled back and an exception will be thrown.
/// </summary>
AllOrNothing,
/// <summary>
/// On failure, the collection will be devided and retried, until individual rows fail.
/// </summary>
IgnoreFailures,
}
/// <summary>
/// This class is used because we cannot pass additional parameters to
///
/// </summary>
internal class BulkInsertionCollectionMetadata : IList
{
private readonly IList _items;
public BulkInsertionCollectionMetadata(IList items)
{
_items = items;
}
public bool SetGeneratedRecordIds { get; set; }
public IEnumerator GetEnumerator() => _items.GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => ((IEnumerable)_items).GetEnumerator();
public void Add(T item) => _items.Add(item);
public void Clear() => _items.Clear();
public bool Contains(T item) => _items.Contains(item);
public void CopyTo(T[] array, int arrayIndex) => _items.CopyTo(array, arrayIndex);
public bool Remove(T item) => _items.Remove(item);
public int Count => _items.Count;
public bool IsReadOnly => _items.IsReadOnly;
public int IndexOf(T item) => _items.IndexOf(item);
public void Insert(int index, T item) => _items.Insert(index, item);
public void RemoveAt(int index) => _items.RemoveAt(index);
public T this[int index]
{
get => _items[index];
set => _items[index] = value;
}
public BulkInsertionCollectionMetadata[] Split()
{
var itemsMiddle = _items.Count / 2;
return new[]
{
CopyWithOtherItems(_items.Take(itemsMiddle).ToList()),
CopyWithOtherItems(_items.Skip(itemsMiddle).ToList()),
};
}
private BulkInsertionCollectionMetadata CopyWithOtherItems(IList items)
=> new BulkInsertionCollectionMetadata(items)
{
SetGeneratedRecordIds = SetGeneratedRecordIds
};
}
/// <summary>
/// Override provider to fix some bugs.
/// </summary>
///
/// https://github.com/MikaelEliasson/EntityFramework.Utilities/blob/a5abc50b7367d64ca541b6e7e2e6018a500b6d8d/EntityFramework.Utilities/EntityFramework.Utilities/SqlQueryProvider.cs
///
public class BulkInsertEntityFrameworkUtitlitiesSqlProvider : SqlQueryProvider, EntityFramework.Utilities.IQueryProvider
{
public new void InsertItems(IEnumerable items, string schema, string tableName, IList properties, DbConnection storeConnection, int? batchSize)
{
var con = (SqlConnection)storeConnection;
var wrapperData = items as BulkInsertionCollectionMetadata;
bool shouldReturnRecordIds = wrapperData?.SetGeneratedRecordIds == true;
if (shouldReturnRecordIds)
{
BulkInsertAllAndReturnIds(wrapperData, schema, tableName, properties, con, batchSize);
}
else
{
BulkInsertAll(items, schema, tableName, properties, con, batchSize, SqlBulkCopyOptions.KeepNulls);
}
}
private static void BulkInsertAll(IEnumerable items, string schema, string tableName, IList properties,
SqlConnection connection, int? batchSize, SqlBulkCopyOptions insertOptions)
{
using (var reader = new EFDataReader(items, properties))
{
if (connection.State != ConnectionState.Open)
{
connection.Open();
}
using (SqlBulkCopy copy = new SqlBulkCopy(connection.ConnectionString, insertOptions))
{
copy.BulkCopyTimeout = 0;
copy.BatchSize = Math.Min(reader.RecordsAffected, batchSize ?? 15000); //default batch size
if (tableName.StartsWith("#"))
{
copy.DestinationTableName = tableName;
}
else if (!string.IsNullOrWhiteSpace(schema))
{
copy.DestinationTableName = $"[{schema}].[{tableName}]";
}
else
{
copy.DestinationTableName = "[" + tableName + "]";
}
copy.NotifyAfter = 0;
foreach (var i in Enumerable.Range(0, reader.FieldCount))
{
copy.ColumnMappings.Add(i, properties[i].NameInDatabase);
}
copy.WriteToServer(reader);
copy.Close();
}
}
}
private static void BulkInsertAllAndReturnIds(BulkInsertionCollectionMetadata items, string schema,
string tableName,
IList properties, SqlConnection connection, int? batchSize)
{
if (items.Count == 0) return;
long dummyValue = -1000 - items.Count;
//set dummy IDs
foreach (var item in items)
{
((IHasPrimaryKey)item).PrimaryKey = dummyValue;
dummyValue++;
}
try
{
if (connection.State != ConnectionState.Open)
{
connection.Open();
}
//create dummy table.
using (var tempTable = new TempTable(connection, tableName, schema))
{
var createTempTableSql = $"Select * Into {tempTable.TableName} From {tableName} Where 1 = 2";
using (var command = new SqlCommand(createTempTableSql, connection))
{
command.ExecuteNonQuery();
}
//bulk insert to temp table.
BulkInsertAll(items, schema, tempTable.TableName, properties, connection, batchSize,
SqlBulkCopyOptions.KeepNulls | SqlBulkCopyOptions.KeepIdentity);
//note: IsPrimaryKey is not populated in InsertAll
// https://github.com/MikaelEliasson/EntityFramework.Utilities/blob/a5abc50b7367d64ca541b6e7e2e6018a500b6d8d/EntityFramework.Utilities/EntityFramework.Utilities/EFBatchOperation.cs#L129
string primaryKeyNameOnObject = ((IHasPrimaryKey)items.First()).PrimaryKeyPropertyName;
var primaryKey = properties.Single(c => c.NameOnObject == primaryKeyNameOnObject);
var otherColumns = properties.Where(p => p != primaryKey);
var allValueColumns = String.Join(", ", otherColumns.Select(c => "[" + c.NameInDatabase + "]"));
//insert to real table and get new IDs.
//this guarantees the record IDs are generated in the right order.
var migrateAndReturnIds =
$@"
insert into {tableName} ({allValueColumns})
OUTPUT inserted.{primaryKey.NameInDatabase}
select {allValueColumns} from {tempTable.TableName} temp
order by temp.{primaryKey.NameInDatabase}
";
var newlyGeneratedIds = new List(items.Count);
using (var migrateDataCommand = new SqlCommand(migrateAndReturnIds, connection)
{
CommandTimeout = 0
})
using (var recordIdReader = migrateDataCommand.ExecuteReader())
{
while (recordIdReader.Read())
{
var newId = recordIdReader.GetInt64(0);
newlyGeneratedIds.Add(newId);
}
}
//set IDs on entities.
if (newlyGeneratedIds.Count != items.Count)
{
throw new MissingPrimaryKeyException("There are fewer generated record IDs than the " +
"number of items inserted to the database.");
}
//the order of the IDs is not guaranteed, but the values will be generated in the same as the order values in `items`
newlyGeneratedIds.Sort();
for (int i = 0; i < newlyGeneratedIds.Count; i++)
{
((IHasPrimaryKey)items[i]).PrimaryKey = newlyGeneratedIds[i];
}
}
}
finally
{
//make sure the ID is 0 if the row wasn't inserted.
foreach (var item in items)
{
var entity = (IHasPrimaryKey)item;
if (entity.PrimaryKey < 0) entity.PrimaryKey = 0;
}
}
}
private static string GetPropertyName(Expression<Func> propertyLambda)
{
var temp = propertyLambda.Body;
while (temp is UnaryExpression)
{
temp = (temp as UnaryExpression).Operand;
}
MemberExpression member = temp as MemberExpression;
return member?.Member.Name;
}
public new void UpdateItems(IEnumerable items, string schema, string tableName,
IList properties, DbConnection storeConnection, int? batchSize,
UpdateSpecification updateSpecification)
{
var columnsToUpdate = updateSpecification.Properties.Select(GetPropertyName).ToDictionary(x => x);
var filtered = properties.Where(p => columnsToUpdate.ContainsKey(p.NameOnObject) || p.IsPrimaryKey).ToList();
var columns = filtered.Select(c => "[" + c.NameInDatabase + "] " + c.DataType);
var pkConstraint = string.Join(", ",
properties.Where(p => p.IsPrimaryKey).Select(c => "[" + c.NameInDatabase + "]"));
var con = (SqlConnection)storeConnection;
if (con.State != ConnectionState.Open)
{
con.Open();
}
var tempTable = new TempTable(con, tableName, schema);
var createTempTableSql =
$"CREATE TABLE {schema}.[{tempTable.TableName}]({string.Join(", ", columns)}, PRIMARY KEY ({pkConstraint}))";
var setters = string.Join(",",
filtered.Where(c => !c.IsPrimaryKey)
.Select(c => "[" + c.NameInDatabase + "] = TEMP.[" + c.NameInDatabase + "]"));
var pks =
properties.Where(p => p.IsPrimaryKey)
.Select(x => "ORIG.[" + x.NameInDatabase + "] = TEMP.[" + x.NameInDatabase + "]");
var filter = string.Join(" and ", pks);
var mergeCommand = string.Format(@"UPDATE [{0}]
SET
{3}
FROM
[{0}] ORIG
INNER JOIN
[{1}] TEMP
ON
{2}", tableName, tempTable.TableName, filter, setters);
using (var createCommand = new SqlCommand(createTempTableSql, con))
using (var mCommand = new SqlCommand(mergeCommand, con))
using (tempTable)
{
createCommand.ExecuteNonQuery();
InsertItems(items, schema, tempTable.TableName, filtered, storeConnection, batchSize);
mCommand.ExecuteNonQuery();
}
}
private static string GetTempTableName(string baseTableName)
//consider using a temp table: "#". This doesn't work with bulk insert.
=> "temp_" + baseTableName + "_" + DateTime.UtcNow.ToString("yyyyMMdd_HHmmss_fff") + "_" +
Guid.NewGuid().ToString("N");
private class TempTable : IDisposable
{
private readonly SqlConnection _connection;
private readonly string _dbSchema;
public string TableName { get; }
public TempTable(SqlConnection connection, string baseTableName, string dbSchema)
{
_connection = connection;
_dbSchema = dbSchema;
TableName = GetTempTableName(baseTableName);
}
public void Dispose()
{
try
{
using (var dCommand = new SqlCommand($"DROP table {_dbSchema}.[{TableName}]", _connection))
{
dCommand.ExecuteNonQuery();
}
}
catch
{
//Nothing to do. don't care.
}
}
}
}
public partial class Spaceship
{
public virtual long SpaceshipId { get; set; }
}
partial class Spaceship : IHasPrimaryKey
{
long IHasPrimaryKey.PrimaryKey
{
get => SpaceshipId;
set => SpaceshipId = value;
}
string IHasPrimaryKey.PrimaryKeyPropertyName
=> nameof(SpaceshipId);
}
context.InsertAllAndReturnRecordIds(context.Spaceships,
purchasedSpaceships, BulkFailurePolicy.IgnoreFailures);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment