Created
November 7, 2021 14:22
-
-
Save bytefish/dbd2da81267990e9326044661be1c480 to your computer and use it in GitHub Desktop.
Bulk Upserts with Npgsql and Postgres Composite Types
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Npgsql; | |
using NUnit.Framework; | |
using System; | |
using System.Linq; | |
using System.Threading.Tasks; | |
namespace NpgsqlTypeMappings.Example | |
{ | |
/// <summary> | |
/// Maps to the Postgres "measurement_type" type. | |
/// </summary> | |
public class Measurement | |
{ | |
public int DeviceId { get; set; } | |
public int ParameterId { get; set; } | |
public DateTime Timestamp { get; set; } | |
public double Value { get; set; } | |
} | |
public class Tests | |
{ | |
private static readonly string ConnectionString = @"Host=localhost;Port=5432;Database=sampledb;Pooling=false;User Id=philipp;Password=test_pwd;"; | |
[Test] | |
public async Task BulkInsertMeasurements() | |
{ | |
var startDate = new DateTime(2013, 1, 1, 0, 0, 0, DateTimeKind.Utc); | |
var measurements = Enumerable.Range(0, 10000) // We start with 10.000 Measurements ... | |
// ... transform them into fake measurements ... | |
.Select(idx => new Measurement | |
{ | |
DeviceId = 1, | |
ParameterId = 1, | |
Timestamp = startDate.AddSeconds(idx), | |
Value = idx | |
}) | |
// ... and finally evaluate them: | |
.ToArray(); | |
// Create the Parameter: | |
var p_measurements = new NpgsqlParameter | |
{ | |
ParameterName = "p_measurements", | |
DataTypeName = "sample.measurement_type[]", | |
Value = measurements | |
}; | |
// Configure the Mappings: | |
NpgsqlConnection.GlobalTypeMapper.MapComposite<Measurement>("sample.measurement_type"); | |
using (var connection = new NpgsqlConnection(ConnectionString)) | |
{ | |
await connection.OpenAsync(); | |
// Execute the Insert or Update Function: | |
using(var cmd = new NpgsqlCommand("CALL sample.insert_or_update_measurements(@p_measurements)", connection)) | |
{ | |
cmd.Parameters.Add(p_measurements); | |
await cmd.ExecuteNonQueryAsync(); | |
} | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DO $$ | |
BEGIN | |
-- Create the Schema: | |
IF NOT EXISTS (SELECT 1 FROM information_schema.schemata WHERE schema_name = 'sample') THEN | |
CREATE SCHEMA sample; | |
END IF; | |
-- Create the Tables: | |
IF NOT EXISTS ( | |
SELECT 1 | |
FROM information_schema.tables | |
WHERE table_schema = 'sample' AND table_name = 'measurements' | |
) THEN | |
CREATE TABLE sample.measurements | |
( | |
device_id int, | |
parameter_id int, | |
timestamp timestamp with time zone, | |
value double precision | |
); | |
END IF; | |
-- Create the Index: | |
DROP INDEX IF EXISTS sample.unique_measurement; | |
CREATE UNIQUE INDEX unique_measurement ON sample.measurements(device_id, parameter_id, timestamp); | |
-- Create the Types: | |
DROP TYPE IF EXISTS "sample"."measurement_type"; | |
CREATE TYPE "sample"."measurement_type" AS ( | |
device_id int, | |
parameter_id int, | |
timestamp timestamp with time zone, | |
value double precision | |
); | |
END; | |
$$; | |
-- Create the Function: | |
CREATE OR REPLACE PROCEDURE sample.insert_or_update_measurements(p_measurements sample.measurement_type[]) | |
LANGUAGE SQL | |
AS $$ | |
INSERT INTO sample.measurements(device_id, parameter_id, timestamp, value) | |
SELECT * FROM UNNEST(p_measurements) | |
ON CONFLICT (device_id, parameter_id, timestamp) | |
DO UPDATE SET value = EXCLUDED.value; | |
$$; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@johnjaylward Is this a question or an answer? I think you need to register the type, like I did in the example, to make it work.