Skip to content

Instantly share code, notes, and snippets.

@mpalet
Forked from mohamedadaly/bw_export_kp.py
Last active March 31, 2020 18:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mpalet/5dfb2bcdce069fbc0f920ae88f074906 to your computer and use it in GitHub Desktop.
Save mpalet/5dfb2bcdce069fbc0f920ae88f074906 to your computer and use it in GitHub Desktop.
Export Bitwarden to KeePass 2 XML format
#!python
"""
Exports a Bitwarden database into a KeePass file (kdbx) including file attachments, custom fields and folder structure.
It can also export an unencrypted XML KeePass file conforming to KeePass 2 XML format.
It requires keepassxc-cli, if not available it can still export KeePass 2 XML
- https://github.com/keepassxreboot/keepassxc
Usage: bw_export_kp.py [-h] [-x] [-d] [-bw-password None] [-kee-password None]
bw_user output_file
positional arguments:
bw_user
output_file
optional arguments:
-h, --help show this help message and exit
-x, --xml-output saves an UNENCRYPTED KeePass 2 XML file
-d, --diff-pass different passwords for Bitwarden and KeePass file
-bw-password None Bitwarden password (prompted if not provided)
-kee-password None KeePass password (prompted if not provided)
References:
- Bitwarden CLI: https://help.bitwarden.com/article/cli/
- KeePass 2 XML: https://github.com/keepassxreboot/keepassxc-specs/blob/master/kdbx-xml/rfc.txt
"""
from __future__ import print_function
import sys
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
import os
import errno
import shutil
import contextlib
import tempfile
import base64
import subprocess
import json
import uuid
import xmltodict
import getpass
def is_tool(name):
"""Check whether `name` is on PATH and marked as executable."""
# from whichcraft import which
from shutil import which
return which(name) is not None
def static_vars(**kwargs):
"""
Decorates a function with static variables and initializes them.
Usage:
Add before function declaration
@static_vars(var1=value, var2=value, ...)
"""
def decorate(func):
for k in kwargs:
setattr(func, k, kwargs[k])
return func
return decorate
@contextlib.contextmanager
def write_open(filename=None):
if filename and filename != '-':
dirname, basename = os.path.split(filename)
temp, tempPath = tempfile.mkstemp(prefix=basename, dir=dirname)
fh = open(temp, 'w')
else:
fh = sys.stdout
try:
yield fh
finally:
if fh is not sys.stdout:
fh.close()
shutil.move(tempPath, filename)
def safe_move(src, dst):
"""Rename a file from ``src`` to ``dst``.
* Moves must be atomic. ``shutil.move()`` is not atomic.
Note that multiple threads may try to write to the cache at once,
so atomicity is required to ensure the serving on one thread doesn't
pick up a partially saved image from another thread.
* Moves must work across filesystems. Often temp directories and the
cache directories live on different filesystems. ``os.rename()`` can
throw errors if run across filesystems.
So we try ``os.rename()``, but if we detect a cross-filesystem copy, we
switch to ``shutil.move()`` with some wrappers to make it atomic.
"""
try:
os.rename(src, dst)
except OSError as err:
if err.errno == errno.EXDEV:
# Generate a unique ID, and copy `<src>` to the target directory
# with a temporary name `<dst>.<ID>.tmp`. Because we're copying
# across a filesystem boundary, this initial copy may not be
# atomic. We intersperse a random UUID so if different processes
# are copying into `<dst>`, they don't overlap in their tmp copies.
copy_id = uuid.uuid4()
tmp_dst = "%s.%s.tmp" % (dst, copy_id)
shutil.copyfile(src, tmp_dst)
# Then do an atomic rename onto the new name, and clean up the
# source image.
os.rename(tmp_dst, dst)
os.unlink(src)
else:
raise
def get_uuid(name):
"""
Computes the UUID of the given string as required by KeePass XML standard
https://github.com/keepassxreboot/keepassxc-specs/blob/master/kdbx-xml/rfc.txt
"""
#name = name.encode('ascii', 'ignore')
uid = uuid.uuid5(uuid.NAMESPACE_DNS, name)
return base64.b64encode(uid.bytes).decode("utf-8")
def get_folder(f):
"""
Returns a dict of the input folder JSON structure returned by Bitwarden.
"""
return dict(UUID=get_uuid(f['name']),
Name=f['name'])
def get_protected_value(v):
"""
Returns a Value element that is "memory protected" in KeePass
(useful for Passwords and sensitive custom fields/strings).
"""
return {'#text': v, '@ProtectInMemory': 'True'}
def get_fields(subitem, protected=[]):
"""
Returns the components of subitem as a fields array,
protecting the items in protected list
"""
fields = []
for k, v in subitem.items():
# check if it's protected
if k in protected:
v = get_protected_value(v)
fields.append(dict(Key=k, Value=v))
return fields
@static_vars(binary_id=0, binaries=[])
def get_entry(e):
"""
Returns a dict of the input entry (item from Bitwarden)
Parses the title, username, password, urls, notes, and custom fields.
"""
# Parse custom fields, protecting as necessary
fields = []
if 'fields' in e:
for f in e['fields']:
if f['name'] is not None:
# get value
value = f['value']
# if protected?
if f['type'] == 1:
value = get_protected_value(value)
# put together
fields.append(dict(Key=f['name'], Value=value))
# default values
urls = ''
username, password = '', ''
notes = e['notes'] if e['notes'] is not None else ''
# read username, password, and url if a login item
if 'login' in e:
login = e['login']
if 'uris' in login:
urls = [u['uri'] for u in login['uris']]
urls = ','.join(urls)
# get username and password
username = login['username']
password = login['password']
# add totop to fields as protected
fields.append(dict(Key='totp',
Value=get_protected_value(login['totp'])))
# Parse Card items
if 'card' in e:
# Make number a protected field
fields.extend(get_fields(e['card'], protected=['number']))
# Parse Identity items
if 'identity' in e:
fields.extend(get_fields(e['identity']))
# Parse attachments
attachments = []
if 'attachments' in e:
for a in e['attachments']:
if a['id'] is not None:
#append attachment reference
attachments.append(
dict(
Key=a['fileName'],
Value={'@Ref': get_entry.binary_id }
)
)
#add binary data to function static list and update static counter
att = get_bw_attachment(a['id'],e['id'])
get_entry.binaries.append(
dict({'@ID': get_entry.binary_id, '@Compressed': 'False', '#text': att})
)
get_entry.binary_id += 1
# Check it's not None
username = username or ''
password = password or ''
# assemble the entry into a dict with a UUID
entry = dict(UUID=get_uuid(e['name']),
String=[dict(Key='Title', Value=e['name']),
dict(Key='UserName', Value=username),
dict(Key='Password', Value=get_protected_value(password)),
dict(Key='URL', Value=urls),
dict(Key='Notes', Value=notes)
] + fields)
if (attachments):
entry.update(dict(Binary=attachments))
return entry
def get_cmd_output(cmd):
"""
Returns the output of the given command
"""
status, output = subprocess.getstatusoutput(cmd)
if status != 0:
eprint("Error running command:", cmd)
raise Exception
return output
def bw_logout():
"""
Bitwarden logout
"""
cmd = 'bw logout --raw'
subprocess.run(cmd, shell=True, capture_output=True)
def get_bw_data():
"""
Gets the folders and items from Bitwarden CLI
"""
# get folders
cmd = 'bw list folders --session '+main.bw_session
folders = json.loads(get_cmd_output(cmd))
# get items
cmd = 'bw list items --session '+main.bw_session
items = json.loads(get_cmd_output(cmd))
return folders, items
def secure_delete(path, passes=1):
"""
Safely delete a file by overwriting it with random data
"""
with open(path, "ba+") as delfile:
length = delfile.tell()
for i in range(passes):
delfile.seek(0)
delfile.write(os.urandom(length))
os.remove(path)
def get_bw_attachment(id, itemid):
"""
Gets an attachment from Bitwarden CLI
"""
with tempfile.TemporaryDirectory() as tmpdir:
cmd = 'bw get attachment --itemid '+itemid+' '+id+' --output '+tmpdir+'/ --raw --session '+main.bw_session
path = get_cmd_output(cmd)
if not os.path.isfile(path):
eprint("Error downloading attachment:", id)
raise Exception
with open(path, "rb") as f:
encoded_file = base64.b64encode(f.read()).decode("utf-8")
secure_delete(path)
return encoded_file
@static_vars(bw_session='')
def main(bw_user, output_file,
xml_output: ('saves an UNENCRYPTED KeePass 2 XML file', 'flag', 'x'),
diff_pass: ('different passwords for Bitwarden and KeePass file', 'flag', 'd'),
bw_password: ('Bitwarden password (prompted if not provided)', 'option')=None,
kee_password: ('KeePass password (prompted if not provided)', 'option')=None
):
"""
Main function
"""
if not is_tool('bw'):
eprint("Bitwarden cli not found")
raise Exception
#Log out any existing session
bw_logout()
# Bitwarden login
if bw_password is None:
bw_password = getpass.getpass(prompt='Bitwarden password: ')
cmd = 'bw --raw login '+str(bw_user)
res = subprocess.run(cmd, input=bytearray(bw_password, 'utf-8'), shell=True, capture_output=True)
if res.returncode != 0:
eprint("Wrong password")
raise SystemExit
main.bw_session = res.stdout.decode()
del res #delete result object which has the password as a cmd argument
#set keepass password
if kee_password is None:
if diff_pass:
kee_password = getpass.getpass(prompt='KeePass password: ')
else:
kee_password = bw_password
del bw_password
# get data from bw
bw_folders, bw_items = get_bw_data()
# parse all entries
entries = [get_entry(e) for e in bw_items]
# loop over folders
# bw_folders = d['folders']
folders = []
root_entries = []
for f in bw_folders:
# parse the folder
folder = get_folder(f)
folder_id = f['id']
# loop on entries in this folder
folder_entries = []
for entry, item in zip(entries, bw_items):
if item['folderId'] == folder_id:
folder_entries.append(entry)
# NoFolder (with None id)
if folder_id is None:
root_entries = folder_entries
# Normal folder
else:
if len(folder_entries) > 0:
folder['Entry'] = folder_entries
# add to output folder
folders.append(folder)
# Root group
root_group = get_folder(dict(name='Root'))
root_group['Group'] = folders
# add items to root folder
if len(root_entries) > 0:
root_group['Entry'] = root_entries
# Root element
root=dict(Group=root_group)
# Meta element
meta = dict(Generator='bitwarden export', MasterKeyChangeForce=-1, MasterKeyChangeRec=-1)
# add binary files from attachments
if (get_entry.binaries):
meta.update(dict(Binaries=dict(Binary=get_entry.binaries)))
# xml document contents
xml = dict(KeePassFile=dict(Meta=meta, Root=root))
if xml_output:
# export unencrypted XML
with write_open(output_file) as out:
out.write(xmltodict.unparse(xml, pretty=True))
out.close()
raise SystemExit
if not is_tool('keepassxc-cli'):
eprint("keepassxc-cli not found")
raise Exception
#write XML and export to keepass
with tempfile.NamedTemporaryFile() as xml_out:
xml_out.write(bytearray(xmltodict.unparse(xml, pretty=True), 'utf-8'))
xml_out.flush()
with tempfile.TemporaryDirectory() as tmpdir:
output_tempfile = tmpdir + '/tmp'
cmd = 'keepassxc-cli import '+xml_out.name+' '+output_tempfile
res = subprocess.run(cmd, input=bytearray(kee_password, 'utf-8'), shell=True, capture_output=True)
if res.returncode != 0:
eprint("Error exporting KeePass file")
raise SystemExit
del res
safe_move(output_tempfile, output_file)
#cleanup and exit
del kee_password
bw_logout()
sys.exit(0)
if __name__ == "__main__":
import plac;
try:
plac.call(main)
except SystemExit:
bw_logout()
sys.exit(0)
except:
import traceback
print("Unexpected error:", sys.exc_info())
print(traceback.format_exc())
bw_logout()
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment