Created
November 16, 2023 16:20
-
-
Save Arrayly/3e883a98d73394001e2562dc6d9c0aae to your computer and use it in GitHub Desktop.
ElasticSearch Index Export
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
from elasticsearch import Elasticsearch | |
import json | |
import os | |
import logging | |
import time | |
import asyncio | |
import aiofiles | |
from concurrent.futures import ThreadPoolExecutor | |
# Logging: | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
es = Elasticsearch(cloud_id=os.environ.get('CLOUD_ID'), api_key=os.environ.get('API_KEY')) | |
# User inputs | |
index = "logs-*" | |
file_prefix = 'logs' | |
query = {} | |
# Initialize scroll parameters | |
scroll = '15s' | |
size = 10000 | |
max_workers = 16 # Adjust based on your CPU cores and system capacity | |
batch_size = 250 # Adjust based on your memory capacity and document size | |
total_docs = es.count(index=index, body={"query": query})['count'] | |
logging.info(f"Total documents to retrieve: {total_docs}") | |
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") | |
script_dir = os.path.dirname(__file__) | |
output_folder = os.path.join(script_dir, 'output') | |
if not os.path.exists(output_folder): | |
os.makedirs(output_folder) | |
filename = f"{file_prefix}_{timestamp}.json" | |
full_path = os.path.join(output_folder, filename) | |
scroll_data = es.search(index=index, body={"query": query}, scroll=scroll, size=size) | |
sid = scroll_data['_scroll_id'] | |
start_time = time.time() | |
processed_docs = 0 | |
async def process_batch(hits): | |
global processed_docs | |
async with aiofiles.open(full_path, 'a', encoding='utf-8') as file: | |
for hit in hits: | |
await file.write(json.dumps(hit['_source']) + '\n') | |
processed_docs += 1 | |
elapsed_time = time.time() - start_time | |
remaining = total_docs - processed_docs | |
percent_complete = (processed_docs / total_docs) * 100 | |
logging.info(f"Processed {processed_docs}/{total_docs} documents; {remaining} remaining; {percent_complete:.2f}% complete; Time elapsed: {elapsed_time:.2f} seconds.") | |
def process_hits_batch(hits): | |
asyncio.run(process_batch(hits)) | |
with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
while True: | |
hits = scroll_data['hits']['hits'] | |
if not hits: | |
break | |
for i in range(0, len(hits), batch_size): | |
batch = hits[i:i + batch_size] | |
executor.submit(process_hits_batch, batch) | |
scroll_data = es.scroll(scroll_id=sid, scroll=scroll) | |
logging.info(f"All documents successfully retrieved and written to {full_path}") | |
es.clear_scroll(scroll_id=sid) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment