Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
GAE MapReduce Datastore to Cloud Storage
import webapp2
from google.appengine.ext import ndb
from mapreduce import mapreduce_pipeline
from mapreduce import base_handler
PROJECT_ID = 'your-project-name'
GS_BUCKET = 'your-project-bucket-name'
ENTITY_KIND = 'your.Entity'
PIPELINE_NAME = 'some-name'
NUM_SHARDS = 16
def datastore_map(entity):
yield("%s\n" % 'format-your-entity-here')
class DataBackupPipeline(base_handler.PipelineBase):
def run(self):
output = yield mapreduce_pipeline.MapperPipeline(
PIPELINE_NAME,
"backup.datastore_map",
"mapreduce.input_readers.DatastoreInputReader",
output_writer_spec="mapreduce.output_writers.GoogleCloudStorageConsistentOutputWriter",
params={
"input_reader":{
"entity_kind": ENTITY_KIND
},
"output_writer":{
"filesystem": "gs",
"bucket_name": GS_BUCKET,
"output_sharding":"input",
}
},
shards=NUM_SHARDS)
yield CloudStorageWriter(output)
class ResultFile(ndb.Model):
file_name = ndb.StringProperty()
date = ndb.DateTimeProperty(auto_now_add=True)
class CloudStorageWriter(base_handler.PipelineBase):
def run(self, csv_output):
# Store all the file names
files = [str(f.replace('/gs/', 'gs://')) for f in csv_output]
for f in files:
entry = ResultFile(file_name=f)
entry.put()
class BackupHandler(webapp2.RequestHandler):
def get(self):
# Start the MR job and redirect to the status page
pipeline = DataBackupPipeline()
pipeline.start()
path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id
self.redirect(path)

I am trying to use my ndb datastore, but getting model is not JSON serializable error. How to resolve it

harlan commented Feb 22, 2017

The yield statement on line 35 appears to have broken indentation -- no?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment