Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Bulk Upserts with Npgsql and Postgres Composite Types
using Npgsql;
using NUnit.Framework;
using System;
using System.Linq;
using System.Threading.Tasks;
namespace NpgsqlTypeMappings.Example
{
/// <summary>
/// Maps to the Postgres "measurement_type" type.
/// </summary>
public class Measurement
{
public int DeviceId { get; set; }
public int ParameterId { get; set; }
public DateTime Timestamp { get; set; }
public double Value { get; set; }
}
public class Tests
{
private static readonly string ConnectionString = @"Host=localhost;Port=5432;Database=sampledb;Pooling=false;User Id=philipp;Password=test_pwd;";
[Test]
public async Task BulkInsertMeasurements()
{
var startDate = new DateTime(2013, 1, 1, 0, 0, 0, DateTimeKind.Utc);
var measurements = Enumerable.Range(0, 10000) // We start with 10.000 Measurements ...
// ... transform them into fake measurements ...
.Select(idx => new Measurement
{
DeviceId = 1,
ParameterId = 1,
Timestamp = startDate.AddSeconds(idx),
Value = idx
})
// ... and finally evaluate them:
.ToArray();
// Create the Parameter:
var p_measurements = new NpgsqlParameter
{
ParameterName = "p_measurements",
DataTypeName = "sample.measurement_type[]",
Value = measurements
};
// Configure the Mappings:
NpgsqlConnection.GlobalTypeMapper.MapComposite<Measurement>("sample.measurement_type");
using (var connection = new NpgsqlConnection(ConnectionString))
{
await connection.OpenAsync();
// Execute the Insert or Update Function:
using(var cmd = new NpgsqlCommand("CALL sample.insert_or_update_measurements(@p_measurements)", connection))
{
cmd.Parameters.Add(p_measurements);
await cmd.ExecuteNonQueryAsync();
}
}
}
}
}
DO $$
BEGIN
-- Create the Schema:
IF NOT EXISTS (SELECT 1 FROM information_schema.schemata WHERE schema_name = 'sample') THEN
CREATE SCHEMA sample;
END IF;
-- Create the Tables:
IF NOT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'sample' AND table_name = 'measurements'
) THEN
CREATE TABLE sample.measurements
(
device_id int,
parameter_id int,
timestamp timestamp with time zone,
value double precision
);
END IF;
-- Create the Index:
DROP INDEX IF EXISTS sample.unique_measurement;
CREATE UNIQUE INDEX unique_measurement ON sample.measurements(device_id, parameter_id, timestamp);
-- Create the Types:
DROP TYPE IF EXISTS "sample"."measurement_type";
CREATE TYPE "sample"."measurement_type" AS (
device_id int,
parameter_id int,
timestamp timestamp with time zone,
value double precision
);
END;
$$;
-- Create the Function:
CREATE OR REPLACE PROCEDURE sample.insert_or_update_measurements(p_measurements sample.measurement_type[])
LANGUAGE SQL
AS $$
INSERT INTO sample.measurements(device_id, parameter_id, timestamp, value)
SELECT * FROM UNNEST(p_measurements)
ON CONFLICT (device_id, parameter_id, timestamp)
DO UPDATE SET value = EXCLUDED.value;
$$;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment