Skip to content

Instantly share code, notes, and snippets.

@BenLubar
Created August 20, 2017 22:17
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save BenLubar/8d43f7aac304c42e2e5c6fc728038a51 to your computer and use it in GitHub Desktop.
Save BenLubar/8d43f7aac304c42e2e5c6fc728038a51 to your computer and use it in GitHub Desktop.
psql_mongo
#!/bin/bash
go build
./psql_mongo -mongo-host "`docker inspect -f '{{.NetworkSettings.Networks.wtdwtf.IPAddress}}' wtdwtf-mongo`" -mongo-db 0 -postgres "postgres://postgres@`docker inspect -f '{{.NetworkSettings.Networks.wtdwtf.IPAddress}}' wtdwtf-nodebb-postgres`/nodebb?sslmode=disable" -steps "$1"
package main
import (
"database/sql"
"flag"
"log"
"strings"
"github.com/davecgh/go-spew/spew"
_ "github.com/lib/pq"
mgo "gopkg.in/mgo.v2"
)
func main() {
flagMongoHost := flag.String("mongo-host", "127.0.0.1", "MongoDB hostname")
flagMongoDatabase := flag.String("mongo-db", "nodebb", "MongoDB database")
flagPostgres := flag.String("postgres", "postgres://postgres@127.0.0.1/nodebb?sslmode=disable", "PostgreSQL URL")
flagSteps := flag.String("steps", "init,categories,topics,posts,foreignkeys", "")
flag.Parse()
minfo, err := mgo.ParseURL(*flagMongoHost)
ifError(err)
minfo.Timeout = 0
minfo.FailFast = true
mongo, err := mgo.DialWithInfo(minfo)
ifError(err)
defer mongo.Close()
objects := mongo.DB(*flagMongoDatabase).C("objects")
db, err := sql.Open("postgres", *flagPostgres)
ifError(err)
for _, step := range strings.Split(*flagSteps, ",") {
if s, ok := steps[step]; ok {
s(db, objects)
} else {
panic("unknown step: " + step)
}
}
}
var steps = map[string]func(*sql.DB, *mgo.Collection){
"init": step1,
"categories": makeStep(step2Categories, "categories"),
"topics": makeStep(step2Topics, "topics"),
"posts": makeStep(step2Posts, "posts"),
"users": makeStep(step2Users, "users"),
"foreignkeys": makeStep(step3ForeignKeys, "foreignkeys"),
}
func makeStep(f func(*sql.Tx), name string) func(*sql.DB, *mgo.Collection) {
return func(db *sql.DB, _ *mgo.Collection) {
log.Println("starting transaction (" + name + ")")
tx, err := db.Begin()
ifError(err)
defer tx.Rollback()
f(tx)
log.Println("committing transaction (" + name + ")")
ifError(tx.Commit())
}
}
func ifError(err error, extra ...interface{}) {
if err != nil {
cfg := spew.NewDefaultConfig()
cfg.ContinueOnMethod = true
cfg.Dump(append(extra, err)...)
panic(err)
}
}
package main
import (
"database/sql"
"encoding/json"
"log"
"math"
"reflect"
"strings"
"time"
"github.com/lib/pq"
mgo "gopkg.in/mgo.v2"
)
func step1(db *sql.DB, objects *mgo.Collection) {
log.Println("creating objects table")
_, err := db.Exec(`CREATE TABLE "objects" ("data" JSONB NOT NULL CHECK("data" ? '_key'));`)
ifError(err)
log.Println("starting transaction (init)")
tx, err := db.Begin()
ifError(err)
defer tx.Rollback()
log.Println("preparing to copy data")
stmt, err := tx.Prepare(pq.CopyIn("objects", "data"))
ifError(err)
it := objects.Find(nil).Iter()
defer it.Close()
var count int64
for {
var obj map[string]interface{}
if !it.Next(&obj) {
break
}
if _, ok := obj["_key"]; !ok {
continue
}
count++
if count%10000 == 0 {
log.Println("copying from MongoDB to PostgreSQL...", count)
}
delete(obj, "_id")
delete(obj, "undefined")
v := removeInvalid(reflect.ValueOf(obj))
b, err := json.Marshal(v.Interface())
ifError(err, obj)
_, err = stmt.Exec(string(b))
ifError(err, obj, string(b))
}
if count%10000 != 0 {
log.Println("copying from MongoDB to PostgreSQL...", count)
}
_, err = stmt.Exec()
ifError(err)
log.Println("finished copying")
ifError(stmt.Close())
ifError(it.Err())
log.Println("creating index on (key, score)")
_, err = tx.Exec(`CREATE INDEX "idx__objects__key__score" ON "objects"(("data"->>'_key') ASC, (("data"->>'score')::numeric) DESC);`)
ifError(err)
log.Println("creating unique index on (key)")
_, err = tx.Exec(`CREATE UNIQUE INDEX "uniq__objects__key" ON "objects"(("data"->>'_key')) WHERE NOT ("data" ? 'score');`)
ifError(err)
log.Println("creating unique index on (key, value)")
_, err = tx.Exec(`CREATE UNIQUE INDEX "uniq__objects__key__value" ON "objects"(("data"->>'_key') ASC, ("data"->>'value') DESC);`)
ifError(err)
log.Println("creating index on (expireAt)")
_, err = tx.Exec(`CREATE INDEX "idx__objects__expireAt" ON "objects"((("data"->>'expireAt')::numeric) ASC) WHERE "data" ? 'expireAt';`)
ifError(err)
log.Println("creating function for expireAt")
_, err = tx.Exec(`CREATE FUNCTION "fun__objects__expireAt"() RETURNS TRIGGER AS $$ BEGIN DELETE FROM "objects" WHERE "data" ? 'expireAt' AND ("data"->>'expireAt')::numeric < EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000; RETURN NEW; END; $$ LANGUAGE plpgsql;`)
ifError(err)
log.Println("creating trigger for expireAt")
_, err = tx.Exec(`CREATE TRIGGER "tr__objects__expireAt" AFTER INSERT OR UPDATE ON "objects" EXECUTE PROCEDURE "fun__objects__expireAt"();`)
ifError(err)
log.Println("clustering for improved performance")
_, err = tx.Exec(`CLUSTER "objects" USING "uniq__objects__key__value";`)
ifError(err)
log.Println("committing transaction (init)")
ifError(tx.Commit())
}
var nullReplacer = strings.NewReplacer("\x00", "x00")
func removeInvalid(v reflect.Value) reflect.Value {
switch v.Kind() {
case reflect.Struct:
if t, ok := v.Interface().(time.Time); ok {
v = reflect.ValueOf(float64(t.UnixNano()) / float64(time.Millisecond))
}
case reflect.Interface:
v = removeInvalid(v.Elem())
case reflect.Slice:
for i, l := 0, v.Len(); i < l; i++ {
v.Index(i).Set(removeInvalid(v.Index(i)))
}
case reflect.Map:
for _, k := range v.MapKeys() {
v.SetMapIndex(k, removeInvalid(v.MapIndex(k)))
}
case reflect.Float32, reflect.Float64:
if math.IsNaN(v.Float()) {
v = reflect.ValueOf("NaN")
}
case reflect.String:
v = reflect.ValueOf(nullReplacer.Replace(v.String()))
}
return v
}
package main
import (
"database/sql"
"log"
)
func step2Categories(tx *sql.Tx) {
log.Println("creating categories table")
_, err := tx.Exec(`CREATE TABLE "categories" (
"cid" bigserial NOT NULL PRIMARY KEY,
"parentCid" bigint DEFAULT NULL,
"name" text NOT NULL,
"slug" text NOT NULL CONSTRAINT "ck__categories__slug__cid" CHECK (char_length("slug") - char_length(replace("slug", '/', '')) = 1 AND split_part("slug", '/', 1) = "cid"::text),
"description" text NOT NULL DEFAULT '',
"link" text DEFAULT NULL,
"icon" text,
"class" text,
"color" text,
"bgColor" text,
"imageClass" text,
"order" int NOT NULL,
"disabled" boolean NOT NULL DEFAULT false,
"post_count" bigint NOT NULL DEFAULT 0,
"topic_count" bigint NOT NULL DEFAULT 0,
"numRecentReplies" int NOT NULL DEFAULT 1,
"data" jsonb NOT NULL DEFAULT '{}'
);`)
ifError(err)
log.Println("clustering table for improved performance")
_, err = tx.Exec(`CLUSTER "categories" USING "categories_pkey";`)
ifError(err)
log.Println("extracting categories")
_, err = tx.Exec(`INSERT INTO "categories" SELECT
NULLIF(NULLIF(c."data"->>'cid', ''), '0')::bigint "cid",
NULLIF(NULLIF(c."data"->>'parentCid', ''), '0')::bigint "parentCid",
c."data"->>'name' "name",
c."data"->>'slug' "slug",
c."data"->>'description' "description",
NULLIF(c."data"->>'link', '') "link",
c."data"->>'icon' "icon",
c."data"->>'class' "class",
c."data"->>'color' "color",
c."data"->>'bgColor' "bgColor",
c."data"->>'imageClass' "imageClass",
(c."data"->>'order')::int "order",
COALESCE(c."data"->>'disabled', '') = '1' "disabled",
COALESCE(NULLIF(c."data"->>'post_count', ''), '0')::bigint "post_count",
COALESCE(NULLIF(c."data"->>'topic_count', ''), '0')::bigint "topic_count",
NULLIF(c."data"->>'numRecentReplies', '')::int "numRecentReplies",
c."data" - '_key' - 'cid' - 'parentCid' - 'name' - 'slug' - 'description' - 'link' - 'icon' - 'class' - 'color' - 'bgColor' - 'imageClass' - 'order' - 'disabled' - 'post_count' - 'topic_count' - 'numRecentReplies' "data"
FROM "objects" i
INNER JOIN "objects" c
ON c."data"->>'_key' = 'category:' || (i."data"->>'value')
WHERE i."data"->>'_key' = 'categories:cid';`)
ifError(err)
log.Println("setting next category ID")
_, err = tx.Exec(`DO $$
DECLARE
"nextCid" BIGINT;
BEGIN
SELECT "data"->>'nextCid' INTO "nextCid"
FROM "objects"
WHERE "data"->>'_key' = 'global';
EXECUTE 'ALTER SEQUENCE "categories_cid_seq" RESTART WITH ' || ("nextCid" + 1) || ';';
END;
$$ language plpgsql;`)
ifError(err)
log.Println("creating index on (order)")
_, err = tx.Exec(`CREATE INDEX "idx__categories__order" ON "categories"("order");`)
ifError(err)
log.Println("creating index on (parentCid)")
_, err = tx.Exec(`CREATE INDEX "idx__categories__parentCid" ON "categories"("parentCid");`)
ifError(err)
}
package main
import (
"database/sql"
"log"
)
func step2Posts(tx *sql.Tx) {
log.Println("creating posts table")
_, err := tx.Exec(`CREATE TABLE "posts" (
"pid" bigserial NOT NULL PRIMARY KEY,
"uid" bigint,
"tid" bigint NOT NULL,
"toPid" bigint DEFAULT NULL,
"timestamp" timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP,
"ip" inet[] DEFAULT NULL,
"content" text NOT NULL,
"votes" bigint NOT NULL DEFAULT 0,
"upvotes" bigint NOT NULL DEFAULT 0,
"downvotes" bigint NOT NULL DEFAULT 0,
"replies" bigint NOT NULL DEFAULT 0,
"deleted" boolean NOT NULL DEFAULT false,
"deleterUid" bigint DEFAULT NULL,
"edited" timestamptz DEFAULT NULL,
"editor" bigint DEFAULT NULL,
"data" jsonb NOT NULL DEFAULT '{}'
);`)
ifError(err)
log.Println("clustering for improved performance")
_, err = tx.Exec(`CLUSTER "posts" USING "posts_pkey";`)
ifError(err)
log.Println("extracting posts")
_, err = tx.Exec(`INSERT INTO "posts" SELECT
NULLIF(NULLIF(p."data"->>'pid', ''), '0')::bigint "pid",
NULLIF(NULLIF(p."data"->>'uid', ''), '0')::bigint "uid",
NULLIF(NULLIF(p."data"->>'tid', ''), '0')::bigint "tid",
NULLIF(NULLIF(p."data"->>'toPid', ''), '0')::bigint "toPid",
to_timestamp(NULLIF(p."data"->>'timestamp', '')::double precision / 1000) "timestamp",
string_to_array(NULLIF(p."data"->>'ip', ''), ', ')::inet[] "ip",
p."data"->>'content' "content",
COALESCE(NULLIF(p."data"->>'votes', ''), '0')::bigint "votes",
COALESCE(NULLIF(p."data"->>'upvotes', ''), '0')::bigint "upvotes",
COALESCE(NULLIF(p."data"->>'downvotes', ''), '0')::bigint "downvotes",
COALESCE(NULLIF(p."data"->>'replies', ''), '0')::bigint "replies",
COALESCE(p."data"->>'deleted', '0') = '1' "deleted",
NULLIF(NULLIF(p."data"->>'deleterUid', ''), '0')::bigint "deleterUid",
to_timestamp(NULLIF(p."data"->>'edited', '')::double precision / 1000) "edited",
NULLIF(NULLIF(p."data"->>'editor', ''), '0')::bigint "editor",
p."data" - '_key' - 'pid' - 'uid' - 'tid' - 'toPid' - 'timestamp' - 'ip' - 'content' - 'votes' - 'upvotes' - 'downvotes' - 'replies' - 'deleted' - 'deleterUid' - 'edited' - 'editor' "data"
FROM "objects" i
INNER JOIN "objects" p
ON p."data"->>'_key' = 'post:' || (i."data"->>'value')
WHERE i."data"->>'_key' = 'posts:pid';`)
ifError(err)
log.Println("setting next post ID")
_, err = tx.Exec(`DO $$
DECLARE
"nextPid" BIGINT;
BEGIN
SELECT "data"->>'nextPid' INTO "nextPid"
FROM "objects"
WHERE "data"->>'_key' = 'global';
EXECUTE 'ALTER SEQUENCE "posts_pid_seq" RESTART WITH ' || ("nextPid" + 1) || ';';
END;
$$ language plpgsql;`)
ifError(err)
log.Println("creating index on (tid)")
_, err = tx.Exec(`CREATE INDEX "idx__posts__tid" ON "posts"("tid");`)
ifError(err)
log.Println("creating index on (timestamp)")
_, err = tx.Exec(`CREATE INDEX "idx__posts__timestamp" ON "posts"("timestamp");`)
ifError(err)
log.Println("creating index on (uid)")
_, err = tx.Exec(`CREATE INDEX "idx__posts__uid" ON "posts"("uid");`)
ifError(err)
log.Println("creating index on (toPid)")
_, err = tx.Exec(`CREATE INDEX "idx__posts__toPid" ON "posts"("toPid");`)
ifError(err)
log.Println("creating index on (ip)")
_, err = tx.Exec(`CREATE INDEX "idx__posts__ip" ON "posts" USING GIN ("ip");`)
ifError(err)
log.Println("creating full text index for posts")
_, err = tx.Exec(`CREATE INDEX "idx__posts__content" ON "posts" USING GIN ((to_tsvector('english', "content")));`)
ifError(err)
}
package main
import (
"database/sql"
"log"
)
func step2Topics(tx *sql.Tx) {
log.Println("creating topics table")
_, err := tx.Exec(`CREATE TABLE "topics" (
"tid" bigserial NOT NULL PRIMARY KEY,
"cid" bigint NOT NULL,
"oldCid" bigint DEFAULT NULL,
"uid" bigint,
"title" text NOT NULL,
"slug" text NOT NULL,
"timestamp" timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP,
"deleted" boolean NOT NULL DEFAULT false,
"deletedTimestamp" timestamptz DEFAULT NULL,
"deleterUid" bigint DEFAULT NULL,
"lastposttime" timestamptz DEFAULT NULL,
"locked" boolean NOT NULL DEFAULT false,
"pinned" boolean NOT NULL DEFAULT false,
"mainPid" bigint,
"teaserPid" bigint DEFAULT NULL,
"thumb" text DEFAULT NULL,
"viewcount" bigint NOT NULL DEFAULT 0,
"postcount" bigint NOT NULL DEFAULT 0,
"data" jsonb NOT NULL DEFAULT '{}'
);`)
ifError(err)
log.Println("clustering for improved performance")
_, err = tx.Exec(`CLUSTER "topics" USING "topics_pkey";`)
ifError(err)
log.Println("extracting topics")
_, err = tx.Exec(`INSERT INTO "topics" SELECT
NULLIF(NULLIF(t."data"->>'tid', ''), '0')::bigint "tid",
NULLIF(NULLIF(t."data"->>'cid', ''), '0')::bigint "cid",
NULLIF(NULLIF(t."data"->>'oldCid', ''), '0')::bigint "oldCid",
NULLIF(NULLIF(t."data"->>'uid', ''), '0')::bigint "uid",
t."data"->>'title' "title",
t."data"->>'slug' "slug",
to_timestamp(NULLIF(t."data"->>'timestamp', '')::double precision / 1000) "timestamp",
COALESCE(t."data"->>'deleted', '') = '1' "deleted",
to_timestamp(NULLIF(t."data"->>'deletedTimestamp', '')::double precision / 1000) "deletedTimestamp",
NULLIF(NULLIF(t."data"->>'deleterUid', ''), '0')::bigint "deleterUid",
to_timestamp(NULLIF(t."data"->>'lastposttime', '')::double precision / 1000) "lastposttime",
COALESCE(t."data"->>'locked', '') = '1' "locked",
COALESCE(t."data"->>'pinned', '') = '1' "pinned",
NULLIF(NULLIF(t."data"->>'mainPid', ''), '0')::bigint "mainPid",
NULLIF(NULLIF(t."data"->>'teaserPid', ''), '0')::bigint "teaserPid",
NULLIF(t."data"->>'thumb', '') "thumb",
COALESCE(NULLIF(t."data"->>'viewcount', ''), '0')::bigint "viewcount",
COALESCE(NULLIF(t."data"->>'postcount', ''), '0')::bigint "postcount",
t."data" - '_key' - 'tid' - 'cid' - 'oldCid' - 'uid' - 'title' - 'slug' - 'timestamp' - 'deleted' - 'deletedTimestamp' - 'deleterUid' - 'lastposttime' - 'locked' - 'pinned' - 'mainPid' - 'teaserPid' - 'thumb' - 'viewcount' - 'postcount' "data"
FROM "objects" i
INNER JOIN "objects" t
ON t."data"->>'_key' = 'topic:' || (i."data"->>'value')
WHERE i."data"->>'_key' = 'topics:tid';`)
ifError(err)
log.Println("setting next topic ID")
_, err = tx.Exec(`DO $$
DECLARE
"nextTid" BIGINT;
BEGIN
SELECT "data"->>'nextTid' INTO "nextTid"
FROM "objects"
WHERE "data"->>'_key' = 'global';
EXECUTE 'ALTER SEQUENCE "topics_tid_seq" RESTART WITH ' || ("nextTid" + 1) || ';';
END;
$$ language plpgsql;`)
ifError(err)
log.Println("creating index on (cid, pinned)")
_, err = tx.Exec(`CREATE INDEX "idx__topics__cid__pinned" ON "topics"("cid", "pinned");`)
ifError(err)
log.Println("creating index on (uid)")
_, err = tx.Exec(`CREATE INDEX "idx__topics__uid" ON "topics"("uid");`)
ifError(err)
log.Println("creating index on (timestamp)")
_, err = tx.Exec(`CREATE INDEX "idx__topics__timestamp" ON "topics"("timestamp");`)
ifError(err)
log.Println("creating index on (lastposttime)")
_, err = tx.Exec(`CREATE INDEX "idx__posts__lastposttime" ON "topics"("lastposttime");`)
ifError(err)
log.Println("creating full text index for topics")
_, err = tx.Exec(`CREATE INDEX "idx__topics__title" ON "topics" USING GIN ((to_tsvector('english', "title")));`)
ifError(err)
}
package main
import (
"database/sql"
"log"
)
func step2Users(tx *sql.Tx) {
log.Println("creating users table")
_, err := tx.Exec(`CREATE TABLE "users" (
"uid" bigserial NOT NULL PRIMARY KEY,
"username" text NOT NULL,
"userslug" text NOT NULL,
"fullname" text DEFAULT NULL,
"email" text,
"email:confirmed" boolean NOT NULL DEFAULT false,
"banned" boolean NOT NULL DEFAULT false,
"banned:expire" timestamptz DEFAULT NULL,
"birthday" date DEFAULT NULL,
"password" text,
"passwordExpiry" timestamptz DEFAULT NULL,
"aboutme" text DEFAULT NULL,
"location" text DEFAULT NULL,
"website" text DEFAULT NULL,
"groupTitle" text DEFAULT NULL,
"signature" text DEFAULT NULL,
"status" text NOT NULL DEFAULT 'online',
"joindate" timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP,
"lastonline" timestamptz DEFAULT NULL,
"lastposttime" timestamptz DEFAULT NULL,
"moderationNote" text NOT NULL DEFAULT '',
"rss_token" text DEFAULT NULL,
"showemail" boolean NOT NULL,
"cover:position" text NOT NULL DEFAULT '50% 50%',
"cover:url" text DEFAULT NULL,
"picture" text DEFAULT NULL,
"uploadedpicture" text DEFAULT NULL,
"flags" bigint NOT NULL DEFAULT 0,
"followerCount" bigint NOT NULL DEFAULT 0,
"followingCount" bigint NOT NULL DEFAULT 0,
"postcount" bigint NOT NULL DEFAULT 0,
"profileviews" bigint NOT NULL DEFAULT 0,
"topiccount" bigint NOT NULL DEFAULT 0,
"reputation" bigint NOT NULL DEFAULT 0,
"data" jsonb NOT NULL DEFAULT '{}'
);`)
ifError(err)
log.Println("clustering for improved performance")
_, err = tx.Exec(`CLUSTER "users" USING "users_pkey";`)
ifError(err)
log.Println("extracting users")
_, err = tx.Exec(`CREATE FUNCTION pg_temp.try_parse_date(text) RETURNS date LANGUAGE plpgsql IMMUTABLE AS $$
BEGIN
RETURN $1::date;
EXCEPTION WHEN OTHERS THEN
RETURN NULL;
END;
$$;
INSERT INTO "users" SELECT
NULLIF(NULLIF(u."data"->>'uid', ''), '0')::bigint "uid",
u."data"->>'username' "username",
u."data"->>'userslug' "userslug",
NULLIF(u."data"->>'fullname', '') "fullname",
NULLIF(u."data"->>'email', '') "email",
COALESCE(u."data"->>'email:confirmed', '0') = '1' "email:confirmed",
COALESCE(u."data"->>'banned', '0') = '1' "banned",
to_timestamp(NULLIF(NULLIF(u."data"->>'banned:expire', ''), '0')::double precision / 1000) "banned:expire",
pg_temp.try_parse_date(NULLIF(u."data"->>'birthday', '')) "birthday",
NULLIF(u."data"->>'password', '') "password",
to_timestamp(NULLIF(NULLIF(u."data"->>'passwordExpiry', ''), '0')::double precision / 1000) "passwordExpiry",
NULLIF(u."data"->>'aboutme', '') "aboutme",
NULLIF(u."data"->>'location', '') "location",
NULLIF(u."data"->>'website', '') "website",
NULLIF(u."data"->>'groupTitle', '') "groupTitle",
NULLIF(u."data"->>'signature', '') "signature",
COALESCE(NULLIF(u."data"->>'status', ''), 'online') "status",
to_timestamp(NULLIF(NULLIF(u."data"->>'joindate', ''), '0')::double precision / 1000) "joindate",
to_timestamp(NULLIF(NULLIF(u."data"->>'lastonline', ''), '0')::double precision / 1000) "lastonline",
to_timestamp(NULLIF(NULLIF(u."data"->>'lastposttime', ''), '0')::double precision / 1000) "lastposttime",
COALESCE(u."data"->>'moderationNote', '') "moderationNote",
NULLIF(u."data"->>'rss_token', '') "rss_token",
COALESCE(u."data"->>'showemail', '0') = '1' "showemail",
COALESCE(NULLIF(u."data"->>'cover:position', ''), '50% 50%') "cover:position",
NULLIF(u."data"->>'cover:url', '') "cover:url",
NULLIF(u."data"->>'picture', '') "picture",
NULLIF(u."data"->>'uploadedpicture', '') "uploadedpicture",
COALESCE(NULLIF(u."data"->>'flags', ''), '0')::bigint "flags",
COALESCE(NULLIF(u."data"->>'followerCount', ''), '0')::bigint "followerCount",
COALESCE(NULLIF(u."data"->>'followingCount', ''), '0')::bigint "followingCount",
COALESCE(NULLIF(u."data"->>'postcount', ''), '0')::bigint "postcount",
COALESCE(NULLIF(u."data"->>'profileviews', ''), '0')::bigint "profileviews",
COALESCE(NULLIF(u."data"->>'topiccount', ''), '0')::bigint "topiccount",
COALESCE(NULLIF(u."data"->>'reputation', ''), '0')::bigint "reputation",
u."data" - '_key' - 'uid' - 'username' - 'userslug' - 'fullname' - 'email' - 'email:confirmed' - 'banned' - 'banned:expire' - 'birthday' - 'password' - 'passwordExpiry' - 'aboutme' - 'location' - 'website' - 'groupTitle' - 'signature' - 'status' - 'joindate' - 'lastonline' - 'lastposttime' - 'moderationNote' - 'reputation' - 'rss_token' - 'showemail' - 'cover:position' - 'cover:url' - 'picture' - 'uploadedpicture' - 'flags' - 'followerCount' - 'followingCount' - 'postcount' - 'profileviews' - 'topiccount' "data"
FROM "objects" i
INNER JOIN "objects" u
ON u."data"->>'_key' = 'user:' || (i."data"->>'score')
WHERE i."data"->>'_key' = 'username:uid';
DROP FUNCTION pg_temp.try_parse_date(text);`)
ifError(err)
log.Println("setting next user ID")
_, err = tx.Exec(`DO $$
DECLARE
"nextUid" BIGINT;
BEGIN
SELECT "data"->>'nextUid' INTO "nextUid"
FROM "objects"
WHERE "data"->>'_key' = 'global';
EXECUTE 'ALTER SEQUENCE "users_uid_seq" RESTART WITH ' || ("nextUid" + 1) || ';';
END;
$$ language plpgsql;`)
ifError(err)
// TODO: indexes
}
package main
import (
"database/sql"
"log"
)
func step3ForeignKeys(tx *sql.Tx) {
foreignKeySetNull(tx, "categories", "cid", "parentCid", "categories", "cid")
foreignKeySetNull(tx, "posts", "pid", "uid", "users", "uid")
foreignKeyCascade(tx, "posts", "pid", "tid", "topics", "tid")
foreignKeySetNull(tx, "posts", "pid", "toPid", "posts", "pid")
foreignKeySetNull(tx, "posts", "pid", "deleterUid", "users", "uid")
foreignKeySetNull(tx, "posts", "pid", "editor", "users", "uid")
foreignKeyCascade(tx, "topics", "tid", "cid", "categories", "cid")
foreignKeySetNull(tx, "topics", "tid", "oldCid", "categories", "cid")
foreignKeySetNull(tx, "topics", "tid", "uid", "users", "uid")
foreignKeySetNull(tx, "topics", "tid", "deleterUid", "users", "uid")
foreignKeySetNull(tx, "topics", "tid", "mainPid", "posts", "pid")
foreignKeySetNull(tx, "topics", "tid", "teaserPid", "posts", "pid")
}
func foreignKeySetNull(tx *sql.Tx, table, pkey, field, fTable, fField string) {
log.Printf("clearing invalid foreign keys %q(%q) -> %q(%q)", table, field, fTable, fField)
_, err := tx.Exec(`CREATE TEMPORARY TABLE "temp_1" AS SELECT "` + fField + `" FROM "` + fTable + `" ORDER BY "` + fField + `";
CREATE TEMPORARY TABLE "temp_2" AS SELECT "` + pkey + `", "` + field + `" FROM "` + table + `" WHERE "` + field + `" IS NOT NULL ORDER BY "` + pkey + `";
ANALYZE "temp_1";
ANALYZE "temp_2";
WITH p AS (
SELECT t2."` + pkey + `"
FROM "temp_2" t2
LEFT OUTER JOIN "temp_1" t1
ON t1."` + fField + `" = t2."` + field + `"
WHERE t1."` + fField + `" IS NULL
)
UPDATE "` + table + `"
SET "` + field + `" = NULL
WHERE "` + pkey + `" IN (SELECT "` + pkey + `" FROM p);
DROP TABLE "temp_1", "temp_2";`)
ifError(err)
foreignKey(tx, table, field, fTable, fField, ` ON DELETE SET NULL`)
}
func foreignKeyCascade(tx *sql.Tx, table, pkey, field, fTable, fField string) {
log.Printf("deleting invalid foreign keys %q(%q) -> %q(%q)", table, field, fTable, fField)
_, err := tx.Exec(`CREATE TEMPORARY TABLE "temp_1" AS SELECT "` + fField + `" FROM "` + fTable + `" ORDER BY "` + fField + `";
CREATE TEMPORARY TABLE "temp_2" AS SELECT "` + pkey + `", "` + field + `" FROM "` + table + `" WHERE "` + field + `" IS NOT NULL ORDER BY "` + pkey + `";
ANALYZE "temp_1";
ANALYZE "temp_2";
WITH p AS (
SELECT t2."` + pkey + `"
FROM "temp_2" t2
LEFT OUTER JOIN "temp_1" t1
ON t1."` + fField + `" = t2."` + field + `"
WHERE t1."` + fField + `" IS NULL
)
DELETE FROM "` + table + `"
WHERE "` + pkey + `" IN (SELECT "` + pkey + `" FROM p);
DROP TABLE "temp_1", "temp_2";`)
ifError(err)
foreignKey(tx, table, field, fTable, fField, ` ON DELETE CASCADE`)
}
func foreignKey(tx *sql.Tx, table, field, fTable, fField, extra string) {
log.Printf("creating foreign key on %q(%q) -> %q(%q)", table, field, fTable, fField)
_, err := tx.Exec(`ALTER TABLE "` + table + `" ADD CONSTRAINT "fk__` + table + `__` + field + `" FOREIGN KEY ("` + field + `") REFERENCES "` + fTable + `"("` + fField + `")` + extra + `;`)
ifError(err)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment