Skip to content

Instantly share code, notes, and snippets.

Last active August 25, 2020 15:34
Show Gist options
  • Save dyens/488a9c0e2f150865e1658e393121bc52 to your computer and use it in GitHub Desktop.
Save dyens/488a9c0e2f150865e1658e393121bc52 to your computer and use it in GitHub Desktop.
Example of mutple queries execution using ambra-sdk
This is example of executing big number of queries using SDK.
Suppose we have big number of studies.
For each study we need execute Study.set query.
from ambra_sdk.api import Api
api = Api.with_creds(
# In future we use sid directly.
sid = api.sid
# Get all studies and save it to list.
# We can use only method if we use only study uuid
from ambra_sdk.models import Study
all_studies = list(api.Study.list().only(Study.uuid).all())
# In service api we have bundle call.
# Using this method we can execute some number of queries
# in one call.
# But python sdk does not support this method (anyway for now).
# So, lets define batch function.
def batch(queries, sid):
"""Execute batch of queries in one bundle call.
:param queries: list of queries
:param sid: sid
:return: bundle response
print('Start batch', len(queries))
batch_json = []
for query in queries:
# In new sdk versions (it is not released yet)
# you need to use query.url method
url = query._url
# In new sdk version use query.request_data
query_request_data = query._request_data
request = {
"URL": url,
"sid": sid,
request = {i: j for i, j in request.items() if j is not None}
return api.service_post(
# Now we can execute some queries in one batch.
# This is cool, but if we have a lot of queries
# better devide its on a pieces.
# Lets write supportying chunks functions.
# Its get list and split it on chunks.
def chunks(lst, n):
"""Yield successive n-sized chunks from lst.
:param lst: list of data
:param n: size of chunk:
:yields: splitted lists
for i in range(0, len(lst), n):
yield lst[i:i + n]
# In the second step lets prepare our queries.
# In this example we want to set patientid=123 for all studies.
def query_for_study(study):
"""Prepare query for study.
:param study: study obj
:return: query
return api.Study.set(study_id=study.uuid, patientid=123)
queries = [query_for_study(study) for study in all_studies]
# Then split all queries on chunks
# Need experiment with chunk size.
# On the one side the larger chunk size the better.
# On the other side as the value increases,
# server may stop responding.
# I would start with 500 or 1000...
chunk_size = 3
chunk_queries = chunks(queries, chunk_size)
# Now we can execute all our queries:
# for chunk in chunk_queries:
# result = batch(chunk, sid)
# Lets do same thing using threads:
from concurrent.futures import ThreadPoolExecutor
from functools import partial
# This parameter should also be selected experimentally.
# I think 3-10 is ok..
max_workers = 5
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results =
partial(batch, sid=sid),
results = [r for r in results]
# Then check, that all queries is success.
for result in results:
assert result.status_code == 200
for query in result.json():
assert query['status'] == 'OK'
# In this example we assume that we can get all studies in memory.
# If we can not do this, we can use some queue. Get some number of study, put it in a queue.
# Then another thread get jobs from this queue and start bundle query.
# Let me know if you want this approach.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment