Skip to content

Instantly share code, notes, and snippets.

Last active July 15, 2022 22:20
  • Star 13 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
Star You must be signed in to star a gist
What would you like to do?
Super-simple MongoDB Apache Beam transform for Python
# Public Domain CC0 license.
"""MongoDB Apache Beam IO utilities.
Tested with google-cloud-dataflow package version 2.0.0
__all__ = ['ReadFromMongo']
import datetime
import logging
import re
from pymongo import MongoClient
from apache_beam.transforms import PTransform, ParDo, DoFn, Create
from import iobase, range_trackers
logger = logging.getLogger(__name__)
def mongo_connection_string(url):
"""Extract the MongoDB connection string given MongoDB url.
If the string contains a Cloud Storage url, the url is assumed to be the
first line of a text file at that location.
if 'gs://' in url:
from import storage'Fetching connection string from Cloud Storage {}'.format(url))
# Split gs://my-bucket/my-file.txt into my-bucket and my-file.txt separately.
_, path = url.split('gs://')
path = path.split('/')
bucket = path[0]
path = '/'.join(path[1:])
# Fetch the file
client = storage.Client()
blob = client.get_bucket(bucket).get_blob(path).download_as_string()
# Assume the connection string is on the first line.
connection_string = blob.splitlines()[0]
return connection_string'Using connection string from CLI options')
return url
iso_match = re.compile(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}')
def clean_query(query):
new_query = {}
for key, val in query.iteritems():
if isinstance(val, basestring):
val = str(val) # Because unicode and 2.7 :-(
# If the string is an ISO date, turn it into a real datetime object so pymongo can understand it.
if isinstance(val, basestring) and iso_match.match(val):
val = datetime.datetime.strptime(val[0:19], '%Y-%m-%dT%H:%M:%S')
elif isinstance(val, dict):
val = clean_query(val)
new_query[str(key)] = val
return new_query
class _MongoSource(iobase.BoundedSource):
"""A :class:`` for reading from MongoDB."""
def __init__(self, connection_string, db, collection, query=None, fields=None):
"""Initializes :class:`_MongoSource`"""
self._connection_string = connection_string
self._db = db
self._collection = collection
self._fields = fields
self._client = None
# Prepare query
self._query = query
if not self._query:
self._query = {}'Raw query: {}'.format(query))
self._query = clean_query(self._query)'Cleaned query: {}'.format(self._query))
def client(self):
"""Returns a PyMongo client. The client is not pickable so it cannot
be part of the main object.
if self._client:'Reusing existing PyMongo client')
return self._client
# Prepare client, assumes a full connection string.'Preparing new PyMongo client')
real_connection_string = mongo_connection_string(self._connection_string)
self._client = MongoClient(real_connection_string)
return self._client
def estimate_size(self):
"""Implements :class:``"""
return self.client[self._db][self._collection].count(self._query)
def get_range_tracker(self, start_position, stop_position):
"""Implements :class:``"""
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = range_trackers.OffsetRangeTracker.OFFSET_INFINITY
# Use an unsplittable range tracker. This means that a collection can
# only be read sequentially for now.
range_tracker = range_trackers.OffsetRangeTracker(start_position, stop_position)
range_tracker = range_trackers.UnsplittableRangeTracker(range_tracker)
return range_tracker
def read(self, range_tracker):
"""Implements :class:``"""
coll = self.client[self._db][self._collection]
for doc in coll.find(self._query, projection=self._fields):
yield doc
def split(self, desired_bundle_size, start_position=None, stop_position=None):
"""Implements :class:``
This function will currently not be called, because the range tracker
is unsplittable
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = range_trackers.OffsetRangeTracker.OFFSET_INFINITY
# Because the source is unsplittable (for now), only a single source is
# returned.
yield iobase.SourceBundle(
class ReadFromMongo(PTransform):
"""A :class:`~apache_beam.transforms.ptransform.PTransform` for reading
from MongoDB.
def __init__(self, connection_string, db, collection, query=None, fields=None):
"""Initializes :class:`ReadFromMongo`
Uses source :class:`_MongoSource`
super(ReadFromMongo, self).__init__()
self._connection_string = connection_string
self._db = db
self._collection = collection
self._query = query
self._fields = fields
self._source = _MongoSource(
def expand(self, pcoll):
"""Implements :class:`~apache_beam.transforms.ptransform.PTransform.expand`"""'Starting MongoDB read from {}.{} with query {}'
.format(self._db, self._collection, self._query))
return pcoll | iobase.Read(self._source)
def display_data(self):
return {'source_dd': self._source}
#!/usr/bin/env python
A simple example of how to use the MongoDB reader.
If you like, you can test it out with these commands (requires Docker and
virtualenv for python2):
$ virtualenv venv
$ source venv/bin/activate
$ pip install google-cloud-dataflow pymongo
$ docker run -p 27017:27017 --name dataflowtest --rm mongo:3.2
$ docker exec -it dataflowtest mongo
> use mydb
> db.mycollection.insert({ _id: ObjectId() })
> exit
$ python
import logging
from mongodbio import ReadFromMongo
import apache_beam as beam
__licence__ = 'Public Domain CC0 license.'
def transform_doc(document):
return {'_id': str(document['_id'])}
def run():
connection_string = 'mongodb://localhost:27017'
# Can also fetch a connection string from a Google Cloud Storage file.
# This might be preferable to avoid pickling the mongodb connection string.
# E.g.
# connection_string = 'gs://my-bucket/mongo_connection_string.txt'
# where "mongo_connection_string.txt" contains a single line with the connection string.
with beam.Pipeline(runner='DirectRunner') as pipeline:
| 'read' >> ReadFromMongo(connection_string, 'mydb', 'mycollection', query={}, fields=['_id'])
| 'transform' >> beam.Map(transform_doc)
| 'save' >>'./documents.txt'))
if __name__ == '__main__':
Copy link

Please create a MongoDB writer as well.

Copy link

(y) <3

Copy link

Took a shot at writing a MongoDB sink, still WIP though

Copy link

MeTaNoV commented Aug 15, 2018

@dlebech and @sandboxws, FYI, I formalized a package (beam-extended) available on PyPi with the following GH repo: based on your first implementation and giving you of course credit.

Copy link

MeTaNoV commented Aug 15, 2018

I also raised this Issue on Apache Beam tracker:

Copy link

dlebech commented Oct 18, 2018

Thanks @MeTaNoV and @sandboxws! 💯 That's going to be very useful for the people that need Mongo support in Beam (I haven't used it for more than a year now myself). Hopefully it will move into Beam core sometime in the future :)

Copy link

dlebech commented Jul 22, 2020

It looks like there is an official MongoDB IO reader and writer now:

That is probably the preferred method, but as of this writing, it seems to have experimental status:

Everything in this module is experimental.

Copy link

AlexandrePieroux commented Sep 18, 2020

This is still relevant for me. I found myself having an error with cosmos db in azure with
[...] File "/opt/venv/lib/python3.6/site-packages/apache_beam/transforms/", line 322, in split_and_size for part in self.split(element, restriction): File "/opt/venv/lib/python3.6/site-packages/apache_beam/io/", line 1488, in split estimated_size = self._source.estimate_size() File "/opt/venv/lib/python3.6/site-packages/apache_beam/io/", line 167, in estimate_size return client[self.db].command('collstats', self.coll).get('size') [...] pymongo.errors.OperationFailure: Required element collStats missing

(Not only with beam io, in fact even with plain pymongo requesting this command on cosmos db trigger this error)

It seems that the command 'collStats' (and 'splitVector'), in their split method, does not seems to be supported by cosmos db (version 3.6). I don't know if it is a bug on cosmo db side and did not found any information about this. So I decided to go with your solution as a base for an inside developed solution (more control over the code and open for our customizations)

Thanks for sharing !!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment