Skip to content

Instantly share code, notes, and snippets.

@hadoopsters
Created April 22, 2019 16:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save hadoopsters/3fd7c585d3e320ab0daaede801ee9a7b to your computer and use it in GitHub Desktop.
Save hadoopsters/3fd7c585d3e320ab0daaede801ee9a7b to your computer and use it in GitHub Desktop.
package tv.spotx.scala.dbutils
import org.apache.logging.log4j.scala.Logging
import scalaj.http.{Http, HttpOptions, HttpResponse}
case class InfluxConfig(hostname: String = "console",
port: Int = 8086, // scalastyle:off magic.number
database: String = "devtest",
ssl: Boolean = false,
username: Option[String] = None,
password: Option[String] = None)
/**
* Influx precision enumeration.
* @see https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
*/
object InfluxPrecision extends Enumeration {
type InfluxPrecision = Value
val ns = Value("ns")
val u = Value("u")
val ms = Value("ms")
val s = Value("s")
val m = Value("m")
val h = Value("h")
}
/** Implement a generic InfluxDB Writer */
abstract class InfluxDBWriter extends Serializable with Logging {
/**
* Optional precision; will default to seconds.
*/
protected var _precision: InfluxPrecision.Value = InfluxPrecision.s
/**
* Set precision.
* @see https://docs.influxdata.com/influxdb/v1.7/tools/api/#write-http-endpoint
* @param newValue Preferred precision.
*/
def precision_= (newValue: InfluxPrecision.Value): Unit = _precision = newValue // scalastyle:off
/**
* Return precision.
* @return InfluxPrecision
*/
def precision: InfluxPrecision.Value = _precision
/**
* Invoke a write on influx.
* @see https://docs.influxdata.com/influxdb/v1.7/introduction/getting-started/
* @param database The database containing the measurement to write to.
* @param measurement The measurement to write to.
* @param tags A sequence of an arbitrary number of tags in key/value tuples.
* @param fields A sequence of an arbitrary number of fields in key/value tuples.
* @return HttpResponse from api write.
*/
def write(database: String,
measurement: String,
tags: Seq[(String, Any)],
fields: Seq[(String, Any)]): HttpResponse[String]
/**
* Sanitises data string for use as a tag or field value.
* @param data Arbitrary data string.
* @return Sanitised data.
*/
def prepareInfluxData(data: String): String = data.replaceAll("\n", "")
.replaceAll(",", raw"\\,")
.replaceAll("=", raw"\\=")
.replaceAll(" ", raw"\\ ")
.replaceAll("\"", "'")
/**
* Generate a clause containing tags and fields for influx write api.
* @param tags A sequence of an arbitrary number of tags in key/value tuples.
* @param fields A sequence of an arbitrary number of fields in key/value tuples.
* @return Stringbuilder containing clause.
*/
protected def generateTagFieldClause(tags: Seq[(String, Any)], fields: Seq[(String, Any)]): String = {
// First the tags.
val buff = new StringBuilder()
buff.append(tags.map(x => s"${x._1}=${x._2}").mkString(","))
buff.append(" ")
// Then the fields.
// Strings need to be wrapped in quotes.
val formattedFields = fields.map(x => x._2 match {
case _: String =>
val fieldBuf = new StringBuilder(x._1)
fieldBuf.append("=")
fieldBuf.append("\"")
fieldBuf.append(x._2.toString)
fieldBuf.append("\"")
fieldBuf.toString()
case _ =>
s"${x._1}=${x._2}"
})
buff.append(formattedFields.mkString(",")).toString()
}
}
/** Implements an InfluxDBWriter that writes one line to the API */
class InfluxHTTPWriter(host: String,
port: Int = 8086,
ssl: Boolean = false,
username: String = "",
password: String = "") extends InfluxDBWriter {
/**
* Validates the combination of ssl related fields passed through the constructor.
* An alternative option would be to validate this in a constructor and throw an exception
* in the event of an invalid combination of fields.
* @return Boolean indicating we have a useful set of credentials.
*/
protected def hasValidCredentials: Boolean = (ssl && username.nonEmpty && password.nonEmpty) || !ssl
override def write(database: String,
measurement: String,
tags: Seq[(String, Any)],
fields: Seq[(String, Any)]): HttpResponse[String] = if (hasValidCredentials) {
val scheme = if (ssl) {
"https"
} else {
"http"
}
val endpointUrl = s"$scheme://$host:$port/write?db=$database&precision=${_precision}"
val http = Http(endpointUrl)
if (ssl) {
http.options(HttpOptions.allowUnsafeSSL).auth(username, password)
}
val buffPost = new StringBuilder(measurement)
if (tags.nonEmpty || fields.nonEmpty) {
buffPost.append(",")
buffPost.append(generateTagFieldClause(tags, fields))
}
http.postData(buffPost.toString()).asString
} else {
logger.error("Username and/or password is empty! Provide official credentials.")
HttpResponse[String]("", 403, Map())
}
}
/** Implements an InfluxDBWriter that writes the influx write to the console */
class InfluxConsoleWriter(output: Int) extends InfluxDBWriter {
override def write(database: String,
measurement: String,
tags: Seq[(String, Any)],
fields: Seq[(String, Any)]
): HttpResponse[String] = {
output match {
case 1 => System.out
case _ => System.err
}
logger.info(s"influx-write:///write?db=$database&precision=${_precision}/$measurement," +
generateTagFieldClause(tags, fields).toString()
)
HttpResponse[String]("", 204, Map())
}
}
/** Called to create an instance of the InfluxDBWriter from the possible classes */
object InfluxDBWriter {
def create(host: String,
port: Int = 8086,
ssl: Boolean = false,
username: String = "",
password: String = ""): InfluxDBWriter = if (host == "console") {
new InfluxConsoleWriter(port)
} else {
new InfluxHTTPWriter(host, port, ssl, username, password)
}
}
/** Implement a generic InfluxDB Bulk Writer */
abstract class InfluxDBBulkWriter {
def write(database: String, value: String)
}
/** Implement an InfluxDBBulkWriter and post numerous rows in one api call */
class InfluxHTTPBulkWriter(host: String, port: Int = 8086) extends InfluxDBBulkWriter {
override def write(database: String, value: String): Unit =
Http(s"http://$host:$port/write?db=$database").postData(
value
).asString
}
/** Implement an InfluxDBBulkWRiter that writes what would be posted in bulk
* to the console. */
class InfluxConsoleBulkWriter(output: Int) extends InfluxDBBulkWriter with Logging {
override def write(database: String, value: String): Unit = {
output match {
case 1 => System.out
case _ => System.err
}
logger.info(s"Would post to Influx db $database, with data \n $value")
}
}
/** Called to create an instance of the InfluxDBBulkWriter from the possible classes */
object InfluxDBBulkWriter {
def create(host: String, port: Int = 8086): InfluxDBBulkWriter = if (host == "console") {
new InfluxConsoleBulkWriter(port)
} else {
new InfluxHTTPBulkWriter(host, port)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment