Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 14 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save koen-dejonghe/39c10357607c698c0b04 to your computer and use it in GitHub Desktop.
Save koen-dejonghe/39c10357607c698c0b04 to your computer and use it in GitHub Desktop.
Implementation of a connection pool for use with spark streaming. See http://stackoverflow.com/questions/30450763/spark-streaming-and-connection-pool-implementation
package net.atos.sparti.pub
import java.io.PrintStream
import java.net.Socket
import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool}
import org.apache.commons.pool2.{ObjectPool, PooledObject, BasePooledObjectFactory}
import org.apache.spark.streaming.dstream.DStream
class PooledSocketStreamPublisher[T](host: String, port: Int)
extends Serializable {
/**
* Publish the stream to a socket.
*/
def publish (stream: DStream[T], callback: (T) => String) =
stream foreachRDD ( rdd =>
rdd foreachPartition { partition =>
val pool = PrintStreamPool(host, port)
partition foreach { event =>
val s = pool.printStream
s println callback (event)
}
pool.release()
}
)
}
class ManagedPrintStream(private val pool: ObjectPool[PrintStream], val printStream: PrintStream) {
def release() = pool.returnObject(printStream)
}
object PrintStreamPool {
var hostPortPool: Map[(String, Int), ObjectPool[PrintStream]] = Map()
sys.addShutdownHook {
hostPortPool.values.foreach { pool => pool.close() }
}
// factory method
def apply(host: String, port: Int): ManagedPrintStream = {
val pool = hostPortPool.getOrElse((host, port), {
val p = new GenericObjectPool[PrintStream](new SocketStreamFactory(host, port))
hostPortPool += (host, port) -> p
p
})
new ManagedPrintStream(pool, pool.borrowObject())
}
}
class SocketStreamFactory(host: String, port: Int) extends BasePooledObjectFactory[PrintStream] {
override def create() = new PrintStream(new Socket(host, port).getOutputStream)
override def wrap(stream: PrintStream) = new DefaultPooledObject[PrintStream](stream)
override def validateObject(po: PooledObject[PrintStream]) = ! po.getObject.checkError()
override def destroyObject(po: PooledObject[PrintStream]) = po.getObject.close()
override def passivateObject(po: PooledObject[PrintStream]) = po.getObject.flush()
}
@asdf2014
Copy link

This way is not suit for Jedis?

@asdf2014
Copy link

16/06/24 13:40:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/24 13:42:17 ERROR Executor: Exception in task 16.0 in stage 1.0 (TID 17)
redis.clients.jedis.exceptions.JedisException: Could not return the resource to the pool
at redis.clients.util.Pool.returnResourceObject(Pool.java:65)
at redis.clients.jedis.JedisPool.returnResource(JedisPool.java:113)
at util.RedisDB$.returnRedis(RedisDB.scala:58)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
Caused by: java.lang.IllegalStateException: Returned object not currently part of this pool
at org.apache.commons.pool2.impl.GenericObjectPool.returnObject(GenericObjectPool.java:537)
at redis.clients.util.Pool.returnResourceObject(Pool.java:63)
... 18 more

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment