Skip to content

Instantly share code, notes, and snippets.

@ArtemGr
Created August 28, 2016 00:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ArtemGr/179fb669fa1b065fd2c11a6fd474da8a to your computer and use it in GitHub Desktop.
Save ArtemGr/179fb669fa1b065fd2c11a6fd474da8a to your computer and use it in GitHub Desktop.
// Asynchronous PostgreSQL INSERT.
glim::NsecTimer timer;
std::unique_ptr<PGconn, void(*)(PGconn*)> pg (PQconnectStart (pcs.c_str()), PQfinish);
if (pg.get() == nullptr || PQstatus (pg.get()) == CONNECTION_BAD) GTHROW ("!PQconnectStart");
int sock = PQsocket (pg.get());
auto evBase = EvServ::instance()->evbase();
event_callback_fn cbcoroInvokeFromCallback = [](evutil_socket_t, short, void* cbcoro) {((CBCoro*) cbcoro)->invokeFromCallback();};
std::unique_ptr<struct event, void(*)(struct event*)> evRead (event_new (evBase.get(), sock, EV_READ, cbcoroInvokeFromCallback, cbcoro), event_free);
std::unique_ptr<struct event, void(*)(struct event*)> evWrite (event_new (evBase.get(), sock, EV_WRITE, cbcoroInvokeFromCallback, cbcoro), event_free);
auto waitFor = [&](int sock, short events) {
cbcoro->yieldForCallback ([&]() {event_add (events == EV_READ ? evRead.get() : evWrite.get(), nullptr);});
};
for (;;) {
auto pollStatus = PQconnectPoll (pg.get()); if (pollStatus == PGRES_POLLING_FAILED) GTHROW ("!PQconnectPoll");
if (pollStatus == PGRES_POLLING_OK) break;
waitFor (sock, pollStatus == PGRES_POLLING_READING ? EV_READ : EV_WRITE);
}
PQsetnonblocking (pg.get(), 1);
log_info ("logWindowOnerror: Connected to PostgreSQL! (In " << timer.seconds (3) << " seconds).");
timer.restart();
GSTRING_ON_STACK (sql, 256) << "BEGIN;";
sql << "DELETE FROM window_onerror_log WHERE time < now() - interval '30 days';";
sql << "INSERT INTO window_onerror_log (error) VALUES (" << PQescapeLiteral (pg.get(), post.data(), post.size()) << ");";
sql << "COMMIT";
int rc = PQsendQuery (pg.get(), sql.c_str());
if (!rc) GTHROW (PQerrorMessage (pg.get()));
auto nextResult = [&]()->std::unique_ptr<PGresult, void(*)(PGresult*)> {
while (PQisBusy (pg.get()) == 1) {
waitFor (sock, EV_READ);
if (!PQconsumeInput (pg.get())) GTHROW (PQerrorMessage (pg.get()));
while (PQflush (pg.get()) == 1) waitFor (sock, EV_WRITE);
}
return std::unique_ptr<PGresult, void(*)(PGresult*)> (PQgetResult (pg.get()), PQclear);
};
while (auto pr = nextResult()) {
ExecStatusType status = PQresultStatus (pr.get());
if (status == PGRES_COMMAND_OK) log_info ("logWindowOnerror: PGRES_COMMAND_OK");
else if (status == PGRES_TUPLES_OK) {
int rows = PQntuples (pr.get()), columns = PQnfields (pr.get());
GSTRING_ON_STACK (got, 64); for (int row = 0; row < rows; ++row) for (int col = 0; col < columns; ++col) got << "; " << PQgetvalue (pr.get(), row, col);
log_info ("logWindowOnerror: PGRES_TUPLES_OK" << got);
} else log_info ("logWindowOnerror: PGresult: " << PQresStatus (status) << "; " << PQresultErrorMessage (pr.get()));
}
log_info ("logWindowOnerror: INSERT in " << timer.seconds (3) << " seconds.");
evhtp_send_reply (req, HTTP_OK); evhtp_request_resume (req);
@Globik
Copy link

Globik commented Jul 13, 2018

What's this? Where is a coroutine from?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment