Skip to content

Instantly share code, notes, and snippets.

@danbarua
Created June 18, 2015 16:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danbarua/710e77de8dd5117d843c to your computer and use it in GitHub Desktop.
Save danbarua/710e77de8dd5117d843c to your computer and use it in GitHub Desktop.
spike inserting events to PGSQL using the new copy api in v3
if(expectedVersion == ExpectedVersion.NoStream)
{
using(var tx = _connection.BeginTransaction(IsolationLevel.Serializable))
{
int streamIdInternal = -1;
using(
var command =
new NpgsqlCommand(
"INSERT INTO streams(id, id_original) VALUES (:stream_id, :stream_id_original) RETURNING id_internal;")
)
{
command.Parameters.AddWithValue(":stream_id", streamIdInfo.StreamId);
command.Parameters.AddWithValue("streamIdOriginal", streamIdInfo.StreamIdOriginal);
streamIdInternal = await command.ExecuteNonQueryAsync(cancellationToken).NotOnCapturedContext();
}
using (var writer = _connection.BeginBinaryImport("COPY events (stream_id_internal, stream_version, id, created, type, json_data, json_metadata) FROM STDIN BINARY"))
{
int version = 0;
foreach (var @event in events)
{
if(cancellationToken.IsCancellationRequested)
{
writer.Cancel();
}
writer.StartRow();
writer.Write(streamIdInternal, NpgsqlDbType.Integer);
writer.Write(++version, NpgsqlDbType.Integer);
writer.Write(@event.EventId, NpgsqlDbType.Uuid);
writer.Write(SystemClock.GetUtcNow(), NpgsqlDbType.TimestampTZ);
writer.Write(@event.Type);
writer.Write(@event.JsonData, NpgsqlDbType.Json);
writer.Write(@event.JsonMetadata, NpgsqlDbType.Json);
}
}
tx.Commit();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment