public
Last active

Simple disruptor example in Scala

  • Download Gist
disruptor.scala
Scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
import com.lmax.disruptor.dsl.Disruptor
import java.util.concurrent.Executors
import com.lmax.disruptor._
 
case class ValueEvent(var value: Long)
 
case class ValueEventTranslator(value: Long) extends EventTranslator[ValueEvent] {
def translateTo(event: ValueEvent, sequence: Long) = {
event.value = value
event
}
}
 
class ValueEventHandler extends EventHandler[ValueEvent] {
def onEvent(event: ValueEvent, sequence: Long, endOfBatch: Boolean) {
if (event.value % 10000 == 0)
println(event.value)
}
}
 
object Main {
 
def main(args: Array[String]) {
val ring_size = 1024 * 8
val executor = Executors.newFixedThreadPool(10)
 
val factory = new EventFactory[ValueEvent] {
def newInstance() = ValueEvent(0L)
}
 
val handler1 = new ValueEventHandler
val handler2 = new ValueEventHandler
val handler3 = new ValueEventHandler
 
val disruptor = new Disruptor[ValueEvent](factory, executor, new SingleThreadedClaimStrategy(ring_size),
new SleepingWaitStrategy())
 
disruptor.handleEventsWith(handler1, handler2).then(handler3)
 
disruptor.start()
 
//Publishing
for (i <- 1 to 1000000) {
disruptor.publishEvent(ValueEventTranslator(i))
}
 
Thread.sleep(2000)
 
disruptor.shutdown()
executor.shutdown()
 
}
 
 
}

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.