Skip to content

Instantly share code, notes, and snippets.

@terabyte
Created June 23, 2016 00:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save terabyte/fdbb6469703da51cf605daf650cefb72 to your computer and use it in GitHub Desktop.
Save terabyte/fdbb6469703da51cf605daf650cefb72 to your computer and use it in GitHub Desktop.
package com.cloudera;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import misc1.commons.ExceptionUtils;
import misc1.commons.options.OptionsFragment;
import misc1.commons.options.OptionsLibrary;
import misc1.commons.options.OptionsResults;
import org.apache.http.Header;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.client.RedirectLocations;
import org.apache.http.protocol.HttpContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qbt.HelpTier;
import qbt.QbtCommand;
import qbt.QbtCommandName;
import qbt.QbtCommandOptions;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class VerifyUrls extends QbtCommand<VerifyUrls.Options> {
private static final Logger LOGGER = LoggerFactory.getLogger(VerifyUrls.class);
private static final String ETAG_HEADER_NAME = "ETag";
private static final Pattern PARENT_DIR_REGEX = Pattern.compile(".*<a.*[Hh][Rr][Ee][Ff]=\"[^\"]+\".*>Parent Directory</a>.*");
private static final Pattern FILENAME_REGEX = Pattern.compile(".*<a.*[Hh][Rr][Ee][Ff]=\"([^\"]+)\".*>\\1</a>.*");
private static final String MISSING_ETAG_STR = "MISSING ETAG: ";
private static final String MISMATCH_ETAG_STR = "DIFFERENT CONTENT: ";
private static final String SRC_ONLY_STR = "SOURCE ONLY: ";
private static final String DEST_ONLY_STR = "DESTINATION ONLY: ";
private static final String DEFAULT_TIMEOUT = "86400"; // 1 day
private static final Integer EXECUTOR_SERVICE_TIMEOUT_MILLIS = 1000;
@QbtCommandName("verifyUrls")
public static interface Options extends QbtCommandOptions {
public static final OptionsLibrary<Options> o = OptionsLibrary.of();
//public static final OptionsFragment<Options, Integer> timeout = o.oneArg("timeout", "t").transform(o.singleton(DEFAULT_TIMEOUT)).transform(o.parseInt()).helpDesc("Timeout before harun time in seconds before giving up (Default " + DEFAULT_TIMEOUT + ")");
public static final OptionsFragment<Options, Integer> parallelism = o.oneArg("parallelism", "j").transform(o.singleton(null)).transform(o.parseInt()).helpDesc("Parallelize up to this width");
public static final OptionsFragment<Options, Boolean> infiniteParallelism = o.zeroArg("infinite-parallelism", "J").transform(o.flag()).helpDesc("Parallelize as widely as possible");
public static final OptionsFragment<Options, URI> sourceUrl = o.oneArg("src").transform(o.singleton()).transform((String h, String s) -> { return URI.create(s); }).helpDesc("Source URL to compare");
public static final OptionsFragment<Options, URI> destUrl = o.oneArg("dest").transform(o.singleton()).transform((String h, String s) -> { return URI.create(s); }).helpDesc("Destination URL to compare");
}
@Override
public Class<Options> getOptionsClass() {
return Options.class;
}
@Override
public HelpTier getHelpTier() {
return HelpTier.COMMON;
}
@Override
public String getDescription() {
return "Compare two URLs contents";
}
@Override
public int run(OptionsResults<? extends Options> options) throws IOException {
ExecutorService es = configureExecutorService(options);
URI src = options.get(Options.sourceUrl);
URI dest = options.get(Options.destUrl);
LOGGER.debug("Verifying URL " + src + " content matches URL " + dest);
LinkedList<String> differences = new LinkedList<>();
LinkedList<Future<?>> futures = new LinkedList<>();
try (CloseableHttpClient hc = HttpClients.createDefault()) {
Future<?> f = es.submit(() -> processPath(src, dest, differences, futures, es, hc, getLastPathComponent(src)));
synchronized (futures) {
futures.add(f);
futures.notifyAll();
}
try {
synchronized (futures) {
while(true) {
boolean stillRunning = false;
ListIterator<Future<?>> listIterator = futures.listIterator();
LOGGER.debug("futures size: " + futures.size());
while(listIterator.hasNext()) {
if (!listIterator.next().isDone()) {
stillRunning = true;
continue;
}
// job is done, don't care anymore
listIterator.remove();
}
LOGGER.debug("Still Running? " + stillRunning);
if (!stillRunning) {
break;
}
futures.wait(EXECUTOR_SERVICE_TIMEOUT_MILLIS);
}
}
es.shutdown();
es.awaitTermination(EXECUTOR_SERVICE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
ExceptionUtils.commute(e);
}
}
if(!differences.isEmpty()) {
differences.forEach(LOGGER::error);
return differences.size();
}
return 0;
}
private static String getLastPathComponent(URI path) {
String pathStr = path.getPath();
if (pathStr.isEmpty()) {
return "";
}
return ImmutableList.copyOf(pathStr.split("/")).reverse().get(0);
}
private static ImmutableSet<String> getPathsFromDirectoryListing(String path, InputStream input) {
LOGGER.debug("Getting directory listing for path " + path);
BufferedReader br = new BufferedReader(new InputStreamReader(input));
ImmutableSet.Builder<String> b = ImmutableSet.builder();
String line;
try {
// Ok, I know this is MEGA stupid. It's gonna be fine for now, but what we should really do here is put in
// an HTML parser.
boolean dirListingStarted = false;
while((line = br.readLine()) != null) {
if (!dirListingStarted) {
// looking for start of listing
if (PARENT_DIR_REGEX.matcher(line).matches()) {
dirListingStarted = true;
}
continue;
}
// listing has started
Matcher m = FILENAME_REGEX.matcher(line);
if (m.find()) {
String newPath = path + "/" + m.group(1);
if (newPath.endsWith("/")) {
// we detect directories by noticing that "http://example.com/foo" gets redirected to
// "http://example.com/foo/", so we need to remove all trailing '/'.
newPath = newPath.substring(0, newPath.length() - 1);
}
LOGGER.trace("Adding path " + newPath);
b.add(newPath);
continue;
}
// after end of listing, we'll just have a bunch of lines not match.
// The main danger with this approach is if the "footer" contains a URL that happens to match:
// <a href="$X">$X</a> for some value of $X. This definitely doesn't happen for the servers I've looked
// at so far.
}
} catch (IOException e) {
LOGGER.error("Error while reading stream", e);
}
return b.build();
}
private void processPath(URI src, URI dest, LinkedList<String> differences, LinkedList<Future<?>> futures, ExecutorService es, CloseableHttpClient hc, String path) {
LOGGER.debug("Processing path " + src.resolve(path));
HttpHead srcHead = new HttpHead(src.resolve(path));
HttpHead destHead = new HttpHead(dest.resolve(path));
HttpContext srcContext = HttpClientContext.create();
HttpContext destContext = HttpClientContext.create();
try (
CloseableHttpResponse srcRes = hc.execute(srcHead, srcContext);
CloseableHttpResponse destRes = hc.execute(destHead, destContext);
) {
RedirectLocations rl = (RedirectLocations) srcContext.getAttribute(HttpClientContext.REDIRECT_LOCATIONS);
if (rl != null) {
if (!rl.getAll().isEmpty()) {
LOGGER.trace("Redirect path => " + rl.getAll().get(0).getPath());
if (rl.getAll().get(0).getPath().endsWith("/")) {
// if there was a redirect and the last redirect URL ends in '/' assume it is a directory
LOGGER.debug("Found directory for path " + path);
// instead of HEAD, we need to do a full get to get the directory contents
HttpGet srcGet = new HttpGet(src.resolve(path));
HttpGet destGet = new HttpGet(dest.resolve(path));
try (
CloseableHttpResponse srcDirRes = hc.execute(srcGet);
CloseableHttpResponse destDirRes = hc.execute(destGet);
) {
ImmutableSet<String> srcPaths = getPathsFromDirectoryListing(path, srcDirRes.getEntity().getContent());
ImmutableSet<String> destPaths = getPathsFromDirectoryListing(path, destDirRes.getEntity().getContent());
for (String newPath : Sets.union(srcPaths, destPaths)) {
LOGGER.debug("Adding job for path " + newPath);
es.execute(() -> processPath(src, dest, differences, futures, es, hc, newPath));
Future<?> f = es.submit(() -> processPath(src, dest, differences, futures, es, hc, newPath));
synchronized (futures) {
futures.add(f);
futures.notifyAll();
}
}
}
return;
}
}
}
LOGGER.debug("Found file for path " + path);
Header srcETag = srcRes.getFirstHeader(ETAG_HEADER_NAME);
Header destETag = destRes.getFirstHeader(ETAG_HEADER_NAME);
if (srcETag == null || destETag == null) {
LOGGER.debug("Missing ETag for path " + path);
synchronized (differences) {
differences.add(MISSING_ETAG_STR + path);
}
return;
}
if (!srcETag.getValue().equals(destETag.getValue())) {
LOGGER.debug("ETag mismatch for path " + path);
synchronized (differences) {
differences.add(MISMATCH_ETAG_STR + path);
}
return;
}
LOGGER.debug("ETags match: " + srcETag.getValue());
} catch (IOException e) {
ExceptionUtils.commute(e);
}
}
private ExecutorService configureExecutorService(OptionsResults<? extends Options> options) {
if (options.get(Options.parallelism) != null) {
return Executors.newFixedThreadPool(options.get(Options.parallelism));
}
if (options.get(Options.infiniteParallelism)) {
return Executors.newCachedThreadPool();
}
// we do a lot of network I/O so let's default to nCpu * 2 threads
return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment