|
#!/usr/bin/python3 |
|
|
|
""" |
|
Copyright (c) 2020 Fabian Große. |
|
This program is free software: you can redistribute it and/or modify |
|
it under the terms of the GNU General Public License as published by |
|
the Free Software Foundation, version 3. |
|
|
|
This program is distributed in the hope that it will be useful, but |
|
WITHOUT ANY WARRANTY; without even the implied warranty of |
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
|
General Public License for more details. |
|
|
|
You should have received a copy of the GNU General Public License |
|
along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
""" |
|
|
|
import json |
|
import sys, getopt |
|
import os, subprocess |
|
import base64 |
|
import shutil, time |
|
|
|
|
|
ITEM_TEMPLATE = { |
|
"folderId":None, |
|
"type":1, # 1-Website password | 2-Secure note | 3-Card | 4-Identity |
|
"name":"Item name", |
|
"notes":"Some notes about this item.", |
|
"login":None, # LOGIN_TEMPLATE |
|
"secureNote":None |
|
} |
|
ITEM_LOGIN_TEMPLATE = { |
|
"uris":[], # LOGIN_URI_TEMPLATE |
|
"username":"jdoe", |
|
"password":"myp@ssword123", |
|
"totp":None |
|
} |
|
ITEM_LOGIN_URI_TEMPLATE = { |
|
"match":None, # 0-Domain | 1-Host | 2-Starts with | 3-Exact | 4-RegExp | 5-Never |
|
"uri":"https://google.com" |
|
} |
|
ITEM_SECURENOTE_TEMPLATE = {"type":0} # 0-Generic |
|
ITEM_FIELD_TEMPLATE = { |
|
"name":"Field name", |
|
"value":"Some value", |
|
"type":0 # 0-Text | 1-Hidden | 2-Boolean |
|
} |
|
|
|
|
|
def create_login(item, has_uris=True): |
|
"""Create a new Bitwarden login entry. |
|
|
|
Args: |
|
- item (dict): Part of the Psono dict. |
|
- has_uris (bool, optional): Used to differ Psono's website and application passwords. True => website password, False => application password. Defaults to True. |
|
|
|
Returns: |
|
dict: Dictionary representing a login item in Bitwarden's JSON structure. |
|
""" |
|
|
|
prefix = "website_password_" if has_uris else "application_password_" |
|
# Populate the template with data |
|
d = { |
|
"name": item.get(prefix + "title", "Empty title"), |
|
"type": 1, |
|
"login": { |
|
"username": item.get(prefix + "username", ""), |
|
"password": item.get(prefix + "password", ""), |
|
"uris": [] |
|
}, |
|
"notes": item.get(prefix + "notes", None) |
|
} |
|
|
|
# Add URIs if item is website password |
|
if has_uris: |
|
# Reference frame is the 'login' sub-dict |
|
temp_d = {"uris": [{ |
|
"match": 0, |
|
"uri": item.get(prefix + "url", "") |
|
}]} |
|
d["login"].update(temp_d) |
|
|
|
return d |
|
|
|
|
|
def create_note(item, is_bookmark=False): |
|
"""Create a new Bitwarden secure note entry. |
|
|
|
Args: |
|
- item (dict): Part of the Psono dict. |
|
- is_bookmark (bool, optional): Used to differ Psono's normal note and bookmark. True => bookmark, False => normal note. Defaults to False. |
|
|
|
Returns: |
|
dict: Dictionary representing a secure note item in Bitwarden's JSON structure. |
|
""" |
|
|
|
prefix = "bookmark_" if is_bookmark else "note_" |
|
# Populate the template with data |
|
d = { |
|
"name": item.get(prefix + "title", "Empty title"), |
|
"type": 2, |
|
"secureNote": {"type":0}, |
|
"notes": item.get(prefix + "notes", "") |
|
} |
|
|
|
# Add custom URL field if item is bookmark |
|
if is_bookmark: |
|
temp_d = {"fields": [{ |
|
"name": "URL", |
|
"value": item.get(prefix + "url", "https://example.com"), |
|
"type": 0 |
|
}]} |
|
d.update(temp_d) |
|
|
|
return d |
|
|
|
|
|
def create_item(item, parent_folder=None): |
|
"""Create a new Bitwarden item. The Bitwarden CLI Tool is required. |
|
|
|
Args: |
|
- item (dict): Part of the Psono dict. Supported types are ```website_password, application_password, note and bookmark```. |
|
- parent_folder (str, optional): Folder ID of the parent folder. Defaults to None. |
|
""" |
|
|
|
d = {"folderId": parent_folder} |
|
# Find and call the appropriate function |
|
if item["type"] == "website_password": |
|
d.update(create_login(item)) |
|
elif item["type"] == "note": |
|
d.update(create_note(item)) |
|
elif item["type"] == "application_password": |
|
d.update(create_login(item, False)) |
|
elif item["type"] == "bookmark": |
|
d.update(create_note(item, True)) |
|
else: |
|
print("Not supported entry type '" + item["type"] + "'. Entry will be ignored.\n", file=sys.stderr) |
|
return |
|
|
|
# Dump dict to JSON string ind Base64-encode it |
|
json_str = json.dumps(d) |
|
enc_bytes = base64.b64encode(json_str.encode("utf-8")) |
|
enc_str = str(enc_bytes, "utf-8") |
|
|
|
# Call Bitwarden CLI Tool to finally create the item |
|
bw = subprocess.Popen(['bw', 'create', 'item', enc_str], |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.STDOUT) |
|
stdout, stderr = bw.communicate() |
|
|
|
|
|
def create_folder(folder_name): |
|
"""Create a new Bitwarden folder called ```folder_name```. |
|
|
|
Args: |
|
folder_name (str): Name of the new folder. To create nested folders use a name like ```folder/sub/sub-sub```. |
|
|
|
Returns: |
|
str: Bitwarden folder GUID |
|
""" |
|
|
|
d = {"name": folder_name} |
|
# Dump dict to JSON string ind Base64-encode it |
|
json_str = json.dumps(d) |
|
enc_bytes = base64.b64encode(json_str.encode("utf-8")) |
|
enc_str = str(enc_bytes, "utf-8") |
|
|
|
# Call Bitwarden CLI Tool to create the folder |
|
bw = subprocess.Popen(['bw', 'create', 'folder', enc_str], |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.STDOUT) |
|
response, _ = bw.communicate() |
|
|
|
# Get folder GUID from subprocess output |
|
bw_folder_id = (json.loads(response))["id"] |
|
return bw_folder_id |
|
|
|
|
|
def get_set_size(psono_obj): |
|
"""Get the size of the working set, aka. how many items need to be processed. |
|
|
|
Args: |
|
psono_obj (dict): The whole Psono dict. |
|
|
|
Returns: |
|
int: Size of the working set. |
|
""" |
|
|
|
count = 0 |
|
# if 'items' key is available in current context, add its length to the total |
|
if "items" in psono_obj: |
|
count += len(psono_obj["items"]) |
|
# if 'folders' key is available in current context, recurse through these folders |
|
if "folders" in psono_obj: |
|
for folder in psono_obj["folders"]: |
|
count += get_set_size(folder) |
|
|
|
return count |
|
|
|
|
|
def print_progress(step, total, prefix="Progress:", fill="█", length=80): |
|
"""Prints a progress bar in the console/terminal. |
|
|
|
Args: |
|
- step (int): Current working step out of the total. |
|
- total (int): Total number of working steps. |
|
- prefix (str, optional): Progress bar prefix, will be printed on the far left. Defaults to "Progress:". |
|
- fill (str, optional): Single character which is used to fill the bar and display the progress. Defaults to "█". |
|
- length (int, optional): Total length of the bar in characters, including prefix. Is fallback if dynamic sizing isn't available. Defaults to 80. |
|
""" |
|
|
|
# progress bar format |
|
styling = "%s |%s| %s/%s" % (prefix, fill, step, total) |
|
# manage dynamic terminal sizing |
|
cols, _ = shutil.get_terminal_size(fallback=(length,1)) |
|
length = cols - len(styling) |
|
# calculate bar filling |
|
filled_length = int(length * step // total) |
|
bar = fill * filled_length + "-" * (length - filled_length) |
|
|
|
print("\r%s" % styling.replace(fill, bar), end="\r") |
|
# New line on Complete |
|
if step == total: |
|
print() |
|
|
|
|
|
def traverse_tree(psono_obj, bw_folder_id=None, curr_folder_name=""): |
|
"""Traverses the Psono nested dict tree to acurately reproduce the folder structure in Bitwarden. |
|
|
|
Args: |
|
- psono_obj (dict): Part of the Psono dict. |
|
- bw_folder_id (str, optional): Bitwarden folder ID of the current folder. Defaults to None. |
|
- curr_folder_name (str, optional): Name of the current folder. Defaults to "". |
|
""" |
|
|
|
# If there is the key 'items' in current context... |
|
if "items" in psono_obj: |
|
# ...loop through all children of that key, aka. through all items in the current folder |
|
for item in psono_obj["items"]: |
|
time.sleep(1) |
|
create_item(item, bw_folder_id) # create equivalent Bitwarden item |
|
global progress |
|
progress += 1 # increase global progress tracker |
|
print_progress(progress, set_size) # update progress bar. |
|
|
|
# If there is the key 'folders' in current context... |
|
if "folders" in psono_obj: |
|
# get list of existing Bitwarden folders to check against later |
|
bw = subprocess.Popen(['bw', 'list', 'folders'], |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.STDOUT) |
|
response, _ = bw.communicate() |
|
folder_list = json.loads(response) |
|
|
|
# ...loop through all children of that key, aka. through all sub-folders of the current folder |
|
for folder in psono_obj["folders"]: |
|
# compose the folder name of new sub-folder |
|
new_folder_name = curr_folder_name + "/" + folder["name"] if curr_folder_name else folder["name"] |
|
# Check if folder already exists |
|
for f in folder_list: |
|
# if exists, get its ID |
|
if f["name"] == new_folder_name: |
|
new_folder_id = f["id"] |
|
break |
|
else: # else create new folder |
|
new_folder_id = create_folder(new_folder_name) |
|
# print("\nDEBUG: New folder " + new_folder_name + "\n") |
|
# Recursively move deeper into the tree |
|
traverse_tree(folder, new_folder_id, new_folder_name) |
|
|
|
|
|
set_size = 0 # Global var, size of the current working set |
|
progress = 0 # Global var, tracks progress |
|
|
|
|
|
def main(argv): |
|
"""Main function. Is run, when script is launched from terminal. |
|
|
|
Args: |
|
argv ([type]): List of arguments passed from the command line. |
|
""" |
|
|
|
# Check if Bitwarden vault is open and usable |
|
bw = subprocess.Popen(['bw', 'status'], |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.STDOUT) |
|
response, _ = bw.communicate() |
|
status = (json.loads(response))["status"] |
|
if status != "unlocked": |
|
sys.exit("You need to login using the Bitwarden CLI client and/or unlock your vault before using this script.") |
|
|
|
# Sync vault before messing with it |
|
bw = subprocess.Popen(['bw', 'sync'], |
|
stdout=sys.stdout, |
|
stderr=sys.stderr) |
|
_,_ = bw.communicate() |
|
print() |
|
|
|
# Set up command line arguments |
|
try: |
|
opts, _ = getopt.getopt(argv, "hp:", ["help", "psonofile="]) |
|
except getopt.GetoptError: |
|
sys.exit("psono-importer [-p|--psonofile] <exported psono file>") |
|
# Loop through arguments |
|
for opt, arg in opts: |
|
if opt == "-h": # Help text |
|
print("You need to login using the Bitwarden CLI client and/or unlock your vault before using this script.") |
|
print("psono-importer -p <exported psono file>") |
|
sys.exit() |
|
elif opt in ("-p", "--psonofile"): # path to exported Psono file |
|
psonofile = arg |
|
# Try loading the file |
|
psono_dict = {} |
|
try: |
|
psono_dict = json.load(open(psonofile, 'r')) |
|
except: |
|
sys.exit("File does not exist or it's contents are faulty.") |
|
|
|
# Start of traversal |
|
global set_size |
|
set_size = get_set_size(psono_dict) |
|
print_progress(0, set_size) |
|
traverse_tree(psono_dict) |
|
|
|
# Sync vault again after messing with it |
|
bw = subprocess.Popen(['bw', 'sync'], |
|
stdout=sys.stdout, |
|
stderr=sys.stderr) |
|
_,_ = bw.communicate() |
|
print() |
|
else: |
|
sys.exit("psono-importer [-p|--psonofile] <exported psono file>") |
|
|
|
|
|
if __name__ == "__main__": |
|
main(sys.argv[1:]) |