Skip to content

Instantly share code, notes, and snippets.

@nuno-faria
Created January 14, 2022 11:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nuno-faria/90f62a6969a54b8b59ea8b0e01cd33a9 to your computer and use it in GitHub Desktop.
Save nuno-faria/90f62a6969a54b8b59ea8b0e01cd33a9 to your computer and use it in GitHub Desktop.
int
sql_trans_commit(sql_trans *tr)
{
int ok = LOG_OK;
sqlstore *store = tr->store;
if (!list_empty(tr->changes)) {
int flush = 0;
ulng commit_ts = 0, oldest = 0;
//////////MT_lock_set(&store->commit);
if (!tr->parent && !list_empty(tr->predicates)) {
ok = sql_trans_valid(tr);
if (ok != LOG_OK) {
sql_trans_rollback(tr, true);
//////////MT_lock_unset(&store->commit);
return ok == LOG_CONFLICT ? SQL_CONFLICT : SQL_ERR;
}
}
if (!tr->parent && (!list_empty(tr->dependencies) || !list_empty(tr->depchanges))) {
ok = transaction_check_dependencies_and_removals(tr);
if (ok != LOG_OK) {
sql_trans_rollback(tr, true);
//////////MT_lock_unset(&store->commit);
return ok == LOG_CONFLICT ? SQL_CONFLICT : SQL_ERR;
}
}
/* log changes should only be done if there is something to log */
if (!tr->parent && tr->logchanges > 0) {
int min_changes = GDKdebug & FORCEMITOMASK ? 5 : 1000000;
flush = (tr->logchanges > min_changes && list_empty(store->changes));
if (flush)
MT_lock_set(&store->flush);
ok = store->logger_api.log_tstart(store, flush);
/* log */
for(node *n=tr->changes->h; n && ok == LOG_OK; n = n->next) {
sql_change *c = n->data;
if (c->log && ok == LOG_OK)
ok = c->log(tr, c);
}
if (ok == LOG_OK) {
if (!list_empty(store->seqchanges)) {
sequences_lock(store);
for(node *n = store->seqchanges->h; n; n = n->next) {
log_store_sequence(store, n->data);
}
list_destroy(store->seqchanges);
store->seqchanges = list_create(NULL);
sequences_unlock(store);
}
}
if (ok == LOG_OK && store->prev_oid != store->obj_id)
ok = store->logger_api.log_sequence(store, OBJ_SID, store->obj_id);
store->prev_oid = store->obj_id;
if (ok == LOG_OK && !flush)
ok = store->logger_api.log_tend(store); /* flush/sync */
store_lock(store);
commit_ts = tr->parent ? tr->parent->tid : store_timestamp(store);
if (ok == LOG_OK && !flush) /* mark as done */
ok = store->logger_api.log_tdone(store, commit_ts);
} else {
store_lock(store);
commit_ts = tr->parent ? tr->parent->tid : store_timestamp(store);
if (tr->parent)
tr->parent->logchanges += tr->logchanges;
}
oldest = tr->parent ? commit_ts : store_oldest(store);
tr->logchanges = 0;
TRC_DEBUG(SQL_STORE, "Forwarding changes (" ULLFMT ", " ULLFMT ") -> " ULLFMT "\n", tr->tid, tr->ts, commit_ts);
/* apply committed changes */
if (ATOMIC_GET(&store->nr_active) == 1 && !tr->parent)
oldest = commit_ts;
store_pending_changes(store, oldest);
for(node *n=tr->changes->h; n && ok == LOG_OK; n = n->next) {
sql_change *c = n->data;
if (c->commit && ok == LOG_OK)
ok = c->commit(tr, c, commit_ts, oldest);
else
c->obj->new = 0;
c->ts = commit_ts;
}
/* when directly flushing: flush logger after changes got applied */
if (flush) {
if (ok == LOG_OK) {
ok = store->logger_api.log_tend(store); /* flush/sync */
if (ok == LOG_OK)
ok = store->logger_api.log_tdone(store, commit_ts); /* mark as done */
}
MT_lock_unset(&store->flush);
}
/* propagate transaction dependencies to the storage only if other transactions are running */
if (ok == LOG_OK && !tr->parent && ATOMIC_GET(&store->nr_active) > 1) {
if (!list_empty(tr->dependencies)) {
for (node *n = tr->dependencies->h; n && ok == LOG_OK; n = n->next) {
sql_dependency_change *lchange = (sql_dependency_change*) n->data;
ok = transaction_add_hash_entry(store->dependencies, lchange->objid, lchange->type, commit_ts);
}
}
if (!list_empty(tr->depchanges)) {
for (node *n = tr->depchanges->h; n && ok == LOG_OK; n = n->next) {
sql_dependency_change *lchange = (sql_dependency_change*) n->data;
ok = transaction_add_hash_entry(store->depchanges, lchange->objid, lchange->type, commit_ts);
}
}
}
/* garbage collect */
for(node *n=tr->changes->h; n && ok == LOG_OK; ) {
node *next = n->next;
sql_change *c = n->data;
if (!c->cleanup || c->cleanup(store, c, oldest)) {
_DELETE(c);
} else if (tr->parent) { /* need to keep everything */
tr->parent->changes = sa_list_append(tr->sa, tr->parent->changes, c);
} else {
store->changes = sa_list_append(tr->sa, store->changes, c);
}
n = next;
}
tr->ts = commit_ts;
store_unlock(store);
//////////MT_lock_unset(&store->commit);
list_destroy(tr->changes);
tr->changes = NULL;
} else if (ATOMIC_GET(&store->nr_active) == 1) { /* just me cleanup */
//////////MT_lock_set(&store->commit);
store_lock(store);
ulng oldest = store_timestamp(store);
store_pending_changes(store, oldest);
store_unlock(store);
//////////MT_lock_unset(&store->commit);
}
/* drop local temp tables with commit action CA_DROP, after cleanup */
if (cs_size(&tr->localtmps)) {
for(node *n=tr->localtmps.set->h; n; ) {
node *next = n->next;
sql_table *tt = n->data;
if (tt->commit_action == CA_DROP)
(void) sql_trans_drop_table_id(tr, tt->s, tt->base.id, DROP_RESTRICT);
n = next;
}
}
if (tr->localtmps.dset) {
list_destroy2(tr->localtmps.dset, store);
tr->localtmps.dset = NULL;
}
tr->localtmps.nelm = NULL;
if (ok == LOG_OK)
ok = clean_predicates_and_propagate_to_parent(tr);
return (ok==LOG_OK)?SQL_OK:SQL_ERR;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment