Skip to content

Instantly share code, notes, and snippets.

@WonderBeat
Created November 13, 2015 08:25
Show Gist options
  • Save WonderBeat/c7b2e25cfa901e857598 to your computer and use it in GitHub Desktop.
Save WonderBeat/c7b2e25cfa901e857598 to your computer and use it in GitHub Desktop.
val flowEventBus = EventBus.create(new RingBufferDispatcher("flow-bus", 64))
val inIOEventBus = EventBus.create(new ThreadPoolExecutorDispatcher(15, 32, "io-input-bus"))
val outIOEventBus = EventBus.create(new ThreadPoolExecutorDispatcher(15, 32, "io-output-bus"))
inIOEventBus.on(Selectors.`type`(ReadKafka.getClass), new Consumer[Event[Ticket]] {
override def accept(event: Event[Ticket]): Unit = {
val ticket = event.getData
val writeTask = new WriteTask(ticket.reader.read(ticket.offset, ticket.batchSize), ticket.writer)
flowEventBus.notify(WriteKafkaRequest, Event.wrap(writeTask))
}
})
outIOEventBus.on(Selectors.`type`(WriteKafka.getClass), new Consumer[Event[WriteTask]] {
override def accept(event: Event[WriteTask]): Unit = {
val task = event.getData
task.writer.write(task.data)
}
})
flowEventBus.on(Selectors.`type`(WriteKafkaRequest.getClass), new Consumer[Event[WriteTask]] {
override def accept(event: Event[WriteTask]): Unit = {
outIOEventBus.notify(WriteKafka, event)
}
})
val iterator = new TicketsIterator(fromTopic, toTopic, initialOffsets, sourceTopic.partitions.get, BatchSize).iterator()
while(iterator.hasNext) {
inIOEventBus.notify(ReadKafka, Event.wrap(iterator.next()))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment