Skip to content

Instantly share code, notes, and snippets.

@nicholasren
Created October 13, 2014 03:18
Show Gist options
  • Save nicholasren/0c01a4e79b1658704d2f to your computer and use it in GitHub Desktop.
Save nicholasren/0c01a4e79b1658704d2f to your computer and use it in GitHub Desktop.
a small example of real time indexing and searching engine
package actors
import akka.actor._
import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
import java.io.{PrintWriter, FileOutputStream}
import rx.lang.scala.Observable
import rx.lang.scala.Observable._
sealed trait IndexAction
case class Index(id: String)
case class ReIndex(id: String)
case class UnIndex(id: String)
class Indexer extends Actor {
override def receive: Actor.Receive = {
case UnIndex(id) => IndexService.unindex(id)
case ReIndex(id) => IndexService.reindex(id)
case Index(id) => IndexService.index(id)
}
}
object IndexService {
def unindex(id: String) = println(s"Unindexing $id")
def reindex(id: String) = println(s"Reindexing $id")
def index(id: String) = println(s"Indexing $id")
}
case class ListingAction(id: String, title: String, action: String)
class ListingExtractor extends Actor {
val indexer = context.system.actorOf(Props[Indexer])
override def receive: Actor.Receive = {
case ids: List[String] => {
ids.
map(Service.fetchDetail(_)).
map(
_.map {
detail =>
{
detail.action match {
case "create" => indexer ! Index(detail.id)
case "delete" => indexer ! UnIndex(detail.id)
case "update" => indexer ! ReIndex(detail.id)
}
Service.persist(detail)
}
}
)
}
}
}
object Service {
val rnd = new scala.util.Random
val output = new PrintWriter(new FileOutputStream("/tmp/fetched-listings.txt"), true)
val actions = List("delete", "update", "create")
def randomDelay[A](f: => A) = {
val i = rnd.nextInt(10)
Thread.sleep(i * 100)
f
}
def fetchDetail(id: String): Future[ListingAction] = randomDelay {
Future {
val i = rnd.nextInt(3)
new ListingAction(id, s"listing-$id", actions(i))
}
}
def persist(detail: ListingAction) = randomDelay {
output.println(detail.toString)
}
def pollChanges(i: Long): Future[List[String]] = randomDelay {
Future {
val d = rnd.nextInt(10)
List(s"1000$d$i", s"10000$d$i")
}
}
}
object Main extends App {
val system = ActorSystem("ActorSys")
val extractor = system.actorOf(Props[ListingExtractor], "extractor")
val changes: Observable[List[String]] = interval(1 seconds).map {
i => Service.pollChanges(i)
}.flatMap(from(_))
changes.subscribe(ids => {
extractor ! ids
})
}
//boot from microkernel
//class Boot extends Bootable {
//
// val system = ActorSystem("ActorSys")
// val extractor = system.actorOf(Props[ListingExtractor], "extractor")
//
// def startup = {
//
// val changes: Observable[List[String]] = interval(1 seconds).map {
// i => Service.pollChanges(i)
// }.flatMap(from(_))
//
//
// changes.subscribe(ids => {
// extractor ! ids
// })
// }
//
// def shutdown = {
// system.shutdown()
// }
//}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment