Skip to content

Instantly share code, notes, and snippets.

@gousiosg
Last active December 15, 2015 12:59
Show Gist options
  • Save gousiosg/5264201 to your computer and use it in GitHub Desktop.
Save gousiosg/5264201 to your computer and use it in GitHub Desktop.
A reactive port scanner written using Rx.Java (0.13 onwards) and Scala 2.10 Futures
import java.net.Socket
import rx.subscriptions.Subscriptions
import rx.lang.scala.Observable
import scala.concurrent.{Future, future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
// A reactive parallel port scanner written with Rx.Java
// @author: @gousiosg, @headinthebox
// Port scanning results
trait Result
case class Open(s: Socket) extends Result
case class Closed(port: Int) extends Result
object RxPortScan extends App {
import FutureExtensions._
val host = "myhost"
Observable(1 to 65536).flatMap(port =>
future {
try {
Open(new Socket(host, port))
} catch {
case e: Exception => Closed(port)
}
}.asObservable
).subscribe(x =>
x match {
case y: Open =>
println("Port " + y.s.getPort + " is open")
y.s.close
case y: Closed =>
})
}
// Extend Future with the asObservable method
object FutureExtensions {
import rx.lang.scala.Observable
import rx.lang.scala.Observer
class ObservableFuture[T](f: Future[T]) {
def asObservable: Observable[T] = {
Observable((o: Observer[T]) => {
f.onComplete {
t => t match {
case Success(s) =>
o.onNext(s)
o.onCompleted
case Failure(s) =>
o.onError(new Exception(s))
}
}
Subscriptions.empty
})
}
}
implicit def richFuture[T](f: Future[T]): ObservableFuture[T] = new ObservableFuture[T](f)
}
@samuelgruetter
Copy link

There's a problem with this code: The thread pool on which the futures are executed consists of deamon threads, which are killed once the main thread terminates, so this app terminates before all open ports are printed.

You can see my modifications from which I conclude the above here.

@gousiosg
Copy link
Author

gousiosg commented Oct 8, 2013

Well, I was running it from SBT, so I did not notice the problems you mention. Thanks for the update!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment