-
-
Save helpsystems-mushkevych/0c21964e5efc0b0c3d97c02adc2afb1f to your computer and use it in GitHub Desktop.
information_assistant_delete_file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Command line to remove file from Information Assistant | |
""" | |
import argparse | |
import base64 | |
import traceback | |
from azure.search.documents._generated.models import IndexingResult | |
from azure.storage.blob import BlobServiceClient | |
from azure.search.documents import SearchClient | |
from azure.core.credentials import AzureKeyCredential | |
UPLOAD_CONTAINER_NAME = "upload" | |
CONTENT_CONTAINER_NAME = "content" | |
ID_FIELD = "id" | |
def parse_arguments(): | |
""" | |
Parse command line arguments | |
Note that extract_env must be run before this script is invoked | |
""" | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"--storage_account_connection_str", | |
required=True, | |
help="Storage account connection string (set in extract-env)") | |
parser.add_argument( | |
"--search_service_endpoint", | |
required=True, | |
help="Azure Search Endpoint") | |
parser.add_argument( | |
"--search_index", | |
required=True, | |
help="Azure Search Index") | |
parser.add_argument( | |
"--search_key", | |
required=True, | |
help="Azure Search Key") | |
parser.add_argument( | |
"--blob_folder_name", | |
required=True, | |
help="Folder *xxx* in Azure Storage Account: /infoasststore${ia_name}/upload/xxx where the ${file_name} has been uploaded") | |
parser.add_argument( | |
"--file_names", | |
required=True, | |
nargs="+", | |
help="Space-delimited file names (including extension) to be removed from the Informational Assistant storage/database") | |
return parser.parse_args() | |
def encode_document_id(document_id): | |
""" encode a path/file name to remove unsafe chars for a cosmos db id """ | |
safe_id = base64.urlsafe_b64encode(document_id.encode()).decode() | |
return safe_id | |
def to_str_list(index_results: list[IndexingResult]) -> list[str]: | |
output: list[str] = list() | |
for i_result in index_results: | |
output.append( | |
f"succeeded={i_result.succeeded} " | |
f"status_code={i_result.status_code} " | |
f"key={i_result.key} " | |
f"error_message={i_result.error_message} " | |
) | |
return output | |
def get_keys_to_delete(search_client: SearchClient, blob_folder_name: str, file_name: str) -> list[str]: | |
q = { | |
"query_type": "simple", | |
"search_text": file_name, | |
"search_fields": ["file_name"], | |
"select": [ID_FIELD, "file_name", "chunk_file", "file_class", "folder", "pages"], | |
"filter": f"folder eq '{blob_folder_name}'" | |
} | |
# print(f"Query={q}") | |
results = search_client.search(**q) | |
return [result[ID_FIELD] for result in results] | |
def delete_db_embeddings(search_client: SearchClient, blob_folder_name: str, file_name: str) -> None: | |
keys_to_delete = get_keys_to_delete(search_client, blob_folder_name, file_name) | |
print(f"#keys_to_delete={len(keys_to_delete)}") | |
if keys_to_delete: | |
documents_to_delete = [{ID_FIELD: key} for key in keys_to_delete] | |
search_client.delete_documents(documents=documents_to_delete) | |
else: | |
print(f"No Embedding documents found for deletion for {blob_folder_name}/{file_name}.") | |
def delete_file( | |
blob_service_client: BlobServiceClient, | |
search_service_endpoint: str, | |
search_index: str, | |
search_key: str, | |
blob_folder_name: str, | |
file_names: list[str] | |
) -> None: | |
"""Function deletes given file from the Storage Account and its embeddings from the Databases""" | |
print(f"Deleting files {blob_folder_name}/{file_names}") | |
upload_container_client = blob_service_client.get_container_client(UPLOAD_CONTAINER_NAME) | |
content_container_client = blob_service_client.get_container_client(CONTENT_CONTAINER_NAME) | |
azure_search_key_credential = AzureKeyCredential(search_key) | |
search_client = SearchClient( | |
endpoint=search_service_endpoint, | |
index_name=search_index, | |
credential=azure_search_key_credential, | |
) | |
for file_name in file_names: | |
try: | |
print(f"Deleting blob: {UPLOAD_CONTAINER_NAME}/{blob_folder_name}/{file_name}") | |
upload_container_client.delete_blob(f"{blob_folder_name}/{file_name}") | |
except Exception as e: | |
print(f"Unable to delete blob for {blob_folder_name}/{file_name} due to: {e}.") | |
# Cleanup "content" blob container | |
blobs = content_container_client.list_blobs(name_starts_with=f"{blob_folder_name}/{file_name}") | |
for blob in blobs: | |
if not content_container_client.get_blob_client(blob.name).exists(): | |
print(f"Blob not found: {blob.name}") | |
continue | |
try: | |
content_container_client.delete_blob(blob.name) | |
print(f"Deleted blob: {blob.name}") | |
except Exception as ex: | |
print(f"Failed to delete blob: {blob.name}. Error: {ex}") | |
try: | |
# Cleanup search index | |
print(f"Removing vector document from {search_index} index: {blob.name} " | |
f": id : {encode_document_id(blob.name)}") | |
index_results = search_client.delete_documents(documents=[{ID_FIELD: f"{encode_document_id(blob.name)}"}]) | |
print(to_str_list(index_results)) | |
except Exception as ex: | |
print(f"Failed to remove vector document from {search_index} index: {blob.name} " | |
f": id : {encode_document_id(blob.name)}. Error: {ex}") | |
try: | |
print(f"Deleting remaining embeddings: {UPLOAD_CONTAINER_NAME}/{blob_folder_name}/{file_name}") | |
delete_db_embeddings(search_client, blob_folder_name, file_name) | |
except Exception as e: | |
print(f"Unable to delete embeddings for {blob_folder_name}/{file_name} due to {e}.") | |
print(f"Finished deletion of {blob_folder_name}/{file_name}\n") | |
if __name__ == '__main__': | |
args = parse_arguments() | |
try: | |
storage_blob_service_client = BlobServiceClient.from_connection_string( | |
args.storage_account_connection_str | |
) | |
delete_file( | |
storage_blob_service_client, | |
args.search_service_endpoint, | |
args.search_index, | |
args.search_key, | |
args.blob_folder_name, | |
args.file_names | |
) | |
except Exception as e: | |
traceback.print_exc() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
#set -x | |
set -e | |
# Set BASE_PATH as {PROJECT_ROOT}/scripts | |
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" | |
BASE_PATH=$(realpath "$DIR/..") | |
echo "BASE_PATH=${BASE_PATH}" | |
# Get env vars for workspace from Terraform outputs | |
source "${BASE_PATH}/environments/infrastructure.env" | |
if [ -z "$ENVIRONMENT_NAME" ]; then | |
export ENVIRONMENT_NAME="local" | |
fi | |
echo "Environment set: $ENVIRONMENT_NAME." | |
if [ -f "$ENV_DIR/environments/$ENVIRONMENT_NAME.env" ]; then | |
echo "Loading environment variables for $ENVIRONMENT_NAME." | |
source "$ENV_DIR/environments/$ENVIRONMENT_NAME.env" | |
fi | |
# activate Virtual Environment | |
source "${BASE_PATH}/../.venv/bin/activate" | |
# delete file | |
python delete_document.py \ | |
--storage_account_connection_str "${BLOB_CONNECTION_STRING}" \ | |
--search_service_endpoint "${AZURE_SEARCH_SERVICE_ENDPOINT}" \ | |
--search_index "${AZURE_SEARCH_INDEX}" \ | |
--search_key "${AZURE_SEARCH_SERVICE_KEY}" \ | |
--blob_folder_name "${1}" \ | |
--file_names "${@:2}" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment