Created
June 30, 2016 14:59
-
-
Save tony2001/2ece5847198fc35bfc9f96a1516b3ce7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/mnotify2_storage.c b/src/mnotify2_storage.c | |
index 2dd2a29..6f31027 100644 | |
--- a/src/mnotify2_storage.c | |
+++ b/src/mnotify2_storage.c | |
@@ -48,6 +48,7 @@ struct nmpa_s mn2_updates_nmpa_previous; | |
static struct local_storage *ls; | |
static time_t current_state_ts; | |
static time_t update_state_ts; | |
+static time_t update_now; | |
static pid_t dump_child; | |
static uint32_t total_autoconfirm; | |
@@ -502,7 +503,7 @@ static void dump_conf_data_full(mn2_configuration_t *cfg, void *bs) | |
nmpa_free(&nmpa); | |
} | |
-static int dump_update(Pvoid_t updates, struct nmpa_s *updates_nmpa, time_t updates_ts) | |
+static int dump_update(Pvoid_t updates, struct nmpa_s *updates_nmpa, time_t updates_ts, time_t now) | |
{ | |
int rv = 0; | |
char *storage_filename_tmp = NULL, *storage_filename = NULL; | |
@@ -541,7 +542,7 @@ static int dump_update(Pvoid_t updates, struct nmpa_s *updates_nmpa, time_t upda | |
/* create meta message */ | |
Badoo__Storage__MetaInfo meta = BADOO__STORAGE__META_INFO__INIT; | |
- meta.current_ts = updates_ts; | |
+ meta.current_ts = now; | |
if (0 > bstorage_put(bs, BADOO__STORAGE__MSGID__META_INFO, &meta.base)) { | |
rv = -1; | |
@@ -591,24 +592,18 @@ end: | |
static int dump_data() | |
{ | |
int rv = 0; | |
- time_t now = time(NULL); | |
- struct tm now_tm; | |
char *storage_filename_tmp = NULL, *storage_filename = NULL; | |
int storage_file = -1; | |
struct bstream_s *bs = NULL; | |
+ struct tm current_state_tm; | |
- /* prevent existing file overwrite */ | |
- if (current_state_ts >= now) { | |
- now = current_state_ts + 1; | |
- } | |
- | |
- if (!gmtime_r(&now, &now_tm)) { | |
+ if (!gmtime_r(¤t_state_ts, ¤t_state_tm)) { | |
zlog_error("gmtime_r() error"); | |
return -1; | |
} | |
- storage_filename_tmp = local_storage_construct_name_from_tm(ls, &mn2_snapshot, &now_tm, 1); | |
- storage_filename = local_storage_construct_name_from_tm(ls, &mn2_snapshot, &now_tm, 0); | |
+ storage_filename_tmp = local_storage_construct_name_from_tm(ls, &mn2_snapshot, ¤t_state_tm, 1); | |
+ storage_filename = local_storage_construct_name_from_tm(ls, &mn2_snapshot, ¤t_state_tm, 0); | |
if (!storage_filename || !storage_filename_tmp) { | |
zlog_error("could not construct storage_filename or storage_filename_tmp"); | |
@@ -633,7 +628,7 @@ static int dump_data() | |
/* create meta message */ | |
Badoo__Storage__MetaInfo meta = BADOO__STORAGE__META_INFO__INIT; | |
- meta.current_ts = now; | |
+ meta.current_ts = current_state_ts; | |
if (0 > bstorage_put(bs, BADOO__STORAGE__MSGID__META_INFO, &meta.base)) { | |
rv = -1; | |
@@ -672,9 +667,18 @@ end: | |
int dump_data_sync() | |
{ | |
- if (0 > dump_data()) { | |
- zlog_error("dump_data() error"); | |
- return -1; | |
+ time_t now = time(NULL); | |
+ | |
+ int is_snapshot = mn2_need_to_dump_a_snapshot(now - config->storage->storage_snapshot_interval); | |
+ | |
+ if (is_snapshot) { | |
+ if (0 > dump_data()) { | |
+ zlog_error("dump_data() error"); | |
+ return -1; | |
+ } | |
+ } else { | |
+ mn2_schedule_update(); | |
+ mn2_write_update(now); | |
} | |
return 0; | |
@@ -742,13 +746,37 @@ static void dump_data_child_exited(pid_t pid, int status, struct rusage *ru, voi | |
return; | |
} | |
- current_state_ts = msg->ts; | |
- | |
badoo__mnotify2internal__mn2_dump_result__free_unpacked(msg, 0); | |
close(stdio[0]); | |
} | |
+static mn2_write_update(time_t now) | |
+{ | |
+ | |
+ Pvoid_t updates = NULL; | |
+ struct nmpa_s updates_nmpa; | |
+ time_t ts, ts_now; | |
+ | |
+ UPDATES_LOCK(); | |
+ if (mn2_updates_hash_previous) { | |
+ updates = mn2_updates_hash_previous; | |
+ updates_nmpa = mn2_updates_nmpa_previous; | |
+ nmpa_init(&mn2_updates_nmpa_previous, MN2_UPDATE_NMPA_SIZE); | |
+ mn2_updates_hash_previous = NULL; | |
+ ts = update_state_ts; | |
+ ts_now = now; | |
+ | |
+ } | |
+ UPDATES_UNLOCK(); | |
+ | |
+ if (updates) { | |
+ while (dump_update(updates, &updates_nmpa, ts, ts_now) != 0) { | |
+ /* try writing update until we succeed */ | |
+ } | |
+ } | |
+} | |
+ | |
static void *mn2_storage_update_thread_func(void *a) | |
{ | |
time_t start, now; | |
@@ -763,23 +791,8 @@ static void *mn2_storage_update_thread_func(void *a) | |
time_delta = now - start; | |
if (in_shutdown || time_delta >= config->storage->storage_update_interval) { | |
- Pvoid_t updates = NULL; | |
- struct nmpa_s updates_nmpa; | |
- time_t ts; | |
- | |
- UPDATES_LOCK(); | |
- if (mn2_updates_hash_previous) { | |
- updates = mn2_updates_hash_previous; | |
- updates_nmpa = mn2_updates_nmpa_previous; | |
- nmpa_init(&mn2_updates_nmpa_previous, MN2_UPDATE_NMPA_SIZE); | |
- mn2_updates_hash_previous = NULL; | |
- ts = update_state_ts; | |
- } | |
- UPDATES_UNLOCK(); | |
- if (updates) { | |
- dump_update(updates, &updates_nmpa, ts); | |
- } | |
+ mn2_write_update(now); | |
if (in_shutdown) { | |
return NULL; | |
@@ -810,28 +823,37 @@ static void *mn2_storage_judy_clean(void *a) | |
return NULL; | |
} | |
-static void mn2_storage_update_tick(int fd, short which, void *arg) | |
+static void mn2_schedule_update() | |
{ | |
- static struct event *timer_event; | |
time_t now = time(NULL); | |
- if (!timer_event) { | |
- goto skip; | |
- } | |
- | |
UPDATES_LOCK(); | |
- if (current_state_ts >= now) { | |
- now = current_state_ts + 1; | |
- } | |
if (mn2_updates_hash_current) { | |
mn2_updates_hash_previous = mn2_updates_hash_current; | |
mn2_updates_hash_current = NULL; | |
mn2_updates_nmpa_previous = mn2_updates_nmpa_current; | |
nmpa_init(&mn2_updates_nmpa_current, MN2_UPDATE_NMPA_SIZE); | |
- update_state_ts = now; | |
+ update_state_ts = current_state_ts; | |
+ if (current_state_ts >= now) { | |
+ now = current_state_ts + 1; | |
+ } | |
+ update_now = now; | |
+ current_state_ts = now; | |
} | |
+ | |
UPDATES_UNLOCK(); | |
+} | |
+ | |
+static void mn2_storage_update_tick(int fd, short which, void *arg) | |
+{ | |
+ static struct event *timer_event; | |
+ | |
+ if (!timer_event) { | |
+ goto skip; | |
+ } | |
+ | |
+ mn2_schedule_update(); | |
skip: | |
if (!timer_event) { | |
@@ -856,6 +878,9 @@ static void mn2_storage_snapshot_tick(int fd, short which, void *arg) | |
if (!dump_child) { | |
zlog_notice("saving snapshot/update"); | |
+ // TODO: only if there is a snapshot on disk */ | |
+ mn2_schedule_update(); | |
+ | |
dump_child = libangel_proc_fork(dump_data_child, 0, dump_data_child_exited, 0, "state snapshot dump child"); | |
if (dump_child < 0) { /* error */ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment