Created
July 6, 2016 19:35
-
-
Save markpapadakis/d1745c661e031ce680d53d1a7dbac410 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <memcachedclient.h> | |
#include <switch.h> | |
#include <tank_client.h> | |
#include <signal.h> | |
static bool running{true}; | |
static void sig_handler(int sig) | |
{ | |
running = false; | |
} | |
static void checkpoint(const uint64_t seqNum) | |
{ | |
int fd = open("cache_observer.checkpoint", O_WRONLY|O_CREAT|O_LARGEFILE|O_TRUNC, 0775); | |
if (fd != -1) | |
{ | |
pwrite64(fd, &seqNum, sizeof(seqNum), 0); | |
close(fd); | |
} | |
} | |
int main() | |
{ | |
TankClient tankClient; | |
uint64_t seqNum{0}; | |
static constexpr size_t defaultMinFetchSize{820 * 1024}; | |
size_t minFetchSize{defaultMinFetchSize}; | |
const TankClient::topic_partition topicPartition("cds_updates", 1); | |
uint32_t pendingResp{0}; | |
MemcachedClient memcachedClient; | |
strwlen8_t ks, cf; | |
time32_t nextCheckpoint{0}; | |
tankClient.set_default_leader("10.5.5.200:11011"); | |
signal(SIGHUP, SIG_IGN); | |
signal(SIGPIPE, SIG_IGN); | |
signal(SIGINT, sig_handler); | |
{ | |
int fd = open("cache_observer.checkpoint", O_RDONLY); | |
if (fd != -1) | |
{ | |
if (pread64(fd, &seqNum, sizeof(seqNum), 0) == sizeof(seqNum)) | |
Print("Resuiming from ", seqNum, "\n"); | |
close(fd); | |
} | |
else if (errno != ENOENT) | |
{ | |
Print("Failed to access checkpoint file:", strerror(errno), "\n"); | |
return 1; | |
} | |
} | |
while (likely(running)) | |
{ | |
if (!pendingResp) | |
{ | |
pendingResp = tankClient.consume( | |
{{topicPartition, {seqNum, minFetchSize}}}, 8e3, 0); | |
if (!pendingResp) | |
{ | |
Timings::Milliseconds::Sleep(800); | |
continue; | |
} | |
} | |
try | |
{ | |
tankClient.poll(800); | |
} | |
catch (...) | |
{ | |
continue; | |
} | |
const time32_t now = time(nullptr); | |
if (now > nextCheckpoint) | |
{ | |
checkpoint(seqNum); | |
nextCheckpoint = now + 8; | |
} | |
for (const auto &it : tankClient.faults()) | |
{ | |
if (it.clientReqId == pendingResp) | |
pendingResp = 0; | |
} | |
for (const auto &it : tankClient.consumed()) | |
{ | |
std::sort(it.msgs.offset, it.msgs.offset + it.msgs.len, [](const auto &a, const auto &b) { | |
return a.key.Cmp(b.key) < 0; | |
}); | |
for (const auto *p = it.msgs.offset, *const e = p + it.msgs.len; p != e;) | |
{ | |
const auto k = p->key; | |
for (++p; p != e && p->key == k; ++p) | |
continue; | |
const auto *i = (uint8_t *)k.p; | |
keyspace.Set((char *)i + 1, *i); | |
i += keyspace.len + sizeof(uint8_t); | |
columnFamily.Set((char *)i + 1, *i); | |
i += columnFamily.len + sizeof(uint8_t); | |
const auto keyLen = *i++; | |
if (keyspace == "inventory" && columnFamily == "products") | |
{ | |
require(keyLen == sizeof(uint32_t)); | |
expire_inventory_product(*(uint32_t *(i))); | |
} | |
} | |
minFetchSize = Max<size_t>(it.next.minFetchSize, defaultMinFetchSize); | |
seqNum = it.next.seqNum; | |
pendingResp = 0; | |
} | |
} | |
checkpoint(seqNum); | |
return 0; | |
} | |
134,1 All |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment