Skip to content

Instantly share code, notes, and snippets.

@johntbush
Last active January 15, 2016 05:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save johntbush/e1b65497087d92ade9e1 to your computer and use it in GitHub Desktop.
Save johntbush/e1b65497087d92ade9e1 to your computer and use it in GitHub Desktop.
slow query ES stuff
val slowQueryESclient = new Client("http://localhost:80")
var queue = new LinkedBlockingQueue[Map[String, Any]]()
def getExtraProps(tags: Seq[String]): Map[String, Any] = {
var extraProps = Map[String, Any]()
extraProps = extraProps + ("app" -> appName)
if (tags.size > 0)
extraProps = extraProps + ("server" -> tags(0))
if (tags.size > 1)
extraProps = extraProps + ("database" -> tags(1))
if (tags.size > 2)
extraProps = extraProps + ("url" -> tags(2))
if (tags.size > 3)
extraProps = extraProps + ("user" -> tags(3))
if (tags.size > 4)
extraProps = extraProps + ("driver" -> tags(4))
extraProps
}
GlobalSettings.taggedQueryCompletionListener = (sql: String, params: Seq[Any], millis: Long, tags: Seq[String]) => {
if (millis > warningThresholdMillis) {
val data = Map(
"stmt" -> sql,
"message" -> "completion",
"timestamp" -> DateTimeUtils.currentIsoString,
"params" -> params.mkString("[", ",", "]"),
"millis" -> millis)
slowQueryLog(data ++ getExtraProps(tags))
}
}
GlobalSettings.taggedQueryFailureListener = (sql: String, params: Seq[Any], e: Throwable, tags: Seq[String]) => {
val data = Map(
"stmt" -> sql,
"message" -> "failure",
"timestamp" -> DateTimeUtils.currentIsoString,
"params" -> params.mkString("[", ",", "]"),
"error" -> e.getMessage)
slowQueryLog(data ++ getExtraProps(tags))
}
def flushSlowQueryLog = {
Future {
implicit val formats = org.json4s.DefaultFormats
if (queue.size > 0) {
val dtf = DateTimeFormat.forPattern("yyyy-MM")
val indexName = "platform-logs-" + dtf.print(DateTimeUtils.nowUtc)
val reqs = new util.ArrayList[Map[String, Any]]()
queue.drainTo(reqs)
val payload = new StringBuffer()
reqs.foreach { data =>
payload.append( s"""{ "index" : { "_index":"$indexName", "_type":"$SLOW_QUERY_DOCTYPE"}}""" + "\n")
payload.append(Serialization.write(data) + "\n")
}
payload.append("\n")
logger.debug("flushing slow query log to elasticsearch:\n" + payload.toString)
slowQueryESclient.bulk(Option(indexName), Option(SLOW_QUERY_DOCTYPE), payload.toString).onComplete {
case Failure(e) => logger.error(e.getMessage)
case Success(r) => logger.debug("ES response code:" + r.getStatusCode.toString)
}
}
}
}
def slowQueryLog(data: Map[String, Any]) = {
queue.put(data)
if (queue.size() >= SLOW_QUERY_BATCH_SIZE) flushSlowQueryLog
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment