Skip to content

Instantly share code, notes, and snippets.

@kroggen
Last active August 11, 2016 20:36
Show Gist options
  • Save kroggen/8329210e5f52a0b8b60e9c7f98b059a7 to your computer and use it in GitHub Desktop.
Save kroggen/8329210e5f52a0b8b60e9c7f98b059a7 to your computer and use it in GitHub Desktop.
sqlite3 session changeset example
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "sqlite3.h"
int save_binary_file(char *file_path, void *buffer, int size) {
FILE *fp;
fp = fopen(file_path, "wb");
if (fp) {
fwrite(buffer, 1, size, fp);
fclose(fp);
}
}
void * load_binary_file(char *file_path, int *psize) {
FILE *fp;
void *buffer=0;
int size;
fp = fopen(file_path, "rb");
if (fp) {
fseek(fp, 0, SEEK_END);
size = ftell(fp);
rewind(fp);
buffer = malloc(size);
if (buffer) {
fread(buffer, 1, size, fp);
*psize = size;
}
fclose(fp);
}
return buffer;
}
sqlite3_stmt * get_table_columns(sqlite3 *db, const char *table) {
sqlite3_stmt *stmt=0;
char *sql=0;
sql = sqlite3_mprintf("SELECT * FROM %s WHERE 1=0", table);
if (sql == 0) goto loc_exit;
if (sqlite3_prepare_v2(db, sql, -1, &stmt, NULL) != SQLITE_OK) {
sqlite3_finalize(stmt);
stmt=0;
}
loc_exit:
if (sql) sqlite3_free(sql);
return stmt;
}
int create_base_db(char *db_path) {
sqlite3 *db;
char *error;
if (sqlite3_open(db_path, &db) != SQLITE_OK) goto loc_error;
// the table must have a primary key
if (sqlite3_exec(db, "CREATE TABLE contacts (id INTEGER PRIMARY KEY,name,email)", 0, 0, &error) != SQLITE_OK) goto loc_error;
if (sqlite3_exec(db, "INSERT INTO contacts (name,email) VALUES ('John','john@email.com')", 0, 0, &error) != SQLITE_OK) goto loc_error;
if (sqlite3_exec(db, "INSERT INTO contacts (name,email) VALUES ('Eric','eric@email.com')", 0, 0, &error) != SQLITE_OK) goto loc_error;
if (sqlite3_exec(db, "INSERT INTO contacts (name,email) VALUES ('Lisa','lisa@email.com')", 0, 0, &error) != SQLITE_OK) goto loc_error;
sqlite3_close(db);
return 0;
loc_error:
printf("create_base_db error: %s\n", error);
sqlite3_free(error);
return -1;
}
int write_to_db(sqlite3 *db) {
char *error;
if (sqlite3_exec(db, "UPDATE contacts SET email='new_john@gmail.com' WHERE name='John'", 0, 0, &error) != SQLITE_OK) goto loc_error;
if (sqlite3_exec(db, "DELETE FROM contacts WHERE name='Eric'", 0, 0, &error) != SQLITE_OK) goto loc_error;
if (sqlite3_exec(db, "INSERT INTO contacts (name,email) VALUES ('Rose','rose@email.com')", 0, 0, &error) != SQLITE_OK) goto loc_error;
return 0;
loc_error:
printf("write_to_db error: %s\n", error);
sqlite3_free(error);
return -1;
}
void log_changes(char *db_path, char *session_path) {
sqlite3 *db;
sqlite3_session *session=0;
void *buffer;
int size;
if (sqlite3_open(db_path, &db) != SQLITE_OK) goto loc_error;
if (sqlite3session_create(db, "main", &session) != SQLITE_OK) goto loc_error;
if (sqlite3session_attach(session, "contacts") != SQLITE_OK) goto loc_error;
// make changes to the tables
if (write_to_db(db) != 0) goto loc_error;
if (sqlite3session_changeset(session, &size, &buffer) != SQLITE_OK) goto loc_error;
printf("changeset buffer=%p size=%d\n", buffer, size);
save_binary_file(session_path, buffer, size);
puts("session logged");
loc_exit:
if (session) sqlite3session_delete(session);
if (db) sqlite3_close(db);
return;
loc_error:
puts("log_changes error");
goto loc_exit;
}
void print_value(const char *column, char *which, sqlite3_value *value) {
printf("%s (%s): ", column, which);
if (value == 0) {
puts("<empty value>");
return;
}
switch(sqlite3_value_type(value)) {
case SQLITE_INTEGER:
printf("%d\n", sqlite3_value_int(value));
break;
case SQLITE_FLOAT:
printf("%g\n", sqlite3_value_double(value));
break;
case SQLITE_TEXT:
printf("%s\n", sqlite3_value_text(value));
break;
case SQLITE_BLOB:
printf("%p size=%d\n", sqlite3_value_blob(value), sqlite3_value_bytes(value));
break;
case SQLITE_NULL:
printf("null\n");
break;
default:
printf("undefined\n");
}
}
char * operation_name(int oper) {
switch (oper) {
case SQLITE_INSERT:
return "INSERT";
case SQLITE_UPDATE:
return "UPDATE";
case SQLITE_DELETE:
return "DELETE";
}
return "not recognized";
}
void print_operation(sqlite3 *db, sqlite3_changeset_iter *iter, int has_conflicting_value) {
sqlite3_value *old_value, *found_value, *new_value;
sqlite3_stmt *stmt=0;
int rc, ncol, ncols, oper, isIndirectChange;
char *old_label;
const char *table, *column;
if ((rc = sqlite3changeset_op(iter, &table, &ncols, &oper, &isIndirectChange)) != SQLITE_OK) return;
printf("table: %s operation: %s values:\n", table, operation_name(oper));
// get the columns names
if ((stmt = get_table_columns(db, table)) == 0) goto loc_exit;
if (ncols != sqlite3_column_count(stmt)) {
puts("different number of columns");
goto loc_exit;
}
if (has_conflicting_value)
old_label = "expected";
else
old_label = "old";
for (ncol=0; ncol < ncols; ncol++) {
column = sqlite3_column_name(stmt, ncol);
if (oper != SQLITE_INSERT) {
if ((rc = sqlite3changeset_old(iter, ncol, &old_value)) == SQLITE_OK) {
print_value(column, old_label, old_value);
}
}
if (has_conflicting_value) {
if ((rc = sqlite3changeset_conflict(iter, ncol, &found_value)) == SQLITE_OK) {
print_value(column, "found", found_value);
}
}
if (oper != SQLITE_DELETE) {
if ((rc = sqlite3changeset_new(iter, ncol, &new_value)) == SQLITE_OK) {
print_value(column, "new", new_value);
}
}
}
puts("");
loc_exit:
// release the columns names
if (stmt) sqlite3_finalize(stmt);
}
void print_changes(char *db_path, char *session_file) {
sqlite3_changeset_iter *iter;
sqlite3 *db;
int rc, size;
void *buffer;
if (sqlite3_open(db_path, &db) != SQLITE_OK) goto loc_error;
buffer = load_binary_file(session_file, &size);
if (buffer == 0) {
puts("could not load binary file");
return;
}
printf("changeset (%d bytes):\n", size);
if (sqlite3changeset_start(&iter, size, buffer) != SQLITE_OK) return;
while ((rc = sqlite3changeset_next(iter)) == SQLITE_ROW) {
print_operation(db, iter, 0);
}
loc_exit:
sqlite3changeset_finalize(iter);
sqlite3_close(db);
return;
loc_error:
puts("error");
goto loc_exit;
}
int on_conflict (
void *pCtx, /* Copy of sixth arg to _apply() */
int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */
sqlite3_changeset_iter *iter /* Handle describing change and conflict */
) {
sqlite3 *db = pCtx;
int has_conflicting_value=0;
printf("conflict found: ");
switch (eConflict) {
case SQLITE_CHANGESET_NOTFOUND: // DELETE or UPDATE: a row with the required PRIMARY KEY fields is not present in the table
puts("ROW NOT FOUND");
break;
case SQLITE_CHANGESET_DATA: // DELETE or UPDATE: a row with the required PRIMARY KEY fields is present in the table, but one or more other (non primary-key) fields modified by the update do not contain the expected "before" values
puts("UNEXPECTED DATA IN ROW");
has_conflicting_value = 1;
break;
case SQLITE_CHANGESET_CONFLICT: // INSERT will result in duplicate primary key values
puts("DUPLICATE PRIMARY KEY OR ROWID");
has_conflicting_value = 1;
break;
case SQLITE_CHANGESET_FOREIGN_KEY: // foreign key violation
puts("FOREIGN KEY");
break;
case SQLITE_CHANGESET_CONSTRAINT: // other constraint violation (i.e. a UNIQUE, CHECK or NOT NULL constraint)
puts("CONSTRAINT");
break;
default:
puts("UNRECOGNIZED");
}
print_operation(db, iter, has_conflicting_value);
return SQLITE_CHANGESET_OMIT;
//return SQLITE_CHANGESET_REPLACE; used with SQLITE_CHANGESET_DATA and SQLITE_CHANGESET_CONFLICT
//return SQLITE_CHANGESET_ABORT;
}
void apply_changes(char *db_path, char *session_path) {
sqlite3 *db;
void *buffer;
int size;
if (sqlite3_open(db_path, &db) != SQLITE_OK) goto loc_error;
buffer = load_binary_file(session_path, &size);
if (buffer == 0) {
puts("could not load binary file");
goto loc_exit;
}
printf("apllying changeset... (size: %d bytes)\n", size);
//sqlite3changeset_apply(db, size, buffer, on_filter, on_conflict, 0);
sqlite3changeset_apply(db, size, buffer, 0, on_conflict, db);
puts("changeset applied!");
loc_exit:
if (db) sqlite3_close(db);
return;
loc_error:
puts("apply_changes error");
goto loc_exit;
}
int main() {
char *master_db = "./master.db";
char *slave_db = "./slave.db";
char *session_path = "./test.session";
create_base_db(master_db);
create_base_db(slave_db);
log_changes(master_db, session_path);
print_changes(master_db, session_path);
system("pause");
apply_changes(slave_db, session_path);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment