Skip to content

Instantly share code, notes, and snippets.

@mindon
Last active May 11, 2024 04:09
Show Gist options
  • Save mindon/3b0c107da2cb51cd15e10a31daade672 to your computer and use it in GitHub Desktop.
Save mindon/3b0c107da2cb51cd15e10a31daade672 to your computer and use it in GitHub Desktop.
helper in zig to get data from auto full backups of Tencent Cloud Postgres 腾讯云POSTGRES备份数据提取小工具
/// tencent cloud postgres data recovery helper
/// required:
/// [zig](https://ziglang.org/download/)
/// [docker](https://docker.com/) or [podman](https://podman.io/) required
/// usage:
/// 1) update recovery for zst path and tables to export
/// 2) `zig run txcloud_pgrecovery.zig`
const std = @import("std");
const print = std.debug.print;
const path = std.fs.path;
const Child = std.process.Child;
const ArrayList = std.ArrayList;
const pga = std.heap.page_allocator;
// for tables config to generate tsv and sql
const Table = struct {
db: []const u8,
name: []const u8,
fields: []const u8,
types: []const u8,
conds: []const u8,
cache: bool,
call: ?(*const fn ([]const u8, Table) anyerror!void),
};
// full path of auto backup downloaded from https://console.cloud.tencent.com/postgres
const recovery = "/Users/mindon/Downloads/automatic-20240428030826.tar.zst";
// configure data to get and generate update sql
const tables: [1]Table = .{
.{
.db = "hello", // database name
.name = "world", // table name
.fields = "id,a,b,c", // fields list
.types = ",n,,", // type of fields: s or empty for string, jsonb, others: n for number, ...anything else without quotes
.conds = "", // conditions: [where] [order by] [limits]
.cache = false, // using tsv without query if exists
.call = &sqlUpdate, // callback or null to handle tsv contents
},
};
/// auto backup recovery based on guides: https://cloud.tencent.com/document/product/409/11642
pub fn main() !void {
var ret: u8 = 0;
const term, const stdout, const stderr = shell(pga, &[_][]const u8{
"ls",
"-lh",
recovery,
}) catch |err| .{ Child.Term{ .Exited = 9 }, "", @errorName(err) };
defer {
if (term.Exited != 9) {
pga.free(stdout);
pga.free(stderr);
}
}
ret = term.Exited;
print("{s} - err: {s}\n", .{ stdout, stderr });
// prepare recovery directory
const dir = path.dirname(recovery);
var redir: []u8 = undefined;
if (dir) |v| {
const name = path.basename(recovery);
const i = std.mem.indexOf(u8, name, ".");
redir = try path.resolve(pga, &[_][]const u8{ v, if (i) |n| name[0..n] else name });
} else {
redir = try pga.dupe(u8, "./recovery");
}
defer pga.free(redir);
print("{s}\n", .{redir});
var target_dir = std.fs.cwd().openDir(redir, .{}) catch blk: {
if (ret != 0) {
return error.FileNotFound;
}
try std.fs.cwd().makePath(redir);
break :blk try std.fs.cwd().openDir(redir, .{});
};
target_dir.close();
// prepare postgresql.conf for local usage
const flpath = try path.resolve(pga, &[_][]const u8{ redir, "postgresql.conf" });
const fd = std.fs.openFileAbsolute(flpath, .{ .mode = .read_write }) catch blk: {
if (ret != 0) {
return error.FileNotFound;
}
dezstd(recovery, redir) catch {};
break :blk std.fs.openFileAbsolute(flpath, .{ .mode = .read_write }) catch null;
};
if (fd) |fp| {
try modify(fp);
fp.close();
try take(redir);
} else {
return error.FileNotFound;
}
}
/// take data from recovery postgres
fn take(redir: []const u8) !void {
// detect pg version
const flver = try path.resolve(pga, &[_][]const u8{ redir, "PG_VERSION" });
defer pga.free(flver);
const fdver = try std.fs.openFileAbsolute(flver, .{ .mode = .read_only });
defer fdver.close();
const sizever = if (fdver.stat() catch null) |s| s.size else 0;
var ver: []const u8 = "9.5";
if (sizever > 0) {
const buffer = try pga.alloc(u8, sizever);
defer pga.free(buffer);
try fdver.reader().readNoEof(buffer);
ver = try pga.dupe(u8, std.mem.trim(u8, buffer, " \r\n"));
}
const postgre_img = try std.fmt.allocPrint(pga, "postgres:{s}-alpine", .{ver});
defer pga.free(postgre_img);
if (sizever > 0) pga.free(ver);
print("Postgre Image: {s}\n", .{postgre_img});
// detect docker, if failed use podman instead
var cmd: []const u8 = "docker";
const term, const stdout_, const stderr_ = try shell(pga, &[_][]const u8{
cmd,
"-v",
});
if (term.Exited != 0) {
cmd = "podman";
const term__, const stdout__, const stderr__ = try shell(pga, &[_][]const u8{
cmd,
"-v",
});
print("{s}\n", .{if (term__.Exited != 0) stderr__ else stdout__});
pga.free(stdout__);
pga.free(stderr__);
} else {
print("{s}\n", .{stdout_});
}
pga.free(stdout_);
pga.free(stderr_);
const fldata = try path.resolve(pga, &[_][]const u8{ redir, "data.sh" });
var fddata = try std.fs.cwd().createFile(fldata, .{});
defer fddata.close();
var datalines = std.ArrayList(u8).init(pga);
defer datalines.deinit();
try datalines.appendSlice("pg_ctl start -D /recovery -w\n");
var n: u32 = 0;
for (tables) |t| {
const flname = try std.fmt.allocPrint(pga, "{s}.tsv", .{t.name});
const flpath = try path.resolve(pga, &[_][]const u8{ redir, flname });
const fd = std.fs.openFileAbsolute(flpath, .{ .mode = .read_only }) catch null;
var size: u64 = 0;
if (fd) |fp| {
size = if (fp.stat() catch null) |s| s.size else 0;
fp.close();
}
if (size == 0 and !t.cache) {
const line = try std.fmt.allocPrint(pga, "psql -d {s} -c \"COPY (SELECT {s} FROM {s} {s}) TO '/recovery/{s}.tsv' WITH DELIMITER E'\\t' CSV HEADER;\"\n", .{
t.db,
t.fields,
t.name,
t.conds,
t.name,
});
defer pga.free(line);
try datalines.appendSlice(line);
n += 1;
} else {
print("[SKIP] -- {s} exists\n", .{flname});
}
}
try fddata.writeAll(datalines.items);
if (n > 0) {
const flcmd = try path.resolve(pga, &[_][]const u8{ redir, "recover.sh" });
var fdcmd = try std.fs.cwd().createFile(flcmd, .{});
defer fdcmd.close();
const cmdlines =
\\#!/bin/bash
\\chmod 0700 /recovery
\\chown postgres:postgres /recovery
\\su postgres -c "sh /recovery/data.sh"
;
try fdcmd.writeAll(cmdlines);
_, const out, const err = try shell(pga, &[_][]const u8{
"chmod",
"+x",
flcmd,
});
pga.free(out);
pga.free(err);
// run recover.sh
const mapping_dir = try std.fmt.allocPrint(pga, "{s}:/recovery", .{redir});
defer pga.free(mapping_dir);
_, const stdout, const stderr = try shell(pga, &[_][]const u8{
cmd,
"run",
"--rm",
"-v",
mapping_dir,
postgre_img,
"/recovery/recover.sh",
});
print("{s} - err: {s}\n", .{ stdout, stderr });
pga.free(stdout);
pga.free(stderr);
}
for (tables) |t| {
if (t.call) |cb| {
const flpath = try path.resolve(pga, &[_][]const u8{ redir, t.name });
try cb(flpath, t);
}
}
}
// generate update sql
fn sqlUpdate(flpath: []const u8, t: Table) !void {
const fltsv = try std.fmt.allocPrint(pga, "{s}.tsv", .{flpath});
defer pga.free(fltsv);
const fd = try std.fs.openFileAbsolute(fltsv, .{ .mode = .read_only });
defer fd.close();
const size = if (fd.stat() catch null) |s| s.size else 0;
print("{d} - {s}\n", .{ size, fltsv });
if (size == 0) {
return error.EmptyFile;
}
const buffer = try pga.alloc(u8, size);
defer pga.free(buffer);
try fd.reader().readNoEof(buffer);
var lines = std.mem.split(u8, buffer, "\n");
var sqls = std.ArrayList(u8).init(pga);
defer sqls.deinit();
var n: u32 = 0;
var fields = std.mem.split(u8, t.fields, ",");
var types = std.mem.split(u8, t.types, ",");
while (lines.next()) |line| {
var d = std.mem.split(u8, line, "\t");
if (n == 0) {
n += 1;
continue;
}
fields.reset();
types.reset();
var i: u32 = 0;
var identify: []const u8 = "";
var data = std.ArrayList(u8).init(pga);
while (fields.next()) |part| {
if (d.next()) |value| {
const vt = if (types.next()) |tv| tv else "s";
var v = value;
var free = false;
if (std.mem.startsWith(u8, v, "\"") and std.mem.endsWith(u8, v, "\"")) {
const output = try pga.alloc(u8, std.mem.replacementSize(u8, v, "\"\"", "\""));
_ = std.mem.replace(u8, v, "\"\"", "\"", output);
v = try pga.dupe(u8, output[1..(output.len - 1)]);
free = true;
pga.free(output);
}
// TODO: form the update sql
if (std.mem.eql(u8, vt, "jsonb") or (std.mem.startsWith(u8, v, "{") and std.mem.endsWith(u8, v, "}"))) {
const s = try std.fmt.allocPrint(pga, "{s}='{s}'::jsonb", .{ part, v });
if (i == 0) {
identify = s;
} else {
try data.appendSlice(",");
try data.appendSlice(s);
}
// print("{d}: {s}='{s}'::jsonb\n", .{ i, part, v });
} else {
const s = if (vt.len == 0 or std.mem.eql(u8, vt, "s"))
try std.fmt.allocPrint(pga, "{s}='{s}'", .{ part, std.mem.trim(u8, v, " ") })
else
try std.fmt.allocPrint(pga, "{s}={s}", .{ part, std.mem.trim(u8, v, " ") });
if (i == 0) {
identify = s;
} else {
try data.appendSlice(",");
try data.appendSlice(s);
}
// print("{d}: {s}='{s}'\n", .{ i, part, std.mem.trim(u8, v, " ") });
}
if (free) {
pga.free(v);
}
i += 1;
}
}
if (data.items.len > 1) {
const sql = try std.fmt.allocPrint(pga, "UPDATE {s} SET {s} WHERE {s};\n", .{ path.basename(flpath), data.items[1..], identify });
try sqls.appendSlice(sql);
}
data.deinit();
n += 1;
}
const flsql = try std.fmt.allocPrint(pga, "{s}.sql", .{flpath});
const fdsql = try std.fs.cwd().createFile(flsql, .{});
defer fdsql.close();
try fdsql.writeAll(sqls.items);
print("[SQL] -- {s}\n\n", .{flsql});
}
/// shell call
fn shell(allocator: std.mem.Allocator, argv: []const []const u8) !std.meta.Tuple(&.{ Child.Term, []u8, []u8 }) {
var arena = std.heap.ArenaAllocator.init(allocator);
defer arena.deinit();
const alloc_arena = arena.allocator();
var child = Child.init(argv, alloc_arena);
child.stdout_behavior = .Pipe;
child.stderr_behavior = .Pipe;
var stdout = ArrayList(u8).init(alloc_arena);
var stderr = ArrayList(u8).init(alloc_arena);
defer {
stdout.deinit();
stderr.deinit();
}
try child.spawn();
try child.collectOutput(&stdout, &stderr, 1024);
const term = try child.wait();
const a = std.mem.trim(u8, stdout.items, " \t\n\r");
const out = try allocator.alloc(u8, a.len);
if (a.len > 0) {
@memcpy(out, a);
}
const b = std.mem.trim(u8, stderr.items, " \t\n\r");
const err = try allocator.alloc(u8, b.len);
if (b.len > 0) {
@memcpy(err, b);
}
return .{
term,
out,
err,
};
}
/// decompress .tar.zstd file
fn dezstd(flpath: []const u8, redir: []const u8) !void {
_, const stdout, const stderr = try shell(pga, &[_][]const u8{
"tar",
"--zstd",
"-C",
redir,
"-xf",
flpath,
});
pga.free(stdout);
pga.free(stderr);
print("{s} {s}\n", .{ stdout, stderr });
{
const labels = try path.resolve(pga, &[_][]const u8{ redir, "backup_label" });
_, const out, const err = try shell(pga, &[_][]const u8{
"rm",
"-rf",
labels,
});
pga.free(out);
pga.free(err);
}
}
/// comment lines in postgresql.conf for local recovery
fn modify(fp: std.fs.File) !void {
const clues =
\\shared_preload_libraries
\\local_preload_libraries
\\pg_stat_statements.max
\\pg_stat_statements.track
\\archive_mode
\\archive_command
\\synchronous_commit
\\synchronous_standby_names
\\port =
\\unix_socket_directories
\\include = 'standby.conf'
;
var comments = std.mem.split(u8, clues, "\n");
const size = if (fp.stat() catch null) |s| s.size else 0;
// print("{d}\n", .{size});
if (size == 0) {
return error.EmptyFile;
}
const buffer = try pga.alloc(u8, size);
defer pga.free(buffer);
try fp.reader().readNoEof(buffer);
var lines = std.mem.split(u8, buffer, "\n");
var updated = std.ArrayList(u8).init(pga);
defer updated.deinit();
var i: u32 = 0;
var fixed = false;
while (lines.next()) |line| {
// print("{s}\n", .{line});
if (i > 0) {
try updated.appendSlice("\n");
}
if (std.mem.startsWith(u8, line, "port = '5432'")) {
fixed = true;
}
if (!fixed) {
comments.reset();
while (comments.next()) |comment| {
if (std.mem.startsWith(u8, line, comment)) {
try updated.appendSlice("# ");
break;
}
}
}
try updated.appendSlice(line);
i += 1;
}
if (fixed) {
return;
}
const fixing =
\\
\\port = '5432'
\\unix_socket_directories = '/var/run/postgresql/'
\\synchronous_commit = local
\\synchronous_standby_names = ''
\\
;
try updated.appendSlice(fixing);
try fp.seekTo(0);
try fp.writer().writeAll(updated.items);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment