Skip to content

Instantly share code, notes, and snippets.

@helpsystems-mushkevych
Created January 22, 2024 17:28
Show Gist options
  • Save helpsystems-mushkevych/0c21964e5efc0b0c3d97c02adc2afb1f to your computer and use it in GitHub Desktop.
Save helpsystems-mushkevych/0c21964e5efc0b0c3d97c02adc2afb1f to your computer and use it in GitHub Desktop.
information_assistant_delete_file
"""
Command line to remove file from Information Assistant
"""
import argparse
import base64
import traceback
from azure.search.documents._generated.models import IndexingResult
from azure.storage.blob import BlobServiceClient
from azure.search.documents import SearchClient
from azure.core.credentials import AzureKeyCredential
UPLOAD_CONTAINER_NAME = "upload"
CONTENT_CONTAINER_NAME = "content"
ID_FIELD = "id"
def parse_arguments():
"""
Parse command line arguments
Note that extract_env must be run before this script is invoked
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--storage_account_connection_str",
required=True,
help="Storage account connection string (set in extract-env)")
parser.add_argument(
"--search_service_endpoint",
required=True,
help="Azure Search Endpoint")
parser.add_argument(
"--search_index",
required=True,
help="Azure Search Index")
parser.add_argument(
"--search_key",
required=True,
help="Azure Search Key")
parser.add_argument(
"--blob_folder_name",
required=True,
help="Folder *xxx* in Azure Storage Account: /infoasststore${ia_name}/upload/xxx where the ${file_name} has been uploaded")
parser.add_argument(
"--file_names",
required=True,
nargs="+",
help="Space-delimited file names (including extension) to be removed from the Informational Assistant storage/database")
return parser.parse_args()
def encode_document_id(document_id):
""" encode a path/file name to remove unsafe chars for a cosmos db id """
safe_id = base64.urlsafe_b64encode(document_id.encode()).decode()
return safe_id
def to_str_list(index_results: list[IndexingResult]) -> list[str]:
output: list[str] = list()
for i_result in index_results:
output.append(
f"succeeded={i_result.succeeded} "
f"status_code={i_result.status_code} "
f"key={i_result.key} "
f"error_message={i_result.error_message} "
)
return output
def get_keys_to_delete(search_client: SearchClient, blob_folder_name: str, file_name: str) -> list[str]:
q = {
"query_type": "simple",
"search_text": file_name,
"search_fields": ["file_name"],
"select": [ID_FIELD, "file_name", "chunk_file", "file_class", "folder", "pages"],
"filter": f"folder eq '{blob_folder_name}'"
}
# print(f"Query={q}")
results = search_client.search(**q)
return [result[ID_FIELD] for result in results]
def delete_db_embeddings(search_client: SearchClient, blob_folder_name: str, file_name: str) -> None:
keys_to_delete = get_keys_to_delete(search_client, blob_folder_name, file_name)
print(f"#keys_to_delete={len(keys_to_delete)}")
if keys_to_delete:
documents_to_delete = [{ID_FIELD: key} for key in keys_to_delete]
search_client.delete_documents(documents=documents_to_delete)
else:
print(f"No Embedding documents found for deletion for {blob_folder_name}/{file_name}.")
def delete_file(
blob_service_client: BlobServiceClient,
search_service_endpoint: str,
search_index: str,
search_key: str,
blob_folder_name: str,
file_names: list[str]
) -> None:
"""Function deletes given file from the Storage Account and its embeddings from the Databases"""
print(f"Deleting files {blob_folder_name}/{file_names}")
upload_container_client = blob_service_client.get_container_client(UPLOAD_CONTAINER_NAME)
content_container_client = blob_service_client.get_container_client(CONTENT_CONTAINER_NAME)
azure_search_key_credential = AzureKeyCredential(search_key)
search_client = SearchClient(
endpoint=search_service_endpoint,
index_name=search_index,
credential=azure_search_key_credential,
)
for file_name in file_names:
try:
print(f"Deleting blob: {UPLOAD_CONTAINER_NAME}/{blob_folder_name}/{file_name}")
upload_container_client.delete_blob(f"{blob_folder_name}/{file_name}")
except Exception as e:
print(f"Unable to delete blob for {blob_folder_name}/{file_name} due to: {e}.")
# Cleanup "content" blob container
blobs = content_container_client.list_blobs(name_starts_with=f"{blob_folder_name}/{file_name}")
for blob in blobs:
if not content_container_client.get_blob_client(blob.name).exists():
print(f"Blob not found: {blob.name}")
continue
try:
content_container_client.delete_blob(blob.name)
print(f"Deleted blob: {blob.name}")
except Exception as ex:
print(f"Failed to delete blob: {blob.name}. Error: {ex}")
try:
# Cleanup search index
print(f"Removing vector document from {search_index} index: {blob.name} "
f": id : {encode_document_id(blob.name)}")
index_results = search_client.delete_documents(documents=[{ID_FIELD: f"{encode_document_id(blob.name)}"}])
print(to_str_list(index_results))
except Exception as ex:
print(f"Failed to remove vector document from {search_index} index: {blob.name} "
f": id : {encode_document_id(blob.name)}. Error: {ex}")
try:
print(f"Deleting remaining embeddings: {UPLOAD_CONTAINER_NAME}/{blob_folder_name}/{file_name}")
delete_db_embeddings(search_client, blob_folder_name, file_name)
except Exception as e:
print(f"Unable to delete embeddings for {blob_folder_name}/{file_name} due to {e}.")
print(f"Finished deletion of {blob_folder_name}/{file_name}\n")
if __name__ == '__main__':
args = parse_arguments()
try:
storage_blob_service_client = BlobServiceClient.from_connection_string(
args.storage_account_connection_str
)
delete_file(
storage_blob_service_client,
args.search_service_endpoint,
args.search_index,
args.search_key,
args.blob_folder_name,
args.file_names
)
except Exception as e:
traceback.print_exc()
#!/usr/bin/env bash
#set -x
set -e
# Set BASE_PATH as {PROJECT_ROOT}/scripts
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
BASE_PATH=$(realpath "$DIR/..")
echo "BASE_PATH=${BASE_PATH}"
# Get env vars for workspace from Terraform outputs
source "${BASE_PATH}/environments/infrastructure.env"
if [ -z "$ENVIRONMENT_NAME" ]; then
export ENVIRONMENT_NAME="local"
fi
echo "Environment set: $ENVIRONMENT_NAME."
if [ -f "$ENV_DIR/environments/$ENVIRONMENT_NAME.env" ]; then
echo "Loading environment variables for $ENVIRONMENT_NAME."
source "$ENV_DIR/environments/$ENVIRONMENT_NAME.env"
fi
# activate Virtual Environment
source "${BASE_PATH}/../.venv/bin/activate"
# delete file
python delete_document.py \
--storage_account_connection_str "${BLOB_CONNECTION_STRING}" \
--search_service_endpoint "${AZURE_SEARCH_SERVICE_ENDPOINT}" \
--search_index "${AZURE_SEARCH_INDEX}" \
--search_key "${AZURE_SEARCH_SERVICE_KEY}" \
--blob_folder_name "${1}" \
--file_names "${@:2}"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment