Skip to content

Instantly share code, notes, and snippets.

@gousiosg
Last active December 15, 2015 12:59
Show Gist options
  • Save gousiosg/5264201 to your computer and use it in GitHub Desktop.
Save gousiosg/5264201 to your computer and use it in GitHub Desktop.
A reactive port scanner written using Rx.Java (0.13 onwards) and Scala 2.10 Futures
import java.net.Socket
import rx.subscriptions.Subscriptions
import rx.lang.scala.Observable
import scala.concurrent.{Future, future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
// A reactive parallel port scanner written with Rx.Java
// @author: @gousiosg, @headinthebox
// Port scanning results
trait Result
case class Open(s: Socket) extends Result
case class Closed(port: Int) extends Result
object RxPortScan extends App {
import FutureExtensions._
val host = "myhost"
Observable(1 to 65536).flatMap(port =>
future {
try {
Open(new Socket(host, port))
} catch {
case e: Exception => Closed(port)
}
}.asObservable
).subscribe(x =>
x match {
case y: Open =>
println("Port " + y.s.getPort + " is open")
y.s.close
case y: Closed =>
})
}
// Extend Future with the asObservable method
object FutureExtensions {
import rx.lang.scala.Observable
import rx.lang.scala.Observer
class ObservableFuture[T](f: Future[T]) {
def asObservable: Observable[T] = {
Observable((o: Observer[T]) => {
f.onComplete {
t => t match {
case Success(s) =>
o.onNext(s)
o.onCompleted
case Failure(s) =>
o.onError(new Exception(s))
}
}
Subscriptions.empty
})
}
}
implicit def richFuture[T](f: Future[T]): ObservableFuture[T] = new ObservableFuture[T](f)
}
@gousiosg
Copy link
Author

gousiosg commented Oct 8, 2013

Well, I was running it from SBT, so I did not notice the problems you mention. Thanks for the update!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment