Skip to content

Instantly share code, notes, and snippets.

@kevingessner
Created November 2, 2020 18:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kevingessner/cb93fcdaee9961183fd2f8fb9ef27a8c to your computer and use it in GitHub Desktop.
Save kevingessner/cb93fcdaee9961183fd2f8fb9ef27a8c to your computer and use it in GitHub Desktop.
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
index 711cdd199e..713d6886d8 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
@@ -46,6 +46,7 @@ import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCounted;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -244,7 +245,16 @@ class ByteStreamUploader extends AbstractReferenceCounted {
Futures.catchingAsync(
uploadResult,
StatusRuntimeException.class,
- (sre) -> Futures.immediateFailedFuture(new IOException(sre)),
+ (sre) -> {
+ String msg = String.format("failed uploading %s for %s (at offset %d/%d, %s)", hash, chunker, chunker.getOffset(), chunker.getSize(), chunker.hasNext() ? "has-next" : "no-next");
+ chunker.reset();
+ try (FileOutputStream out = new FileOutputStream("/tmp/bazel-chunker-" + hash.toString())) {
+ while (chunker.hasNext()) {
+ chunker.next().getData().writeTo(out);
+ }
+ }
+ return Futures.immediateFailedFuture(new IOException(msg, sre));
+ },
MoreExecutors.directExecutor());
uploadsInProgress.put(hash, uploadResult);
@@ -281,9 +291,10 @@ class ByteStreamUploader extends AbstractReferenceCounted {
try {
chunker.reset();
} catch (IOException e) {
- return Futures.immediateFailedFuture(e);
+ return Futures.immediateFailedFuture(new IOException(String.format("chunker failed for %s", hash), e));
}
+ chunker.validating(hash);
UUID uploadId = UUID.randomUUID();
String resourceName = uploadResourceName(instanceName, uploadId, hash, chunker.getSize());
AsyncUpload newUpload =
diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
index 7ce80ee24b..591cedc940 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
@@ -18,7 +18,12 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
import com.google.common.base.Throwables;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
import com.google.common.io.ByteStreams;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputHelper;
@@ -101,6 +106,8 @@ public final class Chunker {
private InputStream data;
private long offset;
private byte[] chunkCache;
+ private HashCode expectedHash;
+ private Hasher hasher;
// Set to true on the first call to next(). This is so that the Chunker can open its data source
// lazily on the first call to next(), as opposed to opening it in the constructor or on reset().
@@ -113,6 +120,10 @@ public final class Chunker {
this.emptyChunk = new Chunk(ByteString.EMPTY, 0);
}
+ public void validating(HashCode hash) {
+ this.expectedHash = hash;
+ }
+
public long getOffset() {
return offset;
}
@@ -134,6 +145,7 @@ public final class Chunker {
offset = 0;
initialized = false;
chunkCache = null;
+ hasher = Hashing.sha256().newHasher();
}
/**
@@ -142,6 +154,7 @@ public final class Chunker {
* <p>May close open resources in order to seek to an earlier offset.
*/
public void seek(long toOffset) throws IOException {
+ expectedHash = null;
if (toOffset < offset) {
reset();
}
@@ -204,8 +217,13 @@ public final class Chunker {
offset += bytesToRead;
ByteString blob = ByteString.copyFrom(chunkCache, 0, bytesToRead);
+ hasher.putBytes(blob.toByteArray());
if (bytesLeft() == 0) {
+ HashCode hash = hasher.hash();
+ if (expectedHash != null && !expectedHash.equals(hash)) {
+ throw new StatusRuntimeException(Status.FAILED_PRECONDITION.withDescription(String.format("mismatched hash! expected %s/%d, got %s", expectedHash, offset, hash)));
+ }
data.close();
data = null;
chunkCache = null;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java
index a4aa232220..790a78dab6 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java
@@ -16,6 +16,7 @@ package com.google.devtools.build.lib.remote;
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static java.lang.String.format;
+import com.google.common.flogger.GoogleLogger;
import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.Directory;
import com.google.common.collect.ImmutableSet;
@@ -37,6 +38,7 @@ import java.util.Map;
/** A {@link RemoteCache} with additional functionality needed for remote execution. */
public class RemoteExecutionCache extends RemoteCache {
+ private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
public RemoteExecutionCache(
RemoteCacheClient protocolImpl, RemoteOptions options, DigestUtil digestUtil) {
@@ -74,13 +76,14 @@ public class RemoteExecutionCache extends RemoteCache {
throws IOException, InterruptedException {
Iterable<Digest> allDigests =
Iterables.concat(merkleTree.getAllDigests(), additionalInputs.keySet());
- ImmutableSet<Digest> missingDigests =
- getFromFuture(cacheProtocol.findMissingDigests(allDigests));
+ ImmutableSet<Digest> missingDigests = ImmutableSet.copyOf(allDigests);
+ //getFromFuture(cacheProtocol.findMissingDigests(allDigests));
Map<Digest, Path> filesToUpload = new HashMap<>();
Map<Digest, ByteString> blobsToUpload = new HashMap<>();
for (Digest missingDigest : missingDigests) {
Directory node = merkleTree.getDirectoryByDigest(missingDigest);
if (node != null) {
+ logger.atWarning().log(String.format("uploading directory blob %s", DigestUtil.toString(missingDigest)));
blobsToUpload.put(missingDigest, node.toByteString());
continue;
}
@@ -88,15 +91,18 @@ public class RemoteExecutionCache extends RemoteCache {
PathOrBytes file = merkleTree.getFileByDigest(missingDigest);
if (file != null) {
if (file.getBytes() != null) {
+ logger.atWarning().log(String.format("uploading file blob %s for %s", DigestUtil.toString(missingDigest), file.getPath()));
blobsToUpload.put(missingDigest, file.getBytes());
continue;
}
+ logger.atWarning().log(String.format("uploading file path %s for %s", DigestUtil.toString(missingDigest), file.getPath()));
filesToUpload.put(missingDigest, file.getPath());
continue;
}
Message message = additionalInputs.get(missingDigest);
if (message != null) {
+ logger.atWarning().log(String.format("uploading message blob %s for %s", DigestUtil.toString(missingDigest), message.toByteString().toString()));
blobsToUpload.put(missingDigest, message.toByteString());
continue;
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/BUILD b/src/main/java/com/google/devtools/build/lib/remote/common/BUILD
index 51a29ee892..c02ca5d69c 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/common/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/common/BUILD
@@ -18,5 +18,6 @@ java_library(
"//third_party:guava",
"//third_party/protobuf:protobuf_java",
"@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
+ "//third_party/grpc:grpc-jar",
],
)
diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java
index ae6a15113f..85cc6757a2 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java
@@ -21,6 +21,7 @@ import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
+import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.io.OutputStream;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTreeBuilder.java b/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTreeBuilder.java
index 54bbeef432..dc0f0054f4 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTreeBuilder.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTreeBuilder.java
@@ -139,6 +139,7 @@ class DirectoryTreeBuilder {
switch (metadata.getType()) {
case REGULAR_FILE:
Digest d = DigestUtil.buildDigest(metadata.getDigest(), metadata.getSize());
+ System.out.println(String.format("adding input %s = %s = %s", path, input, metadata));
currDir.addChild(
new FileNode(
path.getBaseName(), ActionInputHelper.toInputPath(input, execRoot), d));
diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java
index 7fac053ae3..418899c823 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java
@@ -35,6 +35,10 @@ import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
+import com.google.common.io.ByteStreams;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
/** A merkle tree representation as defined by the remote execution api. */
public class MerkleTree {
@@ -174,6 +178,24 @@ public class MerkleTree {
for (DirectoryTree.FileNode file : files) {
b.addFiles(buildProto(file));
digestPathMap.put(file.getDigest(), toPathOrBytes(file));
+ try {
+ if (file.getPath() != null) {
+ try (java.io.InputStream in = file.getPath().getInputStream()) {
+ HashCode realHash = Hashing.sha256().hashBytes(ByteStreams.toByteArray(in));
+ if (!realHash.toString().equals(file.getDigest().getHash())) {
+ throw new RuntimeException(String.format("merkle path %s/%s mismatch %s != %s", dirname, file.getPath(), realHash, file.getDigest().getHash()));
+ }
+ }
+ }
+ if (file.getBytes() != null) {
+ HashCode realHash = Hashing.sha256().hashBytes(file.getBytes().toByteArray());
+ if (!realHash.toString().equals(file.getDigest().getHash())) {
+ throw new RuntimeException(String.format("merkle bytes %s/%s mismatch %s != %s", dirname, file.getPath(), realHash, file.getDigest().getHash()));
+ }
+ }
+ } catch (IOException e) {
+ throw new java.io.UncheckedIOException(e);
+ }
inputBytes.addAndGet(file.getDigest().getSizeBytes());
}
for (DirectoryTree.DirectoryNode dir : dirs) {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment