Skip to content

Instantly share code, notes, and snippets.

@meziantou
Created February 1, 2017 00:28
Show Gist options
  • Save meziantou/174e2791dec966be837746750b87d069 to your computer and use it in GitHub Desktop.
Save meziantou/174e2791dec966be837746750b87d069 to your computer and use it in GitHub Desktop.
ObjectDataReader (BulkInsert)
public class ObjectDataReader<T> : DbDataReader
{
private IEnumerator<T> _iterator;
private IDictionary<string, int> _propertyNameToOrdinal = new Dictionary<string, int>();
private IDictionary<int, string> _ordinalToPropertyName = new Dictionary<int, string>();
private Func<T, object>[] _getPropertyValueFuncs;
public ObjectDataReader(IEnumerator<T> items)
{
_iterator = items ?? throw new ArgumentNullException(nameof(items));
Initialize();
}
private void Initialize()
{
int ordinal = 0;
var properties = typeof(T).GetProperties();
_getPropertyValueFuncs = new Func<T, object>[properties.Length];
foreach (var property in properties)
{
string propertyName = property.Name;
_propertyNameToOrdinal.Add(propertyName, ordinal);
_ordinalToPropertyName.Add(ordinal, propertyName);
var parameterExpression = Expression.Parameter(typeof(T), "x");
var func = (Func<T, object>)Expression.Lambda(Expression.Convert(Expression.Property(parameterExpression, propertyName), typeof(object)), parameterExpression).Compile();
_getPropertyValueFuncs[ordinal] = func;
ordinal++;
}
}
public override object this[int ordinal] => GetValue(ordinal);
public override object this[string name] => GetValue(GetOrdinal(name));
public override int Depth => 0;
public override int FieldCount => _ordinalToPropertyName.Count;
public override bool HasRows => true;
public override bool IsClosed => _iterator != null;
public override int RecordsAffected => throw new NotImplementedException();
public override bool GetBoolean(int ordinal)
{
return (bool)GetValue(ordinal);
}
public override byte GetByte(int ordinal)
{
return (byte)GetValue(ordinal);
}
public override long GetBytes(int ordinal, long dataOffset, byte[] buffer, int bufferOffset, int length)
{
throw new NotImplementedException();
}
public override char GetChar(int ordinal)
{
return (char)GetValue(ordinal);
}
public override long GetChars(int ordinal, long dataOffset, char[] buffer, int bufferOffset, int length)
{
throw new NotImplementedException();
}
public override string GetDataTypeName(int ordinal)
{
throw new NotImplementedException();
}
public override DateTime GetDateTime(int ordinal)
{
return (DateTime)GetValue(ordinal);
}
public override decimal GetDecimal(int ordinal)
{
return (decimal)GetValue(ordinal);
}
public override double GetDouble(int ordinal)
{
return (double)GetValue(ordinal);
}
public override IEnumerator GetEnumerator()
{
throw new NotImplementedException();
}
public override Type GetFieldType(int ordinal)
{
var value = GetValue(ordinal);
if (value == null)
return typeof(object);
return value.GetType();
}
public override float GetFloat(int ordinal)
{
return (float)GetValue(ordinal);
}
public override Guid GetGuid(int ordinal)
{
return (Guid)GetValue(ordinal);
}
public override short GetInt16(int ordinal)
{
return (short)GetValue(ordinal);
}
public override int GetInt32(int ordinal)
{
return (int)GetValue(ordinal);
}
public override long GetInt64(int ordinal)
{
return (long)GetValue(ordinal);
}
public override string GetName(int ordinal)
{
if (_ordinalToPropertyName.TryGetValue(ordinal, out var name))
return name;
return null;
}
public override int GetOrdinal(string name)
{
if (_propertyNameToOrdinal.TryGetValue(name, out var ordinal))
return ordinal;
return -1;
}
public override string GetString(int ordinal)
{
return (string)GetValue(ordinal);
}
public override object GetValue(int ordinal)
{
var func = _getPropertyValueFuncs[ordinal];
return func(_iterator.Current);
}
public override int GetValues(object[] values)
{
int max = Math.Min(values.Length, FieldCount);
for(var i = 0; i < max; i++)
{
values[i] = IsDBNull(i) ? DBNull.Value : GetValue(i);
}
return max;
}
public override bool IsDBNull(int ordinal)
{
return GetValue(ordinal) == null;
}
public override bool NextResult()
{
return false;
}
public override bool Read()
{
return _iterator.MoveNext();
}
}
class Program
{
static void Main(string[] args)
{
InsertAsync().Wait();
}
public static async Task InsertAsync(CancellationToken ct = default(CancellationToken))
{
using (var connection = new SqlConnection())
{
connection.ConnectionString = "Server=(local);Database=Sample;Trusted_Connection=True;";
await connection.OpenAsync(ct);
using (var bulk = new SqlBulkCopy(connection, SqlBulkCopyOptions.Default, null))
{
var customers = Customer.Generate(1000000);
using (var enumerator = customers.GetEnumerator())
using (var customerReader = new ObjectDataReader<Customer>(enumerator))
{
bulk.DestinationTableName = "Customer";
bulk.ColumnMappings.Add(nameof(Customer.Id), "Id");
bulk.ColumnMappings.Add(nameof(Customer.FirstName), "FirstName");
bulk.ColumnMappings.Add(nameof(Customer.LastName), "LastName");
bulk.ColumnMappings.Add(nameof(Customer.DateOfBirth), "DateOfBirth");
bulk.EnableStreaming = true;
bulk.BatchSize = 10000;
bulk.NotifyAfter = 1000;
bulk.SqlRowsCopied += (sender, e) => Console.WriteLine("RowsCopied: " + e.RowsCopied);
await bulk.WriteToServerAsync(customerReader, ct);
}
}
}
}
}
public class Customer
{
public Guid Id { get; set; }
public string FirstName { get; set; }
public string LastName { get; set; }
public DateTime DateOfBirth { get; set; }
public static IEnumerable<Customer> Generate(int count)
{
for (int i = 0; i < count; i++)
{
yield return new Customer
{
Id = Guid.NewGuid(),
FirstName = "FirstName" + i,
LastName = "LastName" + i,
DateOfBirth = DateTime.UtcNow
};
}
}
}
@naidu004
Copy link

naidu004 commented Sep 17, 2018

I am getting this error after using this solution
Object must implement IConvertible

@DevOnBike
Copy link

I have one objection, when I set
bulk.EnableStreaming = true;

then my code instantly allocates memory before sending to sql.
leave it to default value (false) I observe that during sending to sql memory is slowly increasing.
Can u reproduce this behavior ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment