Skip to content

Instantly share code, notes, and snippets.

@gfoidl
Last active December 8, 2023 17:29
Show Gist options
  • Save gfoidl/7ee4643859f28d0825293e945e70d959 to your computer and use it in GitHub Desktop.
Save gfoidl/7ee4643859f28d0825293e945e70d959 to your computer and use it in GitHub Desktop.
PostgreSQL replication via WAL
version: '3.8'
services:
db:
container_name: db
image: postgres
restart: unless-stopped
environment:
POSTGRES_USER: root
POSTGRES_PASSWORD: root
POSTGRES_DB: test_db
#PGDATA: /data/postgres
volumes:
- ./data:/data/postgres
ports:
- "5432:5432"
command:
- "postgres"
- "-c"
- "wal_level=logical"
pgadmin:
container_name: pg_admin
image: dpage/pgadmin4
restart: unless-stopped
environment:
PGADMIN_DEFAULT_EMAIL: admin@test.at
PGADMIN_DEFAULT_PASSWORD: root
PGADMIN_CONFIG_CONSOLE_LOG_LEVEL: 30 # warning, cf. https://www.pgadmin.org/docs/pgadmin4/latest/config_py.html#config-py
ports:
- "5050:80"
depends_on:
- db
logging:
driver: none
using System.Text.Json;
using Microsoft.EntityFrameworkCore;
using Npgsql.Replication;
using Npgsql.Replication.PgOutput;
using Npgsql.Replication.PgOutput.Messages;
const string ConnString = "host=10.0.0.20;username=root;password=root;database=test_db";
using CancellationTokenSource cts = new();
await using BloggingContext db = new(ConnString);
await EnsureDbInitialized(db, cts.Token);
Task replicationTask = RunReplication(ConnString, cts.Token);
Blog? blog = db.Blogs.Include(b => b.Posts).FirstOrDefault();
if (blog is null)
{
blog = new()
{
Url = "https://blog.test.at",
Posts = new List<Post>()
{
new Post()
{
Title = $"Post at {DateTimeOffset.Now}",
Content = "Hello world"
}
}
};
db.Blogs.Add(blog);
}
else
{
blog.Posts.Add(new Post()
{
Title = $"Post at {DateTimeOffset.Now}",
Content = "Hello world"
});
}
int rc = db.SaveChanges();
Console.WriteLine($"rc = {rc}, hit any key to continue");
Console.ReadKey();
cts.Cancel();
await replicationTask;
static async Task EnsureDbInitialized(BloggingContext db, CancellationToken cancellationToken)
{
if (await db.Database.EnsureCreatedAsync(cancellationToken))
{
// Cf. https://www.npgsql.org/doc/replication.html
await db.Database.ExecuteSqlRawAsync("create publication posts_pub for table posts;", cancellationToken);
await db.Database.ExecuteSqlRawAsync("select * from pg_create_logical_replication_slot('posts_slot', 'pgoutput');", cancellationToken);
}
}
static async Task RunReplication(string connString, CancellationToken cancellationToken)
{
try
{
await using LogicalReplicationConnection connection = new(connString);
await connection.Open(cancellationToken);
PgOutputReplicationSlot slot = new("posts_slot");
await foreach (PgOutputReplicationMessage message in connection.StartReplication(slot, new PgOutputReplicationOptions("posts_pub", 1), cancellationToken))
{
if (message is InsertMessage insertMessage)
{
await InsertMessageHandler.Handle(insertMessage, cancellationToken);
}
connection.SetReplicationStatus(message.WalEnd);
// Force that status to be updated in the DB. See description of SetReplicationStatus
// for further info.
// For outbox-patterns this avoids that older messages are sent more than once.
// Note: it's still at-least-once and not exactly-once.
await connection.SendStatusUpdate(cancellationToken);
}
}
catch (OperationCanceledException) { }
catch (Exception ex)
{
Console.WriteLine(ex);
}
}
public static class InsertMessageHandler
{
public static async Task Handle(InsertMessage insertMessage, CancellationToken cancellationToken)
{
int colNumber = 0;
Post post = new();
await foreach (ReplicationValue value in insertMessage.NewRow)
{
switch (colNumber)
{
case 0:
{
int? tmp = await TryGetInt(value, cancellationToken);
if (tmp.HasValue)
{
post.PostId = tmp.Value;
}
break;
}
case 1: post.Title = await value.Get<string>(cancellationToken); break;
case 2: post.Content = await value.Get<string>(cancellationToken); break;
case 3:
{
int? tmp = await TryGetInt(value, cancellationToken);
if (tmp.HasValue)
{
post.BlogId = tmp.Value;
}
break;
}
default: break;
}
colNumber++;
static async ValueTask<int?> TryGetInt(ReplicationValue value, CancellationToken cancellationToken)
{
if (value.Kind == TupleDataKind.TextValue && value.GetPostgresType().Name == "integer")
{
string tmp = await value.Get<string>(cancellationToken);
if (int.TryParse(tmp, out int res))
{
return res;
}
}
return null;
}
}
Console.WriteLine(JsonSerializer.Serialize(post, new JsonSerializerOptions { WriteIndented = true }));
}
}
public class Blog
{
public int BlogId { get; set; }
public string Url { get; set; } = null!;
public List<Post> Posts { get; set; } = null!;
}
public class Post
{
public int PostId { get; set; }
public string Title { get; set; } = null!;
public string Content { get; set; } = null!;
public int BlogId { get; set; }
public Blog Blog { get; set; } = null!;
}
public class BloggingContext : DbContext
{
private readonly string _connString;
public BloggingContext(string connString) => _connString = connString;
public DbSet<Blog> Blogs { get; set; }
public DbSet<Post> Posts { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder
//.LogTo(Console.WriteLine)
.EnableSensitiveDataLogging()
.UseNpgsql(_connString)
.UseSnakeCaseNamingConvention();
}
}
@gfoidl
Copy link
Author

gfoidl commented Dec 29, 2022

Some notes

Publication

In the above SQL the publication is created for all actions, but they can be more specific too, like here where it's filtered to handle only InsertMessages. But for demo it's kept as is.
See e.g. pgAdmin4
grafik

Infos about publication / replication

View Description
pg_publication all publications created in the database
pg_publication_tables information about the mapping between publications and information of tables they contain
pg_replication_slots listing of all replication slots that currently exist on the database
pg_stat_replication Statistics: when a client for replication is connected, then this view shows some info about it.
pg_stat_replication_slots Statistics: info about replication slots

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment