Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
#!/usr/bin/env python
# encoding: utf-8
""" - extport data to external systems
Created by Maximillian Dornseif on 2011-05-01.
Copyright (c) 2011 HUDORA. All rights reserved.
from __future__ import with_statement
import config
config.imported = True
from ablage.models import Akte, Dokument, DokumentFile
from boto.s3.connection import Location
from gaetk import webapp2
from gaetk.handler import BasicHandler
from google.appengine.api import taskqueue
from google.appengine.ext import db
from huTools import hujson
import StringIO
import boto.exception
import boto.s3
import datetime
import logging
import tarfile
import time
## Disaster Recovery on AppEngine and of-site Replication
# In the following paragraphs we consider several disaster scenarios and how
# to guard against them. Here we are only considering Safety (availability)
# issues, not Security (confidentiality) issues. Our data is hosted on Google
# AppEngine Servers which seem exclusively be controlled by Google Inc. and
# exclusively hosted in the United States. This contributes to some disaster
# recovery scenarios.
# 1. Due to some programming or administration Error on our side data is wiped
# out.
# 2. Due to some programming or administration Error on Googles side data is
# wiped out. Data may or may not
# be restored by Google after some time (see "[unapplied writes][1]" in
# 2010 or the AWS EBS outage in 2011).
# 3. Due to some third party soft or hardware involvement data is wiped out.
# Think of student or coordinated physical attacks on datacenters.
# 4. Due to some contractual problems (e.g. we don't pay) data is deliberately
# wiped out by Google.
# 5. The US government or US court system decides to deny us access to our data.
# 6. A disgruntled Admin decides to delete our data.
# In addition there are some desirable properties the replication should have:
# 1. One copy of the data must be stored within the EU. This is a requirement
# for tax record keeping.
# 2. One copy of the data should be stored within Germany. This makes tax
# record keeping easier.
# 3. One copy of the data should be stored on site. This would ensure
# availability even if our company can't pay any 3 rd parties for storage
# for some time.
# 4. The replication should involve only minimal administrative resources.
# We always keep this image in mind when designing that stuff:
# Especially we want to avoid cronjobs on unix machines which need
# additional monitoring, patching, upgrading, disks, backups, etc.
# If possible all should run on AppEngine.
# One thing the replicas don't need to provide is immediate access. As long as
# the data and metadata is replicated somewhere and can be loaded into an
# (possibly to write on demand) application within a reasonable timeframe
# we are fine. Several of the scenarios above imply that we would not have
# access to AppEngine infrastructure and must rewrite our software anyhow.
# So direct restore from the replicas is not needed.
# We decided not to use the [bulkloader][3] for backups. While the bulkloader
# is a fine pice of software it seems that it can't be used for
# incremental backups. Also er are reluctant to enable the `remote_api` because
# technically this would enable every developer with admin permissions on the
# AppEngine to download our complete dataset "for testing". And then a laptop
# gets lost/stolen ...
# I also would argue that an application with enabled `remote_api` can't
# comply with any serious audit/bookkeeping standards. So we don't use it.
# Currently we have no objects bigger than 1 MB. This will change when we use
# the blobstore. Replication entities bigger than 1 MB will be challenging
# since the `urlfetch` API only allows 1 MB per upload.
# Options for storage we considered:
### Amazon S3 (Simple Storage Service)
# This was our first choice. Provides storage in the EU, is well known and
# regarded and comes with a rich ecosystem. With [S3 multipart upload][4] it
# would be possible to generate big files but unfortunately the part size must be
# 5 MB or more while with the urlfetch API we can write only 1 MB or less. So
# this doesn't work. But for objects < 1 MB Amazon S3 is a fine choice.
### GS (Google Storage) and the AppEngine blobstore
# Both services don't guard against most of the disaster scenarios described
# above but still have some interesting properties which might make them
# desirable as an immediate step to generating replicas
# With [resumable uploads][5] Google Storage provides the ability to generate
# very large files while still being bound to the 1 MB upload limit of the
# urlfetch API. In theory I should also be able to use the new
# [file-like blobstore access][6] to write large files to the blobstore. But
# there is a [30 second time limit][7] on keeping the file open. One can get
# arround it by reopening the file in append mode.
# For now we don't use Google Storage and the AppEngine blobstore because they
# don't guard against most of our disaster scenarios.
### Dropbox and
# We use Dropbox extensively and have plenty of storage with a "Team" account.
# We already use Dropbox for file input and output to AppEngine applications.
# Dropbox provides not only online storage but also syncing to local machines.
# Installing a dropbox client on a local machine would provide us with onside
# storage with minimal administrative hassle. Offside Storage within the EU
# would be more work.
# Unfortunately the [public Dropbox API][8] does not provide a append to file
# operation or something else to create files bigger than 1 MB from AppEngine.
# The ecosystem for Dropbox Python libraries seems somewhat immature.
# I haven't looked to close into, but [the upload API][9]
# seems more or less have the same limitations as Dropbox.
# I also took a quick lock into Windows Azure Storage but I didn't understand
# if and how I can use only the storage service.
### Rackspace Cloudfiles
# Cloudfiles is a storage offering provided by Rackspace in the United Stated
# but also [in the united Kingdom by][10]. And the United
# Kingdom is in (mostly) the EU. Rackspace is pushing the Cloudfiles API with
# the "OpenStack" initiative but there still seems to be no extensive
# ecosystem around the API. What is strange is the fact that Rackspace US and
# Rackspace UK seem to have no unified billing and the like.
# The API would allow creating large files via "[Large Object Creation /
# segmented upload][11]". To my understanding the `PUT` method together with
# byte ranges would provide a way to append to a file and the Python library
# for cloudfiles already [provides generator based upload][12] but it seems to
# me the current implementation would not work this way on AppEngine.
### WebDAV (self hosted)
# WebDAV would allow to use `PUT` with byte ranges and therefore would allow us
# to generate arbitrary large output files. But we found no ready to use Python
# library supporting that and no hosted WebDAV provider offering Cloud Scale
# and Cloud pricing. We want to avoid self-hosted servers.
## Choice of technology
# Currently the only services we felt comfortable with based on technology and
# pricing where Amazon S3 and Rackspace Cloudfiles. Cloudfiles has the better
# API for our requirements that would allow us to create very large files.
# Amazon S3 has a much richer environment of Desktop and Browser based
# utilities, FUSE filesystems etc. Therefor we decided for now to focus on
# output smaller than 1 MB and start with using Amazon S3. We would use one of
# the many desktop utilities to regularly sync data from Amazon S3 to local
# on-site storage. (one Unix cronjob to monitor `:-(` )
# This approach will guard against all the described disaster scenarios above.
# It should also guard against most data corruption scenarios because most of
# our data structures are designed to be immutable: data is never rewritten,
# instead a new version of the objects is created in the datastore. The old
# version is kept for audit purposes.
# [1]:
# [3]:
# [4]:
# [5]:
# [6]:
# [7]:
# [8]:
# [9]:
# [10]:
# [11]:
# [12]:
## Implementation
# We want incremental replication running in fixed intervals (e.g. every 15
# minutes). All data changed since the last replication should be written to
# the external storage. All of our datastore Entities have an `updated_at`
# property tat is set on entity update and creation. We use this to select
# entities for replication.
# Data is stored as PDF and JSON to Amazon S3.
# It is Packaged into "TAR-Archives" conforming to the _IEEE Std 1003.1-2001_.
# The `boto` library has the tendency to flood the AppEngine log with
# usually uninteresting stuff therefore we supress logging.
# To remember which entity was the last one replicated we use a separate
# simple model storing only a timestamp.
class DateTimeConfig(db.Model):
"""Store Configuration as DateTime."""
data = db.DateTimeProperty()
# We also use a Model for storing AWS access credentials.
# You need to set the Credentials before first use like this:
# StrConfig.get_or_insert('aws_key_%s' % tenant, data='*key*')
# StrConfig.get_or_insert('aws_secret_%s' % tenant, data='*secret*')
class StrConfig(db.Model):
"""Store Configuration as String."""
data = db.StringProperty(default='')
# The replication is working incrementally and expected to be called at
# regular intervals. Here we are only replicating Document entities and
# related PDFs for a single tenant.
# The replication stops after 500 seconds by default. This is to make sure
# that where are no race conditions between a long running replication job
# and the next job started by cron. Such a race condition should not result
# in data loss but is a waste of resources.
# We start of by the updated_at timestamp of the last entity successfully
# replicated taken from `DateTimeConfig`. Alternatively the caller can
# provide a start timestamp. During the first run we start by default at
# 2010-11-01.
def replicate_documents_s3(tenant, timelimit=500, startfrom=None):
"""Replication of Document Entities."""
if not startfrom:
startfrom = DateTimeConfig.get_or_insert('export_latest_document_%s' % tenant,
data=datetime.datetime(2010, 11, 1)).data
starttime = time.time()
maxdate = startfrom
# The connection to S3 is set up based on credentials found in the
# datastore. If not set we use default values which can be updated via
# the datastore admin.
# We assume that when connecting from GAE to AWS using SSL/TLS
# is of little use.
s3connection = boto.connect_s3(
StrConfig.get_or_insert('aws_key_%s' % tenant, data='*key*').data,
StrConfig.get_or_insert('aws_secret_%s' % tenant, data='*secret*').data,
# We create one bucket per month per tennant for replication. While there
# seems to be no practical limit on how many keys can be stored in a S3
# bucket, most frontend tools get very slow with more than a few thousand
# keys per bucket. We currently have less than 50.000 entities per month.
# If we had more entities we would probably be better of with creating
# a S3 bucket per day.
bucket_name = 'XXX.%s-%s' % (tenant, startfrom.strftime('%Y-%m'))
s3connection.create_bucket(bucket_name, location=Location.EU)
except boto.exception.S3CreateError:"S3 bucket %s already exists", bucket_name)
# We start replicating from the `updated_at` timestamp of the last entity
# replicated. This results in the last entity being replicated twice.
# While this is a certain waste of resources it ensures that the system
# reliably replicates even if two entities have exactly the same
# timestamp (which should never happen, due to the sub-millisecond
# resolution) of the timestamp, but you never know."archiving starting from %s to %s" % (startfrom, bucket_name))
docs = Dokument.all().filter('tenant =', tenant).filter(
'updated_at >=', startfrom).order('updated_at')
# The JSON data will be encapsulated in a tar-gz stream to reduce the
# number of small files in S3. `Tarstream` offers a sane interface for
# generating tarfiles.
tarf = Tarstream()
# The first version of this code used iterator access to loop over the
# documents. Unfortunately we saw timeouts and strange errors after about
# 80 seconds. Using `fetch()` removed that issue and also nicely limits
# the number of documents archived per call.
# This approach would fail if we had 300 or more entities with exactly
# the same timestamp. We just hope this will not happen.
docs = docs.fetch(300)
if docs:
logging.debug("processing %d documents" % len(docs))
for doc in docs:
# Prepare filenames.
# We want to have no slashes and colons in filenames. Unfortunately
# we can't use `strftime()` because this would loose microseconds.
# Therefore we use `isoformat()` and `replace()`.
# Finally we ensure the filename is not Unicode.
# Since we use the updated_at property in the filename rewritten
# versions of the entity will have a different filename.
akte_name = '-'.join(doc.akte.key().id_or_name().split('-')[1:])
updated_at = doc.updated_at.isoformat().replace('-', '').replace(':', '')
fname = "d-%s-%s-%s" % (updated_at, akte_name, doc.designator)
fname = fname.encode('ascii', 'replace')
# Serialize the entity as JSON. We put a separate file per entity
# into the tar file.
jsondata = hujson.dumps(doc.as_dict())
tarf.writestr(fname + '.json', jsondata)
# For every entity we have a separate entity containing a PDF. Retrieve
# that and store it to S3.
# When reusing the same s3bucket as used when writing the JSON files
# got mixed up on the server. Creating a separate bucket instance
# solved that issue.
# Since we use the designator in the filename and the designator is
# in fact the SHA-1 of the PDF, rewritten PDFs will have a differen
# filename.
pdf = DokumentFile.get_by_key_name("%s-%s" % (tenant, doc.designator)).data
logging.debug("%s is %d kb", doc.designator, len(pdf)/1024)
s3bucket = s3connection.get_bucket(bucket_name)
s3bucket.new_key(fname + '.pdf').set_contents_from_string(pdf)
# Remeber the data of the newest updated_at value.
maxdate = max([maxdate, doc.updated_at])
# If we have been running for more than `timelimit` seconds, stop
# replication.
if time.time() - starttime > timelimit:
# If the tar file is getting to big, stop
# processing.
if len(tarf) > 2000000:
logging.debug("done writing files, writing tga")
tarname = 'dokumente_%s_%s.tgz' % (str(startfrom).replace(' ', 'T'), str(maxdate).replace(' ', 'T'))
s3bucket = s3connection.get_bucket(bucket_name)
s3bucket.new_key(tarname).set_contents_from_string(tarf.getvalue())"written %s", tarname)
# Finally store `maxdate` into the datastore so we know where we should
# continue next time we are called.
DateTimeConfig(key_name='export_latest_document_%s' % tenant, data=maxdate).put()
return maxdate
# HTTP-Request Handler to be called via cron or via a taskqueue.
# Being called via a regular request would impose the 30 second request
# runtime limit which is undesirable. Running form a Task Queue handler
# or from cron would give us 10 Minutes runtime.
# Currently we use a hardcoded tennant and call this handler every 10 minutes
# via cron.
class ReplicateDokumenteHandler(BasicHandler):
"""Handler for Replication of Dokument Entities."""
def get(self):
"""No parameters are expected."""
tenant = ''
maxdate = replicate_documents_s3(tenant)"processed up to %s", maxdate)
self.response.out.write(str(maxdate) + '\n')
post = get
# HTTP-Request Handler to trigger replication via taskque. For testing purposes.
class TestReplicateDokumenteHandler(BasicHandler):
"""Initiate Replication of Dokument Entities via TaskQueue."""
def get(self):
"""No parameters are expected."""
taskqueue.add(url='/automation/replication/cron/dokumente', method='GET')
# The replication of Akte Entities follows the replication of Document
# entities. But since we only handle relatively small JSON data bits
# this function can tar arround 2500 entities in about 30 seconds.
# The datastore iterator/query interface starts acting up when
# requesting much more data, so we limit ourself to 30 seconds of processing.
def replicate_akten_s3(tenant, timelimit=30, startfrom=None):
"""Replication of Akte Entities."""
if not startfrom:
startfrom = DateTimeConfig.get_or_insert('export_latest_akte_%s' % tenant,
data=datetime.datetime(2010, 11, 1)).data
starttime = time.time()
maxdate = startfrom
s3connection = boto.connect_s3(
StrConfig.get_or_insert('aws_key_%s' % tenant, data='*key*').data,
StrConfig.get_or_insert('aws_secret_%s' % tenant, data='*secret*').data,
bucket_name = 'XXX.%s-%s' % (tenant, startfrom.strftime('%Y-%m'))
s3connection.create_bucket(bucket_name, location=Location.EU)
except boto.exception.S3CreateError:"S3 bucket %s already exists", bucket_name)
pass"archiving starting from %s to %s" % (startfrom, bucket_name))
akten = Akte.all().filter('tenant =', tenant).filter(
'updated_at >=', startfrom).order('updated_at')
tarf = Tarstream()
for akte in akten:
akte_name = '-'.join(akte.key().id_or_name().split('-')[1:])
updated_at = akte.updated_at.isoformat().replace('-', '').replace(':', '')
fname = "%s-a-%s" % (akte_name, str(updated_at).replace(' ', 'T'))
fname = fname.encode('ascii', 'replace')
jsondata = hujson.dumps(akte.as_dict())
tarf.writestr(fname + '.json', jsondata)
maxdate = max([maxdate, akte.updated_at])
if time.time() - starttime > timelimit:
if len(tarf) > 2000000:
# `len(tarf)` is the uncompressed Data size. If we assume 4 bits/byte comprssion ratio,
# with 2 MB raw date we would still be under the 1 MB upload limit.
# In reality we observe compression ratios of about 0.35 bits/byte
if len(tarf):
s3bucket = s3connection.get_bucket(bucket_name)
tarname = 'akten_%s_%s.tgz' % (str(startfrom).replace(' ', 'T'), str(maxdate).replace(' ', 'T'))
s3bucket.new_key(tarname).set_contents_from_string(tarf.getvalue())"written %s", tarname)
DateTimeConfig(key_name='export_latest_akte_%s' % tenant, data=maxdate).put()
return maxdate
# HTTP-Request Handler to be called via cron or via a taskqueue.
class ReplicateAktenHandler(BasicHandler):
"""Handler for Replication of Akte Entities."""
def get(self):
"""No parameters are expected."""
tenant = ''
maxdate = replicate_akten_s3(tenant)"processed up to %s", maxdate)
self.response.out.write(str(maxdate) + '\n')
# HTTP-Request Handler to trigger replication via taskque.
class TestReplicateAktenHandler(BasicHandler):
"""Initiate Replication of Akte Entities via TaskQueue."""
def get(self):
"""No parameters are expected."""
taskqueue.add(url='/automation/replication/cron/akten', method='GET')
## Evaluation
# We assume that the ReplicateDokumenteHandler is called every 10 minutes.
# We also assume that no more than 250 Documents in 10 minutes are added to
# the application.
# The setup will ensure that as long as Google AppEngine and Amazon S3 are
# available all data older than 20 Minutes will exists on the S3 replica
# within the EU. Should Google AppEngine or Amazon S3 become unavailable
# for some period of time and then become available again, replication will
# automatically "catch up". No data is ever deleted by the replication
# process so that data loss in the primary system will not result in data
# loss in the replica. Amazon guarantees 99.99999999% data durability.
# Regular mirroring of the S3 data to a on-site local storage will ensure
# a third replica within Germany.
# From his third replica at regular intervals copies are created to removable
# storage media. This media is stored at a different office inside a save.
# This strategy will protect against all treat scenarios outlined above.
## Helper functionality
class Tarstream(object):
"""Class to allow in-memory generation of tar files - similar to zipfile."""
def __init__(self): = StringIO.StringIO()
# The `|` symbol in the mode string instructs tarfile to switch
# to streaming mode.
self.tar =, mode="w|gz")
def __len__(self):
def writestr(self, name, data):
"""Create a tar file entry `name` and write `data` into it."""
ti = tarfile.TarInfo(name)
ti.size = len(data)
ti.mtime = time.time()
self.tar.addfile(ti, StringIO.StringIO(data))
def getvalue(self):
"""Close tarfile and return the bytestream of the tar file."""
def main():
application = webapp2.WSGIApplication(
('/automation/replication/test/akten', TestReplicateAktenHandler),
('/automation/replication/cron/akten', ReplicateAktenHandler),
('/automation/replication/test/dokumente', TestReplicateDokumenteHandler),
('/automation/replication/cron/dokumente', ReplicateDokumenteHandler),
if __name__ == '__main__':
import datetime
import zipfile
import time
import zlib
import binascii
import struct
class blobstoreZipFile(zipfile.ZipFile):
"""Class to create a PLZIP file.
This is based on Python's zipfile module but is stripped down to work
with file like objects which only have a write() and possibly no
other methods. It is tested with the AppEngine blobstore API.
def __init__(self, file):
"""Open the ZIP file with mode read "r", write "w" or append "a"."""
self.filelist = [] # List of ZipInfo instances for archive
self.fp = file
self.pos = 0
self.compression = zipfile.ZIP_DEFLATED
self.mode = None
self.__didModify = None
self._filePassed = True
def write(self, data):
self.pos += len(data)
return self.fp.write(data)
def tell(self):
return self.pos
def writestr(self, filename, bytes, comment=''):
"""Write a file into the archive. The contents is the string
'bytes'. 'zinfo_or_arcname' is either a ZipInfo instance or
the name of the file in the archive."""
zinfo = zipfile.ZipInfo(
# Uncompressed size
zinfo.file_size = len(bytes)
zinfo.comment = comment
zinfo.external_attr = 0755 << 16L
zinfo.CRC = binascii.crc32(bytes) & 0xffffffff
co = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
bytes = co.compress(bytes) + co.flush()
zinfo.compress_size = len(bytes) # Compressed size
zinfo.header_offset = self.tell() # Start of header bytes
file_header = zinfo.FileHeader()
if zinfo.flag_bits & 0x08:
# Write CRC and file sizes after the file data
self.write(struct.pack("<lLL", zinfo.CRC, zinfo.compress_size,
def flush(self):
"""write the ending records."""
count = 0
pos1 = self.tell()
records = []
# write central directory
centdir_len = 0
for zinfo in self.filelist:
count = count + 1
dt = zinfo.date_time
dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2]
dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2)
file_size = zinfo.file_size
compress_size = zinfo.compress_size
header_offset = zinfo.header_offset
extra_data = zinfo.extra
extract_version = zinfo.extract_version
create_version = zinfo.create_version
centdir = struct.pack(zipfile.structCentralDir,
zipfile.stringCentralDir, create_version,
zinfo.create_system, extract_version, zinfo.reserved,
zinfo.flag_bits, zinfo.compress_type, dostime, dosdate,
zinfo.CRC, compress_size, file_size,
len(zinfo.filename), len(extra_data), len(zinfo.comment),
0, zinfo.internal_attr, zinfo.external_attr,
centdir = ''.join([centdir, zinfo.filename, extra_data, zinfo.comment])
centdir_len += len(centdir)
# Write end-of-zip-archive record
endrec = struct.pack(zipfile.structEndArchive, zipfile.stringEndArchive,
0, 0, count, count, centdir_len, pos1, 0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.