Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Super-simple MongoDB Apache Beam transform for Python
# Public Domain CC0 license. https://creativecommons.org/publicdomain/zero/1.0/
from pymongo import MongoClient
from apache_beam.transforms import PTransform, ParDo, DoFn, Create
__all__ = ['ReadFromMongo']
class ReadFromMongo(PTransform):
"""A ``PTransform`` for reading from MongoDB.
"""
def __init__(self, connection_string, db, collection, query=None):
super(ReadFromMongo, self).__init__()
self._connection_string = connection_string
self._db = db
self._collection = collection
self._query = query
def expand(self, pcoll):
docs = (pcoll.pipeline
| 'DB Singleton' >> Create([(self._db, self._collection, self._query)])
| 'Run Query' >> ParDo(ReadFromMongo.ReadFn(self._connection_string)))
return docs
def display_data(self):
return {
'db': self._db,
'collection': self._collection,
'query': self._query
}
class ReadFn(DoFn):
def __init__(self, connection_string):
super(ReadFromMongo.ReadFn, self).__init__()
self._connection_string = connection_string
def start_bundle(self):
self._client = MongoClient(self._connection_string)
def process(self, conf, *args, **kwargs):
db, coll, query = conf
db = self._client[db]
coll = db[coll]
return coll.find(query)
# Public Domain CC0 license. https://creativecommons.org/publicdomain/zero/1.0/
from mongodbio import ReadFromMongo
import apache_beam as beam
def run():
p = beam.Pipeline(runner='DirectRunner')
(p
| 'read' >> ReadFromMongo(None, 'mydb', 'mycollection')
| 'save' >> beam.io.WriteToText('./documents.txt'))
p.run()
if __name__ == '__main__':
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment