Skip to content

Instantly share code, notes, and snippets.

@samlii
Created September 26, 2016 13:54
Show Gist options
  • Save samlii/a646660ced448fa1d8dd6642da358f3e to your computer and use it in GitHub Desktop.
Save samlii/a646660ced448fa1d8dd6642da358f3e to your computer and use it in GitHub Desktop.
A way to do a bulk update in postgresql using a datatable.
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using Dapper;
using Newtonsoft.Json;
using Npgsql;
using NpgsqlTypes;
namespace DataExtensions
{
public static class PostgresBulkUpdate
{
public static void Update(this DataTable dataTable, NpgsqlConnection connection,
Dictionary<string, string> columnMap = null, Dictionary<Type, NpgsqlDbType> columnTypes = null)
{
if(dataTable.PrimaryKey == null || dataTable.PrimaryKey.Length == 0)
throw new ArgumentException("No primary keys specified", nameof(dataTable));
var wasClosed = connection.State != ConnectionState.Open;
if (wasClosed)
connection.Open();
columnMap = columnMap ?? new Dictionary<string, string>();
columnTypes = columnTypes ?? new Dictionary<Type, NpgsqlDbType>();
var tableName = dataTable.TableName;
var columnNames = (from DataColumn column in dataTable.Columns
select columnMap.ContainsKey(column.ColumnName) ? columnMap[column.ColumnName] : column.ColumnName).ToList();
var allColumns = string.Join(",", columnNames);
connection.Execute($"CREATE TEMP TABLE {tableName}_tmp AS SELECT {allColumns} FROM {tableName} LIMIT 0;");
try
{
using (
var writer =
connection.BeginBinaryImport($"COPY {tableName}_tmp({allColumns}) FROM STDIN (FORMAT BINARY)"))
{
foreach (DataRow row in dataTable.Rows)
{
writer.StartRow();
foreach (var o in row.ItemArray)
{
if (columnTypes.ContainsKey(o.GetType()))
{
if (columnTypes[o.GetType()] == NpgsqlDbType.Json)
{
writer.Write(JsonConvert.SerializeObject(o), NpgsqlDbType.Json);
}
else
{
writer.Write(o, columnTypes[o.GetType()]);
}
}
else
{
writer.Write(o);
}
}
}
}
var newColumns = string.Join(",", columnNames.Select(c => $"new.{c}"));
var whereClause = string.Join(" AND ",
dataTable.PrimaryKey.Select(c => $"orig.{c.ColumnName} = new.{c.ColumnName}"));
using (var trans = connection.BeginTransaction(IsolationLevel.ReadCommitted))
{
connection.Execute(
$"LOCK TABLE {tableName} IN EXCLUSIVE MODE");
connection.Execute(
$"UPDATE {tableName} orig SET ({allColumns}) = ({newColumns}) FROM {tableName}_tmp \"new\" WHERE {whereClause}");
connection.Execute(
$@"INSERT INTO {tableName} ({allColumns})
SELECT {newColumns}
FROM {tableName}_tmp ""new""
WHERE NOT EXISTS (
SELECT NULL
FROM {tableName} orig
WHERE {whereClause})");
trans.Commit();
}
}
finally
{
connection.Execute($"DROP TABLE {tableName}_tmp");
if (wasClosed)
connection.Close();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment