-
-
Save koen-dejonghe/39c10357607c698c0b04 to your computer and use it in GitHub Desktop.
package net.atos.sparti.pub | |
import java.io.PrintStream | |
import java.net.Socket | |
import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool} | |
import org.apache.commons.pool2.{ObjectPool, PooledObject, BasePooledObjectFactory} | |
import org.apache.spark.streaming.dstream.DStream | |
class PooledSocketStreamPublisher[T](host: String, port: Int) | |
extends Serializable { | |
/** | |
* Publish the stream to a socket. | |
*/ | |
def publish (stream: DStream[T], callback: (T) => String) = | |
stream foreachRDD ( rdd => | |
rdd foreachPartition { partition => | |
val pool = PrintStreamPool(host, port) | |
partition foreach { event => | |
val s = pool.printStream | |
s println callback (event) | |
} | |
pool.release() | |
} | |
) | |
} | |
class ManagedPrintStream(private val pool: ObjectPool[PrintStream], val printStream: PrintStream) { | |
def release() = pool.returnObject(printStream) | |
} | |
object PrintStreamPool { | |
var hostPortPool: Map[(String, Int), ObjectPool[PrintStream]] = Map() | |
sys.addShutdownHook { | |
hostPortPool.values.foreach { pool => pool.close() } | |
} | |
// factory method | |
def apply(host: String, port: Int): ManagedPrintStream = { | |
val pool = hostPortPool.getOrElse((host, port), { | |
val p = new GenericObjectPool[PrintStream](new SocketStreamFactory(host, port)) | |
hostPortPool += (host, port) -> p | |
p | |
}) | |
new ManagedPrintStream(pool, pool.borrowObject()) | |
} | |
} | |
class SocketStreamFactory(host: String, port: Int) extends BasePooledObjectFactory[PrintStream] { | |
override def create() = new PrintStream(new Socket(host, port).getOutputStream) | |
override def wrap(stream: PrintStream) = new DefaultPooledObject[PrintStream](stream) | |
override def validateObject(po: PooledObject[PrintStream]) = ! po.getObject.checkError() | |
override def destroyObject(po: PooledObject[PrintStream]) = po.getObject.close() | |
override def passivateObject(po: PooledObject[PrintStream]) = po.getObject.flush() | |
} | |
16/06/24 13:40:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/24 13:42:17 ERROR Executor: Exception in task 16.0 in stage 1.0 (TID 17)
redis.clients.jedis.exceptions.JedisException: Could not return the resource to the pool
at redis.clients.util.Pool.returnResourceObject(Pool.java:65)
at redis.clients.jedis.JedisPool.returnResource(JedisPool.java:113)
at util.RedisDB$.returnRedis(RedisDB.scala:58)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
Caused by: java.lang.IllegalStateException: Returned object not currently part of this pool
at org.apache.commons.pool2.impl.GenericObjectPool.returnObject(GenericObjectPool.java:537)
at redis.clients.util.Pool.returnResourceObject(Pool.java:63)
... 18 more
This way is not suit for Jedis?