Skip to content

Instantly share code, notes, and snippets.

@MarkPflug
Created January 5, 2024 16:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MarkPflug/2c3c4fc39b1716788fd18093d64ab9da to your computer and use it in GitHub Desktop.
Save MarkPflug/2c3c4fc39b1716788fd18093d64ab9da to your computer and use it in GitHub Desktop.
CsvPostgresBulkInsert
using Npgsql;
using NpgsqlTypes;
using Sylvan.Data.Csv;
using System.Collections.ObjectModel;
using System.Data.Common;
class Program
{
static async Task Main()
{
/*
CREATE TABLE contacts
(
id serial primary key,
firstname VARCHAR(40) not null,
lastname varchar(40) not null,
dateofbirth timestamp not null
);
*/
var tableName = "contacts";
/*
FirstName,LastName,DateOfBirth
tony,eltigre,1973-01-01
capn,crunch,1953-11-07
*/
var path = "contacts.csv";
var conn = new NpgsqlConnection();
conn.ConnectionString = new NpgsqlConnectionStringBuilder
{
Host = "localhost",
Database = "test",
}.ConnectionString;
await conn.OpenAsync();
var cmd = conn.CreateCommand();
// limit 0. Don't need any data, just the schema.
cmd.CommandText = $"select * from {tableName} limit 0";
ReadOnlyCollection<DbColumn> sqltableSchema;
using (var sqlreader = cmd.ExecuteReader()) {
// this must be called through the interface
// because NpgsqlDataReader exposes a different GetColumnSchema method.
sqltableSchema = ((IDbColumnSchemaGenerator)sqlreader).GetColumnSchema();
}
var csvReadOpt = new CsvDataReaderOptions
{
Schema = new CsvSchema(sqltableSchema)
};
using var edr = await CsvDataReader.CreateAsync(path, csvReadOpt);
WriteData(conn, tableName, edr);
}
static long WriteData(NpgsqlConnection conn, string tableName, DbDataReader data)
{
var schema = data.GetColumnSchema();
var sw = new StringWriter();
sw.WriteLine($"copy {tableName} (");
for (int i = 0; i < schema.Count; i++)
{
if (i > 0)
sw.Write(", ");
var colSchema = schema[i];
sw.Write('\"');
sw.Write(colSchema.ColumnName);
sw.Write('\"');
}
sw.Write(")");
sw.Write("from stdin (format binary);");
var cmd = sw.ToString();
var bi = conn.BeginBinaryImport(cmd);
while (data.Read())
{
bi.StartRow();
for (var i = 0; i < data.FieldCount; i++)
{
if (data.IsDBNull(i))
{
bi.WriteNull();
continue;
}
var type = schema[i].DataType;
var dbType = GetType(type);
switch (dbType)
{
case NpgsqlDbType.Text:
var t = data.GetString(i);
bi.Write(t, dbType);
break;
case NpgsqlDbType.Char:
var str = data.GetString(i);
bi.Write(str, dbType);
break;
case NpgsqlDbType.Smallint:
// TODO: need to figure out "tinyint" scenario. postgres doesn't support it.
bi.Write(data.GetInt16(i), dbType);
break;
case NpgsqlDbType.Integer:
bi.Write(data.GetInt32(i), dbType);
break;
case NpgsqlDbType.Bigint:
bi.Write(data.GetInt64(i), dbType);
break;
case NpgsqlDbType.Boolean:
bi.Write(data.GetBoolean(i), dbType);
break;
case NpgsqlDbType.Double:
bi.Write(data.GetDouble(i), dbType);
break;
case NpgsqlDbType.Real:
bi.Write(data.GetFloat(i), dbType);
break;
case NpgsqlDbType.Money:
case NpgsqlDbType.Numeric:
bi.Write(data.GetDecimal(i), dbType);
break;
case NpgsqlDbType.Timestamp:
bi.Write(data.GetDateTime(i), dbType);
break;
case NpgsqlDbType.Uuid:
bi.Write(data.GetGuid(i), dbType);
break;
case NpgsqlDbType.Bytea:
bi.Write((byte[])data.GetValue(i), dbType);
break;
default:
throw new NotSupportedException();
}
}
}
return (long)bi.Complete();
}
static NpgsqlDbType GetType(Type type)
{
if (type == typeof(string))
return NpgsqlDbType.Text;
if (type == typeof(byte))
return NpgsqlDbType.Smallint;
if (type == typeof(int))
return NpgsqlDbType.Integer;
if (type == typeof(short))
return NpgsqlDbType.Smallint;
if (type == typeof(long))
return NpgsqlDbType.Bigint;
if (type == typeof(bool))
return NpgsqlDbType.Boolean;
if (type == typeof(float))
return NpgsqlDbType.Real;
if (type == typeof(double))
return NpgsqlDbType.Double;
if (type == typeof(decimal))
return NpgsqlDbType.Numeric;
if (type == typeof(DateTime))
return NpgsqlDbType.Timestamp;
if (type == typeof(byte[]))
return NpgsqlDbType.Bytea;
if (type == typeof(Guid))
return NpgsqlDbType.Uuid;
throw new NotSupportedException();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment