Skip to content

Instantly share code, notes, and snippets.

@clement-tourriere
Created August 28, 2015 15:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save clement-tourriere/82b27c25d5876ae53eef to your computer and use it in GitHub Desktop.
Save clement-tourriere/82b27c25d5876ae53eef to your computer and use it in GitHub Desktop.
Geo Indexing tests
from __future__ import absolute_import
from celery import Celery
# This file must be put in a subdirectory proj
app = Celery('tasks', broker='redis://localhost', backend='redis://localhost', include=['proj.tasks'])
app.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'], # Ignore other content
CELERY_RESULT_SERIALIZER='json',
CELERY_TIMEZONE='Europe/Paris',
CELERY_ENABLE_UTC=True,
)
# app.control.purge()
### You can start celery worker in the directory above proj with this command :
### celery -A proj worker --loglevel=info
### You can change concurrency with -c option
### You need to have a running redis with this configuration (broker can be change)
if __name__ == '__main__':
app.start()
from __future__ import absolute_import
import json
from elasticsearch import helpers
from elasticsearch.client import Elasticsearch
from elasticsearch.exceptions import ConnectionError
from elasticsearch.helpers import bulk
import requests
from proj.celery import app
from celery import Celery
# This file must be put in a subdirectory proj
@app.task
def bulk_task(data):
try:
es = Elasticsearch()
bulk(es, data)
except ConnectionError as ex:
print "pouet"
except Exception as e:
raise e
es_client = Elasticsearch()
@app.task
def scroll_task(nb_query, index, query):
with open("res_{}.json".format(nb_query), "w") as f:
for res in helpers.scan(es_client, query, index=index, size="100"):
f.write(json.dumps(res))
f.write('\n')
import random
from elasticsearch.client import Elasticsearch
import time
import sys
from proj.tasks import bulk_task, scroll_task
es_client = Elasticsearch()
test_index = "test_indice_external"
test_type = "test_type"
if es_client.indices.exists(test_index):
es_client.indices.delete(test_index)
if not es_client.indices.exists(test_index):
es_client.indices.create(test_index, {
"index": {
"refresh_interval": -1,
"number_of_shards": 5,
"number_of_replicas": 0
}
})
es_client.indices.put_mapping(index=test_index, doc_type=test_type, body={
test_type: {"properties": {
"toto": {
"type": "test_external"
}
}}
})
def generate_random_doc():
return {"toto": random.randint(-1000, 100000)}
def docs_generator(nb_docs):
for x in xrange(nb_docs):
doc = generate_random_doc()
yield {
"_index": test_index,
"_type": test_type,
"_source": doc
}
# res, errors = bulk(es_client, docs_generator(10000000))
tasks = []
t0 = time.time()
buf = []
buf_size = 0
for i, doc in enumerate(docs_generator(10000)):
buf.append(doc)
buf_size += sys.getsizeof(doc, 0)
if buf_size / (1000 * 1000) >= 0.5:
tasks.append(bulk_task.delay(buf))
buf_size = 0
buf = []
# bulk_task.delay(buf)
tasks.append(bulk_task.delay(buf))
print "Waiting tasks"
res = [t.get() for t in tasks]
print "All tasks done"
es_client.indices.refresh(index=test_index)
scroll_tasks = []
scroll_query = {
"script_fields": {
"lat": {
"script": "doc['toto.point'].lat"
},
"lon": {
"script": "doc['toto.point'].lon"
}
},
"_source": True
}
for nb_req in xrange(10):
scroll_tasks.append(scroll_task.delay(nb_req, test_index, scroll_query))
print "Waiting scrolling tasks"
res2 = [t.get() for t in scroll_tasks]
print "All scrolling tasks done"
import random
from elasticsearch.client import Elasticsearch
import time
import sys
from proj.tasks import bulk_task, scroll_task
es_client = Elasticsearch()
test_index = "test_indice_point"
test_type = "test_type"
if es_client.indices.exists(test_index):
es_client.indices.delete(test_index)
if not es_client.indices.exists(test_index):
es_client.indices.create(test_index, {
"index": {
"refresh_interval": -1,
"number_of_shards": 5,
"number_of_replicas": 0
}
})
es_client.indices.put_mapping(index=test_index, doc_type=test_type, body={
test_type: {"properties": {
"toto": {
"type": "geo_point",
"doc_value": True,
# "geohash_prefix": True,
# "lat_lon": True
}
}
}
})
def generate_random_doc():
return {"toto": {"lon": random.randint(-180, 180), "lat": random.randint(-79, 79)}}
def docs_generator(nb_docs):
for x in xrange(nb_docs):
doc = generate_random_doc()
yield {
"_index": test_index,
"_type": test_type,
"_source": doc
}
tasks = []
t0 = time.time()
buf = []
buf_size = 0
for i, doc in enumerate(docs_generator(100000)):
buf.append(doc)
buf_size += sys.getsizeof(doc, 0)
if buf_size / (1000 * 1000) >= 0.5:
tasks.append(bulk_task.delay(buf))
buf_size = 0
buf = []
tasks.append(bulk_task.delay(buf))
print "Waiting tasks"
res = [t.get() for t in tasks]
print "All tasks done"
es_client.indices.refresh(index=test_index)
scroll_tasks = []
scroll_query = {
"script_fields": {
"lat": {
"script": "doc['toto'].lat"
},
"lon": {
"script": "doc['toto'].lon"
}
},
"_source": True
}
for nb_req in xrange(10):
scroll_tasks.append(scroll_task.delay(nb_req, test_index, scroll_query))
print "Waiting scrolling tasks"
res2 = [t.get() for t in scroll_tasks]
print "All scrolling tasks done"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment