Skip to content

Instantly share code, notes, and snippets.

@BenLubar
Last active July 11, 2016 15:43
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save BenLubar/98fe068162c5ae9b11659b7029da3a76 to your computer and use it in GitHub Desktop.
Save BenLubar/98fe068162c5ae9b11659b7029da3a76 to your computer and use it in GitHub Desktop.
wtdwtf-science
package main
import (
"database/sql"
"database/sql/driver"
"fmt"
"log"
"math"
"net"
"os/exec"
"regexp"
"strconv"
"strings"
"time"
_ "github.com/denisenkom/go-mssqldb"
"github.com/lib/pq"
"github.com/pkg/errors"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)
var db, cs, dc *sql.DB
var objects *mgo.Collection
func main() {
_ = exec.Command("docker", "stop", "wtdwtf-nodebb-postgres").Run()
_ = exec.Command("docker", "rm", "-v", "wtdwtf-nodebb-postgres").Run()
err := exec.Command("docker", "run", "-d", "--name", "wtdwtf-nodebb-postgres", "--restart", "unless-stopped", "--net", "wtdwtf", "benlubar/webscale:postgres").Run()
if err != nil {
log.Fatalf("%+v", err)
}
ip, err := DockerIP("wtdwtf-nodebb-postgres")
if err != nil {
log.Fatalf("%+v", err)
}
db, err = sql.Open("postgres", "user=postgres sslmode=disable host="+ip.String())
if err != nil {
log.Fatalf("%+v", err)
}
defer db.Close()
for {
err = db.Ping()
if err == nil {
break
}
log.Println("waiting for db:", err)
time.Sleep(time.Second)
}
cs, err = sql.Open("mssql", "server=192.168.1.100; user id=NodeBB; password=wtdwtf; database=TheDailyWtf")
if err != nil {
log.Fatalf("%+v", err)
}
defer cs.Close()
if err = cs.Ping(); err != nil {
log.Fatalf("%+v", err)
}
ip, err = DockerIP("wtdwtf-postgres")
if err != nil {
log.Fatalf("%+v", err)
}
dc, err = sql.Open("postgres", "user=postgres dbname=discourse sslmode=disable host="+ip.String())
if err != nil {
log.Fatalf("%+v", err)
}
defer dc.Close()
if err = dc.Ping(); err != nil {
log.Fatalf("%+v", err)
}
ip, err = DockerIP("wtdwtf-mongo")
if err != nil {
log.Fatalf("%+v", err)
}
mongo, err := mgo.Dial(ip.String())
if err != nil {
log.Fatalf("%+v", err)
}
defer mongo.Close()
mongo.SetSyncTimeout(0)
mongo.SetSocketTimeout(0)
mongo.SetCursorTimeout(0)
objects = mongo.DB("0").C("objects")
if err := InitDB(); err != nil {
log.Fatalf("%+v", err)
}
if err := ForSortedSet("users:joindate", HandleUser); err != nil {
log.Fatalf("%+v", err)
}
if err := ForSortedSet("users:banned:expire", UpdateBanExpirations); err != nil {
log.Fatalf("%+v", err)
}
if err := ForSortedSet("groups:createtime", HandleGroup); err != nil {
log.Fatalf("%+v", err)
}
if err := ForSortedSet("categories:cid", HandleCategory); err != nil {
log.Fatalf("%+v", err)
}
if err := ForSortedSet("topics:tid", HandleTopic); err != nil {
log.Fatalf("%+v", err)
}
if err := HandlePosts(); err != nil {
log.Fatalf("%+v", err)
}
if err := HandleImportedUsers(); err != nil {
log.Fatalf("%+v", err)
}
if err := HandleImportedTopics(); err != nil {
log.Fatalf("%+v", err)
}
if err := HandleImportedPosts(); err != nil {
log.Fatalf("%+v", err)
}
if err := FinishDB(); err != nil {
log.Fatalf("%+v", err)
}
if err := FindOrphanedCommunityServerPostAuthors(); err != nil {
log.Fatalf("%+v", err)
}
if err := FindIncorrectCommunityServerUserNames(); err != nil {
log.Fatalf("%+v", err)
}
if err := FindUnusedUserAccounts(); err != nil {
log.Fatalf("%+v", err)
}
}
func DockerIP(containerName string) (net.IP, error) {
b, err := exec.Command("docker", "inspect", "-f", "{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}", containerName).Output()
if ee, ok := err.(*exec.ExitError); ok {
return nil, errors.Wrapf(err, "cannot get IP of container %q: stderr: %s", containerName, ee.Stderr)
}
s := strings.TrimSpace(string(b))
ip := net.ParseIP(s)
if ip == nil {
return nil, errors.Errorf("cannot get IP of container %q: invalid IP: %q", containerName, s)
}
return ip, nil
}
var insertUser, insertUserIP, insertUserIgnoredCategory, insertUserIgnoredTopic, insertUserFollowedTopic, insertUserFavouritePost, insertUserFollowedUser, insertUserUpvotedPost, insertUserDownvotedPost, updateUserBanExpiration, insertGroup, insertUserGroup, updateUserGroupOwner, insertCategory, insertTopic, insertTopicTag, insertUserTopicBookmark, insertPost, insertImportedUserCommunityServer, insertImportedUserDiscourse, selectImportedUserDiscourse, insertImportedTopicCommunityServer, insertImportedTopicDiscourse, selectImportedTopicDiscourse, insertImportedPostCommunityServer, insertImportedPostDiscourse, selectImportedPostDiscourse *sql.Stmt
func InitDB() error {
// adapted from #webscale
_, err := db.Exec(`
create extension citext;
create extension pgcrypto;
-- http://stackoverflow.com/a/20102665/2664560
create operator class _citext_ops default
for type _citext using gin as
operator 1 &&(anyarray, anyarray),
operator 2 @>(anyarray, anyarray),
operator 3 <@(anyarray, anyarray),
operator 4 =(anyarray, anyarray),
function 1 citext_cmp(citext, citext),
function 2 ginarrayextract(anyarray, internal, internal),
function 3 ginqueryarrayextract(anyarray, internal, smallint, internal, internal, internal, internal),
function 4 ginarrayconsistent(internal, smallint, anyarray, integer, internal, internal, internal, internal),
storage citext;
create language plperlu;
create function slugify(text) returns text as $$
use Unicode::Normalize;
use utf8;
use feature 'unicode_strings';
$_ = NFKD($_[0]);
s/[\pZ\-]/ /g;
s/[^\pL\pN_ ]//g;
lc;
s/^[ ]+|[ ]+$//g;
s/ /-/g;
return $_;
$$ language plperlu immutable;
create function make_slug() returns trigger as $$
begin
new.slug = slugify(new.name);
return new;
end;
$$ language plpgsql;`)
if err != nil {
return errors.Wrap(err, "db make helpers")
}
_, err = db.Exec(`create table users (
id bigserial primary key,
name varchar(255) not null,
slug varchar(255) not null constraint users_slug_exists check (slug <> ''),
password varchar(60),
full_name text,
email citext,
email_confirmed boolean not null default false,
banned boolean not null default false,
ban_expiration timestamp with time zone,
join_date timestamp with time zone not null default now(),
last_online timestamp with time zone,
picture varchar(2048),
uploaded_picture varchar(2048),
cover_url varchar(2048),
cover_position double precision[2],
group_title text,
website varchar(2048),
location text,
signature text,
about_me text,
birthday date,
profile_views bigint not null default 0
);
create unique index users_slug on users (slug);
create index users_join_date on users (join_date);
create unique index users_email on users (email);
create trigger users_make_slug
before insert or update of name, slug on users
for each row
execute procedure make_slug();`)
if err != nil {
return errors.Wrap(err, "db create table users")
}
_, err = db.Exec(`create table groups (
name text not null primary key,
slug text not null constraint groups_slug_exists check (slug <> ''),
create_time timestamp with time zone not null default now(),
user_title text,
description text not null,
icon varchar(255),
label_color varchar(255),
cover_url varchar(2048),
cover_thumb_url varchar(2048),
cover_position double precision[2],
hidden boolean not null default false,
system boolean not null default false,
private boolean not null default true,
disable_join_requests boolean not null default false,
user_title_enabled boolean not null default false
);
create unique index groups_slug on groups (slug);
create index groups_create_time on groups (create_time);
create trigger groups_make_slug
before insert or update of name, slug on groups
for each row
execute procedure make_slug();`)
if err != nil {
return errors.Wrap(err, "db create table groups")
}
_, err = db.Exec(`create table user_groups (
user_id bigint not null references users (id) on update cascade on delete cascade,
group_name text not null references groups (name) on update cascade on delete cascade,
is_owner boolean not null default false,
join_date timestamp with time zone not null default now(),
primary key (user_id, group_name)
);`)
if err != nil {
return errors.Wrap(err, "db create table user_groups")
}
_, err = db.Exec(`create table user_ips (
user_id bigint not null references users (id) on update cascade on delete cascade,
ip inet not null,
last_seen timestamp with time zone not null default now(),
primary key (user_id, ip)
);`)
if err != nil {
return errors.Wrap(err, "db create table user_ips")
}
_, err = db.Exec(`create table user_watched_users (
user_id bigint not null references users (id) on update cascade on delete cascade,
target_user_id bigint not null,
watched bool,
last_changed timestamp with time zone not null default now(),
primary key (user_id, target_user_id)
);`)
if err != nil {
return errors.Wrap(err, "db create table user_watched_users")
}
_, err = db.Exec(`create table user_watched_categories (
user_id bigint not null references users (id) on update cascade on delete cascade,
category_id bigint not null,
watched bool,
last_changed timestamp with time zone not null default now(),
primary key (user_id, category_id)
);`)
if err != nil {
return errors.Wrap(err, "db create table user_watched_categories")
}
_, err = db.Exec(`create table user_watched_topics (
user_id bigint not null references users (id) on update cascade on delete cascade,
topic_id bigint not null,
watched bool,
last_changed timestamp with time zone not null default now(),
primary key (user_id, topic_id)
);`)
if err != nil {
return errors.Wrap(err, "db create table user_watched_topics")
}
_, err = db.Exec(`create table user_favorite_posts (
user_id bigint not null references users (id) on update cascade on delete cascade,
post_id bigint not null,
favorited_at timestamp with time zone not null default now(),
primary key (user_id, post_id)
);`)
if err != nil {
return errors.Wrap(err, "db create table user_favorite_posts")
}
_, err = db.Exec(`create table user_post_votes (
user_id bigint not null references users (id) on update cascade on delete cascade,
post_id bigint not null,
up bool,
last_changed timestamp with time zone not null default now(),
primary key (user_id, post_id)
);`)
if err != nil {
return errors.Wrap(err, "db create table user_post_votes")
}
_, err = db.Exec(`create table categories (
id bigserial primary key,
name text not null,
slug text not null constraint categories_slug_exists check (slug <> ''),
parent_category_id bigint,
position int,
description text,
link varchar(2048),
icon varchar(255),
fg_color varchar(255),
bg_color varchar(255),
recent_replies int,
disabled boolean not null default false
);
create unique index categories_slug on categories (slug);
create trigger categories_make_slug
before insert or update of name, slug on categories
for each row
execute procedure make_slug();`)
if err != nil {
return errors.Wrap(err, "db create table categories")
}
_, err = db.Exec(`create table topics (
id bigserial primary key,
name text not null,
slug text not null, -- intentionally non-required
category_id bigint not null references categories (id) on update cascade on delete restrict,
author_id bigint references users (id) on update cascade on delete set null,
main_post_id bigint,
created_at timestamp with time zone not null default now(),
view_count bigint not null default 0,
locked boolean not null default false,
deleted boolean not null default false,
pinned boolean not null default false
);
create index topics_slug on topics (slug); -- intentionally non-unique
create trigger topics_make_slug
before insert or update of name, slug on topics
for each row
execute procedure make_slug();`)
if err != nil {
return errors.Wrap(err, "db create table topics")
}
_, err = db.Exec(`create table topic_tags (
topic_id bigint not null references topics (id) on update cascade on delete cascade,
tag text not null,
primary key (topic_id, tag)
);`)
if err != nil {
return errors.Wrap(err, "db create table topic_tags")
}
_, err = db.Exec(`create table user_topic_bookmarks (
user_id bigint not null references users (id) on update cascade on delete cascade,
topic_id bigint not null references topics (id) on update cascade on delete cascade,
position bigint not null,
primary key (user_id, topic_id)
);`)
if err != nil {
return errors.Wrap(err, "db create table user_topic_bookmarks")
}
_, err = db.Exec(`create table posts (
id bigserial primary key,
topic_id bigint not null references topics (id) on update cascade on delete cascade,
author_id bigint references users (id) on update cascade on delete set null,
author_name varchar(255),
parent_post_id bigint,
content text not null,
created_at timestamp with time zone not null default now(),
ip inet[],
editor_id bigint references users (id) on update cascade on delete set null,
edited_at timestamp with time zone default null,
deleted boolean not null default false
);`)
if err != nil {
return errors.Wrap(err, "db create table posts")
}
_, err = db.Exec(`create table imported_users_communityserver (
user_id bigint not null primary key references users (id) on update cascade on delete cascade,
communityserver_user_id bigint not null
);`)
if err != nil {
return errors.Wrap(err, "db create table imported_users_communityserver")
}
_, err = db.Exec(`create table imported_users_discourse (
user_id bigint not null primary key references users (id) on update cascade on delete cascade,
discourse_user_id bigint not null
);`)
if err != nil {
return errors.Wrap(err, "db create table imported_users_discourse")
}
_, err = db.Exec(`create table imported_topics_communityserver (
topic_id bigint not null primary key references topics (id) on update cascade on delete cascade,
communityserver_topic_id bigint not null
);`)
if err != nil {
return errors.Wrap(err, "db create table imported_topics_communityserver")
}
_, err = db.Exec(`create table imported_topics_discourse (
topic_id bigint not null primary key references topics (id) on update cascade on delete cascade,
discourse_topic_id bigint not null
);`)
if err != nil {
return errors.Wrap(err, "db create table imported_topics_discourse")
}
_, err = db.Exec(`create table imported_posts_communityserver (
post_id bigint not null primary key references posts (id) on update cascade on delete cascade,
communityserver_post_id bigint not null
);`)
if err != nil {
return errors.Wrap(err, "db create table imported_posts_communityserver")
}
_, err = db.Exec(`create table imported_posts_discourse (
post_id bigint not null primary key references posts (id) on update cascade on delete cascade,
discourse_post_id bigint not null
);`)
if err != nil {
return errors.Wrap(err, "db create table imported_posts_discourse")
}
_, err = dc.Exec(`create index if not exists index_topic_custom_fields_for_wtdwtf_science on topic_custom_fields ((value::bigint)) where name = 'import_id';`)
if err != nil {
return errors.Wrap(err, "db index cs->discourse imported topics")
}
_, err = dc.Exec(`create index if not exists index_post_custom_fields_for_wtdwtf_science on post_custom_fields ((value::bigint)) where name = 'import_id';`)
if err != nil {
return errors.Wrap(err, "db index cs->discourse imported posts")
}
insertUser, err = db.Prepare(`insert into users (id, name, password, full_name, email, email_confirmed, banned, ban_expiration, join_date, last_online, picture, uploaded_picture, cover_url, cover_position, group_title, website, location, signature, about_me, birthday, profile_views) values($1, $2, $3, $4, $5, $6, $7, null, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20);`)
if err != nil {
return errors.Wrap(err, "db prepare insertUser")
}
insertUserIP, err = db.Prepare(`insert into user_ips (user_id, ip, last_seen) values ($1, $2, $3);`)
if err != nil {
return errors.Wrap(err, "db prepare insertUserIP")
}
insertUserIgnoredCategory, err = db.Prepare(`insert into user_watched_categories (user_id, category_id, watched, last_changed) values ($1, $2, false, $3);`)
if err != nil {
return errors.Wrap(err, "db prepare insertUserIgnoredCategory")
}
insertUserIgnoredTopic, err = db.Prepare(`insert into user_watched_topics (user_id, topic_id, watched, last_changed) values ($1, $2, false, $3);`)
if err != nil {
return errors.Wrap(err, "db prepare insertUserIgnoredTopic")
}
insertUserFollowedTopic, err = db.Prepare(`insert into user_watched_topics (user_id, topic_id, watched, last_changed) values ($1, $2, true, $3);`)
if err != nil {
return errors.Wrap(err, "db prepare insertUserFollowedTopic")
}
insertUserFavouritePost, err = db.Prepare(`insert into user_favorite_posts (user_id, post_id, favorited_at) values ($1, $2, $3);`)
if err != nil {
return errors.Wrap(err, "db prepare insertUserFavouritePost")
}
insertUserFollowedUser, err = db.Prepare(`insert into user_watched_users (user_id, target_user_id, watched, last_changed) values ($1, $2, true, $3);`)
if err != nil {
return errors.Wrap(err, "db prepare insertUserFollowedUser")
}
insertUserUpvotedPost, err = db.Prepare(`insert into user_post_votes (user_id, post_id, up, last_changed) values ($1, $2, true, $3);`)
if err != nil {
return errors.Wrap(err, "db prepare insertUserUpvotedPost")
}
insertUserDownvotedPost, err = db.Prepare(`insert into user_post_votes (user_id, post_id, up, last_changed) values ($1, $2, false, $3);`)
if err != nil {
return errors.Wrap(err, "db prepare insertUserDownvotedPost")
}
updateUserBanExpiration, err = db.Prepare(`update users set ban_expiration = $2 where id = $1 and banned;`)
if err != nil {
return errors.Wrap(err, "db prepare updateUserBanExpiration")
}
insertGroup, err = db.Prepare(`insert into groups (name, create_time, user_title, description, icon, label_color, cover_url, cover_thumb_url, cover_position, hidden, system, private, disable_join_requests, user_title_enabled) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14);`)
if err != nil {
return errors.Wrap(err, "db prepare insertGroup")
}
insertUserGroup, err = db.Prepare(`insert into user_groups (user_id, group_name, join_date) values ($2, $1, $3);`)
if err != nil {
return errors.Wrap(err, "db prepare insertUserGroup")
}
updateUserGroupOwner, err = db.Prepare(`update user_groups set is_owner = true where user_id = $2 and group_name = $1;`)
if err != nil {
return errors.Wrap(err, "db prepare updateUserGroupOwner")
}
insertCategory, err = db.Prepare(`insert into categories (id, name, parent_category_id, position, description, link, icon, fg_color, bg_color, recent_replies, disabled) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);`)
if err != nil {
return errors.Wrap(err, "db prepare insertCategory")
}
insertTopic, err = db.Prepare(`insert into topics (id, name, category_id, author_id, main_post_id, created_at, view_count, locked, deleted, pinned) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`)
if err != nil {
return errors.Wrap(err, "db prepare insertTopic")
}
insertTopicTag, err = db.Prepare(`insert into topic_tags (topic_id, tag) values ($1, $2) on conflict do nothing;`)
if err != nil {
return errors.Wrap(err, "db prepare insertTopicTag")
}
insertUserTopicBookmark, err = db.Prepare(`insert into user_topic_bookmarks select u.id user_id, $2::bigint topic_id, $3::bigint "position" from users u where u.id = $1;`)
if err != nil {
return errors.Wrap(err, "db prepare insertUserTopicBookmark")
}
insertPost, err = db.Prepare(`insert into posts (id, topic_id, author_id, author_name, parent_post_id, content, created_at, ip, editor_id, edited_at, deleted) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11);`)
if err != nil {
return errors.Wrap(err, "db prepare insertPost")
}
insertImportedUserCommunityServer, err = db.Prepare(`insert into imported_users_communityserver select id user_id, $2::bigint communityserver_user_id from users where id = $1::bigint and not exists (select 1 from imported_users_communityserver where user_id = $1::bigint and communityserver_user_id = $2::bigint);`)
if err != nil {
return errors.Wrap(err, "db prepare insertImportedUserCommunityServer")
}
insertImportedUserDiscourse, err = db.Prepare(`insert into imported_users_discourse select id user_id, $2::bigint discourse_user_id from users where id = $1::bigint and not exists (select 1 from imported_users_discourse where user_id = $1::bigint and discourse_user_id = $2::bigint);`)
if err != nil {
return errors.Wrap(err, "db prepare insertImportedUserDiscourse")
}
selectImportedUserDiscourse, err = dc.Prepare(`select value::bigint from user_custom_fields where user_id = $1::bigint and name = 'import_id';`)
if err != nil {
return errors.Wrap(err, "db prepare selectImportedUserDiscourse")
}
insertImportedTopicCommunityServer, err = db.Prepare(`insert into imported_topics_communityserver select id topic_id, $2::bigint communityserver_topic_id from topics where id = $1::bigint and not exists (select 1 from imported_topics_communityserver where topic_id = $1::bigint and communityserver_topic_id = $2::bigint);`)
if err != nil {
return errors.Wrap(err, "db prepare insertImportedTopicCommunityServer")
}
insertImportedTopicDiscourse, err = db.Prepare(`insert into imported_topics_discourse select id topic_id, $2::bigint discourse_topic_id from topics where id = $1::bigint and not exists (select 1 from imported_topics_discourse where topic_id = $1::bigint and discourse_topic_id = $2::bigint);`)
if err != nil {
return errors.Wrap(err, "db prepare insertImportedTopicDiscourse")
}
selectImportedTopicDiscourse, err = dc.Prepare(`select topic_id::bigint from topic_custom_fields where value::bigint = $1::bigint and name = 'import_id';`)
if err != nil {
return errors.Wrap(err, "db prepare selectImportedTopicDiscourse")
}
insertImportedPostCommunityServer, err = db.Prepare(`insert into imported_posts_communityserver select id post_id, $2::bigint communityserver_post_id from posts where id = $1::bigint and not exists (select 1 from imported_posts_communityserver where post_id = $1::bigint and communityserver_post_id = $2::bigint);`)
if err != nil {
return errors.Wrap(err, "db prepare insertImportedPostCommunityServer")
}
insertImportedPostDiscourse, err = db.Prepare(`insert into imported_posts_discourse select id post_id, $2::bigint discourse_post_id from posts where id = $1::bigint and not exists (select 1 from imported_posts_discourse where post_id = $1::bigint and discourse_post_id = $2::bigint);`)
if err != nil {
return errors.Wrap(err, "db prepare insertImportedPostDiscourse")
}
selectImportedPostDiscourse, err = dc.Prepare(`select post_id::bigint from post_custom_fields where value::bigint = $1::bigint and name = 'import_id';`)
if err != nil {
return errors.Wrap(err, "db prepare selectImportedPostDiscourse")
}
return nil
}
func FinishDB() error {
_, err := db.Exec(`update users set group_title = null where group_title not in (select name from groups);
alter table users add foreign key (group_title) references groups (name) on update cascade on delete set null;`)
if err != nil {
return errors.Wrap(err, "db add fk on users.group_title")
}
_, err = db.Exec(`delete from user_watched_users where target_user_id not in (select id from users);
alter table user_watched_users add foreign key (target_user_id) references users (id) on update cascade on delete cascade;`)
if err != nil {
return errors.Wrap(err, "db add fk on user_watched_users.target_user_id")
}
_, err = db.Exec(`delete from user_watched_categories where category_id not in (select id from categories);
alter table user_watched_categories add foreign key (category_id) references categories (id) on update cascade on delete cascade;`)
if err != nil {
return errors.Wrap(err, "db add fk on user_watched_categories.category_id")
}
_, err = db.Exec(`update categories set parent_category_id = null where parent_category_id not in (select id from categories);
alter table categories add foreign key (parent_category_id) references categories (id) on update cascade on delete set null;`)
if err != nil {
return errors.Wrap(err, "db add fk on categories.parent_category_id")
}
_, err = db.Exec(`delete from user_watched_topics where topic_id not in (select id from topics);
alter table user_watched_topics add foreign key (topic_id) references topics (id) on update cascade on delete cascade;`)
if err != nil {
return errors.Wrap(err, "db add fk on user_watched_topics.topic_id")
}
_, err = db.Exec(`update topics set main_post_id = null where main_post_id not in (select id from posts);
alter table topics add foreign key (main_post_id) references posts (id) on update cascade on delete set null;`)
if err != nil {
return errors.Wrap(err, "db add fk on topics.main_post_id")
}
_, err = db.Exec(`delete from user_post_votes where post_id in (select post_id from user_post_votes left join posts on id = post_id where id is null);
alter table user_post_votes add foreign key (post_id) references posts (id) on update cascade on delete cascade;`)
if err != nil {
return errors.Wrap(err, "db add fk on user_post_votes.post_id")
}
_, err = db.Exec(`delete from user_favorite_posts where post_id in (select post_id from user_favorite_posts left join posts on id = post_id where id is null);
alter table user_favorite_posts add foreign key (post_id) references posts (id) on update cascade on delete cascade;`)
if err != nil {
return errors.Wrap(err, "db add fk on user_favorite_posts.post_id")
}
_, err = db.Exec(`update posts set parent_post_id = null where id in (select a.id from posts a left join posts b on a.parent_post_id = b.id where a.parent_post_id is not null and b.id is null);
alter table posts add foreign key (parent_post_id) references posts (id) on update cascade on delete set null;`)
if err != nil {
return errors.Wrap(err, "db add fk on posts.parent_post_id")
}
return nil
}
// IPs provides support for the PostgreSQL type inet[].
type IPs []net.IP
// Scan implements sql.Scanner.
func (ips *IPs) Scan(value interface{}) error {
var src string
if b, ok := value.([]byte); ok {
src = string(b)
} else {
return errors.Errorf("unexpected IPs type %T", value)
}
if len(src) < 2 || src[0] != '{' || src[len(src)-1] != '}' {
return errors.Errorf("invalid array: %q", src)
}
s := strings.Split(src[1:len(src)-1], ",")
addrs := make(IPs, len(s))
for i, ip := range s {
addr := net.ParseIP(ip)
if addr == nil {
return errors.Errorf("invalid IP: %q", ip)
}
addrs[i] = addr
}
*ips = addrs
return nil
}
// Value implements driver.Valuer.
func (ips IPs) Value() (driver.Value, error) {
s := make([]string, len(ips))
for i, ip := range ips {
s[i] = ip.String()
}
return "{" + strings.Join(s, ",") + "}", nil
}
func HandleUser(uidString string, joinTS float64) error {
tx, err := db.Begin()
if err != nil {
return errors.Wrap(err, "begin transaction")
}
defer tx.Rollback()
var user struct {
ID int64
Name sql.NullString
FullName sql.NullString
Password []byte
Email sql.NullString
EmailConfirmed bool
Banned bool
LastOnline pq.NullTime
JoinDate pq.NullTime
Picture sql.NullString
UploadedPicture sql.NullString
CoverURL sql.NullString
CoverPosition NullPosition
GroupTitle sql.NullString
Website sql.NullString
Location sql.NullString
Signature sql.NullString
AboutMe sql.NullString
Birthday pq.NullTime
ProfileViews sql.NullInt64
Settings struct {
TopicsPerPage sql.NullInt64
PostsPerPage sql.NullInt64
UpvoteNotificationLevel sql.NullString
UserLang sql.NullString
HomePageRoute sql.NullString
DailyDigestFreq sql.NullString
ShowEmail bool
ShowFullName bool
ScrollToMyPost bool
UsePagination bool
NotificationSounds bool
SendChatNotifications bool
SendPostNotifications bool
FollowTopicsOnCreate bool
FollowTopicsOnReply bool
OpenOutgoingLinksInNewTab bool
RestrictChat bool
}
}
user.ID, err = strconv.ParseInt(uidString, 10, 64)
if err != nil {
return errors.Wrap(err, "parsing user ID")
}
var data bson.M
err = ByKey("user:"+uidString, &data)
if err != nil {
return err
}
for k, v := range data {
switch k {
case "_id", "_key", "uid", "status", "passwordExpiry", "showemail":
// ignore
case "followerCount", "followingCount", "postcount", "topiccount", "userslug", "lastposttime", "reputation":
// ignore, derived field
case "username":
user.Name, err = String(v)
case "fullname":
user.FullName, err = String(v)
case "password":
user.Password, err = Bytes(v)
case "email":
user.Email, err = String(v)
case "email:confirmed":
user.EmailConfirmed, err = Bool(v)
case "banned":
user.Banned, err = Bool(v)
case "gplusid":
// TODO
case "githubid":
// TODO
case "fbid":
// TODO
case "fbaccesstoken":
// TODO
case "fbrefreshtoken":
// TODO
case "twid":
// TODO
case "lastonline":
user.LastOnline, err = Time(v)
case "joindate":
user.JoinDate, err = Time(v)
case "picture":
user.Picture, err = String(v)
case "uploadedpicture":
user.UploadedPicture, err = String(v)
case "cover:url":
user.CoverURL, err = String(v)
case "cover:position":
user.CoverPosition, err = Position(v)
case "groupTitle":
user.GroupTitle, err = String(v)
case "website":
user.Website, err = String(v)
case "location":
user.Location, err = String(v)
case "signature":
user.Signature, err = String(v)
case "aboutme":
user.AboutMe, err = String(v)
case "birthday":
user.Birthday, err = Date(v)
case "profileviews":
user.ProfileViews, err = Int64(v)
default:
if !strings.HasPrefix(k, "_imported_") {
err = errors.Errorf("unknown user field: %q %T %#v", k, v, v)
}
}
if err != nil {
return errors.Wrapf(err, "for field %q", k)
}
}
_, err = tx.Stmt(insertUser).Exec(&user.ID, &user.Name, &user.Password, &user.FullName, &user.Email, &user.EmailConfirmed, &user.Banned, &user.JoinDate, &user.LastOnline, &user.Picture, &user.UploadedPicture, &user.CoverURL, &user.CoverPosition, &user.GroupTitle, &user.Website, &user.Location, &user.Signature, &user.AboutMe, &user.Birthday, &user.ProfileViews)
if err != nil {
return errors.Wrap(err, "insert user into database")
}
data = nil
err = ByKey("user:"+uidString+":settings", &data)
if errors.Cause(err) != mgo.ErrNotFound {
if err != nil {
return err
}
for k, v := range data {
switch k {
case "_id", "_key", "topicSearchEnabled", "delayImageLoading", "groupTitle", "topicPostSort", "bootswatchSkin", "categoryTopicSort":
// ignore
case "topicsPerPage":
user.Settings.TopicsPerPage, err = Int64(v)
case "postsPerPage":
user.Settings.PostsPerPage, err = Int64(v)
case "upvoteNotificationLevel":
user.Settings.UpvoteNotificationLevel, err = String(v)
case "userLang":
user.Settings.UserLang, err = String(v)
case "homePageRoute":
user.Settings.HomePageRoute, err = String(v)
case "dailyDigestFreq":
user.Settings.DailyDigestFreq, err = String(v)
case "showemail":
user.Settings.ShowEmail, err = Bool(v)
case "showfullname":
user.Settings.ShowFullName, err = Bool(v)
case "scrollToMyPost":
user.Settings.ScrollToMyPost, err = Bool(v)
case "usePagination":
user.Settings.UsePagination, err = Bool(v)
case "notificationSounds":
user.Settings.NotificationSounds, err = Bool(v)
case "sendChatNotifications":
user.Settings.SendChatNotifications, err = Bool(v)
case "sendPostNotifications":
user.Settings.SendPostNotifications, err = Bool(v)
case "followTopicsOnCreate":
user.Settings.FollowTopicsOnCreate, err = Bool(v)
case "followTopicsOnReply":
user.Settings.FollowTopicsOnReply, err = Bool(v)
case "openOutgoingLinksInNewTab":
user.Settings.OpenOutgoingLinksInNewTab, err = Bool(v)
case "restrictChat":
user.Settings.RestrictChat, err = Bool(v)
default:
if !strings.HasPrefix(k, "_imported_") {
err = errors.Errorf("unknown user settings field: %q %T %#v", k, v, v)
}
}
if err != nil {
return errors.Wrapf(err, "for settings field %q", k)
}
}
}
if err := ForSortedSet("uid:"+uidString+":ip", func(ipString string, lastSeenTS float64) error {
if ipString == "Unknown" {
return nil
}
ip := net.ParseIP(ipString)
if ip == nil {
return errors.New("invalid IP format")
}
lastSeen, err := Time(lastSeenTS)
if err == nil && !lastSeen.Valid {
err = errors.New("invalid timestamp")
}
if err != nil {
return errors.Wrap(err, "cannot parse last seen time")
}
_, err = tx.Stmt(insertUserIP).Exec(&user.ID, ip.String(), &lastSeen)
if err != nil {
return errors.Wrap(err, "insert user IP into database")
}
return nil
}); err != nil {
return errors.Wrap(err, "getting IPs")
}
if err := SortedSetIDs("uid:"+uidString+":ignored:cids", tx.Stmt(insertUserIgnoredCategory), &user.ID); err != nil {
return errors.Wrap(err, "getting ignored categories")
}
if err := SortedSetIDs("uid:"+uidString+":ignored_tids", tx.Stmt(insertUserIgnoredTopic), &user.ID); err != nil {
return errors.Wrap(err, "getting ignored topics")
}
if err := SortedSetIDs("uid:"+uidString+":followed_tids", tx.Stmt(insertUserFollowedTopic), &user.ID); err != nil {
return errors.Wrap(err, "getting followed topics")
}
if err := SortedSetIDs("uid:"+uidString+":favourites", tx.Stmt(insertUserFavouritePost), &user.ID); err != nil {
return errors.Wrap(err, "getting favourited posts")
}
if err := SortedSetIDs("following:"+uidString, tx.Stmt(insertUserFollowedUser), &user.ID); err != nil {
return errors.Wrap(err, "getting followed users")
}
if err := SortedSetIDs("uid:"+uidString+":upvote", tx.Stmt(insertUserUpvotedPost), &user.ID); err != nil {
return errors.Wrap(err, "getting upvoted posts")
}
if err := SortedSetIDs("uid:"+uidString+":downvote", tx.Stmt(insertUserDownvotedPost), &user.ID); err != nil {
return errors.Wrap(err, "getting downvoted posts")
}
log.Printf("user:%d:%q", user.ID, user.Name.String)
return errors.Wrap(tx.Commit(), "commit transaction")
}
func UpdateBanExpirations(uidString string, expirationTS float64) error {
uid, err := strconv.ParseInt(uidString, 10, 64)
if err != nil {
return errors.Wrap(err, "invalid user ID")
}
expiration, err := Time(expirationTS)
if err == nil && !expiration.Valid {
err = errors.New("invalid expiration timestamp")
}
if err != nil {
return errors.Wrap(err, "invalid expiration")
}
_, err = updateUserBanExpiration.Exec(&uid, &expiration)
return errors.Wrap(err, "update users.ban_expiration in database")
}
var privilegeGroup = regexp.MustCompile(`\Acid:([0-9]+):privileges(:groups)?:(.*)\z`)
func HandleGroup(name string, createdTS float64) error {
if priv := privilegeGroup.FindStringSubmatch(name); priv != nil {
return nil
}
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
var group struct {
CreateTime pq.NullTime
UserTitle sql.NullString
Description sql.NullString
Icon sql.NullString
LabelColor sql.NullString
CoverURL sql.NullString
CoverThumbURL sql.NullString
CoverPosition NullPosition
Hidden bool
System bool
Private bool
DisableJoinRequests bool
UserTitleEnabled bool
}
var data bson.M
if err := ByKey("group:"+name, &data); err != nil {
return err
}
for k, v := range data {
switch k {
case "_id", "_key", "name", "slug", "memberCount", "ownerUid":
// ignore
case "deleted":
var deleted bool
if deleted, err = Bool(v); err == nil && deleted {
return nil
}
case "createtime":
group.CreateTime, err = Time(v)
case "userTitle":
group.UserTitle, err = String(v)
case "description":
group.Description, err = String(v)
case "icon":
group.Icon, err = String(v)
case "labelColor":
group.LabelColor, err = String(v)
case "cover:url":
group.CoverURL, err = String(v)
case "cover:thumb:url":
group.CoverThumbURL, err = String(v)
case "cover:position":
group.CoverPosition, err = Position(v)
case "hidden":
group.Hidden, err = Bool(v)
case "system":
group.System, err = Bool(v)
case "private":
group.Private, err = Bool(v)
case "disableJoinRequests":
group.DisableJoinRequests, err = Bool(v)
case "userTitleEnabled":
group.UserTitleEnabled, err = Bool(v)
default:
if !strings.HasPrefix(k, "_imported_") {
err = errors.Errorf("unknown group field: %q %T %#v", k, v, v)
}
}
if err != nil {
return errors.Wrapf(err, "for field %q", k)
}
}
if _, err := tx.Stmt(insertGroup).Exec(&name, &group.CreateTime, &group.UserTitle, &group.Description.String, &group.Icon, &group.LabelColor, &group.CoverURL, &group.CoverThumbURL, &group.CoverPosition, &group.Hidden, &group.System, &group.Private, &group.DisableJoinRequests, &group.UserTitleEnabled); err != nil {
return errors.Wrap(err, "inserting group into database")
}
if err := SortedSetIDs("group:"+name+":members", tx.Stmt(insertUserGroup), &name); err != nil {
return errors.Wrap(err, "getting group members")
}
if err := UnsortedSetIDs("group:"+name+":owners", tx.Stmt(updateUserGroupOwner), &name); err != nil {
return errors.Wrap(err, "getting group owners")
}
log.Printf("group:%q", name)
return errors.Wrap(tx.Commit(), "commit transaction")
}
func HandleCategory(cidString string, position float64) error {
var category struct {
ID int64
Name sql.NullString
ParentCategoryID sql.NullInt64
Position sql.NullInt64
Description sql.NullString
Link sql.NullString
Icon sql.NullString
FgColor sql.NullString
BgColor sql.NullString
RecentReplies sql.NullInt64
Disabled bool
}
var err error
category.ID, err = strconv.ParseInt(cidString, 10, 64)
if err != nil {
return errors.Wrap(err, "parsing category ID")
}
var data bson.M
if err := ByKey("category:"+cidString, &data); err != nil {
return err
}
for k, v := range data {
switch k {
case "_id", "_key", "cid", "slug", "topic_count", "post_count", "undefined", "imageClass", "class", "descriptionParsed", "timesClicked":
// ignore
case "name":
category.Name, err = String(v)
case "parentCid":
category.ParentCategoryID, err = ID(v)
case "order":
category.Position, err = Int64(v)
case "description":
category.Description, err = String(v)
case "link":
category.Link, err = String(v)
case "icon":
category.Icon, err = String(v)
case "color":
category.FgColor, err = String(v)
case "bgColor":
category.BgColor, err = String(v)
case "disabled":
category.Disabled, err = Bool(v)
case "numRecentReplies":
category.RecentReplies, err = Int64(v)
default:
if !strings.HasPrefix(k, "_imported_") {
err = errors.Errorf("unknown category field: %q %T %#v", k, v, v)
}
}
if err != nil {
return errors.Wrapf(err, "for field %q", k)
}
}
if _, err := insertCategory.Exec(&category.ID, &category.Name, &category.ParentCategoryID, &category.Position, &category.Description, &category.Link, &category.Icon, &category.FgColor, &category.BgColor, &category.RecentReplies, &category.Disabled); err != nil {
return errors.Wrap(err, "insert category into database")
}
log.Printf("category:%d:%q", category.ID, category.Name.String)
return nil
}
func HandleTopic(tidString string, createdTS float64) error {
var topic struct {
ID int64
Name sql.NullString
CategoryID sql.NullInt64
AuthorID sql.NullInt64
MainPostID sql.NullInt64
CreatedAt pq.NullTime
ViewCount sql.NullInt64
Locked bool
Deleted bool
Pinned bool
}
var err error
topic.ID, err = strconv.ParseInt(tidString, 10, 64)
if err != nil {
return errors.Wrap(err, "parsing topic ID")
}
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
var data bson.M
if err := ByKey("topic:"+tidString, &data); err != nil {
return err
}
for k, v := range data {
switch k {
case "_id", "_key", "tid", "slug", "postcount", "teaserPid", "lastposttime", "thumb", "isQuestion", "isSolved", "solvedPid":
// ignore
case "title":
topic.Name, err = String(v)
case "cid":
topic.CategoryID, err = ID(v)
case "uid":
topic.AuthorID, err = ID(v)
case "mainPid":
topic.MainPostID, err = ID(v)
case "timestamp":
topic.CreatedAt, err = Time(v)
case "viewcount":
topic.ViewCount, err = Int64(v)
case "locked":
topic.Locked, err = Bool(v)
case "deleted":
topic.Deleted, err = Bool(v)
case "pinned":
topic.Pinned, err = Bool(v)
default:
if !strings.HasPrefix(k, "_imported_") {
err = errors.Errorf("unknown topic field: %q %T %#v", k, v, v)
}
}
if err != nil {
return errors.Wrapf(err, "for field %q", k)
}
}
if _, err := tx.Stmt(insertTopic).Exec(&topic.ID, &topic.Name, &topic.CategoryID, &topic.AuthorID, &topic.MainPostID, &topic.CreatedAt, &topic.ViewCount, &topic.Locked, &topic.Deleted, &topic.Pinned); err != nil {
return errors.Wrap(err, "insert topic into database")
}
var tags struct {
Members []string `bson:"members"`
}
if err := ByKey("topic:"+tidString+":tags", &tags); err == nil {
tag := tx.Stmt(insertTopicTag)
for _, t := range tags.Members {
s, err := String(t)
if err != nil || !s.Valid {
continue
}
if _, err := tag.Exec(&topic.ID, &s.String); err != nil {
return errors.Wrapf(err, "inserting tag %q", s.String)
}
}
} else if errors.Cause(err) == mgo.ErrNotFound {
// no tags
} else {
return errors.Wrap(err, "find topic tags")
}
bookmark := tx.Stmt(insertUserTopicBookmark)
if err := ForSortedSet("tid:"+tidString+":bookmarks", func(uidString string, positionFloat float64) error {
uid, err := ID(uidString)
if err != nil {
return errors.Wrap(err, "user ID")
}
position, err := Int64(positionFloat)
if err != nil {
return errors.Wrap(err, "position")
}
_, err = bookmark.Exec(&uid, &topic.ID, &position)
return errors.Wrap(err, "inserting bookmark")
}); err != nil {
return errors.Wrap(err, "bookmarks")
}
log.Printf("topic:%d:%q", topic.ID, topic.Name.String)
return errors.Wrap(tx.Commit(), "commit transaction")
}
var insertPostTx *sql.Stmt
func HandlePosts() error {
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
insertPostTx = tx.Stmt(insertPost)
if err := ForSortedSet("posts:pid", HandlePost); err != nil {
return errors.Wrap(err, "handle posts")
}
return errors.Wrap(tx.Commit(), "commit transaction")
}
func HandlePost(pidString string, createdTS float64) error {
var post struct {
ID int64
TopicID sql.NullInt64
AuthorID sql.NullInt64
AuthorName sql.NullString
ParentPostID sql.NullInt64
Content sql.NullString
CreatedAt pq.NullTime
IP IPs
EditorID sql.NullInt64
EditedAt pq.NullTime
Deleted bool
}
var err error
post.ID, err = strconv.ParseInt(pidString, 10, 64)
if err != nil {
return errors.Wrap(err, "parsing post ID")
}
var data bson.M
if err := ByKey("post:"+pidString, &data); err != nil {
return err
}
for k, v := range data {
switch k {
case "_id", "_key", "pid", "reputation", "votes", "upvotes", "downvotes", "flags":
// ignore
case "tid":
post.TopicID, err = ID(v)
case "uid":
post.AuthorID, err = ID(v)
case "handle":
post.AuthorName, err = String(v)
case "toPid":
post.ParentPostID, err = ID(v)
case "content":
post.Content, err = String(v)
case "timestamp":
post.CreatedAt, err = Time(v)
case "ip":
post.IP, err = ParseIPs(v)
case "editor":
post.EditorID, err = ID(v)
case "edited":
post.EditedAt, err = Time(v)
case "deleted":
post.Deleted, err = Bool(v)
default:
if !strings.HasPrefix(k, "_imported_") {
err = errors.Errorf("unknown post field: %q %T %#v", k, v, v)
}
}
if err != nil {
return errors.Wrapf(err, "for field %q", k)
}
}
if _, err := insertPostTx.Exec(&post.ID, &post.TopicID, &post.AuthorID, &post.AuthorName, &post.ParentPostID, &post.Content.String, &post.CreatedAt, &post.IP, &post.EditorID, &post.EditedAt, &post.Deleted); err != nil {
return errors.Wrap(err, "insert post into database")
}
log.Printf("post:%d", post.ID)
return nil
}
var insertImportedUserDiscourseTx *sql.Stmt
var insertImportedUserCommunityServerTx *sql.Stmt
func HandleImportedUsers() error {
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
insertImportedUserDiscourseTx = tx.Stmt(insertImportedUserDiscourse)
insertImportedUserCommunityServerTx = tx.Stmt(insertImportedUserCommunityServer)
if err := ForSortedSet("_imported:_users", HandleImportedUser); err != nil {
return errors.Wrap(err, "handle imported users")
}
return errors.Wrap(tx.Commit(), "commit transaction")
}
func HandleImportedUser(discourseIDString string, nodeBBIDFloat float64) error {
discourseID, err := ID(discourseIDString)
if err != nil {
return errors.Wrap(err, "Discourse ID")
}
nodeBBID, err := ID(nodeBBIDFloat)
if err != nil {
return errors.Wrap(err, "NodeBB ID")
}
_, err = insertImportedUserDiscourseTx.Exec(&nodeBBID, &discourseID)
if err != nil {
return errors.Wrap(err, "insert imported Discourse user ID mapping")
}
log.Printf("imported_user:%d:Discourse:%d", nodeBBID.Int64, discourseID.Int64)
var communityServerID int64
err = selectImportedUserDiscourse.QueryRow(&discourseID).Scan(&communityServerID)
if err == sql.ErrNoRows {
return nil
}
if err != nil {
return errors.Wrap(err, "get imported Community Server user ID from Discourse")
}
_, err = insertImportedUserCommunityServerTx.Exec(&nodeBBID, &communityServerID)
log.Printf("imported_user:%d:Community Server:%d", nodeBBID.Int64, communityServerID)
return errors.Wrap(err, "insert imported Community Server user ID mapping")
}
var insertImportedTopicDiscourseTx *sql.Stmt
var insertImportedTopicCommunityServerTx *sql.Stmt
func HandleImportedTopics() error {
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
insertImportedTopicDiscourseTx = tx.Stmt(insertImportedTopicDiscourse)
insertImportedTopicCommunityServerTx = tx.Stmt(insertImportedTopicCommunityServer)
if err := ForSortedSet("_imported:_topics", HandleImportedTopic); err != nil {
return errors.Wrap(err, "handle imported topics")
}
return errors.Wrap(tx.Commit(), "commit transaction")
}
func HandleImportedTopic(importedIDString string, nodeBBIDFloat float64) error {
importedID, err := ID(importedIDString)
if err != nil {
return errors.Wrap(err, "imported ID")
}
nodeBBID, err := ID(nodeBBIDFloat)
if err != nil {
return errors.Wrap(err, "NodeBB ID")
}
if !importedID.Valid {
return errors.New("missing imported ID")
}
kind := importedID.Int64 % 2
importedID.Int64 /= 2
switch kind {
case 0:
_, err = insertImportedTopicCommunityServerTx.Exec(&nodeBBID, &importedID)
log.Printf("imported_topic:%d:Community Server:%d", nodeBBID.Int64, importedID.Int64)
if err == nil {
var discourseID int64
err = selectImportedTopicDiscourse.QueryRow(&importedID).Scan(&discourseID)
if err == sql.ErrNoRows {
return nil
}
if err != nil {
return errors.Wrap(err, "get imported Discourse topic ID")
}
_, err = insertImportedTopicDiscourseTx.Exec(&nodeBBID, &discourseID)
log.Printf("imported_topic:%d:Discourse:%d", nodeBBID.Int64, discourseID)
}
case 1:
_, err = insertImportedTopicDiscourseTx.Exec(&nodeBBID, &importedID)
log.Printf("imported_topic:%d:Discourse:%d", nodeBBID.Int64, importedID.Int64)
default:
panic("impossible")
}
return errors.Wrap(err, "insert imported topic ID mapping")
}
var insertImportedPostDiscourseTx *sql.Stmt
var insertImportedPostCommunityServerTx *sql.Stmt
func HandleImportedPosts() error {
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
insertImportedPostDiscourseTx = tx.Stmt(insertImportedPostDiscourse)
insertImportedPostCommunityServerTx = tx.Stmt(insertImportedPostCommunityServer)
if err := ForSortedSet("_imported:_posts", HandleImportedPost); err != nil {
return errors.Wrap(err, "handle imported posts")
}
return errors.Wrap(tx.Commit(), "commit transaction")
}
func HandleImportedPost(importedIDString string, nodeBBIDFloat float64) error {
importedID, err := ID(importedIDString)
if err != nil {
return errors.Wrap(err, "imported ID")
}
nodeBBID, err := ID(nodeBBIDFloat)
if err != nil {
return errors.Wrap(err, "NodeBB ID")
}
if !importedID.Valid {
return errors.New("missing imported ID")
}
kind := importedID.Int64 % 2
importedID.Int64 /= 2
switch kind {
case 0:
_, err = insertImportedPostCommunityServerTx.Exec(&nodeBBID, &importedID)
log.Printf("imported_post:%d:Community Server:%d", nodeBBID.Int64, importedID.Int64)
if err == nil {
var discourseID int64
err = selectImportedPostDiscourse.QueryRow(&importedID).Scan(&discourseID)
if err == sql.ErrNoRows {
return nil
}
if err != nil {
return errors.Wrap(err, "get imported Discourse post ID")
}
_, err = insertImportedPostDiscourseTx.Exec(&nodeBBID, &discourseID)
log.Printf("imported_post:%d:Discourse:%d", nodeBBID.Int64, discourseID)
}
case 1:
_, err = insertImportedPostDiscourseTx.Exec(&nodeBBID, &importedID)
log.Printf("imported_post:%d:Discourse:%d", nodeBBID.Int64, importedID.Int64)
default:
panic("impossible")
}
return errors.Wrap(err, "insert imported post ID mapping")
}
var deleteNulls = strings.NewReplacer("\x00", "")
func String(v interface{}) (sql.NullString, error) {
if v == nil {
return sql.NullString{}, nil
}
if s, ok := v.(string); ok {
s = deleteNulls.Replace(s)
return sql.NullString{String: s, Valid: s != ""}, nil
}
return sql.NullString{}, errors.Errorf("expected string, but got %T: %#v", v, v)
}
func ParseIPs(v interface{}) (IPs, error) {
s, err := String(v)
if err != nil || !s.Valid {
return nil, errors.Wrap(err, "cannot convert to IPs")
}
parts := strings.Split(s.String, ", ")
ips := make(IPs, len(parts))
for i, p := range parts {
ips[i] = net.ParseIP(p)
if err == nil && ips[i] == nil {
err = errors.Errorf("invalid IP address %q", p)
}
}
return ips, errors.Wrapf(err, "for %q", s.String)
}
func Bytes(v interface{}) ([]byte, error) {
s, err := String(v)
if err != nil {
return nil, errors.Wrap(err, "cannot convert to []byte")
}
if s.Valid {
return []byte(s.String), nil
}
return nil, nil
}
type NullPosition struct {
Valid bool
X, Y float64
}
func (pos *NullPosition) Scan(v interface{}) error {
if v == nil {
return nil
}
b, ok := v.([]byte)
if !ok {
return errors.Errorf("unexpected SQL type for NullPosition: %T", v)
}
_, err := fmt.Sscanf(string(b), "{%f,%f}", &pos.X, &pos.Y)
if err != nil {
pos.Valid = true
}
return errors.Wrapf(err, "for NullPosition %q", b)
}
func (pos NullPosition) Value() (driver.Value, error) {
if !pos.Valid {
return nil, nil
}
return []byte(fmt.Sprintf("{%g,%g}", pos.X, pos.Y)), nil
}
func Position(v interface{}) (NullPosition, error) {
var f NullPosition
s, err := String(v)
if err != nil || !s.Valid {
return f, errors.Wrap(err, "cannot convert to position")
}
_, err = fmt.Sscanf(s.String, "%f%% %f%%", &f.X, &f.Y)
if err == nil {
f.Valid = true
}
return f, errors.Wrapf(err, "invalid format for position: %q", s)
}
func Int64(v interface{}) (sql.NullInt64, error) {
if v == nil {
return sql.NullInt64{}, nil
}
if s, ok := v.(string); ok {
i, err := strconv.ParseInt(s, 10, 64)
return sql.NullInt64{Int64: i, Valid: true}, errors.Wrapf(err, "invalid format for int64: %q", s)
}
if i, ok := v.(int64); ok {
return sql.NullInt64{Int64: i, Valid: true}, nil
}
if i, ok := v.(int); ok {
return sql.NullInt64{Int64: int64(i), Valid: true}, nil
}
if f, ok := v.(float64); ok {
if i, frac := math.Modf(f); frac != 0 || i > 2<<53 || i < -2<<53 {
return sql.NullInt64{}, errors.Errorf("float64 cannot be converted to int64 without losing precision: %v", f)
}
return sql.NullInt64{Int64: int64(f), Valid: true}, nil
}
return sql.NullInt64{}, errors.Errorf("expected int64, but got %T: %#v", v, v)
}
func ID(v interface{}) (sql.NullInt64, error) {
if s, ok := v.(string); ok && s == "" {
return sql.NullInt64{}, nil
}
i, err := Int64(v)
i.Valid = i.Valid && i.Int64 != 0
return i, err
}
func Bool(v interface{}) (bool, error) {
i, err := Int64(v)
if err != nil || !i.Valid {
return false, errors.Wrap(err, "cannot convert to bool")
}
switch i.Int64 {
case 0:
return false, nil
case 1:
return true, nil
default:
return false, errors.Errorf("unexpected value for bool: %d", i.Int64)
}
}
func Time(v interface{}) (pq.NullTime, error) {
i, err := Int64(v)
if err != nil || !i.Valid || i.Int64 == 0 {
return pq.NullTime{}, errors.Wrap(err, "cannot convert to time.Time")
}
if i.Int64 < 0 {
return pq.NullTime{}, errors.Errorf("unsupported negative timestamp %d", i.Int64)
}
return pq.NullTime{Time: time.Unix(i.Int64/1000, (i.Int64%1000)*int64(time.Millisecond)), Valid: true}, nil
}
func Date(v interface{}) (pq.NullTime, error) {
s, err := String(v)
if err != nil || !s.Valid {
return pq.NullTime{}, errors.Wrap(err, "cannot parse date")
}
t, err := time.Parse("1/2/2006", s.String)
return pq.NullTime{Time: t, Valid: true}, errors.Wrapf(err, "for date %q", s.String)
}
func ByKey(key string, data interface{}) error {
return errors.Wrapf(objects.Find(bson.M{"_key": key}).One(data), "ByKey(%q)", key)
}
func ForSortedSet(key string, f func(string, float64) error) error {
var el struct {
Value string `bson:"value"`
Score float64 `bson:"score"`
}
var err error
it := objects.
Find(bson.M{"_key": key}).
Sort("score").
Select(bson.M{"_id": 0, "value": 1, "score": 1}).
Iter()
for err == nil && it.Next(&el) {
if math.IsNaN(el.Score) {
log.Printf("warning: sorted set entry %q[%q] has a NaN score", key, el.Value)
continue
}
err = errors.Wrapf(f(el.Value, el.Score), "ForSortedSet(%q, (%q, %v))", key, el.Value, el.Score)
}
if e := errors.Wrapf(it.Close(), "closing ForSortedSet(%q)", key); err == nil {
err = e
}
return err
}
func SortedSetIDs(key string, stmt *sql.Stmt, args ...interface{}) error {
args = append(args[:len(args):len(args)], nil, nil)[:len(args)]
return ForSortedSet(key, func(idString string, ts float64) error {
id, err := strconv.ParseInt(idString, 10, 64)
if err != nil {
return errors.Wrap(err, "parsing ID")
}
t, err := Time(ts)
if err == nil && !t.Valid {
err = errors.New("invalid timestamp")
}
if err != nil {
return errors.Wrap(err, "parsing timestamp")
}
_, err = stmt.Exec(append(args, &id, &t.Time)...)
return errors.Wrapf(err, "inserting %q into database", key)
})
}
func UnsortedSetIDs(key string, stmt *sql.Stmt, args ...interface{}) error {
args = append(args[:len(args):len(args)], nil)[:len(args)]
var data struct {
Members []string `bson:"members"`
}
if err := ByKey(key, &data); err != nil {
if errors.Cause(err) == mgo.ErrNotFound {
return nil
}
return err
}
for _, idString := range data.Members {
id, err := strconv.ParseInt(idString, 10, 64)
if err != nil {
return errors.Wrapf(err, "parsing ID %q", idString)
}
_, err = stmt.Exec(append(args, &id)...)
return errors.Wrapf(err, "inserting %q into database for id %d", key, id)
}
return nil
}
func FindOrphanedCommunityServerPostAuthors() error {
selectCommunityServerPostAuthor, err := cs.Prepare(`SELECT [UserID], [PostAuthor] FROM [cs_Posts] WHERE [PostID] = ? AND [UserID] <> 1001;`)
if err != nil {
return errors.Wrap(err, "db prepare selectCommunityServerPostAuthor")
}
defer selectCommunityServerPostAuthor.Close()
selectCommunityServerImportedUser, err := db.Prepare(`select u.id, u.name from users u inner join imported_users_communityserver iucs on u.id = iucs.user_id where iucs.communityserver_user_id = $1;`)
if err != nil {
return errors.Wrap(err, "db prepare selectCommunityServerImportedUser")
}
defer selectCommunityServerImportedUser.Close()
rows, err := db.Query(`select ipcs.post_id, ipcs.communityserver_post_id from imported_posts_communityserver ipcs inner join posts p on p.id = ipcs.post_id where p.author_id is null;`)
if err != nil {
return errors.Wrap(err, "select guest posts imported from Community Server")
}
defer rows.Close()
for rows.Next() {
var nodeBBPostID int64
var communityServerPostID int64
err = rows.Scan(&nodeBBPostID, &communityServerPostID)
if err != nil {
return errors.Wrap(err, "select guest posts imported from Community Server")
}
var communityServerUserID int64
var communityServerUserName string
err = selectCommunityServerPostAuthor.QueryRow(&communityServerPostID).Scan(&communityServerUserID, &communityServerUserName)
if err == sql.ErrNoRows {
continue
}
if err != nil {
return errors.Wrap(err, "select Community Server post author")
}
var nodeBBUserID int64
var nodeBBUserName string
err = selectCommunityServerImportedUser.QueryRow(&communityServerUserID).Scan(&nodeBBUserID, &nodeBBUserName)
if err == sql.ErrNoRows {
fmt.Printf("Imported /post/%d is missing author (cs: %d %q)\n", nodeBBPostID, communityServerUserID, communityServerUserName)
continue
}
if err != nil {
return errors.Wrap(err, "select NodeBB post author")
}
fmt.Printf("Imported /post/%d is missing author /uid/%d %q (cs: %d %q)\n", nodeBBPostID, nodeBBUserID, nodeBBUserName, communityServerUserID, communityServerUserName)
}
return errors.Wrap(rows.Close(), "select guest posts imported from Community Server")
}
func FindIncorrectCommunityServerUserNames() error {
selectCommunityServerUserName, err := cs.Prepare(`SELECT [UserName] FROM [cs_Users] WHERE [UserID] = ?;`)
if err != nil {
return errors.Wrap(err, "db prepare selectCommunityServerUserName")
}
defer selectCommunityServerUserName.Close()
rows, err := db.Query(`select u.id, iucs.communityserver_user_id, u.name from users u inner join imported_users_communityserver iucs on u.id = iucs.user_id;`)
if err != nil {
return errors.Wrap(err, "select imported Community Server users")
}
defer rows.Close()
for rows.Next() {
var nodeBBID int64
var communityServerID int64
var nodeBBName string
err = rows.Scan(&nodeBBID, &communityServerID, &nodeBBName)
if err != nil {
return errors.Wrap(err, "select imported Community Server users")
}
var communityServerName string
err = selectCommunityServerUserName.QueryRow(&communityServerID).Scan(&communityServerName)
if err != nil {
return errors.Wrapf(err, "get real Community Server username for %d %q", nodeBBID, nodeBBName)
}
if nodeBBName != communityServerName {
fmt.Printf("Imported /uid/%d has name %q on NodeBB (cs: %d %q)\n", nodeBBID, nodeBBName, communityServerID, communityServerName)
}
}
return errors.Wrap(rows.Close(), "select imported Community Server users")
}
func FindUnusedUserAccounts() error {
rows, err := db.Query(`select u.id, u.name from users u left join posts p on p.author_id = u.id left join user_post_votes upv on upv.user_id = u.id left join user_topic_bookmarks utb on utb.user_id = u.id where p.id is null and upv.post_id is null and utb.topic_id is null and (u.last_online is null or u.last_online < now() - interval '1 year') order by u.id asc;`)
if err != nil {
return errors.Wrap(err, "select unused user accounts")
}
defer rows.Close()
for rows.Next() {
var id int64
var name string
if err = rows.Scan(&id, &name); err != nil {
return errors.Wrap(err, "select unused user accounts")
}
fmt.Printf("Possibly unused user account /uid/%d %q\n", id, name)
}
return errors.Wrap(rows.Close(), "select unused user accounts")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment