Skip to content

Instantly share code, notes, and snippets.

@Arrayly
Created November 16, 2023 16:20
Show Gist options
  • Save Arrayly/3e883a98d73394001e2562dc6d9c0aae to your computer and use it in GitHub Desktop.
Save Arrayly/3e883a98d73394001e2562dc6d9c0aae to your computer and use it in GitHub Desktop.
ElasticSearch Index Export
from datetime import datetime
from elasticsearch import Elasticsearch
import json
import os
import logging
import time
import asyncio
import aiofiles
from concurrent.futures import ThreadPoolExecutor
# Logging:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
es = Elasticsearch(cloud_id=os.environ.get('CLOUD_ID'), api_key=os.environ.get('API_KEY'))
# User inputs
index = "logs-*"
file_prefix = 'logs'
query = {}
# Initialize scroll parameters
scroll = '15s'
size = 10000
max_workers = 16 # Adjust based on your CPU cores and system capacity
batch_size = 250 # Adjust based on your memory capacity and document size
total_docs = es.count(index=index, body={"query": query})['count']
logging.info(f"Total documents to retrieve: {total_docs}")
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
script_dir = os.path.dirname(__file__)
output_folder = os.path.join(script_dir, 'output')
if not os.path.exists(output_folder):
os.makedirs(output_folder)
filename = f"{file_prefix}_{timestamp}.json"
full_path = os.path.join(output_folder, filename)
scroll_data = es.search(index=index, body={"query": query}, scroll=scroll, size=size)
sid = scroll_data['_scroll_id']
start_time = time.time()
processed_docs = 0
async def process_batch(hits):
global processed_docs
async with aiofiles.open(full_path, 'a', encoding='utf-8') as file:
for hit in hits:
await file.write(json.dumps(hit['_source']) + '\n')
processed_docs += 1
elapsed_time = time.time() - start_time
remaining = total_docs - processed_docs
percent_complete = (processed_docs / total_docs) * 100
logging.info(f"Processed {processed_docs}/{total_docs} documents; {remaining} remaining; {percent_complete:.2f}% complete; Time elapsed: {elapsed_time:.2f} seconds.")
def process_hits_batch(hits):
asyncio.run(process_batch(hits))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
while True:
hits = scroll_data['hits']['hits']
if not hits:
break
for i in range(0, len(hits), batch_size):
batch = hits[i:i + batch_size]
executor.submit(process_hits_batch, batch)
scroll_data = es.scroll(scroll_id=sid, scroll=scroll)
logging.info(f"All documents successfully retrieved and written to {full_path}")
es.clear_scroll(scroll_id=sid)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment