Created
August 4, 2020 09:10
-
-
Save kobi/f56ab3cbb30cf79f04b3d08384ad4e81 to your computer and use it in GitHub Desktop.
Example code for Entity Framework 6 – Bulk Insert and Returning of Generated Primary Keys https://kobikobi.wordpress.com/2018/02/05/entity-framework-6-bulk-insert-and-returning-of-generated-primary-keys/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections; | |
using System.Collections.Generic; | |
using System.Data; | |
using System.Data.Common; | |
using System.Data.Entity; | |
using System.Data.SqlClient; | |
using System.Linq; | |
using System.Linq.Expressions; | |
using EntityFramework.Utilities; | |
using log4net; | |
public interface IHasPrimaryKey | |
{ | |
long PrimaryKey { get; set; } | |
//this is needed because EntityFramework.Utilities doesn't reveal this internal information to the SQL provider. | |
// https://github.com/MikaelEliasson/EntityFramework.Utilities/blob/a5abc50b7367d64ca541b6e7e2e6018a500b6d8d/EntityFramework.Utilities/EntityFramework.Utilities/EFBatchOperation.cs#L119 | |
string PrimaryKeyPropertyName { get; } | |
} | |
public static class BulkOperationsExtensions | |
{ | |
static BulkOperationsExtensions() | |
{ | |
SetProvider(); | |
} | |
private static void SetProvider() | |
{ | |
Configuration.Providers.Clear(); | |
Configuration.Providers.Add(new BulkInsertEntityFrameworkUtitlitiesSqlProvider()); | |
} | |
private static readonly object SyncLock = new object(); | |
private static void SyncronizedInitContextMetadata(DbContext context) | |
{ | |
lock (SyncLock) | |
{ | |
//this method is not thread safe, so we lock it. | |
EfMappingFactory.GetMappingsForContext(context); | |
} | |
} | |
public static void InsertAll(this DbContext context, IDbSet dbSet, IList items, | |
BulkFailurePolicy failurePolicy = BulkFailurePolicy.AllOrNothing) | |
where TEntity : class | |
{ | |
SyncronizedInitContextMetadata(context); | |
var wrapper = new BulkInsertionCollectionMetadata(items) { SetGeneratedRecordIds = false, }; | |
RetryOnFailure(wrapper, data => EFBatchOperation.For(context, dbSet).InsertAll(data), failurePolicy); | |
} | |
public static void InsertAllAndReturnRecordIds(this DbContext context, IDbSet dbSet, | |
IList items, | |
BulkFailurePolicy failurePolicy = BulkFailurePolicy.AllOrNothing) where TEntity : class, IHasPrimaryKey | |
{ | |
SyncronizedInitContextMetadata(context); | |
var wrapper = new BulkInsertionCollectionMetadata(items) { SetGeneratedRecordIds = true, }; | |
RetryOnFailure(wrapper, data => EFBatchOperation.For(context, dbSet).InsertAll(data), failurePolicy); | |
} | |
private const int MaxRetries = 80; | |
private const int MaxRowsPerInsertAction = 3000; | |
private static void RetryOnFailure(BulkInsertionCollectionMetadata items, | |
Action<BulkInsertionCollectionMetadata> insertAction, BulkFailurePolicy failurePolicy, | |
int retryCounter = 0) | |
{ | |
// If there are too many items, split the load to smaller items. | |
// This is only allowed if we don't have to rollback after filures. | |
if (failurePolicy == BulkFailurePolicy.IgnoreFailures && items.Count > MaxRowsPerInsertAction) | |
{ | |
var split = items.Split(); | |
foreach (var smallerCollection in split) | |
{ | |
RetryOnFailure(smallerCollection, insertAction, failurePolicy, retryCounter); | |
} | |
return; | |
} | |
try | |
{ | |
insertAction(items); | |
} | |
//We are catching all exceptions. We can also get InvalidOperationException or others, not just SqlException. | |
catch (Exception exception) when (exception.Message.Contains("deadlock")) | |
{ | |
Log.Error($"Encountered deadlock while ingesting records of type {typeof(TEntity).Name} to database.", exception); | |
//never ignore deadlock related failures, we will throw this exception. | |
if (retryCounter > MaxRetries) throw; | |
RetryOnFailure(items, insertAction, failurePolicy, retryCounter + 1); | |
} | |
//any other exception - split and retry | |
catch (Exception exception) | |
{ | |
if (items.Count == 1 && failurePolicy == BulkFailurePolicy.IgnoreFailures) | |
{ | |
Log.Error($"Problem ingesting record of type {typeof(TEntity).Name} to database.", exception); | |
return; | |
} | |
if (failurePolicy == BulkFailurePolicy.AllOrNothing) | |
{ | |
Log.Error($"Problem ingesting records of type {typeof(TEntity).Name} to database.", exception); | |
throw; | |
} | |
//else - split the list to two | |
var split = items.Split(); | |
foreach (var smallerCollection in split) | |
{ | |
RetryOnFailure(smallerCollection, insertAction, failurePolicy, retryCounter + 1); | |
} | |
} | |
} | |
private static readonly ILog Log = LogManager.GetLogger(typeof(BulkOperationsExtensions)); | |
} | |
public enum BulkFailurePolicy | |
{ | |
/// <summary> | |
/// The collection will be insertes as whole, or all rows will be rolled back and an exception will be thrown. | |
/// </summary> | |
AllOrNothing, | |
/// <summary> | |
/// On failure, the collection will be devided and retried, until individual rows fail. | |
/// </summary> | |
IgnoreFailures, | |
} | |
/// <summary> | |
/// This class is used because we cannot pass additional parameters to | |
/// | |
/// </summary> | |
internal class BulkInsertionCollectionMetadata : IList | |
{ | |
private readonly IList _items; | |
public BulkInsertionCollectionMetadata(IList items) | |
{ | |
_items = items; | |
} | |
public bool SetGeneratedRecordIds { get; set; } | |
public IEnumerator GetEnumerator() => _items.GetEnumerator(); | |
IEnumerator IEnumerable.GetEnumerator() => ((IEnumerable)_items).GetEnumerator(); | |
public void Add(T item) => _items.Add(item); | |
public void Clear() => _items.Clear(); | |
public bool Contains(T item) => _items.Contains(item); | |
public void CopyTo(T[] array, int arrayIndex) => _items.CopyTo(array, arrayIndex); | |
public bool Remove(T item) => _items.Remove(item); | |
public int Count => _items.Count; | |
public bool IsReadOnly => _items.IsReadOnly; | |
public int IndexOf(T item) => _items.IndexOf(item); | |
public void Insert(int index, T item) => _items.Insert(index, item); | |
public void RemoveAt(int index) => _items.RemoveAt(index); | |
public T this[int index] | |
{ | |
get => _items[index]; | |
set => _items[index] = value; | |
} | |
public BulkInsertionCollectionMetadata[] Split() | |
{ | |
var itemsMiddle = _items.Count / 2; | |
return new[] | |
{ | |
CopyWithOtherItems(_items.Take(itemsMiddle).ToList()), | |
CopyWithOtherItems(_items.Skip(itemsMiddle).ToList()), | |
}; | |
} | |
private BulkInsertionCollectionMetadata CopyWithOtherItems(IList items) | |
=> new BulkInsertionCollectionMetadata(items) | |
{ | |
SetGeneratedRecordIds = SetGeneratedRecordIds | |
}; | |
} | |
/// <summary> | |
/// Override provider to fix some bugs. | |
/// </summary> | |
/// | |
/// https://github.com/MikaelEliasson/EntityFramework.Utilities/blob/a5abc50b7367d64ca541b6e7e2e6018a500b6d8d/EntityFramework.Utilities/EntityFramework.Utilities/SqlQueryProvider.cs | |
/// | |
public class BulkInsertEntityFrameworkUtitlitiesSqlProvider : SqlQueryProvider, EntityFramework.Utilities.IQueryProvider | |
{ | |
public new void InsertItems(IEnumerable items, string schema, string tableName, IList properties, DbConnection storeConnection, int? batchSize) | |
{ | |
var con = (SqlConnection)storeConnection; | |
var wrapperData = items as BulkInsertionCollectionMetadata; | |
bool shouldReturnRecordIds = wrapperData?.SetGeneratedRecordIds == true; | |
if (shouldReturnRecordIds) | |
{ | |
BulkInsertAllAndReturnIds(wrapperData, schema, tableName, properties, con, batchSize); | |
} | |
else | |
{ | |
BulkInsertAll(items, schema, tableName, properties, con, batchSize, SqlBulkCopyOptions.KeepNulls); | |
} | |
} | |
private static void BulkInsertAll(IEnumerable items, string schema, string tableName, IList properties, | |
SqlConnection connection, int? batchSize, SqlBulkCopyOptions insertOptions) | |
{ | |
using (var reader = new EFDataReader(items, properties)) | |
{ | |
if (connection.State != ConnectionState.Open) | |
{ | |
connection.Open(); | |
} | |
using (SqlBulkCopy copy = new SqlBulkCopy(connection.ConnectionString, insertOptions)) | |
{ | |
copy.BulkCopyTimeout = 0; | |
copy.BatchSize = Math.Min(reader.RecordsAffected, batchSize ?? 15000); //default batch size | |
if (tableName.StartsWith("#")) | |
{ | |
copy.DestinationTableName = tableName; | |
} | |
else if (!string.IsNullOrWhiteSpace(schema)) | |
{ | |
copy.DestinationTableName = $"[{schema}].[{tableName}]"; | |
} | |
else | |
{ | |
copy.DestinationTableName = "[" + tableName + "]"; | |
} | |
copy.NotifyAfter = 0; | |
foreach (var i in Enumerable.Range(0, reader.FieldCount)) | |
{ | |
copy.ColumnMappings.Add(i, properties[i].NameInDatabase); | |
} | |
copy.WriteToServer(reader); | |
copy.Close(); | |
} | |
} | |
} | |
private static void BulkInsertAllAndReturnIds(BulkInsertionCollectionMetadata items, string schema, | |
string tableName, | |
IList properties, SqlConnection connection, int? batchSize) | |
{ | |
if (items.Count == 0) return; | |
long dummyValue = -1000 - items.Count; | |
//set dummy IDs | |
foreach (var item in items) | |
{ | |
((IHasPrimaryKey)item).PrimaryKey = dummyValue; | |
dummyValue++; | |
} | |
try | |
{ | |
if (connection.State != ConnectionState.Open) | |
{ | |
connection.Open(); | |
} | |
//create dummy table. | |
using (var tempTable = new TempTable(connection, tableName, schema)) | |
{ | |
var createTempTableSql = $"Select * Into {tempTable.TableName} From {tableName} Where 1 = 2"; | |
using (var command = new SqlCommand(createTempTableSql, connection)) | |
{ | |
command.ExecuteNonQuery(); | |
} | |
//bulk insert to temp table. | |
BulkInsertAll(items, schema, tempTable.TableName, properties, connection, batchSize, | |
SqlBulkCopyOptions.KeepNulls | SqlBulkCopyOptions.KeepIdentity); | |
//note: IsPrimaryKey is not populated in InsertAll | |
// https://github.com/MikaelEliasson/EntityFramework.Utilities/blob/a5abc50b7367d64ca541b6e7e2e6018a500b6d8d/EntityFramework.Utilities/EntityFramework.Utilities/EFBatchOperation.cs#L129 | |
string primaryKeyNameOnObject = ((IHasPrimaryKey)items.First()).PrimaryKeyPropertyName; | |
var primaryKey = properties.Single(c => c.NameOnObject == primaryKeyNameOnObject); | |
var otherColumns = properties.Where(p => p != primaryKey); | |
var allValueColumns = String.Join(", ", otherColumns.Select(c => "[" + c.NameInDatabase + "]")); | |
//insert to real table and get new IDs. | |
//this guarantees the record IDs are generated in the right order. | |
var migrateAndReturnIds = | |
$@" | |
insert into {tableName} ({allValueColumns}) | |
OUTPUT inserted.{primaryKey.NameInDatabase} | |
select {allValueColumns} from {tempTable.TableName} temp | |
order by temp.{primaryKey.NameInDatabase} | |
"; | |
var newlyGeneratedIds = new List(items.Count); | |
using (var migrateDataCommand = new SqlCommand(migrateAndReturnIds, connection) | |
{ | |
CommandTimeout = 0 | |
}) | |
using (var recordIdReader = migrateDataCommand.ExecuteReader()) | |
{ | |
while (recordIdReader.Read()) | |
{ | |
var newId = recordIdReader.GetInt64(0); | |
newlyGeneratedIds.Add(newId); | |
} | |
} | |
//set IDs on entities. | |
if (newlyGeneratedIds.Count != items.Count) | |
{ | |
throw new MissingPrimaryKeyException("There are fewer generated record IDs than the " + | |
"number of items inserted to the database."); | |
} | |
//the order of the IDs is not guaranteed, but the values will be generated in the same as the order values in `items` | |
newlyGeneratedIds.Sort(); | |
for (int i = 0; i < newlyGeneratedIds.Count; i++) | |
{ | |
((IHasPrimaryKey)items[i]).PrimaryKey = newlyGeneratedIds[i]; | |
} | |
} | |
} | |
finally | |
{ | |
//make sure the ID is 0 if the row wasn't inserted. | |
foreach (var item in items) | |
{ | |
var entity = (IHasPrimaryKey)item; | |
if (entity.PrimaryKey < 0) entity.PrimaryKey = 0; | |
} | |
} | |
} | |
private static string GetPropertyName(Expression<Func> propertyLambda) | |
{ | |
var temp = propertyLambda.Body; | |
while (temp is UnaryExpression) | |
{ | |
temp = (temp as UnaryExpression).Operand; | |
} | |
MemberExpression member = temp as MemberExpression; | |
return member?.Member.Name; | |
} | |
public new void UpdateItems(IEnumerable items, string schema, string tableName, | |
IList properties, DbConnection storeConnection, int? batchSize, | |
UpdateSpecification updateSpecification) | |
{ | |
var columnsToUpdate = updateSpecification.Properties.Select(GetPropertyName).ToDictionary(x => x); | |
var filtered = properties.Where(p => columnsToUpdate.ContainsKey(p.NameOnObject) || p.IsPrimaryKey).ToList(); | |
var columns = filtered.Select(c => "[" + c.NameInDatabase + "] " + c.DataType); | |
var pkConstraint = string.Join(", ", | |
properties.Where(p => p.IsPrimaryKey).Select(c => "[" + c.NameInDatabase + "]")); | |
var con = (SqlConnection)storeConnection; | |
if (con.State != ConnectionState.Open) | |
{ | |
con.Open(); | |
} | |
var tempTable = new TempTable(con, tableName, schema); | |
var createTempTableSql = | |
$"CREATE TABLE {schema}.[{tempTable.TableName}]({string.Join(", ", columns)}, PRIMARY KEY ({pkConstraint}))"; | |
var setters = string.Join(",", | |
filtered.Where(c => !c.IsPrimaryKey) | |
.Select(c => "[" + c.NameInDatabase + "] = TEMP.[" + c.NameInDatabase + "]")); | |
var pks = | |
properties.Where(p => p.IsPrimaryKey) | |
.Select(x => "ORIG.[" + x.NameInDatabase + "] = TEMP.[" + x.NameInDatabase + "]"); | |
var filter = string.Join(" and ", pks); | |
var mergeCommand = string.Format(@"UPDATE [{0}] | |
SET | |
{3} | |
FROM | |
[{0}] ORIG | |
INNER JOIN | |
[{1}] TEMP | |
ON | |
{2}", tableName, tempTable.TableName, filter, setters); | |
using (var createCommand = new SqlCommand(createTempTableSql, con)) | |
using (var mCommand = new SqlCommand(mergeCommand, con)) | |
using (tempTable) | |
{ | |
createCommand.ExecuteNonQuery(); | |
InsertItems(items, schema, tempTable.TableName, filtered, storeConnection, batchSize); | |
mCommand.ExecuteNonQuery(); | |
} | |
} | |
private static string GetTempTableName(string baseTableName) | |
//consider using a temp table: "#". This doesn't work with bulk insert. | |
=> "temp_" + baseTableName + "_" + DateTime.UtcNow.ToString("yyyyMMdd_HHmmss_fff") + "_" + | |
Guid.NewGuid().ToString("N"); | |
private class TempTable : IDisposable | |
{ | |
private readonly SqlConnection _connection; | |
private readonly string _dbSchema; | |
public string TableName { get; } | |
public TempTable(SqlConnection connection, string baseTableName, string dbSchema) | |
{ | |
_connection = connection; | |
_dbSchema = dbSchema; | |
TableName = GetTempTableName(baseTableName); | |
} | |
public void Dispose() | |
{ | |
try | |
{ | |
using (var dCommand = new SqlCommand($"DROP table {_dbSchema}.[{TableName}]", _connection)) | |
{ | |
dCommand.ExecuteNonQuery(); | |
} | |
} | |
catch | |
{ | |
//Nothing to do. don't care. | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public partial class Spaceship | |
{ | |
public virtual long SpaceshipId { get; set; } | |
} | |
partial class Spaceship : IHasPrimaryKey | |
{ | |
long IHasPrimaryKey.PrimaryKey | |
{ | |
get => SpaceshipId; | |
set => SpaceshipId = value; | |
} | |
string IHasPrimaryKey.PrimaryKeyPropertyName | |
=> nameof(SpaceshipId); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
context.InsertAllAndReturnRecordIds(context.Spaceships, | |
purchasedSpaceships, BulkFailurePolicy.IgnoreFailures); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment