Skip to content

Instantly share code, notes, and snippets.

@diogommartins
Last active November 4, 2018 20:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save diogommartins/b45d9cda932813a17834ba0516cb149c to your computer and use it in GitHub Desktop.
Save diogommartins/b45d9cda932813a17834ba0516cb149c to your computer and use it in GitHub Desktop.
from typing import List, Dict, Iterable, Generator
from aioelasticsearch import Elasticsearch
from asyncworker import App, Options
from asyncworker.rabbitmq import RabbitMQMessage
app = App(host="localhost", user="guest", password="guest", prefetch_count=512)
es = Elasticsearch()
def gen_bulk(docs: Iterable[Dict]) -> Generator[Dict, None, None]:
for doc in docs:
yield {"index": {"_index": 'words', "_type": "doc"}}
yield doc
@app.route(["words_to_index"], vhost="/", options={Options.BULK_SIZE: 256})
async def drain_handler(messages: List[RabbitMQMessage]):
await es.bulk(body=gen_bulk((msg.body for msg in messages)))
app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment