Skip to content

Instantly share code, notes, and snippets.

@mmizutani
Created March 7, 2014 04:24
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mmizutani/9405222 to your computer and use it in GitHub Desktop.
Save mmizutani/9405222 to your computer and use it in GitHub Desktop.
import com.lmax.disruptor.dsl.Disruptor
import java.util.concurrent.Executors
import com.lmax.disruptor._
case class ValueEvent(var value: Long)
case class ValueEventTranslator(value: Long) extends EventTranslator[ValueEvent] {
def translateTo(event: ValueEvent, sequence: Long) = {
event.value = value
event
}
}
class ValueEventHandler extends EventHandler[ValueEvent] {
def onEvent(event: ValueEvent, sequence: Long, endOfBatch: Boolean) {
if (event.value % 10000 == 0)
println(event.value)
}
}
object Main {
def main(args: Array[String]) {
val ring_size = 1024 * 8
val executor = Executors.newFixedThreadPool(10)
val factory = new EventFactory[ValueEvent] {
def newInstance() = ValueEvent(0L)
}
val handler1 = new ValueEventHandler
val handler2 = new ValueEventHandler
val handler3 = new ValueEventHandler
val disruptor = new Disruptor[ValueEvent](factory, executor, new SingleThreadedClaimStrategy(ring_size),
new SleepingWaitStrategy())
disruptor.handleEventsWith(handler1, handler2).then(handler3)
disruptor.start()
//Publishing
for (i <- 1 to 1000000) {
disruptor.publishEvent(ValueEventTranslator(i))
}
Thread.sleep(2000)
disruptor.shutdown()
executor.shutdown()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment