Skip to content

Instantly share code, notes, and snippets.

@dyens
Last active August 25, 2020 15:34
Show Gist options
  • Save dyens/488a9c0e2f150865e1658e393121bc52 to your computer and use it in GitHub Desktop.
Save dyens/488a9c0e2f150865e1658e393121bc52 to your computer and use it in GitHub Desktop.
Example of mutple queries execution using ambra-sdk
"""
This is example of executing big number of queries using SDK.
Suppose we have big number of studies.
For each study we need execute Study.set query.
"""
from ambra_sdk.api import Api
api = Api.with_creds(
url,
username,
password,
)
# In future we use sid directly.
sid = api.sid
# Get all studies and save it to list.
# We can use only method if we use only study uuid
from ambra_sdk.models import Study
all_studies = list(api.Study.list().only(Study.uuid).all())
# In service api we have bundle call.
# Using this method we can execute some number of queries
# in one call.
#
# But python sdk does not support this method (anyway for now).
# So, lets define batch function.
def batch(queries, sid):
"""Execute batch of queries in one bundle call.
:param queries: list of queries
:param sid: sid
:return: bundle response
"""
print('Start batch', len(queries))
batch_json = []
for query in queries:
# In new sdk versions (it is not released yet)
# you need to use query.url method
url = query._url
# In new sdk version use query.request_data
query_request_data = query._request_data
request = {
"URL": url,
"sid": sid,
}
request.update(query_request_data)
request = {i: j for i, j in request.items() if j is not None}
batch_json.append(request)
return api.service_post(
url='/bundle',
required_sid=False,
json=batch_json,
)
# Now we can execute some queries in one batch.
# This is cool, but if we have a lot of queries
# better devide its on a pieces.
# Lets write supportying chunks functions.
# Its get list and split it on chunks.
def chunks(lst, n):
"""Yield successive n-sized chunks from lst.
:param lst: list of data
:param n: size of chunk:
:yields: splitted lists
"""
for i in range(0, len(lst), n):
yield lst[i:i + n]
# In the second step lets prepare our queries.
# In this example we want to set patientid=123 for all studies.
def query_for_study(study):
"""Prepare query for study.
:param study: study obj
:return: query
"""
return api.Study.set(study_id=study.uuid, patientid=123)
queries = [query_for_study(study) for study in all_studies]
# Then split all queries on chunks
# Need experiment with chunk size.
# On the one side the larger chunk size the better.
# On the other side as the value increases,
# server may stop responding.
# I would start with 500 or 1000...
chunk_size = 3
chunk_queries = chunks(queries, chunk_size)
# Now we can execute all our queries:
#
# for chunk in chunk_queries:
# result = batch(chunk, sid)
#
# Lets do same thing using threads:
from concurrent.futures import ThreadPoolExecutor
from functools import partial
# This parameter should also be selected experimentally.
# I think 3-10 is ok..
max_workers = 5
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = executor.map(
partial(batch, sid=sid),
chunk_queries,
)
results = [r for r in results]
# Then check, that all queries is success.
for result in results:
assert result.status_code == 200
for query in result.json():
assert query['status'] == 'OK'
# In this example we assume that we can get all studies in memory.
# If we can not do this, we can use some queue. Get some number of study, put it in a queue.
# Then another thread get jobs from this queue and start bundle query.
# Let me know if you want this approach.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment