Skip to content

Instantly share code, notes, and snippets.

@johntbush
Last active April 9, 2016 16:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save johntbush/25986e3cb0303d8e9da6527d4a0ea19e to your computer and use it in GitHub Desktop.
Save johntbush/25986e3cb0303d8e9da6527d4a0ea19e to your computer and use it in GitHub Desktop.
case class Completion(tuple:Tuple, exOpt: Option[Throwable] = None)
class SomeBolt extends BaseRichBolt {
var completions = new LinkedBlockingQueue[Completion]()
def flushCompletions = {
if (completions.size() > 0) {
val completionArr = new util.ArrayList[Completion]()
completions.drainTo(completionArr)
logger.debug("completing " + completionArr.size() + " tuples")
completionArr.foreach { completion =>
if (completion.exOpt.isEmpty) {
collector.ack(completion.tuple)
} else {
collector.reportError(completion.exOpt.get)
collector.fail(completion.tuple)
}
}
}
}
override def execute(tuple: Tuple): Unit = {
flushCompletions
...
}
...
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment