Skip to content

Instantly share code, notes, and snippets.

@ujjwal
Created June 1, 2013 07:56
Show Gist options
  • Save ujjwal/5689625 to your computer and use it in GitHub Desktop.
Save ujjwal/5689625 to your computer and use it in GitHub Desktop.
sugar
private[this] def send() {
if (preCommitBuffer.size > maxQueueSize || System.currentTimeMillis - lastSendTime > maxHoldTime) {
lastSendTime = System.currentTimeMillis
val batches = preCommitBuffer groupBy (_.sendHour)
batches foreach {
case (hour, events) => {
try {
val inputStream = new ByteArrayInputStream(
(events map (JSON.toJson(_)) mkString (System.getProperty("line.separator"))).getBytes("UTF-8"))
info("Sending batch of %d messages, de-queued from %s, for hour boundary %d".format(events.size, queueName, hour))
val filename = cloudStorageClient.uploadObject(eventType, new Date(hour), inputStream, accountId)
info("Successfully uploaded with filename %s".format(filename))
} catch {
case e: Exception => e.printStackTrace(); error("SEVERE: Error uploading to GCS", e)
//handled failed items - add back into the queue
}
}
}
preCommitBuffer.clear()
session.commit
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment