Skip to content

Instantly share code, notes, and snippets.

Last active Dec 15, 2015
What would you like to do?
A reactive port scanner written using Rx.Java (0.13 onwards) and Scala 2.10 Futures
import rx.subscriptions.Subscriptions
import rx.lang.scala.Observable
import scala.concurrent.{Future, future}
import scala.util.{Failure, Success}
// A reactive parallel port scanner written with Rx.Java
// @author: @gousiosg, @headinthebox
// Port scanning results
trait Result
case class Open(s: Socket) extends Result
case class Closed(port: Int) extends Result
object RxPortScan extends App {
import FutureExtensions._
val host = "myhost"
Observable(1 to 65536).flatMap(port =>
future {
try {
Open(new Socket(host, port))
} catch {
case e: Exception => Closed(port)
).subscribe(x =>
x match {
case y: Open =>
println("Port " + y.s.getPort + " is open")
case y: Closed =>
// Extend Future with the asObservable method
object FutureExtensions {
import rx.lang.scala.Observable
import rx.lang.scala.Observer
class ObservableFuture[T](f: Future[T]) {
def asObservable: Observable[T] = {
Observable((o: Observer[T]) => {
f.onComplete {
t => t match {
case Success(s) =>
case Failure(s) =>
o.onError(new Exception(s))
implicit def richFuture[T](f: Future[T]): ObservableFuture[T] = new ObservableFuture[T](f)

This comment has been minimized.

Copy link

@samuelgruetter samuelgruetter commented Sep 29, 2013

There's a problem with this code: The thread pool on which the futures are executed consists of deamon threads, which are killed once the main thread terminates, so this app terminates before all open ports are printed.

You can see my modifications from which I conclude the above here.


This comment has been minimized.

Copy link
Owner Author

@gousiosg gousiosg commented Oct 8, 2013

Well, I was running it from SBT, so I did not notice the problems you mention. Thanks for the update!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment