Skip to content

Instantly share code, notes, and snippets.

@apeckham
Created December 8, 2023 10:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save apeckham/e803cf1ee8f64e470324458e1d152a05 to your computer and use it in GitHub Desktop.
Save apeckham/e803cf1ee8f64e470324458e1d152a05 to your computer and use it in GitHub Desktop.
firestore import in parallel
from google.cloud import firestore
from datetime import datetime
import json
import concurrent.futures
from tqdm import tqdm
import math
client = firestore.Client(database="xxxxx", project="xxxxx")
def process_chunk(chunk, pbar):
batch = client.batch()
for line in chunk:
doc = json.loads(line)
doc_id = doc["key"]
doc_data = doc["fields"]
doc_data["now"] = datetime.utcnow()
batch.set(client.collection("xxxxxx").document(doc_id), doc_data)
batch.commit()
pbar.update(len(chunk))
def read_file_in_chunks(file, chunk_size=500):
chunk = []
for line in file:
chunk.append(line)
if len(chunk) == chunk_size:
yield chunk
chunk = []
if chunk:
yield chunk
def parallel_import(filename, num_threads):
with open(filename, "r") as file:
total_chunks = sum(1 for _ in file)
file.seek(0)
with tqdm(total=total_chunks) as pbar:
with concurrent.futures.ThreadPoolExecutor(
max_workers=num_threads
) as executor:
for chunk in read_file_in_chunks(file):
executor.submit(process_chunk, chunk, pbar)
if __name__ == "__main__":
parallel_import("xxxxx.json", 8)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment