Skip to content

Instantly share code, notes, and snippets.

@cmacrander
Created September 17, 2015 17:00
Show Gist options
  • Save cmacrander/5f4b46bffadf4891e15b to your computer and use it in GitHub Desktop.
Save cmacrander/5f4b46bffadf4891e15b to your computer and use it in GitHub Desktop.
Dead-end attempt to use map reduce to convert app engine backup files in gcs to json.
from google.appengine.api import datastore # for reading backup files
from google.appengine.api.files import records # for reading backup files
from google.appengine.datastore import entity_pb # for reading backup files
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op
from mapreduce import shuffler
import json
import logging
import util
def backups_to_json_mapper(file_handle):
"""Read entities out of a datastore backup file.
Args:
file_handle: gcs file handle from cloudstorage.open()
"""
logging.info("mapper got file contents: {}".format(file_handle))
def record_to_entity_dict(backup_record):
prototype = entity_pb.EntityProto(contents=backup_record)
return datastore.Entity.FromPb(prototype)
for r in records.RecordsReader(file_handle):
e = record_to_entity_dict(r)
kind = e['id'].split('_')[0]
yield (kind, e)
def backups_to_json_reduce(kind, entities):
logging.info("reduce got kind: {}".format(kind))
logging.info("reduce got entities: {}".format(entities))
json_str = json.dumps(entities, default=util.json_dumps_default)
# yield (kind, json_str)
yield json_str
class BackupsToJsonPipeline(base_handler.PipelineBase):
""" """
def run(self, bucket):
logging.info("Running pipeline with bucket {}".format(bucket))
yield mapreduce_pipeline.MapreducePipeline(
"backups_to_json." + bucket,
"map.backups_to_json_mapper",
"map.backups_to_json_reduce",
"mapreduce.input_readers.GoogleCloudStorageInputReader",
"mapreduce.output_writers.GoogleCloudStorageConsistentOutputWriter",
mapper_params={
"input_reader": {
'bucket_name': bucket,
'objects': ['datastore_backup_*'],
},
},
reducer_params={
"output_writer": {
'bucket_name': bucket,
'content_type': 'application/json',
},
},
)
pipeline = BackupsToJsonPipeline(bucket)
pipeline.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment