Skip to content

Instantly share code, notes, and snippets.

@zhong-j-yu
Last active February 8, 2016 04:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zhong-j-yu/033ee7ef447a3f63dcc9 to your computer and use it in GitHub Desktop.
Save zhong-j-yu/033ee7ef447a3f63dcc9 to your computer and use it in GitHub Desktop.
Demo async HTTP server that completes user requests after being pinged back by a remote server.
import bayou.async.*;
import bayou.http.*;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
// see question https://groups.google.com/forum/#!topic/bayou-io/9Mqtcl3OoWM
//
// my server sends a task to the remote server.
// the remote server pings back my server when the task is completed.
//
public class PingBackDemo
{
public static void main(String[] args) throws Exception
{
// for testing, we can run both MyServer and MockRemoteServer in the same VM
MockRemoteServer.main(args);
MyServer.main(args);
}
// MyServer =============================================================================================
static final int myServerPort = 8080;
static final String myServerAddress = "http://localhost:"+myServerPort;
static class MyServer
{
public static void main(String[] args) throws Exception
{
HttpServer server = new HttpServer(request->
{
switch(request.uriPath())
{
case "/requestA" : return handleRequestA(request);
case "/requestC" : return handleRequestC(request);
default: return HttpResponse.text(200, "Usage: /requestA?inputs=FooBar");
}
});
server.conf().port(myServerPort);
server.conf().trafficDump(System.out::print);
server.start();
}
static AtomicLong taskIdSeq = new AtomicLong();
static ConcurrentHashMap<String, Promise<HttpResponse>> promises = new ConcurrentHashMap<>();
static HttpClient httpClient = new HttpClientConf()
.trafficDump(System.err::print)
.newClient();
static Async<HttpResponse> handleRequestA(HttpRequest requestA)
{
String inputs = requestA.uriParam("inputs");
// todo: validate `inputs`; may return error response here.
String taskId = ""+taskIdSeq.incrementAndGet(); // new unique task id
Promise<HttpResponse> promise = new Promise<>();
promises.put(taskId, promise);
// to be completed by requestC, errorB, or cancel
Async<Void> sendB = httpClient
.doGet(remoteServerAddress + "/requestB?taskId=" + taskId + "&inputs=" + inputs)
.then(responseB->checkResponseB(responseB))
.catch_(Exception.class, err -> handleErrorB(taskId, err));
// test: try incorrect remote address, to raise errorB
promise.timeout(Duration.ofSeconds(10)); // cancel after timeout
promise.onCancel(err->cancel(err, taskId, sendB));
// test: try shorter timeout (1 sec), to cancel before requestC arrives
return promise; // responseA (pending)
}
static void completeResponseA(String taskId, int status, String text)
{
Promise<HttpResponse> promise = promises.remove(taskId);
if(promise==null) // already completed
return;
promise.succeed(HttpResponse.text(status, text)); // responseA (complete)
}
static Async<HttpResponse> handleRequestC(HttpRequest requestC)
{
String taskId = requestC.uriParam("taskId");
// result is in POST request body. read it and send it back to user.
Async<String> bodyAsync = requestC.entity().bodyString(1000); // may fail
bodyAsync.peek(
str -> completeResponseA(taskId, 200, "answer:\n" + str),
err -> completeResponseA(taskId, 503, "failed:\n" + err)
);
// if necessary, we can also stream body from requestC to responseA
return bodyAsync.then(v->HttpResponse.text(200,"")); // responseC
// note that we must drain body of requestC before sending responseC
}
static Async<Void> checkResponseB(HttpResponse responseB)
{
// in any case, we must drain responseB's body, though we don't use it
return responseB.bodyString(1000) // may fail
.then(bodyB->
{
if(responseB.statusCode()!=200) // test: try different status, to raise errorB
throw new Exception("responseB error: "+responseB.statusCode());
return Async.VOID;
});
}
static Void handleErrorB(String taskId, Exception err)
{
// something wrong with responseB. tell that to user in responseA.
completeResponseA(taskId, 503, "error contacting remote server\n"+err);
return (Void)null;
}
static void cancel(Exception err, String taskId, Async<Void> sendB)
{
sendB.cancel(err); // may stuck in sendB
completeResponseA(taskId, 500, "cancelled: "+err); // timeout
}
} // MyServer
// mock remote server =======================================================================================
//
// upon receiving requestB, server will pretend to work 2 seconds, before sending requestC.
static final int remoteServerPort = 9090;
static final String remoteServerAddress = "http://localhost:"+remoteServerPort;
static class MockRemoteServer
{
static final int port = 9090;
public static void main(String[] args) throws Exception
{
HttpServer server = new HttpServer(request->handleRequest(request));
server.conf().port(port);
server.start();
}
static HttpResponseImpl handleRequest(HttpRequest request)
{
if(!request.uriPath().equals("/requestB"))
return HttpResponse.text(404, "unknown request :"+request.absoluteUri());
String taskId = request.uriParam("taskId");
String inputs = request.uriParam("inputs");
// todo: validate them; may return error response here.
doTaskAsync(taskId, inputs);
return HttpResponse.text(200, "request received..."); // responseB
}
static HttpClient httpClient = new HttpClient();
static void doTaskAsync(String taskId, String inputs)
{
new Thread(()->
{
// computing ...
try{ Thread.sleep(2000); }catch(Exception e){}
// done
String result = "result for ["+inputs+"] .... \n" + Instant.now();
// POST the result to my server
HttpRequest requestC = HttpRequest.toPost(
myServerAddress + "/requestC?taskId=" + taskId,
"text/plain", result.getBytes());
httpClient
.send(requestC)
.then(responseC->handleResponseC(responseC));
}).start();
}
// we'll just ignore responseC. however we must drain the response body.
static Async<Void> handleResponseC(HttpResponse responseC)
{
responseC.bodyString(1000);
return Async.VOID;
}
} // MockRemoteServer
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment