|
#!/usr/bin/env python3 |
|
""" |
|
Set author of a codi note. |
|
|
|
$ codi-set-author [--note=|--owner=] new-user |
|
|
|
new-user can be uuid or email |
|
""" |
|
|
|
import argparse |
|
import base64 |
|
import datetime |
|
import io |
|
import logging |
|
import os |
|
import sys |
|
import textwrap |
|
import psycopg2 |
|
import uuid |
|
|
|
def is_uuid(u): |
|
try: |
|
uuid.UUID(u) |
|
except ValueError: |
|
return False |
|
|
|
# def uuid2b64(u): |
|
# # codi strips off trailing '='. |
|
# return base64.urlsafe_b64encode(uuid.UUID(u).bytes).decode('utf-8').rstrip("=") |
|
# |
|
# |
|
|
|
def b642uuid(b): |
|
# base64 values should be length of multiple of 4. |
|
# you can just add = at the end, and python's b64decode |
|
# will strip off the correct number. |
|
return str(uuid.UUID(bytes=base64.urlsafe_b64decode(b + '=' * (4 - len(b) % 4)))) |
|
|
|
def get_user_by_input(info, cursor): |
|
if is_uuid(info): |
|
cursor.execute(textwrap.dedent(""" |
|
SELECT "Users".id, "Users".email, "Users".profileid |
|
FROM "Users" WHERE "Users".id = %s"""), |
|
(uuid.UUID(info),)) |
|
else: |
|
cursor.execute(textwrap.dedent(""" |
|
SELECT "Users".id, "Users".email, "Users".profileid |
|
FROM "Users" |
|
WHERE ("Users".email = %s OR "Users".profileid = %s)"""), |
|
(info, info)) |
|
|
|
if cursor.rowcount != 1: |
|
logging.error("Found %d matches for user '%s':", cursor.rowcount, info) |
|
for (cuuid, email, profileid) in cursor: |
|
logging.error("uuid=%s email=%s profileid=%s", cuuid, email, profileid) |
|
sys.exit(1) |
|
return cursor.fetchone() |
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument('--verbose', '-v', action='count', default=0) |
|
parser.add_argument('--dry-run', '-n', action='store_true') |
|
|
|
parser.add_argument( |
|
'-u', '--user', help='username to connect to postgres', |
|
default="codimd") |
|
parser.add_argument( |
|
'-P', '--password', help='password for user', |
|
default=os.environ.get('PGPASSWORD')) |
|
parser.add_argument( |
|
'-d', '--dbname', help='The name of the database', |
|
default="codimd") |
|
parser.add_argument( |
|
'-H', '--hostname', help='connect to db on host HOSTNAME', |
|
default="localhost"), |
|
|
|
from_group = parser.add_mutually_exclusive_group(required=True) |
|
from_group.add_argument( |
|
'--owner', default=False, action='store_true', |
|
help='owner as email or uuid') |
|
from_group.add_argument( |
|
'--page-uuids', default=False, action='store_true', |
|
help='from-list are page uuids (or base64 encoded uuids)') |
|
from_group.add_argument( |
|
'--page-names', default=False, action='store_true', |
|
help='from-list are page names (aliases)') |
|
|
|
parser.add_argument('new_owner', help='new owner as email or uuid') |
|
parser.add_argument('from_list', help='pages/notes or owner', |
|
nargs="+") |
|
|
|
args = parser.parse_args() |
|
|
|
level = (logging.ERROR, logging.INFO, logging.DEBUG)[ |
|
min(args.verbose + 1, 2)] |
|
logging.basicConfig(level=level) |
|
|
|
conn = psycopg2.connect( |
|
host=args.hostname, database=args.dbname, user=args.user, |
|
password=args.password) |
|
|
|
cur = conn.cursor() |
|
|
|
(new_uuid, new_email, new_profileid) = get_user_by_input(args.new_owner, cur) |
|
|
|
if args.owner: |
|
# from-list is an owner by uuid or email. |
|
if len(args.from_list) > 1: |
|
logging.error("only a single owner supported (%s)", args.from_list) |
|
sys.exit(1) |
|
|
|
(owner_uuid, owner_email, owner_profileid) = get_user_by_input(args.from_list[0], cur) |
|
|
|
cur.execute(textwrap.dedent(""" |
|
SELECT "Users".id, "Notes".id |
|
FROM "Users" INNER JOIN "Notes" |
|
ON "Notes"."ownerId" = "Users"."id" |
|
WHERE "Users".id = %s"""), |
|
(owner_uuid,)) |
|
|
|
page_uuids = [] |
|
for (cur_uuid, page_uuid) in cur: |
|
page_uuids.append(page_uuid) |
|
|
|
logging.info("Found %d pages owned by %s (uuid=%s)", |
|
len(page_uuids), args.from_list[0], owner_uuid) |
|
|
|
elif args.page_uuids: |
|
# pages are base64 uuids or uuids |
|
page_uuids = [] |
|
for page in args.from_list: |
|
if is_uuid(page): |
|
page_uuids.append(page) |
|
else: |
|
try: |
|
page_uuids.append(b642uuid(page.rstrip("#"))) |
|
except ValueError: |
|
logging.error("%s is not a uuid or base64 uuid", page) |
|
sys.exit(1) |
|
|
|
uuids = tuple(str(uuid.UUID(p)) for p in page_uuids) |
|
cur.execute(textwrap.dedent(""" |
|
SELECT "Notes".id |
|
FROM "Notes" |
|
WHERE "Notes".id in %s"""), |
|
(uuids,)) |
|
|
|
if cur.rowcount != len(page_uuids): |
|
logging.error("you gave %d input ids, but %d exist as notes", |
|
len(page_uuids), cur.rowcount) |
|
sys.exit(1) |
|
|
|
else: |
|
# --page-names (aliases) |
|
cur.execute(textwrap.dedent(""" |
|
SELECT "Notes".id, "Notes".alias |
|
FROM "Notes" |
|
WHERE "Notes".alias in %s"""), |
|
(tuple(args.from_list),)) |
|
|
|
if cur.rowcount != len(args.from_list): |
|
logging.error("you gave %d pages but %d exist", |
|
len(args.from_list), cur.rowcount) |
|
logging.error("Existing were:") |
|
for (cuuid, alias) in cur: |
|
logging.error("%s %s", cuuid, alias) |
|
sys.exit(1) |
|
|
|
page_uuids = [x[0] for x in cur] |
|
|
|
if len(page_uuids) == 0: |
|
logging.info("Nothing to do, 0 relevant page ids") |
|
sys.exit(0) |
|
|
|
owner_friendly = new_profileid if new_profileid else new_email |
|
logging.info("Will change ownership of %d pages to %s (%s)", |
|
len(page_uuids), owner_friendly, new_uuid) |
|
|
|
cur.execute(textwrap.dedent(""" |
|
UPDATE "Notes" set "ownerId" = %s |
|
WHERE id in %s |
|
"""), (new_uuid, tuple(page_uuids),)) |
|
|
|
if cur.rowcount != len(page_uuids): |
|
logging.error("Updated %d pages, but expected to update %d", |
|
cur.rowcount, len(page_uuids)) |
|
cur.rollback() |
|
sys.exit(1) |
|
|
|
logging.info("Updated %d pages, setting owner to %s (%s)", |
|
cur.rowcount, owner_friendly, new_uuid) |
|
|
|
if args.dry_run: |
|
logging.info("Dry run, rolling back\n") |
|
conn.rollback() |
|
else: |
|
conn.commit() |
|
|
|
conn.close() |
|
|
|
sys.exit(0) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |