Skip to content

Instantly share code, notes, and snippets.

@gfoidl
Last active December 8, 2023 17:29
Show Gist options
  • Save gfoidl/7ee4643859f28d0825293e945e70d959 to your computer and use it in GitHub Desktop.
Save gfoidl/7ee4643859f28d0825293e945e70d959 to your computer and use it in GitHub Desktop.
PostgreSQL replication via WAL
version: '3.8'
services:
db:
container_name: db
image: postgres
restart: unless-stopped
environment:
POSTGRES_USER: root
POSTGRES_PASSWORD: root
POSTGRES_DB: test_db
#PGDATA: /data/postgres
volumes:
- ./data:/data/postgres
ports:
- "5432:5432"
command:
- "postgres"
- "-c"
- "wal_level=logical"
pgadmin:
container_name: pg_admin
image: dpage/pgadmin4
restart: unless-stopped
environment:
PGADMIN_DEFAULT_EMAIL: admin@test.at
PGADMIN_DEFAULT_PASSWORD: root
PGADMIN_CONFIG_CONSOLE_LOG_LEVEL: 30 # warning, cf. https://www.pgadmin.org/docs/pgadmin4/latest/config_py.html#config-py
ports:
- "5050:80"
depends_on:
- db
logging:
driver: none
using System.Text.Json;
using Microsoft.EntityFrameworkCore;
using Npgsql.Replication;
using Npgsql.Replication.PgOutput;
using Npgsql.Replication.PgOutput.Messages;
const string ConnString = "host=10.0.0.20;username=root;password=root;database=test_db";
using CancellationTokenSource cts = new();
await using BloggingContext db = new(ConnString);
await EnsureDbInitialized(db, cts.Token);
Task replicationTask = RunReplication(ConnString, cts.Token);
Blog? blog = db.Blogs.Include(b => b.Posts).FirstOrDefault();
if (blog is null)
{
blog = new()
{
Url = "https://blog.test.at",
Posts = new List<Post>()
{
new Post()
{
Title = $"Post at {DateTimeOffset.Now}",
Content = "Hello world"
}
}
};
db.Blogs.Add(blog);
}
else
{
blog.Posts.Add(new Post()
{
Title = $"Post at {DateTimeOffset.Now}",
Content = "Hello world"
});
}
int rc = db.SaveChanges();
Console.WriteLine($"rc = {rc}, hit any key to continue");
Console.ReadKey();
cts.Cancel();
await replicationTask;
static async Task EnsureDbInitialized(BloggingContext db, CancellationToken cancellationToken)
{
if (await db.Database.EnsureCreatedAsync(cancellationToken))
{
// Cf. https://www.npgsql.org/doc/replication.html
await db.Database.ExecuteSqlRawAsync("create publication posts_pub for table posts;", cancellationToken);
await db.Database.ExecuteSqlRawAsync("select * from pg_create_logical_replication_slot('posts_slot', 'pgoutput');", cancellationToken);
}
}
static async Task RunReplication(string connString, CancellationToken cancellationToken)
{
try
{
await using LogicalReplicationConnection connection = new(connString);
await connection.Open(cancellationToken);
PgOutputReplicationSlot slot = new("posts_slot");
await foreach (PgOutputReplicationMessage message in connection.StartReplication(slot, new PgOutputReplicationOptions("posts_pub", 1), cancellationToken))
{
if (message is InsertMessage insertMessage)
{
await InsertMessageHandler.Handle(insertMessage, cancellationToken);
}
connection.SetReplicationStatus(message.WalEnd);
// Force that status to be updated in the DB. See description of SetReplicationStatus
// for further info.
// For outbox-patterns this avoids that older messages are sent more than once.
// Note: it's still at-least-once and not exactly-once.
await connection.SendStatusUpdate(cancellationToken);
}
}
catch (OperationCanceledException) { }
catch (Exception ex)
{
Console.WriteLine(ex);
}
}
public static class InsertMessageHandler
{
public static async Task Handle(InsertMessage insertMessage, CancellationToken cancellationToken)
{
int colNumber = 0;
Post post = new();
await foreach (ReplicationValue value in insertMessage.NewRow)
{
switch (colNumber)
{
case 0:
{
int? tmp = await TryGetInt(value, cancellationToken);
if (tmp.HasValue)
{
post.PostId = tmp.Value;
}
break;
}
case 1: post.Title = await value.Get<string>(cancellationToken); break;
case 2: post.Content = await value.Get<string>(cancellationToken); break;
case 3:
{
int? tmp = await TryGetInt(value, cancellationToken);
if (tmp.HasValue)
{
post.BlogId = tmp.Value;
}
break;
}
default: break;
}
colNumber++;
static async ValueTask<int?> TryGetInt(ReplicationValue value, CancellationToken cancellationToken)
{
if (value.Kind == TupleDataKind.TextValue && value.GetPostgresType().Name == "integer")
{
string tmp = await value.Get<string>(cancellationToken);
if (int.TryParse(tmp, out int res))
{
return res;
}
}
return null;
}
}
Console.WriteLine(JsonSerializer.Serialize(post, new JsonSerializerOptions { WriteIndented = true }));
}
}
public class Blog
{
public int BlogId { get; set; }
public string Url { get; set; } = null!;
public List<Post> Posts { get; set; } = null!;
}
public class Post
{
public int PostId { get; set; }
public string Title { get; set; } = null!;
public string Content { get; set; } = null!;
public int BlogId { get; set; }
public Blog Blog { get; set; } = null!;
}
public class BloggingContext : DbContext
{
private readonly string _connString;
public BloggingContext(string connString) => _connString = connString;
public DbSet<Blog> Blogs { get; set; }
public DbSet<Post> Posts { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder
//.LogTo(Console.WriteLine)
.EnableSensitiveDataLogging()
.UseNpgsql(_connString)
.UseSnakeCaseNamingConvention();
}
}
@gfoidl
Copy link
Author

gfoidl commented Dec 8, 2022

If one tries to register more than 1 replications, then it throws:

Npgsql.PostgresException (0x80004005): 55006: replication slot "posts_slot" is active for PID 91
   at Npgsql.Internal.NpgsqlConnector.<ReadMessage>g__ReadMessageLong|222_0(NpgsqlConnector connector, Boolean async, DataRowLoadingMode dataRowLoadingMode, Boolean readingNotifications, Boolean isReadingPrependedMessage)
   at Npgsql.Replication.ReplicationConnection.StartReplicationInternal(String command, Boolean bypassingStream, CancellationToken cancellationToken)+MoveNext()
   at Npgsql.Replication.ReplicationConnection.StartReplicationInternal(String command, Boolean bypassingStream, CancellationToken cancellationToken)+MoveNext()
   at Npgsql.Replication.ReplicationConnection.StartReplicationInternal(String command, Boolean bypassingStream, CancellationToken cancellationToken)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()
   at Npgsql.Replication.Internal.LogicalReplicationConnectionExtensions.<StartLogicalReplication>g__StartLogicalReplicationInternal|1_0(LogicalReplicationConnection connection, LogicalReplicationSlot slot, CancellationToken cancellationToken, Nullable`1 walLocation, IEnumerable`1 options, Boolean bypassingStream)+MoveNext()
   at Npgsql.Replication.Internal.LogicalReplicationConnectionExtensions.<StartLogicalReplication>g__StartLogicalReplicationInternal|1_0(LogicalReplicationConnection connection, LogicalReplicationSlot slot, CancellationToken cancellationToken, Nullable`1 walLocation, IEnumerable`1 options, Boolean bypassingStream)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()
   at Npgsql.Replication.PgOutput.PgOutputAsyncEnumerable.StartReplicationInternal(CancellationToken cancellationToken)+MoveNext()
   at Npgsql.Replication.PgOutput.PgOutputAsyncEnumerable.StartReplicationInternal(CancellationToken cancellationToken)+MoveNext()
   at Npgsql.Replication.PgOutput.PgOutputAsyncEnumerable.StartReplicationInternal(CancellationToken cancellationToken)+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.GetResult()
   at Program.<<Main>$>g__RunReplication|0_2(String connString, Int32 workerIdx, CancellationToken cancellationToken) in D:\Spielwiese\PostgresTest\PostgresTest\Program.cs:line 71
   at Program.<<Main>$>g__RunReplication|0_2(String connString, Int32 workerIdx, CancellationToken cancellationToken) in D:\Spielwiese\PostgresTest\PostgresTest\Program.cs:line 71
   at Program.<<Main>$>g__RunReplication|0_2(String connString, Int32 workerIdx, CancellationToken cancellationToken) in D:\Spielwiese\PostgresTest\PostgresTest\Program.cs:line 71
  Exception data:
    Severity: ERROR
    SqlState: 55006
    MessageText: replication slot "posts_slot" is active for PID 91
    File: slot.c
    Line: 516
    Routine: ReplicationSlotAcquire

Well, that doesn't make sense anyway, as the replication isn't thought of "event streaming", only for 1:1 pub-sub.
For streaming there are dedicated approaches available (Kafka, RabbitMQ, Azure Event Bus with topics, Azure Event Grid, etc.).
So the Postgres replication can be used to feed one of those.

@gfoidl
Copy link
Author

gfoidl commented Dec 29, 2022

Some notes

Publication

In the above SQL the publication is created for all actions, but they can be more specific too, like here where it's filtered to handle only InsertMessages. But for demo it's kept as is.
See e.g. pgAdmin4
grafik

Infos about publication / replication

View Description
pg_publication all publications created in the database
pg_publication_tables information about the mapping between publications and information of tables they contain
pg_replication_slots listing of all replication slots that currently exist on the database
pg_stat_replication Statistics: when a client for replication is connected, then this view shows some info about it.
pg_stat_replication_slots Statistics: info about replication slots

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment