Skip to content

Instantly share code, notes, and snippets.

@joachimtingvold
Last active August 7, 2023 09:03
Show Gist options
  • Save joachimtingvold/fef17f2772ebd623b1b1ea4135e1e7f5 to your computer and use it in GitHub Desktop.
Save joachimtingvold/fef17f2772ebd623b1b1ea4135e1e7f5 to your computer and use it in GitHub Desktop.
Royal TSX Dynamic Folder 1Password with support for multiple vaults
from __future__ import print_function
from functools import partial
from sys import platform as _platform
from subprocess import Popen, PIPE
import sys
import json
import subprocess
import os
import base64
op_path_windows = r"$CustomProperty.OPPathWindows$"
op_path_macOS = r"$CustomProperty.OPPathmacOS$"
filter_account = r"$CustomProperty.Account$"
filter_vault = r"$CustomProperty.Vault$"
item_id = r"$DynamicCredential.EffectiveID$"
class RoyalUtils:
@staticmethod
def is_macOS():
plat = _platform.lower()
return plat.startswith("darwin")
@staticmethod
def exit_with_error(message, exception=None):
printError = partial(print, file=sys.stderr) # python2 compatibility
exception_message = str(exception) if exception else "N/A"
full_message = message + exception_message
printError(full_message)
sys.exit(1)
@staticmethod
def to_json(obj, pretty=False):
return json.dumps(obj, indent=4) if pretty else json.dumps(obj)
@staticmethod
def decode_to_utf8_string(potential_bytes):
if isinstance(potential_bytes, str):
return potential_bytes
else:
return potential_bytes.decode("utf-8")
class OnePassword:
op_path = ""
account = ""
vault = ""
vaults = []
unknown_error_string = "An unknown error occurred."
def __init__(self, op_path, account="", vault=""):
self.op_path = op_path
self.account = account.strip()
# use an array of vault IDs instead of a single ID
#self.vault = vault.strip()
self.vaults = vault.replace(' ','').split(',')
def get_items(self):
items = []
failed = False
# foreach vault, grab all of the items in it
for vault in self.vaults:
cmd_list_items = [
self.op_path,
"item", "list",
"--format=json",
]
if self.account:
cmd_list_items.append("--account")
cmd_list_items.append(self.account)
if self.vaults:
cmd_list_items.append("--vault")
cmd_list_items.append(vault)
op = subprocess.Popen(cmd_list_items, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(output, err) = op.communicate()
exit_code = op.wait()
success = exit_code == 0
if success:
# not sure why this needs to be done like this but F.I.
# from what I understand, op.exe returns the data in JSON format, then we need to decode it to a UTF8 string for RoyalTS, and then back into JSON (necessary so we can merge multiple vaults later anyway)
items.append(json.loads(RoyalUtils.decode_to_utf8_string(output)))
else:
failed = True
if not failed:
merged_output = []
for output in items:
merged_output.extend(output)
return merged_output
else:
if not err:
err = self.unknown_error_string
else:
err = RoyalUtils.decode_to_utf8_string(err)
raise Exception(err)
def get_item_details(self, item_id):
cmd_get_item = [
self.op_path,
"item", "get", item_id,
"--format=json",
]
if self.account:
cmd_get_item.append("--account")
cmd_get_item.append(self.account)
op = subprocess.Popen(cmd_get_item, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(output, err) = op.communicate()
exit_code = op.wait()
success = exit_code == 0
if success:
output = RoyalUtils.decode_to_utf8_string(output)
item = json.loads(output)
return item
else:
if not err:
err = self.unknown_error_string
else:
err = RoyalUtils.decode_to_utf8_string(err)
raise Exception(err)
class Converter:
@staticmethod
def convert_items(items):
objects = []
for item in items:
id = item.get("id", "")
title = item.get("title", "N/A")
primary_url = ""
if "urls" in item:
for url in item["urls"]:
if url.get("primary", None):
primary_url = url["href"]
vault_name = ""
if item.get("vault", None):
vault_name = item["vault"].get("name", "")
cred_type = "DynamicCredential"
cred = {
"Type": cred_type,
"ID": id,
"Name": title,
"Path": vault_name
}
if primary_url != "":
cred["URL"] = primary_url
objects.append(cred)
objects = sorted(objects, key = lambda i: (i["Path"], i["Name"]))
store = {
"Objects": objects
}
return store
@staticmethod
def convert_item(item_details):
username = None
password = None
fields = item_details.get("fields")
for field in fields:
field_id = field.get("id", None)
field_label = field.get("label", None)
field_value = field.get("value", None)
if field_id == "username" or field_label == "username":
username = field_value
if field_id == "password" or field_label == "password":
password = field_value
cred = { }
if username is not None:
cred["Username"] = username
if password is not None:
cred["Password"] = password
return cred
class Coordinator:
op_path = ""
account = ""
vault = ""
error_message_get_items = "Error while getting items: "
error_message_get_item_details = "Error while getting item details: "
def __init__(self, op_path_windows, op_path_macOS, account, vault):
self.op_path = op_path_macOS if RoyalUtils.is_macOS() else op_path_windows
self.account = account
self.vault = vault
def get_items(self):
op = OnePassword(self.op_path, self.account, self.vault)
items = None
try:
items = op.get_items()
except Exception as e:
RoyalUtils.exit_with_error(self.error_message_get_items, e)
items_details = [ ]
store = Converter.convert_items(items)
store_json = RoyalUtils.to_json(store, True)
print(store_json)
def get_item_details(self, item_id):
op = OnePassword(self.op_path, self.account, self.vault)
item_details = None
try:
item_details = op.get_item_details(item_id)
except Exception as e:
RoyalUtils.exit_with_error(self.error_message_get_item_details, e)
store = Converter.convert_item(item_details)
store_json = RoyalUtils.to_json(store, True)
print(store_json)
coordinator = Coordinator(op_path_windows, op_path_macOS, filter_account, filter_vault)
#coordinator.get_items()
coordinator.get_item_details(item_id)
from __future__ import print_function
from functools import partial
from sys import platform as _platform
from subprocess import Popen, PIPE
import sys
import json
import subprocess
import os
import base64
op_path_windows = r"$CustomProperty.OPPathWindows$"
op_path_macOS = r"$CustomProperty.OPPathmacOS$"
filter_account = r"$CustomProperty.Account$"
filter_vault = r"$CustomProperty.Vault$"
#item_id = r"$DynamicCredential.EffectiveID$"
class RoyalUtils:
@staticmethod
def is_macOS():
plat = _platform.lower()
return plat.startswith("darwin")
@staticmethod
def exit_with_error(message, exception=None):
printError = partial(print, file=sys.stderr) # python2 compatibility
exception_message = str(exception) if exception else "N/A"
full_message = message + exception_message
printError(full_message)
sys.exit(1)
@staticmethod
def to_json(obj, pretty=False):
return json.dumps(obj, indent=4) if pretty else json.dumps(obj)
@staticmethod
def decode_to_utf8_string(potential_bytes):
if isinstance(potential_bytes, str):
return potential_bytes
else:
return potential_bytes.decode("utf-8")
class OnePassword:
op_path = ""
account = ""
vault = ""
vaults = []
unknown_error_string = "An unknown error occurred."
def __init__(self, op_path, account="", vault=""):
self.op_path = op_path
self.account = account.strip()
# use an array of vault IDs instead of a single ID
#self.vault = vault.strip()
self.vaults = vault.replace(' ','').split(',')
def get_items(self):
items = []
failed = False
# foreach vault, grab all of the items in it
for vault in self.vaults:
cmd_list_items = [
self.op_path,
"item", "list",
"--format=json",
]
if self.account:
cmd_list_items.append("--account")
cmd_list_items.append(self.account)
if self.vaults:
cmd_list_items.append("--vault")
cmd_list_items.append(vault)
op = subprocess.Popen(cmd_list_items, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(output, err) = op.communicate()
exit_code = op.wait()
success = exit_code == 0
if success:
# not sure why this needs to be done like this but F.I.
# from what I understand, op.exe returns the data in JSON format, then we need to decode it to a UTF8 string for RoyalTS, and then back into JSON (necessary so we can merge multiple vaults later anyway)
items.append(json.loads(RoyalUtils.decode_to_utf8_string(output)))
else:
failed = True
if not failed:
merged_output = []
for output in items:
merged_output.extend(output)
return merged_output
else:
if not err:
err = self.unknown_error_string
else:
err = RoyalUtils.decode_to_utf8_string(err)
raise Exception(err)
def get_item_details(self, item_id):
cmd_get_item = [
self.op_path,
"item", "get", item_id,
"--format=json",
]
if self.account:
cmd_get_item.append("--account")
cmd_get_item.append(self.account)
op = subprocess.Popen(cmd_get_item, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(output, err) = op.communicate()
exit_code = op.wait()
success = exit_code == 0
if success:
output = RoyalUtils.decode_to_utf8_string(output)
item = json.loads(output)
return item
else:
if not err:
err = self.unknown_error_string
else:
err = RoyalUtils.decode_to_utf8_string(err)
raise Exception(err)
class Converter:
@staticmethod
def convert_items(items):
objects = []
for item in items:
id = item.get("id", "")
title = item.get("title", "N/A")
primary_url = ""
if "urls" in item:
for url in item["urls"]:
if url.get("primary", None):
primary_url = url["href"]
vault_name = ""
if item.get("vault", None):
vault_name = item["vault"].get("name", "")
cred_type = "DynamicCredential"
cred = {
"Type": cred_type,
"ID": id,
"Name": title,
"Path": vault_name
}
if primary_url != "":
cred["URL"] = primary_url
objects.append(cred)
objects = sorted(objects, key = lambda i: (i["Path"], i["Name"]))
store = {
"Objects": objects
}
return store
@staticmethod
def convert_item(item_details):
username = None
password = None
fields = item_details.get("fields")
for field in fields:
field_id = field.get("id", None)
field_label = field.get("label", None)
field_value = field.get("value", None)
if field_id == "username" or field_label == "username":
username = field_value
if field_id == "password" or field_label == "password":
password = field_value
cred = { }
if username is not None:
cred["Username"] = username
if password is not None:
cred["Password"] = password
return cred
class Coordinator:
op_path = ""
account = ""
vault = ""
error_message_get_items = "Error while getting items: "
error_message_get_item_details = "Error while getting item details: "
def __init__(self, op_path_windows, op_path_macOS, account, vault):
self.op_path = op_path_macOS if RoyalUtils.is_macOS() else op_path_windows
self.account = account
self.vault = vault
def get_items(self):
op = OnePassword(self.op_path, self.account, self.vault)
items = None
try:
items = op.get_items()
except Exception as e:
RoyalUtils.exit_with_error(self.error_message_get_items, e)
items_details = [ ]
store = Converter.convert_items(items)
store_json = RoyalUtils.to_json(store, True)
print(store_json)
def get_item_details(self, item_id):
op = OnePassword(self.op_path, self.account, self.vault)
item_details = None
try:
item_details = op.get_item_details(item_id)
except Exception as e:
RoyalUtils.exit_with_error(self.error_message_get_item_details, e)
store = Converter.convert_item(item_details)
store_json = RoyalUtils.to_json(store, True)
print(store_json)
coordinator = Coordinator(op_path_windows, op_path_macOS, filter_account, filter_vault)
coordinator.get_items()
# coordinator.get_item_details(item_id)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment