Skip to content

Instantly share code, notes, and snippets.

@kakarukeys
Created July 7, 2022 03:21
Show Gist options
  • Save kakarukeys/07e50177c4919d1b24ab16b623b6aa14 to your computer and use it in GitHub Desktop.
Save kakarukeys/07e50177c4919d1b24ab16b623b6aa14 to your computer and use it in GitHub Desktop.
robust_parallel_bulk.py
import logging
from enum import IntEnum
from datetime import datetime
from typing import List, Iterable, Dict, Any, Tuple, Literal, NamedTuple, Type
import tenacity as tn
from elasticsearch.helpers import parallel_bulk
from app.elasticsearch.client import es_client
from app.db.models import BaseModel
from app.settings import settings
def robust_parallel_bulk(es_ops: List[EsOp], index: str) -> None:
""" perform specified es operations in <es_ops> on <index>
retry on connection errors
"""
for attempt in tn.Retrying(
reraise=True,
stop=tn.stop_after_attempt(3),
wait=tn.wait_fixed(90),
retry=tn.retry_if_exception_type(ConnectionError)
):
with attempt:
parallel_bulk(
es_client.es_client,
es_ops,
thread_count=settings.bulk_indexing_thread_count,
queue_size=settings.bulk_indexing_thread_count,
chunk_size=500,
max_chunk_bytes=104857600,
index=index
)
@kakarukeys
Copy link
Author

kakarukeys commented Jul 7, 2022

example of tests:

from unittest.mock import patch, MagicMock
from workers.contact_indexer import ContactIndexer


def test_process_messages():
    mock_sync_contacts = MagicMock(return_value=iter(["L-2", "L-3"]))
    contact_indexer = ContactIndexer()

    with patch("workers.contact_indexer.sync_contacts", mock_sync_contacts):
        results = list(contact_indexer.process_messages([
            ("lb_contact_will_index", "L-1", {"leadbook_id": "L-1"}),
            ("lb_contact_will_index", "L-2", {"leadbook_id": "L-2"}),
            ("lb_contact_will_index", "L-3", {"leadbook_id": "L-3"}),
        ]))

    mock_sync_contacts.assert_called_once_with(["L-1", "L-2", "L-3"])

    assert results == [
        ("lb_contact_did_index", "L-2", {"leadbook_id": "L-2"}),
        ("lb_contact_did_index", "L-3", {"leadbook_id": "L-3"}),
    ]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment