Skip to content

Instantly share code, notes, and snippets.

@alexanderdean
Last active August 29, 2015 13:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexanderdean/9588012 to your computer and use it in GitHub Desktop.
Save alexanderdean/9588012 to your computer and use it in GitHub Desktop.
/**
* Loader for Thrift SnowplowRawEvent objects which
* are inbound as a simple Byte Array.
*/
object ThriftByteArrayLoader extends CollectorLoader[Array[Byte]] {
private val thriftDeserializer = new TDeserializer
/**
* Converts the source string into a MaybeCanonicalInput.
*
* @param line A serialized Thrift object Byte array mapped to a String.
* The method calling this should encode the serialized object
* with `snowplowRawEventBytes.map(_.toChar)`.
* Reference: http://stackoverflow.com/questions/5250324/
* @return either a set of validation errors or an Option-boxed
* CanonicalInput object, wrapped in a Scalaz ValidatioNel.
*/
def toCanonicalInput(line: Array[Byte]): ValidatedMaybeCanonicalInput = {
var snowplowRawEvent = new SnowplowRawEvent()
try {
this.synchronized {
thriftDeserializer.deserialize(
snowplowRawEvent,
line
)
}
ThriftRawEventLoader.toCanonicalInput(snowplowRawEvent)
} catch {
// TODO: Check for deserialization errors.
case _: Throwable =>
"Record does not match Thrift SnowplowRawEvent schema".failNel[Option[CanonicalInput]]
}
}
}
/**
* Loader for Thrift SnowplowRawEvent objects which have
* already been unmarshalled.
*/
object ThriftRawEventLoader extends CollectorLoader[ThriftRawEvent] {
/**
* Converts the source string into a MaybeCanonicalInput.
*
* @param line A XXX
* @return either a set of validation errors or an Option-boxed
* CanonicalInput object, wrapped in a Scalaz ValidatioNel.
*/
def toCanonicalInput(line: ThriftRawEvent): ValidatedMaybeCanonicalInput = {
val payload = TrackerPayload.extractGetPayload(
Option(line.payload.data),
line.encoding
)
val ip = line.ipAddress.some // Required
val hostname = Option(line.hostname)
val userAgent = Option(line.userAgent)
val refererUri = Option(line.refererUri)
val networkUserId = Option(line.networkUserId)
val headers = Option(line.headers)
.map(_.toList).getOrElse(Nil)
(payload.toValidationNel) map { (p: NameValueNel) =>
Some(
CanonicalInput(
new DateTime(line.timestamp, DateTimeZone.UTC),
new NvGetPayload(p),
InputSource(line.collector, hostname),
line.encoding,
ip,
userAgent,
refererUri,
headers,
networkUserId
)
)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment