Skip to content

Instantly share code, notes, and snippets.

@markpapadakis
Created July 6, 2016 19:35
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save markpapadakis/d1745c661e031ce680d53d1a7dbac410 to your computer and use it in GitHub Desktop.
Save markpapadakis/d1745c661e031ce680d53d1a7dbac410 to your computer and use it in GitHub Desktop.
#include <memcachedclient.h>
#include <switch.h>
#include <tank_client.h>
#include <signal.h>
static bool running{true};
static void sig_handler(int sig)
{
running = false;
}
static void checkpoint(const uint64_t seqNum)
{
int fd = open("cache_observer.checkpoint", O_WRONLY|O_CREAT|O_LARGEFILE|O_TRUNC, 0775);
if (fd != -1)
{
pwrite64(fd, &seqNum, sizeof(seqNum), 0);
close(fd);
}
}
int main()
{
TankClient tankClient;
uint64_t seqNum{0};
static constexpr size_t defaultMinFetchSize{820 * 1024};
size_t minFetchSize{defaultMinFetchSize};
const TankClient::topic_partition topicPartition("cds_updates", 1);
uint32_t pendingResp{0};
MemcachedClient memcachedClient;
strwlen8_t ks, cf;
time32_t nextCheckpoint{0};
tankClient.set_default_leader("10.5.5.200:11011");
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
signal(SIGINT, sig_handler);
{
int fd = open("cache_observer.checkpoint", O_RDONLY);
if (fd != -1)
{
if (pread64(fd, &seqNum, sizeof(seqNum), 0) == sizeof(seqNum))
Print("Resuiming from ", seqNum, "\n");
close(fd);
}
else if (errno != ENOENT)
{
Print("Failed to access checkpoint file:", strerror(errno), "\n");
return 1;
}
}
while (likely(running))
{
if (!pendingResp)
{
pendingResp = tankClient.consume(
{{topicPartition, {seqNum, minFetchSize}}}, 8e3, 0);
if (!pendingResp)
{
Timings::Milliseconds::Sleep(800);
continue;
}
}
try
{
tankClient.poll(800);
}
catch (...)
{
continue;
}
const time32_t now = time(nullptr);
if (now > nextCheckpoint)
{
checkpoint(seqNum);
nextCheckpoint = now + 8;
}
for (const auto &it : tankClient.faults())
{
if (it.clientReqId == pendingResp)
pendingResp = 0;
}
for (const auto &it : tankClient.consumed())
{
std::sort(it.msgs.offset, it.msgs.offset + it.msgs.len, [](const auto &a, const auto &b) {
return a.key.Cmp(b.key) < 0;
});
for (const auto *p = it.msgs.offset, *const e = p + it.msgs.len; p != e;)
{
const auto k = p->key;
for (++p; p != e && p->key == k; ++p)
continue;
const auto *i = (uint8_t *)k.p;
keyspace.Set((char *)i + 1, *i);
i += keyspace.len + sizeof(uint8_t);
columnFamily.Set((char *)i + 1, *i);
i += columnFamily.len + sizeof(uint8_t);
const auto keyLen = *i++;
if (keyspace == "inventory" && columnFamily == "products")
{
require(keyLen == sizeof(uint32_t));
expire_inventory_product(*(uint32_t *(i)));
}
}
minFetchSize = Max<size_t>(it.next.minFetchSize, defaultMinFetchSize);
seqNum = it.next.seqNum;
pendingResp = 0;
}
}
checkpoint(seqNum);
return 0;
}
134,1 All
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment