Skip to content

Instantly share code, notes, and snippets.

@msukmanowsky
Last active August 29, 2015 13:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save msukmanowsky/8826152 to your computer and use it in GitHub Desktop.
Save msukmanowsky/8826152 to your computer and use it in GitHub Desktop.
Thoughts on how to achieve strict and opaque transactional Trident topologies with a hypothetical Mongo Trident state.

Storm Transactional Topologies with Mongo DB State

Strict Transactional State

  1. Batches for a given txid are always the same. Replays of batches for a txid will exact same set of tuples as the first time that batch was emitted for that txid.
  2. There's no overlap between batches of tuples (tuples are in one batch or another, never multiple).
  3. Every tuple is in a batch (no tuples are skipped)

Current DB document State

{
    "_id": ObjectId("1234"),
    "txid": 123,
    "hits": 100
}

Scenario 1: Different txid

{"_id": "1234", "txid": 456, "hits": 155}


# Mongo bits
query = {
    "txid": {"$ne": 456},
}
update = {
    "$inc": {"hits": 155},
    "$set": {"txid": 456},
}
db.collection.find_and_modify(query, update)  # update succeeds

DB state gets properly updated:

{
    "_id": ObjectId("1234"),
    "txid": 456,   # new txid
    "hits": 255,   # 100 + 155 = 255
}

Scenario 2: Same txid

Say we have an increment operation come across that looks something like this:

{"txid": 456, "hits": 120}

We'd update Mongo with

query = {
    "txid": {"$ne": 123},  # only update document if txid doesn't match
}
update = {
    "$inc": {"hits": 120},
    "$set": {"txid": 123},
}
db.collection.update(query, update, multi=True)  # has no effect

Update is ignored, DB State would remain unchanged:

{
    "_id": ObjectId("1234"),
    "txid": 123,
    "hits": 100
}

Opaque Transactional

  1. Every tuple is successfully processed in exactly one batch. However, it's possible for a tuple to fail to process in one batch and then succeed to process in a later batch.

Current DB State

{
    "_id": ObjectId("1234"),
    "txid": 123,
    "hits_prev": 50,
    "hits": 100
}

Scenario 1: Different txid

Assume we have a batch of updates to be applied that when reduced, look like:

{"txid": 456, "hits": 200}

Since transaction ID differes from database, we know we are dealing with a brand new batch of updates, so we can update value and prev value. Problem is, we need the previous value of each document, and to get this we have to iterate over docs in the collection:

query = {"txid": {"$ne": 456}}
update = {
    "$set": {"txid": 456},
    "$inc": {"hits": 200},
}
for doc in db.collection.find(query):
    update["$set"]["hits_prev"] = doc["hits"]
    db.collection.update({"_id": doc["_id"]}, update)

This causes problems if multiple processes/threads are updating this collection. For this to work, you'd need to implement some kind of locking mechanism using something like Redis setnx.

Scenario 2: Same txid

Now we have a batch of updates to be applied that when reduced, look like:

{"txid": 123, "hits": 123}

Since the transaction ID is identical to that of the DB currently, know that the "hits" in the database contains an update from a previous batch for your current txid, but that batch may have been different so you have to ignore it. What you do in this case is increment "hits_prev" by your partial count to compute the new "hits" value.

query = {"txid": 123}  # query for documents that match the original batch
update = {
    "$set": {},
}
for doc in db.collection.find(query):
    update["$set"]["hits"] = doc.get("hits_prev", 0) + 123
    db.collection.update({"_id": doc["_id"]}, update)

Handling both cases

So altogether, we'd have to have something that handles both cases.

batch = {"txid": 123, "hits": 213}

with acquire_lock(db.collection.name): # assume that this blocks, now we're the only writer
    
    # Same transaction ID as batch
    query = {"txid": batch["txid"]}
    update = {"$set": {}}
    for doc in db.collection.find(query):
        update["$set"]["hits"] = doc.get("hits_prev", 0) + batch["hits"]
        db.collection.update({"_id": doc["_id"]}, update)

    # Different transaction ID than batch
    query["txid"] = {"$ne": batch["txid"]}
    update = {
        "$set": {"txid": 456},
        "$inc": {"hits": 200},
    }
    for doc in db.collection.find(query):
        update["$set"]["hits_prev"] = doc["hits"]
        db.collection.update({"_id": doc["_id"]}, update)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment