Skip to content

Instantly share code, notes, and snippets.

@dat-vikash
Created October 9, 2015 15:45
Show Gist options
  • Save dat-vikash/ae93b96466b110f5a8fe to your computer and use it in GitHub Desktop.
Save dat-vikash/ae93b96466b110f5a8fe to your computer and use it in GitHub Desktop.
geoip sink example
trait EventStoreServiceWithLogstashSink extends EventStoreService
{
this: EventStore =>
// set config
val config = EventStoreCollectorConfig(logstashEndpoint = Some(Application.CONFIGS.get("logstash-endpoint").get.convertTo[String]))
case class MyCustomEvent(fact: Any,
timestamp: Option[Long] = None,
apiVersion: Option[String] = None,
originHost: Option[String]= None,
originService: Option[String] = None,
httpData: Option[HttpHeaderData],
nonce: Option[Long] = None
) extends EventBase {
override def toJson : String =
s"""
{"resourceURI" : ${stringLiteralForJson(resourceURI)},
"apiVersion": ${stringLiteralForJson(apiVersion)},
"checksum": ${stringLiteralForJson(checksum)},
"originHost": ${stringLiteralForJson(originHost)},
"originService": ${stringLiteralForJson(originService)},
"userAgent": ${httpData.map(d => stringLiteralForJson(d.userAgent)).orNull},
"acceptCharset": ${httpData.map(d => stringLiteralForJson(d.acceptCharset)).orNull},
"acceptDateTime": ${httpData.map(d => stringLiteralForJson(d.acceptDatetime)).orNull},
"authorization": ${httpData.map(d => stringLiteralForJson(d.authorization)).orNull},
"contentLength": ${httpData.map(d => d.contentLength).orNull},
"contentType": ${httpData.map(d => stringLiteralForJson(d.contentType)).orNull},
"server": ${httpData.map(d => stringLiteralForJson(d.server)).orNull},
"date": ${httpData.map(d => stringLiteralForJson(d.date)).orNull},
"nonce": ${nonce.orNull},
"fact": $fact
} """.stripLineEnd.stripMargin
}
object ConcreteSink extends LogstashSink(Some(config), Application.system )
implicit def geoipRequestToEvent(event: HttpRequest, fact: Any) : Option[MyCustomEvent]=
{
import spray.http.HttpHeaders._
// create our event type
val userAgent = event.headers.find(header => header.name.equals(`User-Agent`.name)) match
{
case Some(v) => v.value
case None => ""
}
val acceptCharset = event.headers.filter(header => header.name.equals(`Accept-Charset`.name)).headOption match
{
case Some(v) => v.value
case None => ""
}
val acceptDatetime = event.headers.filter(header => header.name.equals(`Date`.name)).headOption match
{
case Some(v) => v.value
case None => ""
}
val authorization = event.headers.filter(header => header.name.equals(`Authorization`.name)).headOption match
{
case Some(v) => v.value
case None => ""
}
val contentLength = event.headers.filter(header => header.name.equals(`Content-Length`.name)).headOption match
{
case Some(v) => v.value.toInt
case None => 0
}
val contentType = event.headers.filter(header => header.name.equals(`Content-Type`.name)).headOption match
{
case Some(v) => v.value
case None => ""
}
val host = event.headers.filter(header => header.name.equals(`Host`.name)).headOption match
{
case Some(v) => v.value
case None => ""
}
val remoteAddress = event.headers.filter(header => header.name.equals(`Remote-Address`.name)).headOption match
{
case Some(v) => v.value
case None => ""
}
val accept = event.headers.filter(header => header.name.equals(`Accept`.name)).headOption match
{
case Some(v) => v.value
case None => ""
}
val requestUri = event.uri.toString()
val httpHeaderData = HttpHeaderData(userAgent = Some(userAgent),
acceptCharset = Some(acceptCharset),
acceptDatetime = Some(acceptDatetime),
authorization = Some(authorization),
contentLength = Some(contentLength),
contentType = Some(contentType),
host = Some(host),
requestUri = Some(requestUri),
accept = Some(accept),
remoteAddress = Some(remoteAddress))
Some(MyCustomEvent(httpData = Some(httpHeaderData), fact = fact))
}
def putItem(jsonFactToInsert: spray.json.JsValue, ctx: RequestContext): Unit =
{
geoipRequestToEvent(ctx.request, jsonFactToInsert.toString()) match {
case Some(event) => ConcreteSink.persist(event).onComplete {
case Success(event) =>
ctx.complete(StatusCodes.Created, "Sent to logstash")
eventStoreServiceLog.info("Link info data saved to Logstash")
case Failure(error) =>
eventStoreServiceLog.error("Error occurred with EventStore",error)
ctx.complete(StatusCodes.InternalServerError, error.getMessage)
}
case None =>
eventStoreServiceLog.info("Unable to marshall to Event")
ctx.complete(StatusCodes.InternalServerError, "Unable to marshall to Event")
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment