Created
November 25, 2022 21:27
-
-
Save lduboeuf/13f308435de731167916f2f7690a2ebd to your computer and use it in GitHub Desktop.
account migrqtion history db
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import os | |
import re | |
import sqlite3 | |
import shutil | |
def migrate(con, fromAccount, toAccount): | |
print(f'Start to migrate {fromAccount} to {toAccount}') | |
con.isolation_level = None | |
cursor = con.cursor() | |
cursor.execute("begin") | |
try: | |
con.execute("UPDATE threads SET accountId = ? WHERE accountId = ?", (toAccount, fromAccount)) | |
con.execute("UPDATE thread_participants SET accountId = ? WHERE accountId = ?", (toAccount, fromAccount)) | |
con.execute("UPDATE text_event_attachments SET accountId = ? WHERE accountId = ?", (toAccount, fromAccount)) | |
con.execute("UPDATE text_events SET accountId = ? WHERE accountId = ?", (toAccount, fromAccount)) | |
con.execute("UPDATE voice_events SET accountId = ? WHERE accountId = ?", (toAccount, fromAccount)) | |
con.execute("UPDATE chat_room_info SET accountId = ? WHERE accountId = ?", (toAccount, fromAccount)) | |
cursor.execute("commit") | |
except con.Error: | |
print("failed!") | |
cursor.execute("rollback") | |
print(f'Finished to migrate {fromAccount} to {toAccount}') | |
print("Start migration") | |
# tmpAccounts = os.popen("mc-tool list | sort").read().splitlines() | |
tmpAccounts = ['ofono/ofono/ril_0', 'ofono/ofono/ril_1', 'tartuffe'] | |
if len(tmpAccounts) == 0: | |
print('no accounts found, nothing to do') | |
exit() | |
# filter out possible fake accounts (e.g /phonesim) | |
p = re.compile('^.*_[0-9]{1}$') | |
accounts = [s for s in tmpAccounts if p.match(s)] | |
dbPath = os.path.expanduser("~") + '/.local/share/history-service/history.sqlite' | |
#backup in case | |
shutil.copy(dbPath, dbPath + '.backup') | |
print(f'use database {dbPath} and backup to {dbPath}.backup') | |
con = sqlite3.connect(dbPath) | |
oldAccounts = [] | |
for i, account in enumerate(accounts): | |
cur = con.cursor() | |
res = cur.execute(f"SELECT accountId FROM threads WHERE accountId LIKE '%{i}' GROUP BY accountId") | |
row = res.fetchone() | |
if row: | |
oldAccounts.append(row[0]) | |
cur.close() | |
print(f' found oldAccounts: {oldAccounts}, new accounts:{accounts}') | |
for i, oldAccount in enumerate(oldAccounts): | |
if len(accounts) >= i + 1 and accounts[i] != oldAccount: | |
migrate(con, oldAccount, accounts[i]) | |
con.close() | |
print("Done") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment