Last active
December 8, 2023 17:29
-
-
Save gfoidl/7ee4643859f28d0825293e945e70d959 to your computer and use it in GitHub Desktop.
PostgreSQL replication via WAL
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: '3.8' | |
services: | |
db: | |
container_name: db | |
image: postgres | |
restart: unless-stopped | |
environment: | |
POSTGRES_USER: root | |
POSTGRES_PASSWORD: root | |
POSTGRES_DB: test_db | |
#PGDATA: /data/postgres | |
volumes: | |
- ./data:/data/postgres | |
ports: | |
- "5432:5432" | |
command: | |
- "postgres" | |
- "-c" | |
- "wal_level=logical" | |
pgadmin: | |
container_name: pg_admin | |
image: dpage/pgadmin4 | |
restart: unless-stopped | |
environment: | |
PGADMIN_DEFAULT_EMAIL: admin@test.at | |
PGADMIN_DEFAULT_PASSWORD: root | |
PGADMIN_CONFIG_CONSOLE_LOG_LEVEL: 30 # warning, cf. https://www.pgadmin.org/docs/pgadmin4/latest/config_py.html#config-py | |
ports: | |
- "5050:80" | |
depends_on: | |
- db | |
logging: | |
driver: none |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Text.Json; | |
using Microsoft.EntityFrameworkCore; | |
using Npgsql.Replication; | |
using Npgsql.Replication.PgOutput; | |
using Npgsql.Replication.PgOutput.Messages; | |
const string ConnString = "host=10.0.0.20;username=root;password=root;database=test_db"; | |
using CancellationTokenSource cts = new(); | |
await using BloggingContext db = new(ConnString); | |
await EnsureDbInitialized(db, cts.Token); | |
Task replicationTask = RunReplication(ConnString, cts.Token); | |
Blog? blog = db.Blogs.Include(b => b.Posts).FirstOrDefault(); | |
if (blog is null) | |
{ | |
blog = new() | |
{ | |
Url = "https://blog.test.at", | |
Posts = new List<Post>() | |
{ | |
new Post() | |
{ | |
Title = $"Post at {DateTimeOffset.Now}", | |
Content = "Hello world" | |
} | |
} | |
}; | |
db.Blogs.Add(blog); | |
} | |
else | |
{ | |
blog.Posts.Add(new Post() | |
{ | |
Title = $"Post at {DateTimeOffset.Now}", | |
Content = "Hello world" | |
}); | |
} | |
int rc = db.SaveChanges(); | |
Console.WriteLine($"rc = {rc}, hit any key to continue"); | |
Console.ReadKey(); | |
cts.Cancel(); | |
await replicationTask; | |
static async Task EnsureDbInitialized(BloggingContext db, CancellationToken cancellationToken) | |
{ | |
if (await db.Database.EnsureCreatedAsync(cancellationToken)) | |
{ | |
// Cf. https://www.npgsql.org/doc/replication.html | |
await db.Database.ExecuteSqlRawAsync("create publication posts_pub for table posts;", cancellationToken); | |
await db.Database.ExecuteSqlRawAsync("select * from pg_create_logical_replication_slot('posts_slot', 'pgoutput');", cancellationToken); | |
} | |
} | |
static async Task RunReplication(string connString, CancellationToken cancellationToken) | |
{ | |
try | |
{ | |
await using LogicalReplicationConnection connection = new(connString); | |
await connection.Open(cancellationToken); | |
PgOutputReplicationSlot slot = new("posts_slot"); | |
await foreach (PgOutputReplicationMessage message in connection.StartReplication(slot, new PgOutputReplicationOptions("posts_pub", 1), cancellationToken)) | |
{ | |
if (message is InsertMessage insertMessage) | |
{ | |
await InsertMessageHandler.Handle(insertMessage, cancellationToken); | |
} | |
connection.SetReplicationStatus(message.WalEnd); | |
// Force that status to be updated in the DB. See description of SetReplicationStatus | |
// for further info. | |
// For outbox-patterns this avoids that older messages are sent more than once. | |
// Note: it's still at-least-once and not exactly-once. | |
await connection.SendStatusUpdate(cancellationToken); | |
} | |
} | |
catch (OperationCanceledException) { } | |
catch (Exception ex) | |
{ | |
Console.WriteLine(ex); | |
} | |
} | |
public static class InsertMessageHandler | |
{ | |
public static async Task Handle(InsertMessage insertMessage, CancellationToken cancellationToken) | |
{ | |
int colNumber = 0; | |
Post post = new(); | |
await foreach (ReplicationValue value in insertMessage.NewRow) | |
{ | |
switch (colNumber) | |
{ | |
case 0: | |
{ | |
int? tmp = await TryGetInt(value, cancellationToken); | |
if (tmp.HasValue) | |
{ | |
post.PostId = tmp.Value; | |
} | |
break; | |
} | |
case 1: post.Title = await value.Get<string>(cancellationToken); break; | |
case 2: post.Content = await value.Get<string>(cancellationToken); break; | |
case 3: | |
{ | |
int? tmp = await TryGetInt(value, cancellationToken); | |
if (tmp.HasValue) | |
{ | |
post.BlogId = tmp.Value; | |
} | |
break; | |
} | |
default: break; | |
} | |
colNumber++; | |
static async ValueTask<int?> TryGetInt(ReplicationValue value, CancellationToken cancellationToken) | |
{ | |
if (value.Kind == TupleDataKind.TextValue && value.GetPostgresType().Name == "integer") | |
{ | |
string tmp = await value.Get<string>(cancellationToken); | |
if (int.TryParse(tmp, out int res)) | |
{ | |
return res; | |
} | |
} | |
return null; | |
} | |
} | |
Console.WriteLine(JsonSerializer.Serialize(post, new JsonSerializerOptions { WriteIndented = true })); | |
} | |
} | |
public class Blog | |
{ | |
public int BlogId { get; set; } | |
public string Url { get; set; } = null!; | |
public List<Post> Posts { get; set; } = null!; | |
} | |
public class Post | |
{ | |
public int PostId { get; set; } | |
public string Title { get; set; } = null!; | |
public string Content { get; set; } = null!; | |
public int BlogId { get; set; } | |
public Blog Blog { get; set; } = null!; | |
} | |
public class BloggingContext : DbContext | |
{ | |
private readonly string _connString; | |
public BloggingContext(string connString) => _connString = connString; | |
public DbSet<Blog> Blogs { get; set; } | |
public DbSet<Post> Posts { get; set; } | |
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) | |
{ | |
optionsBuilder | |
//.LogTo(Console.WriteLine) | |
.EnableSensitiveDataLogging() | |
.UseNpgsql(_connString) | |
.UseSnakeCaseNamingConvention(); | |
} | |
} |
Some notes
Publication
In the above SQL the publication is created for all actions, but they can be more specific too, like here where it's filtered to handle only InsertMessage
s. But for demo it's kept as is.
See e.g. pgAdmin4
Infos about publication / replication
View | Description |
---|---|
pg_publication | all publications created in the database |
pg_publication_tables | information about the mapping between publications and information of tables they contain |
pg_replication_slots | listing of all replication slots that currently exist on the database |
pg_stat_replication | Statistics: when a client for replication is connected, then this view shows some info about it. |
pg_stat_replication_slots | Statistics: info about replication slots |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
If one tries to register more than 1 replications, then it throws:
Well, that doesn't make sense anyway, as the replication isn't thought of "event streaming", only for 1:1 pub-sub.
For streaming there are dedicated approaches available (Kafka, RabbitMQ, Azure Event Bus with topics, Azure Event Grid, etc.).
So the Postgres replication can be used to feed one of those.