Created
July 26, 2023 00:40
-
-
Save jvwam/bd7ebeec596507cb1149a3561c8d43ff to your computer and use it in GitHub Desktop.
Royal TS Dynamic Folder 1Password multiple vaules
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
from functools import partial | |
from sys import platform as _platform | |
from subprocess import Popen, PIPE | |
import sys | |
import json | |
import subprocess | |
import os | |
import base64 | |
op_path_windows = r"$CustomProperty.OPPathWindows$" | |
op_path_macOS = r"$CustomProperty.OPPathmacOS$" | |
filter_account = r"$CustomProperty.Account$" | |
filter_vault = r"$CustomProperty.Vault$" | |
item_id = r"$DynamicCredential.EffectiveID$" | |
class RoyalUtils: | |
@staticmethod | |
def is_macOS(): | |
plat = _platform.lower() | |
return plat.startswith("darwin") | |
@staticmethod | |
def exit_with_error(message, exception=None): | |
printError = partial(print, file=sys.stderr) # python2 compatibility | |
exception_message = str(exception) if exception else "N/A" | |
full_message = message + exception_message | |
printError(full_message) | |
sys.exit(1) | |
@staticmethod | |
def to_json(obj, pretty=False): | |
return json.dumps(obj, indent=4) if pretty else json.dumps(obj) | |
@staticmethod | |
def decode_to_utf8_string(potential_bytes): | |
if isinstance(potential_bytes, str): | |
return potential_bytes | |
else: | |
return potential_bytes.decode("utf-8") | |
class OnePassword: | |
op_path = "" | |
account = "" | |
vault = "" | |
vaults = [] | |
unknown_error_string = "An unknown error occurred." | |
def __init__(self, op_path, account="", vault=""): | |
self.op_path = op_path | |
self.account = account.strip() | |
# use an array of vault IDs instead of a single ID | |
#self.vault = vault.strip() | |
self.vaults = vault.replace(' ','').split(',') | |
def get_items(self): | |
items = [] | |
failed = False | |
# foreach vault, grab all of the items in it | |
for vault in self.vaults: | |
cmd_list_items = [ | |
self.op_path, | |
"item", "list", | |
"--format=json", | |
] | |
if self.account: | |
cmd_list_items.append("--account") | |
cmd_list_items.append(self.account) | |
if self.vaults: | |
cmd_list_items.append("--vault") | |
cmd_list_items.append(vault) | |
op = subprocess.Popen(cmd_list_items, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
(output, err) = op.communicate() | |
exit_code = op.wait() | |
success = exit_code == 0 | |
if success: | |
# not sure why this needs to be done like this but F.I. | |
# from what I understand, op.exe returns the data in JSON format, then we need to decode it to a UTF8 string for RoyalTS, and then back into JSON (necessary so we can merge multiple vaults later anyway) | |
items.append(json.loads(RoyalUtils.decode_to_utf8_string(output))) | |
else: | |
failed = True | |
if not failed: | |
merged_output = [] | |
for output in items: | |
merged_output.extend(output) | |
return merged_output | |
else: | |
if not err: | |
err = self.unknown_error_string | |
else: | |
err = RoyalUtils.decode_to_utf8_string(err) | |
raise Exception(err) | |
def get_item_details(self, item_id): | |
cmd_get_item = [ | |
self.op_path, | |
"item", "get", item_id, | |
"--format=json", | |
] | |
if self.account: | |
cmd_get_item.append("--account") | |
cmd_get_item.append(self.account) | |
op = subprocess.Popen(cmd_get_item, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
(output, err) = op.communicate() | |
exit_code = op.wait() | |
success = exit_code == 0 | |
if success: | |
output = RoyalUtils.decode_to_utf8_string(output) | |
item = json.loads(output) | |
return item | |
else: | |
if not err: | |
err = self.unknown_error_string | |
else: | |
err = RoyalUtils.decode_to_utf8_string(err) | |
raise Exception(err) | |
class Converter: | |
@staticmethod | |
def convert_items(items): | |
objects = [] | |
for item in items: | |
id = item.get("id", "") | |
title = item.get("title", "N/A") | |
primary_url = "" | |
if "urls" in item: | |
for url in item["urls"]: | |
if url.get("primary", None): | |
primary_url = url["href"] | |
vault_name = "" | |
if item.get("vault", None): | |
vault_name = item["vault"].get("name", "") | |
cred_type = "DynamicCredential" | |
cred = { | |
"Type": cred_type, | |
"ID": id, | |
"Name": title, | |
"Path": vault_name | |
} | |
if primary_url != "": | |
cred["URL"] = primary_url | |
objects.append(cred) | |
objects = sorted(objects, key = lambda i: (i["Path"], i["Name"])) | |
store = { | |
"Objects": objects | |
} | |
return store | |
@staticmethod | |
def convert_item(item_details): | |
username = None | |
password = None | |
fields = item_details.get("fields") | |
for field in fields: | |
field_id = field.get("id", None) | |
field_label = field.get("label", None) | |
field_value = field.get("value", None) | |
if field_id == "username" or field_label == "username": | |
username = field_value | |
if field_id == "password" or field_label == "password": | |
password = field_value | |
cred = { } | |
if username is not None: | |
cred["Username"] = username | |
if password is not None: | |
cred["Password"] = password | |
return cred | |
class Coordinator: | |
op_path = "" | |
account = "" | |
vault = "" | |
error_message_get_items = "Error while getting items: " | |
error_message_get_item_details = "Error while getting item details: " | |
def __init__(self, op_path_windows, op_path_macOS, account, vault): | |
self.op_path = op_path_macOS if RoyalUtils.is_macOS() else op_path_windows | |
self.account = account | |
self.vault = vault | |
def get_items(self): | |
op = OnePassword(self.op_path, self.account, self.vault) | |
items = None | |
try: | |
items = op.get_items() | |
except Exception as e: | |
RoyalUtils.exit_with_error(self.error_message_get_items, e) | |
items_details = [ ] | |
store = Converter.convert_items(items) | |
store_json = RoyalUtils.to_json(store, True) | |
print(store_json) | |
def get_item_details(self, item_id): | |
op = OnePassword(self.op_path, self.account, self.vault) | |
item_details = None | |
try: | |
item_details = op.get_item_details(item_id) | |
except Exception as e: | |
RoyalUtils.exit_with_error(self.error_message_get_item_details, e) | |
store = Converter.convert_item(item_details) | |
store_json = RoyalUtils.to_json(store, True) | |
print(store_json) | |
coordinator = Coordinator(op_path_windows, op_path_macOS, filter_account, filter_vault) | |
#coordinator.get_items() | |
coordinator.get_item_details(item_id) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
from functools import partial | |
from sys import platform as _platform | |
from subprocess import Popen, PIPE | |
import sys | |
import json | |
import subprocess | |
import os | |
import base64 | |
op_path_windows = r"$CustomProperty.OPPathWindows$" | |
op_path_macOS = r"$CustomProperty.OPPathmacOS$" | |
filter_account = r"$CustomProperty.Account$" | |
filter_vault = r"$CustomProperty.Vault$" | |
#item_id = r"$DynamicCredential.EffectiveID$" | |
class RoyalUtils: | |
@staticmethod | |
def is_macOS(): | |
plat = _platform.lower() | |
return plat.startswith("darwin") | |
@staticmethod | |
def exit_with_error(message, exception=None): | |
printError = partial(print, file=sys.stderr) # python2 compatibility | |
exception_message = str(exception) if exception else "N/A" | |
full_message = message + exception_message | |
printError(full_message) | |
sys.exit(1) | |
@staticmethod | |
def to_json(obj, pretty=False): | |
return json.dumps(obj, indent=4) if pretty else json.dumps(obj) | |
@staticmethod | |
def decode_to_utf8_string(potential_bytes): | |
if isinstance(potential_bytes, str): | |
return potential_bytes | |
else: | |
return potential_bytes.decode("utf-8") | |
class OnePassword: | |
op_path = "" | |
account = "" | |
vault = "" | |
vaults = [] | |
unknown_error_string = "An unknown error occurred." | |
def __init__(self, op_path, account="", vault=""): | |
self.op_path = op_path | |
self.account = account.strip() | |
# use an array of vault IDs instead of a single ID | |
#self.vault = vault.strip() | |
self.vaults = vault.replace(' ','').split(',') | |
def get_items(self): | |
items = [] | |
failed = False | |
# foreach vault, grab all of the items in it | |
for vault in self.vaults: | |
cmd_list_items = [ | |
self.op_path, | |
"item", "list", | |
"--format=json", | |
] | |
if self.account: | |
cmd_list_items.append("--account") | |
cmd_list_items.append(self.account) | |
if self.vaults: | |
cmd_list_items.append("--vault") | |
cmd_list_items.append(vault) | |
op = subprocess.Popen(cmd_list_items, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
(output, err) = op.communicate() | |
exit_code = op.wait() | |
success = exit_code == 0 | |
if success: | |
# not sure why this needs to be done like this but F.I. | |
# from what I understand, op.exe returns the data in JSON format, then we need to decode it to a UTF8 string for RoyalTS, and then back into JSON (necessary so we can merge multiple vaults later anyway) | |
items.append(json.loads(RoyalUtils.decode_to_utf8_string(output))) | |
else: | |
failed = True | |
if not failed: | |
merged_output = [] | |
for output in items: | |
merged_output.extend(output) | |
return merged_output | |
else: | |
if not err: | |
err = self.unknown_error_string | |
else: | |
err = RoyalUtils.decode_to_utf8_string(err) | |
raise Exception(err) | |
def get_item_details(self, item_id): | |
cmd_get_item = [ | |
self.op_path, | |
"item", "get", item_id, | |
"--format=json", | |
] | |
if self.account: | |
cmd_get_item.append("--account") | |
cmd_get_item.append(self.account) | |
op = subprocess.Popen(cmd_get_item, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
(output, err) = op.communicate() | |
exit_code = op.wait() | |
success = exit_code == 0 | |
if success: | |
output = RoyalUtils.decode_to_utf8_string(output) | |
item = json.loads(output) | |
return item | |
else: | |
if not err: | |
err = self.unknown_error_string | |
else: | |
err = RoyalUtils.decode_to_utf8_string(err) | |
raise Exception(err) | |
class Converter: | |
@staticmethod | |
def convert_items(items): | |
objects = [] | |
for item in items: | |
id = item.get("id", "") | |
title = item.get("title", "N/A") | |
primary_url = "" | |
if "urls" in item: | |
for url in item["urls"]: | |
if url.get("primary", None): | |
primary_url = url["href"] | |
vault_name = "" | |
if item.get("vault", None): | |
vault_name = item["vault"].get("name", "") | |
cred_type = "DynamicCredential" | |
cred = { | |
"Type": cred_type, | |
"ID": id, | |
"Name": title, | |
"Path": vault_name | |
} | |
if primary_url != "": | |
cred["URL"] = primary_url | |
objects.append(cred) | |
objects = sorted(objects, key = lambda i: (i["Path"], i["Name"])) | |
store = { | |
"Objects": objects | |
} | |
return store | |
@staticmethod | |
def convert_item(item_details): | |
username = None | |
password = None | |
fields = item_details.get("fields") | |
for field in fields: | |
field_id = field.get("id", None) | |
field_label = field.get("label", None) | |
field_value = field.get("value", None) | |
if field_id == "username" or field_label == "username": | |
username = field_value | |
if field_id == "password" or field_label == "password": | |
password = field_value | |
cred = { } | |
if username is not None: | |
cred["Username"] = username | |
if password is not None: | |
cred["Password"] = password | |
return cred | |
class Coordinator: | |
op_path = "" | |
account = "" | |
vault = "" | |
error_message_get_items = "Error while getting items: " | |
error_message_get_item_details = "Error while getting item details: " | |
def __init__(self, op_path_windows, op_path_macOS, account, vault): | |
self.op_path = op_path_macOS if RoyalUtils.is_macOS() else op_path_windows | |
self.account = account | |
self.vault = vault | |
def get_items(self): | |
op = OnePassword(self.op_path, self.account, self.vault) | |
items = None | |
try: | |
items = op.get_items() | |
except Exception as e: | |
RoyalUtils.exit_with_error(self.error_message_get_items, e) | |
items_details = [ ] | |
store = Converter.convert_items(items) | |
store_json = RoyalUtils.to_json(store, True) | |
print(store_json) | |
def get_item_details(self, item_id): | |
op = OnePassword(self.op_path, self.account, self.vault) | |
item_details = None | |
try: | |
item_details = op.get_item_details(item_id) | |
except Exception as e: | |
RoyalUtils.exit_with_error(self.error_message_get_item_details, e) | |
store = Converter.convert_item(item_details) | |
store_json = RoyalUtils.to_json(store, True) | |
print(store_json) | |
coordinator = Coordinator(op_path_windows, op_path_macOS, filter_account, filter_vault) | |
coordinator.get_items() | |
# coordinator.get_item_details(item_id) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment