Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Mongo Dask Bag
import dask
class MongoDaskBag:
def __init__(db_name, collection_name):
self.db_name =self.db_name
self.collection_name = collection_name
def bag(self, partition_size: int = 1000, partitions_num: int = None):
with pymongo.MongoClient() as mongo_client:
collection = mongo_client[self.db_name][self.collection_name]
local_collection = collection.find({'_id': 1})
items = list(local_collection)
all_ids = [x['_id'] for x in items]
size = len(all_ids)
partition_size, partitions_num = adjust_partition_size(size, partitions_num, partition_size)
start_indexes = list(range(0, size, partition_size))
start_ids = [all_ids[i] for i in start_indexes]
end_ids = [all_ids[i - 1] for i in start_indexes[1:]] + [all_ids[-1]]
partitions_requests = list(zip(start_ids, end_ids))
logging.info(f"Data partitioned: {partition_size}x{len(partitions_requests)}")
b = (dask.bag.from_sequence(partitions_requests)
.map(self.read_datetime_interval_from_collection)
.flatten())
return b
def read_datetime_interval_from_collection(self, args):
start_ts, end_ts = args
with pymongo.MongoClient() as mongo_client:
collection = mongo_client[self.db_name][self.collection_name]
items = list(collection.find({'ts': {'$gte': start_ts, '$lte': end_ts}}))
return items
@priyanka666

This comment has been minimized.

Copy link

@priyanka666 priyanka666 commented Jul 24, 2019

Hi what is the adjust_partition_size in the code ?

@Sklavit

This comment has been minimized.

Copy link
Owner Author

@Sklavit Sklavit commented Jul 24, 2019

In signature stated, that partitions_num and partition_size are optional, but we need real values for both of them.
So this function computes partitions_num and partition_size if one of them is omitted.

Sad to say, I have no possibility to restore this function code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment