Skip to content

Instantly share code, notes, and snippets.

@Arachnid
Created September 14, 2009 12:10
Show Gist options
  • Save Arachnid/186619 to your computer and use it in GitHub Desktop.
Save Arachnid/186619 to your computer and use it in GitHub Desktop.
"""Implements a distributed transaction for transfers of funds.
Invariant: The sum of all accounts, plus all partially applied transfers, is 0.
Steps:
1) In a transaction, an Account is debited, and a Transaction object is created, with the
appropriate amount and target, but no other transaction.
2) In a transaction, the target Account is credited, and a Transaction object is created with
a predefined key name, and the other transaction referencing the one created in step 1.
3) The original Transaction object created in step 1 is updated to reference the transaction
object created in step 2.
A failure at step 1 causes the transaction to have never taken place.
Failures at step 2 or 3 can be recovered from by rerunning the roll_forward process on the
transaction object created in step 1.
"""
class Account(db.Model):
"""Represents an account, which has a balance."""
owner = db.StringProperty(required=True)
balance = db.IntegerProperty(required=True, default=0)
class Transfer(db.Model):
"""A transfer between accounts.
A transfer is partially applied if its other field is not set.
Transfer objects are immutable after creation, except for setting the 'other' field when
a partially applied transaction is completed.
"""
amount = db.IntegerProperty(required=True)
target = db.ReferenceProperty(Account, required=True) # The account the transfer is to/from
other = db.SelfReferenceProperty() # The other Transfer object involved in this transaction
timestamp = db.DateTimeProperty(required=True, auto_now_add=True)
def transfer_funds(src, dest, amount):
"""Creates a transfer from one account to another.
The returned Transfer object represents the transaction. Funds are immediately debited from
the source account, but the transfer is not complete (and the funds are not credited to the
destination account) until roll_forward() is called on the returned Transfer object.
Args:
src: A Key object for the account to be debited.
dest: A Key object for the account to be credited.
amount: The amount to transfer.
Returns:
A Transfer object representing the transaction.
"""
def _tx():
"""Create the 'from' transfer."""
account = Account.get(src)
account.balance -= amount
transfer = Transfer(
parent=account,
amount=-amount,
target=dest)
db.put([account, transfer])
return transfer
return db.run_in_transaction(_tx)
def roll_forward(transfer):
def _tx():
"""Create the 'to' transfer."""
target_key = Transfer.target.get_value_for_datastore(transfer)
# The destination transfer object is a child of the destination account,
# and named after the source transfer object to ensure uniqueness.
dest_transfer = Transfer.get_by_key_name(parent=target_key, str(transfer))
if not dest_transfer:
dest_transfer = Transfer(
parent=target_key,
amount=transfer.amount,
target=transfer.key().parent(),
other=transfer)
account = Account.get(target_key)
account.balance += transfer.amount
db.put([account, dest_transfer])
return dest_transfer
dest_transfer = db.run_in_transaction(_tx)
# No need to run this in a TX, because the only possible change after creation is
# setting other
transfer.other = dest_transfer
transfer.put()
def find_unapplied_transactions(count=100):
"""Finds and rolls forwards unapplied transactions more than 30 seconds old."""
cutoff = datetime.datetime.now() - datetime.timedelta(seconds=30)
q = Transfer.all().filter("other =", None).filter("timestamp <", cutoff)
for transfer in q.fetch(count):
roll_forward(transfer)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment