Last active
February 8, 2016 04:17
-
-
Save zhong-j-yu/033ee7ef447a3f63dcc9 to your computer and use it in GitHub Desktop.
Demo async HTTP server that completes user requests after being pinged back by a remote server.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import bayou.async.*; | |
import bayou.http.*; | |
import java.time.Duration; | |
import java.time.Instant; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.atomic.AtomicLong; | |
// see question https://groups.google.com/forum/#!topic/bayou-io/9Mqtcl3OoWM | |
// | |
// my server sends a task to the remote server. | |
// the remote server pings back my server when the task is completed. | |
// | |
public class PingBackDemo | |
{ | |
public static void main(String[] args) throws Exception | |
{ | |
// for testing, we can run both MyServer and MockRemoteServer in the same VM | |
MockRemoteServer.main(args); | |
MyServer.main(args); | |
} | |
// MyServer ============================================================================================= | |
static final int myServerPort = 8080; | |
static final String myServerAddress = "http://localhost:"+myServerPort; | |
static class MyServer | |
{ | |
public static void main(String[] args) throws Exception | |
{ | |
HttpServer server = new HttpServer(request-> | |
{ | |
switch(request.uriPath()) | |
{ | |
case "/requestA" : return handleRequestA(request); | |
case "/requestC" : return handleRequestC(request); | |
default: return HttpResponse.text(200, "Usage: /requestA?inputs=FooBar"); | |
} | |
}); | |
server.conf().port(myServerPort); | |
server.conf().trafficDump(System.out::print); | |
server.start(); | |
} | |
static AtomicLong taskIdSeq = new AtomicLong(); | |
static ConcurrentHashMap<String, Promise<HttpResponse>> promises = new ConcurrentHashMap<>(); | |
static HttpClient httpClient = new HttpClientConf() | |
.trafficDump(System.err::print) | |
.newClient(); | |
static Async<HttpResponse> handleRequestA(HttpRequest requestA) | |
{ | |
String inputs = requestA.uriParam("inputs"); | |
// todo: validate `inputs`; may return error response here. | |
String taskId = ""+taskIdSeq.incrementAndGet(); // new unique task id | |
Promise<HttpResponse> promise = new Promise<>(); | |
promises.put(taskId, promise); | |
// to be completed by requestC, errorB, or cancel | |
Async<Void> sendB = httpClient | |
.doGet(remoteServerAddress + "/requestB?taskId=" + taskId + "&inputs=" + inputs) | |
.then(responseB->checkResponseB(responseB)) | |
.catch_(Exception.class, err -> handleErrorB(taskId, err)); | |
// test: try incorrect remote address, to raise errorB | |
promise.timeout(Duration.ofSeconds(10)); // cancel after timeout | |
promise.onCancel(err->cancel(err, taskId, sendB)); | |
// test: try shorter timeout (1 sec), to cancel before requestC arrives | |
return promise; // responseA (pending) | |
} | |
static void completeResponseA(String taskId, int status, String text) | |
{ | |
Promise<HttpResponse> promise = promises.remove(taskId); | |
if(promise==null) // already completed | |
return; | |
promise.succeed(HttpResponse.text(status, text)); // responseA (complete) | |
} | |
static Async<HttpResponse> handleRequestC(HttpRequest requestC) | |
{ | |
String taskId = requestC.uriParam("taskId"); | |
// result is in POST request body. read it and send it back to user. | |
Async<String> bodyAsync = requestC.entity().bodyString(1000); // may fail | |
bodyAsync.peek( | |
str -> completeResponseA(taskId, 200, "answer:\n" + str), | |
err -> completeResponseA(taskId, 503, "failed:\n" + err) | |
); | |
// if necessary, we can also stream body from requestC to responseA | |
return bodyAsync.then(v->HttpResponse.text(200,"")); // responseC | |
// note that we must drain body of requestC before sending responseC | |
} | |
static Async<Void> checkResponseB(HttpResponse responseB) | |
{ | |
// in any case, we must drain responseB's body, though we don't use it | |
return responseB.bodyString(1000) // may fail | |
.then(bodyB-> | |
{ | |
if(responseB.statusCode()!=200) // test: try different status, to raise errorB | |
throw new Exception("responseB error: "+responseB.statusCode()); | |
return Async.VOID; | |
}); | |
} | |
static Void handleErrorB(String taskId, Exception err) | |
{ | |
// something wrong with responseB. tell that to user in responseA. | |
completeResponseA(taskId, 503, "error contacting remote server\n"+err); | |
return (Void)null; | |
} | |
static void cancel(Exception err, String taskId, Async<Void> sendB) | |
{ | |
sendB.cancel(err); // may stuck in sendB | |
completeResponseA(taskId, 500, "cancelled: "+err); // timeout | |
} | |
} // MyServer | |
// mock remote server ======================================================================================= | |
// | |
// upon receiving requestB, server will pretend to work 2 seconds, before sending requestC. | |
static final int remoteServerPort = 9090; | |
static final String remoteServerAddress = "http://localhost:"+remoteServerPort; | |
static class MockRemoteServer | |
{ | |
static final int port = 9090; | |
public static void main(String[] args) throws Exception | |
{ | |
HttpServer server = new HttpServer(request->handleRequest(request)); | |
server.conf().port(port); | |
server.start(); | |
} | |
static HttpResponseImpl handleRequest(HttpRequest request) | |
{ | |
if(!request.uriPath().equals("/requestB")) | |
return HttpResponse.text(404, "unknown request :"+request.absoluteUri()); | |
String taskId = request.uriParam("taskId"); | |
String inputs = request.uriParam("inputs"); | |
// todo: validate them; may return error response here. | |
doTaskAsync(taskId, inputs); | |
return HttpResponse.text(200, "request received..."); // responseB | |
} | |
static HttpClient httpClient = new HttpClient(); | |
static void doTaskAsync(String taskId, String inputs) | |
{ | |
new Thread(()-> | |
{ | |
// computing ... | |
try{ Thread.sleep(2000); }catch(Exception e){} | |
// done | |
String result = "result for ["+inputs+"] .... \n" + Instant.now(); | |
// POST the result to my server | |
HttpRequest requestC = HttpRequest.toPost( | |
myServerAddress + "/requestC?taskId=" + taskId, | |
"text/plain", result.getBytes()); | |
httpClient | |
.send(requestC) | |
.then(responseC->handleResponseC(responseC)); | |
}).start(); | |
} | |
// we'll just ignore responseC. however we must drain the response body. | |
static Async<Void> handleResponseC(HttpResponse responseC) | |
{ | |
responseC.bodyString(1000); | |
return Async.VOID; | |
} | |
} // MockRemoteServer | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment