Created
September 26, 2016 13:54
-
-
Save samlii/a646660ced448fa1d8dd6642da358f3e to your computer and use it in GitHub Desktop.
A way to do a bulk update in postgresql using a datatable.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Data; | |
using System.Linq; | |
using Dapper; | |
using Newtonsoft.Json; | |
using Npgsql; | |
using NpgsqlTypes; | |
namespace DataExtensions | |
{ | |
public static class PostgresBulkUpdate | |
{ | |
public static void Update(this DataTable dataTable, NpgsqlConnection connection, | |
Dictionary<string, string> columnMap = null, Dictionary<Type, NpgsqlDbType> columnTypes = null) | |
{ | |
if(dataTable.PrimaryKey == null || dataTable.PrimaryKey.Length == 0) | |
throw new ArgumentException("No primary keys specified", nameof(dataTable)); | |
var wasClosed = connection.State != ConnectionState.Open; | |
if (wasClosed) | |
connection.Open(); | |
columnMap = columnMap ?? new Dictionary<string, string>(); | |
columnTypes = columnTypes ?? new Dictionary<Type, NpgsqlDbType>(); | |
var tableName = dataTable.TableName; | |
var columnNames = (from DataColumn column in dataTable.Columns | |
select columnMap.ContainsKey(column.ColumnName) ? columnMap[column.ColumnName] : column.ColumnName).ToList(); | |
var allColumns = string.Join(",", columnNames); | |
connection.Execute($"CREATE TEMP TABLE {tableName}_tmp AS SELECT {allColumns} FROM {tableName} LIMIT 0;"); | |
try | |
{ | |
using ( | |
var writer = | |
connection.BeginBinaryImport($"COPY {tableName}_tmp({allColumns}) FROM STDIN (FORMAT BINARY)")) | |
{ | |
foreach (DataRow row in dataTable.Rows) | |
{ | |
writer.StartRow(); | |
foreach (var o in row.ItemArray) | |
{ | |
if (columnTypes.ContainsKey(o.GetType())) | |
{ | |
if (columnTypes[o.GetType()] == NpgsqlDbType.Json) | |
{ | |
writer.Write(JsonConvert.SerializeObject(o), NpgsqlDbType.Json); | |
} | |
else | |
{ | |
writer.Write(o, columnTypes[o.GetType()]); | |
} | |
} | |
else | |
{ | |
writer.Write(o); | |
} | |
} | |
} | |
} | |
var newColumns = string.Join(",", columnNames.Select(c => $"new.{c}")); | |
var whereClause = string.Join(" AND ", | |
dataTable.PrimaryKey.Select(c => $"orig.{c.ColumnName} = new.{c.ColumnName}")); | |
using (var trans = connection.BeginTransaction(IsolationLevel.ReadCommitted)) | |
{ | |
connection.Execute( | |
$"LOCK TABLE {tableName} IN EXCLUSIVE MODE"); | |
connection.Execute( | |
$"UPDATE {tableName} orig SET ({allColumns}) = ({newColumns}) FROM {tableName}_tmp \"new\" WHERE {whereClause}"); | |
connection.Execute( | |
$@"INSERT INTO {tableName} ({allColumns}) | |
SELECT {newColumns} | |
FROM {tableName}_tmp ""new"" | |
WHERE NOT EXISTS ( | |
SELECT NULL | |
FROM {tableName} orig | |
WHERE {whereClause})"); | |
trans.Commit(); | |
} | |
} | |
finally | |
{ | |
connection.Execute($"DROP TABLE {tableName}_tmp"); | |
if (wasClosed) | |
connection.Close(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment