-
-
Save ordnungswidrig/df3c51fc511b56d7e6499c2263bba32d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defrecord DynamoDBTxLog [ddb-client table-name partition-name] | |
db/TxLog | |
(submit-tx [this tx-events] | |
(log/debug "submit-tx" :tx-events tx-events) | |
(try | |
(let [last-id (last-tx-id ddb-client table-name partition-name) | |
_ (log/debug "submitting tx, last tx id was" last-id) | |
content-hashes (->> (set (map c/new-id (mapcat txc/tx-events->docs tx-events)))) | |
tx-id (inc (or last-id 0)) | |
tx-time (System/currentTimeMillis) | |
item-values {partition-key (s partition-name) | |
sort-key (n tx-id) | |
"tx-time" (n tx-time) | |
"events" (b (nippy/fast-freeze tx-events)) | |
"crux.tx/docs" (b (nippy/fast-freeze content-hashes))} | |
tx-put-items (-> (Put/builder) (.tableName table-name) (.item item-values) .build) | |
tx-update-id (cond-> (-> (Put/builder) | |
(.tableName table-name) | |
(.item {"partition" (s partition-name) | |
"tx" (n -1) | |
"last" (n tx-id)})) | |
last-id (.conditionExpression "#last = :lastid") | |
last-id (.expressionAttributeValues {":lastid" (n last-id)}) | |
last-id (.expressionAttributeNames {"#last" "last"}) | |
:always .build) | |
twir (-> (TransactWriteItemsRequest/builder) | |
(.transactItems [(-> (TransactWriteItem/builder) | |
(.put tx-put-items) | |
.build) | |
(-> (TransactWriteItem/builder) | |
(.put tx-update-id) | |
.build)]) | |
.build)] | |
(delay | |
(let [r (.transactWriteItems ddb-client twir)] | |
{::twi-response r | |
::tx/tx-id tx-id | |
::tx/tx-time tx-time}))))) | |
(open-tx-log [this after-tx-id] | |
(log/debug "Opening tx-log" :after-tx-id after-tx-id) | |
(cio/->cursor (fn [] ) | |
(query-tx-table ddb-client table-name partition-name after-tx-id))) | |
(latest-submitted-tx [this] | |
{:crux.tx/tx-id (last-tx-id ddb-client table-name partition-name)}) | |
status/Status | |
(status-map [_] | |
{:crux.dynamodb/table-info | |
(try | |
(let [r (-> (DescribeTableRequest/builder) (.tableName table-name) .build) | |
table-info | |
^TableDescription (-> (.describeTable ddb-client r) .table)] | |
{:status (.tableStatus table-info) | |
:item-count (.itemCount table-info) | |
:table-size (.tableSizeBytes table-info)}) | |
(catch Exception e | |
(log/debug e "Could not describe dynamodb table") | |
false))})) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment