Skip to content

Instantly share code, notes, and snippets.

@matiwinnetou
Last active February 12, 2016 02:25
Show Gist options
  • Save matiwinnetou/84a4af72cb9dc6b9cfe7 to your computer and use it in GitHub Desktop.
Save matiwinnetou/84a4af72cb9dc6b9cfe7 to your computer and use it in GitHub Desktop.
Dedupe service requests from various pagelets
public class HttpRequest {
private String requestId;
private HttpRequestsCache httpRequestsCache;
public HttpRequest(final HttpRequestsCache httpRequestsCache, final String prefix, final WSRequestHolder holder) {
this.requestId = prefix.concat(":").concat(UUID.randomUUID().toString());
this.httpRequestsCache = httpRequestsCache.addRequest(requestId, holder);
}
public HttpRequest(final HttpRequestsCache httpRequestsCache, final WSRequestHolder holder) {
this.requestId = UUID.randomUUID().toString();
this.httpRequestsCache = httpRequestsCache.addRequest(requestId, holder);
}
public String getRequestId() {
return requestId;
}
public F.Promise<WSResponse> get() {
return httpRequestsCache.get(requestId);
}
}
package controllers;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import play.api.libs.ws.WSRequest;
import play.libs.F;
import play.libs.ws.WSRequestHolder;
import play.libs.ws.WSResponse;
import javax.annotation.concurrent.NotThreadSafe;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Created by mati on 24/05/2014.
*/
@NotThreadSafe
public class HttpRequestsCache {
private String httpRequestId = UUID.randomUUID().toString();
private Map<Integer, ClientsGroup> cache = Maps.newHashMap();
private Map<String, Integer> requestIdsToGroupIds = Maps.newHashMap();
public HttpRequestsCache(final String appPrefix) {
this.httpRequestId = appPrefix.concat(":").concat(UUID.randomUUID().toString());
}
public String getHttpRequestId() {
return httpRequestId;
}
public HttpRequestsCache addRequest(final String requestId, final WSRequestHolder holder) {
final Integer clientsGroupId = WSUtils.hashCode(holder);
requestIdsToGroupIds.put(requestId, clientsGroupId);
final ClientsGroup clientsGroup = getOrCreateClientsGroup(clientsGroupId);
clientsGroup.lazyProxyPromises.put(requestId, createLazyPromise());
clientsGroup.wsRequestHolders.add(holder);
cache.put(clientsGroupId, clientsGroup);
return this;
}
public Map<Integer, String> getUniqueRequestedUrls() {
final Set<Integer> ids = requestIdsToGroupIds.values().stream().collect(Collectors.toSet());
return ids.stream().map(id -> cache.get(id))
.filter(holder -> !holder.wsRequestHolders.isEmpty())
.map(holder -> holder.wsRequestHolders.iterator().next())
.collect(Collectors.toMap(holder -> WSUtils.hashCode(holder), holder -> WSUtils.toUrl(holder)));
}
public F.Promise<Map<String, RemoteCallStats>> getRequestsUrlsP() {
return getRealPromises().map(promises -> getRequestsUrls());
}
public Map<String, RemoteCallStats> getRequestsUrls() {
final Map<String, RemoteCallStats> urls = Maps.newHashMap();
final Set<Map.Entry<String, Integer>> entries = requestIdsToGroupIds.entrySet();
for (final Map.Entry<String, Integer> entry : entries) {
final String requestId = entry.getKey();
final Integer clientGroupId = entry.getValue();
Optional.ofNullable(cache.get(clientGroupId)).ifPresent(clientsGroup -> {
if (!clientsGroup.wsRequestHolders.isEmpty()) {
final WSRequestHolder next = clientsGroup.wsRequestHolders.iterator().next();
final String url = WSUtils.toUrl(next);
urls.put(requestId, new RemoteCallStats(url, requestId, clientsGroup.stopwatch, clientsGroup.isCompleted));
}
});
}
return urls;
}
private F.Promise<List<WSRequest>> getRealPromises() {
final List<F.Promise<WSResponse>> collect = cache.values().stream().filter(clientsGroup -> clientsGroup.realPromise.isPresent())
.map(clientsGroup -> clientsGroup.realPromise.get()).collect(Collectors.toList());
final F.Promise<WSRequest>[] promises = new F.Promise[collect.size()];
return F.Promise.sequence(collect.toArray(promises));
}
public HttpRequest createHttpRequest(final String prefix, final WSRequestHolder holder) {
return new HttpRequest(this, prefix, holder);
}
public HttpRequest createHttpRequest(final WSRequestHolder holder) {
return new HttpRequest(this, holder);
}
public F.Promise<WSResponse> get(final String requestId) {
final Integer clientsGroupId = requestIdsToGroupIds.get(requestId);
final ClientsGroup clientsGroup = getOrCreateClientsGroup(clientsGroupId);
if (clientsGroup.realPromise.isPresent()) {
return clientsGroup.getLazyPromise(requestId);
}
return realGet(requestId, clientsGroup);
}
private ClientsGroup getOrCreateClientsGroup(final Integer clientGroupId) {
return cache.getOrDefault(clientGroupId, ClientsGroup.empty(clientGroupId));
}
private F.Promise<WSResponse> realGet(final String requestId, final ClientsGroup clientsGroup) {
if (clientsGroup.wsRequestHolders.isEmpty()) {
return F.Promise.throwing(new RuntimeException("You must first enqueue a holder via addRequest method!"));
}
final WSRequestHolder next = clientsGroup.wsRequestHolders.iterator().next();
clientsGroup.stopwatch.start();
final F.Promise<WSResponse> realResponseP = next.get();
clientsGroup.realPromise = Optional.of(realResponseP);
realResponseP.onRedeem(response -> clientsGroup.redeemSuccess(response));
realResponseP.onFailure(t -> clientsGroup.redeemFailure(t));
cache.put(clientsGroup.groupId, clientsGroup);
return clientsGroup.getLazyPromise(requestId);
}
private scala.concurrent.Promise<WSResponse> createLazyPromise() {
return scala.concurrent.Promise$.MODULE$.<play.libs.ws.WSResponse>apply();
}
private static class ClientsGroup {
private final Integer groupId;
private Optional<F.Promise<WSResponse>> realPromise = Optional.empty();
private Collection<WSRequestHolder> wsRequestHolders = Lists.newArrayList();
private Map<String, scala.concurrent.Promise<WSResponse>> lazyProxyPromises = Maps.newHashMap();
private boolean isCompleted = false;
private Stopwatch stopwatch = Stopwatch.createUnstarted();
private ClientsGroup(final Integer groupId) {
this.groupId = groupId;
}
public F.Promise<WSResponse> getLazyPromise(final String requestId) {
return F.Promise.<WSResponse>wrap(asScalaPromise(requestId).future());
}
public boolean isCompleted() {
return isCompleted;
}
private void redeemSuccess(final WSResponse response) {
isCompleted = true;
stopwatch.stop();
lazyProxyPromises.values().stream().forEach(p -> p.success(response));
}
private void redeemFailure(final Throwable t) {
isCompleted = true;
stopwatch.stop();
lazyProxyPromises.values().stream().forEach(p -> p.failure(t));
}
private scala.concurrent.Promise<WSResponse> asScalaPromise(final String requestId) {
return lazyProxyPromises.get(requestId);
}
public static ClientsGroup empty(final Integer groupId) {
return new ClientsGroup(groupId);
}
}
public static class RemoteCallStats {
private String url;
private String requestId;
private Stopwatch stopwatch;
private boolean isCompleted = false;
public RemoteCallStats(final String url, final String requestId, final Stopwatch stopwatch, boolean isCompleted) {
this.url = url;
this.requestId = requestId;
this.stopwatch = stopwatch;
this.isCompleted = isCompleted;
}
public boolean isCompleted() {
return isCompleted;
}
public String getUrl() {
return url;
}
public String getRequestId() {
return requestId;
}
public long getTimeInMs(final TimeUnit timeUnit) {
return stopwatch.elapsed(timeUnit);
}
}
}
package controllers;
import com.google.common.base.Objects;
import play.libs.ws.WSRequestHolder;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
/**
* Created by mati on 24/05/2014.
*/
public class WSUtils {
public static String toUrl(final WSRequestHolder holder) {
final String url = holder.getUrl();
final Map<String, Collection<String>> queryParameters = holder.getQueryParameters();
final String params = queryParameters.entrySet().stream()
.map(entry -> entry.getKey() + "=" + paramValueToArray(entry.getValue()).orElse(""))
.reduce((entry1, entry2) -> "&" + entry1 + entry2).orElse("");
return params.isEmpty()? url : url.concat("?").concat(params);
}
public static int hashCode(final WSRequestHolder holder) {
return Objects.hashCode(holder.getUrl(), holder.getQueryParameters(), holder.getHeaders());
}
private static Optional<String> paramValueToArray(final Collection<String> values) {
return values.stream().reduce((param1, param2) -> param1 + "," + param2);
}
}
@matiwinnetou
Copy link
Author

public F.Promise<Result> index2() {
    System.out.println("********remote call********:" + request().remoteAddress());
    System.out.println("remote:" + request().queryString());

    try {
        final int val = 1000 + new Random().nextInt(10) * 1000;
        System.out.println("val:" + val);
        Thread.sleep(val);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    return F.Promise.pure(ok("NA DU?"));
}

public F.Promise<Result> index3() {
    final HttpRequestsCache httpRequestsCache = new HttpRequestsCache("play-soy-view-example");

    final F.Promise<WSResponse> responseP1 = httpRequestsCache.createHttpRequest("POD1", WS.url("http://localhost:9000/index2")).get();
    final F.Promise<WSResponse> responseP2 = httpRequestsCache.createHttpRequest("POD2", WS.url("http://localhost:9000/index2")).get();
    final F.Promise<WSResponse> responseP3 = httpRequestsCache.createHttpRequest("POD3", WS.url("http://localhost:9000/index2").setQueryParameter("param1", "param2")).get();
    final F.Promise<WSResponse> responseP4 = httpRequestsCache.createHttpRequest("POD4", WS.url("http://localhost:9000/index2").setQueryParameter("param1", "param2")).get();

    responseP1.onRedeem(resp -> System.out.println("resp1_SUCCESS:" + resp.getBody()));
    responseP1.onFailure(t -> System.out.println("resp1_FAIL:" + t.getMessage()));

    responseP2.onRedeem(resp -> System.out.println("resp2_SUCCESS:" + resp.getBody()));
    responseP2.onFailure(t -> System.out.println("resp2_FAIL:" + t.getMessage()));

    responseP3.onRedeem(resp -> System.out.println("resp3_SUCCESS:" + resp.getBody()));
    responseP3.onFailure(t -> System.out.println("resp3_FAIL:" + t.getMessage()));

    responseP4.onRedeem(resp -> System.out.println("resp4_SUCCESS:" + resp.getBody()));
    responseP4.onFailure(t -> System.out.println("resp4_FAIL:" + t.getMessage()));

    return httpRequestsCache.getRequestsUrlsP().map(data -> ok(stats(data)));
}

private String stats(final Map<String, HttpRequestsCache.RemoteCallStats> stats) {
    System.out.println("stats:" + stats);
    final StringWriter writer = new StringWriter();
    for (final Map.Entry<String, HttpRequestsCache.RemoteCallStats> entry : stats.entrySet()) {
        final String requestId = entry.getKey();
        final HttpRequestsCache.RemoteCallStats remoteCallStats = entry.getValue();
        writer.write(String.format("url - requestId:%s,url:%s,time:%d ms, done:%s",
                requestId, remoteCallStats.getUrl(), remoteCallStats.getTimeInMs(TimeUnit.MILLISECONDS),
                remoteCallStats.isCompleted()));
        writer.write("\n");
    }

    return writer.toString();
}

@matiwinnetou
Copy link
Author

url - requestId:POD3:40a2983d-ff82-4b40-ad75-1134d8a43e24,url:http://localhost:9000/index2?param1=param2,time:8005 ms, done:true
url - requestId:POD1:9cf259f7-726b-47eb-b883-8030ac5a5188,url:http://localhost:9000/index2,time:6005 ms, done:true
url - requestId:POD2:2f603a2d-6bde-4ea0-96dd-094408beda04,url:http://localhost:9000/index2,time:6005 ms, done:true
url - requestId:POD4:f1a6ece4-17cd-4cf6-95c0-74ebe363ebb6,url:http://localhost:9000/index2?param1=param2,time:8005 ms, done:true

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment