Last active
December 29, 2015 10:18
-
-
Save haitaoyao/7655599 to your computer and use it in GitHub Desktop.
add read time out for HTTP request to avoid spark http broadcast hang
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.scalatest.{Assertions, FunSuite} | |
import org.eclipse.jetty.server.handler._ | |
import org.eclipse.jetty.server._ | |
import org.eclipse.jetty.server.bio.SocketConnector | |
import javax.servlet.http.{HttpServletResponse, HttpServletRequest} | |
import java.util.concurrent.{Executors, TimeUnit} | |
import java.net.{URLConnection, URL} | |
import org.apache.commons.io.IOUtils | |
import java.io.ByteArrayOutputStream | |
/** | |
* | |
* User: haitao.yao | |
* Date: 13-11-20 | |
* Time: 6:20 pm | |
* | |
*/ | |
class HTTPTimeoutSuite extends FunSuite with Assertions{ | |
def sleepURL:String = "http://localhost:9999/sleep" | |
test("http no timeout"){ | |
val testServer = new TestServer | |
val executor = Executors.newSingleThreadExecutor() | |
testServer.start() | |
try{ | |
val future = executor.submit(new Runnable(){ | |
override def run():Unit ={ | |
try{ | |
new URL(sleepURL).openStream() | |
}catch{ | |
case e : Throwable => | |
} | |
} | |
}) | |
TimeUnit.SECONDS.sleep(10) | |
assert(future.isDone() == false) | |
future.cancel(true) | |
} finally{ | |
testServer.stop() | |
} | |
} | |
test("http with timeout") { | |
val testServer = new TestServer | |
testServer.start() | |
try{ | |
val connection:URLConnection = new URL(sleepURL).openConnection() | |
connection.setReadTimeout(TimeUnit.MILLISECONDS.convert(25,TimeUnit.SECONDS).toInt) | |
val in = connection.getInputStream() | |
IOUtils.copy(in, new ByteArrayOutputStream()) | |
}finally{ | |
testServer.stop() | |
} | |
} | |
class DeadHandler extends HandlerWrapper { | |
override def handle(target:String, baseRequest:Request, request:HttpServletRequest, response:HttpServletResponse){ | |
baseRequest.setHandled(true) | |
val out = response.getOutputStream | |
if("/sleep".equals(target)){ | |
out.println("test") | |
System.out.println("handle sleeping") | |
try{ | |
TimeUnit.SECONDS.sleep(20) | |
}catch{ | |
case e:Throwable => | |
} | |
out.println("test done") | |
}else{ | |
System.out.println("handle no sleep") | |
} | |
} | |
} | |
class TestServer{ | |
private var server:Server = null | |
private val port = 9999 | |
def start():Unit = { | |
this.server = new Server() | |
val connector = new SocketConnector | |
connector.setPort(this.port) | |
server.addConnector(connector) | |
server.setHandler(new DeadHandler) | |
server.start() | |
} | |
def stop():Unit = { | |
if(this.server != null){ | |
this.server.stop() | |
this.server = null | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment