Skip to content

Instantly share code, notes, and snippets.

@haitaoyao
Last active December 29, 2015 10:18
Show Gist options
  • Save haitaoyao/7655599 to your computer and use it in GitHub Desktop.
Save haitaoyao/7655599 to your computer and use it in GitHub Desktop.
add read time out for HTTP request to avoid spark http broadcast hang
import org.scalatest.{Assertions, FunSuite}
import org.eclipse.jetty.server.handler._
import org.eclipse.jetty.server._
import org.eclipse.jetty.server.bio.SocketConnector
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
import java.util.concurrent.{Executors, TimeUnit}
import java.net.{URLConnection, URL}
import org.apache.commons.io.IOUtils
import java.io.ByteArrayOutputStream
/**
*
* User: haitao.yao
* Date: 13-11-20
* Time: 6:20 pm
*
*/
class HTTPTimeoutSuite extends FunSuite with Assertions{
def sleepURL:String = "http://localhost:9999/sleep"
test("http no timeout"){
val testServer = new TestServer
val executor = Executors.newSingleThreadExecutor()
testServer.start()
try{
val future = executor.submit(new Runnable(){
override def run():Unit ={
try{
new URL(sleepURL).openStream()
}catch{
case e : Throwable =>
}
}
})
TimeUnit.SECONDS.sleep(10)
assert(future.isDone() == false)
future.cancel(true)
} finally{
testServer.stop()
}
}
test("http with timeout") {
val testServer = new TestServer
testServer.start()
try{
val connection:URLConnection = new URL(sleepURL).openConnection()
connection.setReadTimeout(TimeUnit.MILLISECONDS.convert(25,TimeUnit.SECONDS).toInt)
val in = connection.getInputStream()
IOUtils.copy(in, new ByteArrayOutputStream())
}finally{
testServer.stop()
}
}
class DeadHandler extends HandlerWrapper {
override def handle(target:String, baseRequest:Request, request:HttpServletRequest, response:HttpServletResponse){
baseRequest.setHandled(true)
val out = response.getOutputStream
if("/sleep".equals(target)){
out.println("test")
System.out.println("handle sleeping")
try{
TimeUnit.SECONDS.sleep(20)
}catch{
case e:Throwable =>
}
out.println("test done")
}else{
System.out.println("handle no sleep")
}
}
}
class TestServer{
private var server:Server = null
private val port = 9999
def start():Unit = {
this.server = new Server()
val connector = new SocketConnector
connector.setPort(this.port)
server.addConnector(connector)
server.setHandler(new DeadHandler)
server.start()
}
def stop():Unit = {
if(this.server != null){
this.server.stop()
this.server = null
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment