Skip to content

Instantly share code, notes, and snippets.

@tony2001
Created June 30, 2016 15:01
Show Gist options
  • Save tony2001/df80ca22916d2fe8b912337f54d32ca6 to your computer and use it in GitHub Desktop.
Save tony2001/df80ca22916d2fe8b912337f54d32ca6 to your computer and use it in GitHub Desktop.
diff --git a/src/mnotify2_storage.c b/src/mnotify2_storage.c
index 2dd2a29..5ea86b4 100644
--- a/src/mnotify2_storage.c
+++ b/src/mnotify2_storage.c
@@ -48,6 +48,7 @@ struct nmpa_s mn2_updates_nmpa_previous;
static struct local_storage *ls;
static time_t current_state_ts;
static time_t update_state_ts;
+static time_t update_now;
static pid_t dump_child;
static uint32_t total_autoconfirm;
@@ -502,7 +503,7 @@ static void dump_conf_data_full(mn2_configuration_t *cfg, void *bs)
nmpa_free(&nmpa);
}
-static int dump_update(Pvoid_t updates, struct nmpa_s *updates_nmpa, time_t updates_ts)
+static int dump_update(Pvoid_t updates, struct nmpa_s *updates_nmpa, time_t updates_ts, time_t now)
{
int rv = 0;
char *storage_filename_tmp = NULL, *storage_filename = NULL;
@@ -541,7 +542,7 @@ static int dump_update(Pvoid_t updates, struct nmpa_s *updates_nmpa, time_t upda
/* create meta message */
Badoo__Storage__MetaInfo meta = BADOO__STORAGE__META_INFO__INIT;
- meta.current_ts = updates_ts;
+ meta.current_ts = now;
if (0 > bstorage_put(bs, BADOO__STORAGE__MSGID__META_INFO, &meta.base)) {
rv = -1;
@@ -591,24 +592,18 @@ end:
static int dump_data()
{
int rv = 0;
- time_t now = time(NULL);
- struct tm now_tm;
char *storage_filename_tmp = NULL, *storage_filename = NULL;
int storage_file = -1;
struct bstream_s *bs = NULL;
+ struct tm current_state_tm;
- /* prevent existing file overwrite */
- if (current_state_ts >= now) {
- now = current_state_ts + 1;
- }
-
- if (!gmtime_r(&now, &now_tm)) {
+ if (!gmtime_r(&current_state_ts, &current_state_tm)) {
zlog_error("gmtime_r() error");
return -1;
}
- storage_filename_tmp = local_storage_construct_name_from_tm(ls, &mn2_snapshot, &now_tm, 1);
- storage_filename = local_storage_construct_name_from_tm(ls, &mn2_snapshot, &now_tm, 0);
+ storage_filename_tmp = local_storage_construct_name_from_tm(ls, &mn2_snapshot, &current_state_tm, 1);
+ storage_filename = local_storage_construct_name_from_tm(ls, &mn2_snapshot, &current_state_tm, 0);
if (!storage_filename || !storage_filename_tmp) {
zlog_error("could not construct storage_filename or storage_filename_tmp");
@@ -633,7 +628,7 @@ static int dump_data()
/* create meta message */
Badoo__Storage__MetaInfo meta = BADOO__STORAGE__META_INFO__INIT;
- meta.current_ts = now;
+ meta.current_ts = current_state_ts;
if (0 > bstorage_put(bs, BADOO__STORAGE__MSGID__META_INFO, &meta.base)) {
rv = -1;
@@ -670,11 +665,68 @@ end:
return rv;
}
+static void mn2_schedule_update()
+{
+ time_t now = time(NULL);
+
+ UPDATES_LOCK();
+
+ if (mn2_updates_hash_current) {
+ mn2_updates_hash_previous = mn2_updates_hash_current;
+ mn2_updates_hash_current = NULL;
+ mn2_updates_nmpa_previous = mn2_updates_nmpa_current;
+ nmpa_init(&mn2_updates_nmpa_current, MN2_UPDATE_NMPA_SIZE);
+ update_state_ts = current_state_ts;
+ if (current_state_ts >= now) {
+ now = current_state_ts + 1;
+ }
+ update_now = now;
+ current_state_ts = now;
+ }
+
+ UPDATES_UNLOCK();
+}
+
+static void mn2_write_update(time_t now)
+{
+
+ Pvoid_t updates = NULL;
+ struct nmpa_s updates_nmpa;
+ time_t ts, ts_now;
+
+ UPDATES_LOCK();
+ if (mn2_updates_hash_previous) {
+ updates = mn2_updates_hash_previous;
+ updates_nmpa = mn2_updates_nmpa_previous;
+ nmpa_init(&mn2_updates_nmpa_previous, MN2_UPDATE_NMPA_SIZE);
+ mn2_updates_hash_previous = NULL;
+ ts = update_state_ts;
+ ts_now = now;
+
+ }
+ UPDATES_UNLOCK();
+
+ if (updates) {
+ while (dump_update(updates, &updates_nmpa, ts, ts_now) != 0) {
+ /* try writing update until we succeed */
+ }
+ }
+}
+
int dump_data_sync()
{
- if (0 > dump_data()) {
- zlog_error("dump_data() error");
- return -1;
+ time_t now = time(NULL);
+
+ int is_snapshot = mn2_need_to_dump_a_snapshot(now - config->storage->storage_snapshot_interval);
+
+ if (is_snapshot) {
+ if (0 > dump_data()) {
+ zlog_error("dump_data() error");
+ return -1;
+ }
+ } else {
+ mn2_schedule_update();
+ mn2_write_update(now);
}
return 0;
@@ -742,8 +794,6 @@ static void dump_data_child_exited(pid_t pid, int status, struct rusage *ru, voi
return;
}
- current_state_ts = msg->ts;
-
badoo__mnotify2internal__mn2_dump_result__free_unpacked(msg, 0);
close(stdio[0]);
@@ -763,23 +813,8 @@ static void *mn2_storage_update_thread_func(void *a)
time_delta = now - start;
if (in_shutdown || time_delta >= config->storage->storage_update_interval) {
- Pvoid_t updates = NULL;
- struct nmpa_s updates_nmpa;
- time_t ts;
-
- UPDATES_LOCK();
- if (mn2_updates_hash_previous) {
- updates = mn2_updates_hash_previous;
- updates_nmpa = mn2_updates_nmpa_previous;
- nmpa_init(&mn2_updates_nmpa_previous, MN2_UPDATE_NMPA_SIZE);
- mn2_updates_hash_previous = NULL;
- ts = update_state_ts;
- }
- UPDATES_UNLOCK();
- if (updates) {
- dump_update(updates, &updates_nmpa, ts);
- }
+ mn2_write_update(now);
if (in_shutdown) {
return NULL;
@@ -810,28 +845,16 @@ static void *mn2_storage_judy_clean(void *a)
return NULL;
}
+
static void mn2_storage_update_tick(int fd, short which, void *arg)
{
static struct event *timer_event;
- time_t now = time(NULL);
if (!timer_event) {
goto skip;
}
- UPDATES_LOCK();
- if (current_state_ts >= now) {
- now = current_state_ts + 1;
- }
-
- if (mn2_updates_hash_current) {
- mn2_updates_hash_previous = mn2_updates_hash_current;
- mn2_updates_hash_current = NULL;
- mn2_updates_nmpa_previous = mn2_updates_nmpa_current;
- nmpa_init(&mn2_updates_nmpa_current, MN2_UPDATE_NMPA_SIZE);
- update_state_ts = now;
- }
- UPDATES_UNLOCK();
+ mn2_schedule_update();
skip:
if (!timer_event) {
@@ -856,6 +879,9 @@ static void mn2_storage_snapshot_tick(int fd, short which, void *arg)
if (!dump_child) {
zlog_notice("saving snapshot/update");
+ // TODO: only if there is a snapshot on disk */
+ mn2_schedule_update();
+
dump_child = libangel_proc_fork(dump_data_child, 0, dump_data_child_exited, 0, "state snapshot dump child");
if (dump_child < 0) { /* error */
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment