Skip to content

Instantly share code, notes, and snippets.

@romain-grecourt
Last active February 9, 2024 02:36
Show Gist options
  • Save romain-grecourt/778019f91ef8bd01bc47b08aac0bc007 to your computer and use it in GitHub Desktop.
Save romain-grecourt/778019f91ef8bd01bc47b08aac0bc007 to your computer and use it in GitHub Desktop.
Helidon Request buffering to compute a payload hash
package com.acme;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import io.helidon.common.http.DataChunk;
import io.helidon.common.reactive.BufferedEmittingPublisher;
import io.helidon.common.reactive.IoMulti;
import io.helidon.common.reactive.Multi;
import io.helidon.media.common.MessageBodyFilter;
import io.helidon.media.common.MessageBodyReadableContent;
import io.helidon.webserver.ServerRequest;
import io.helidon.webserver.ServerResponse;
class RequestBuffering {
private static final long THRESHOLD = 5000 * 1000; // 5MB
static MessageBodyReadableContent content(ServerRequest req, ServerResponse res) {
FilterImpl filter = new FilterImpl();
res.whenSent().thenRun(filter::deleteFile);
MessageBodyReadableContent original = req.content();
return MessageBodyReadableContent.create(filter.apply(original), original.readerContext());
}
static class FilterImpl implements MessageBodyFilter {
final Path tempFile;
final List<ByteBuffer> inMemoryData = new ArrayList<>();
final BufferedEmittingPublisher<ByteBuffer> emitter;
final BufferedEmittingPublisher<ByteBuffer> fileEmitter;
final AtomicBoolean inMemory = new AtomicBoolean(true);
final AtomicBoolean supplied = new AtomicBoolean();
final AtomicBoolean done = new AtomicBoolean(true);
volatile Throwable error;
FilterImpl() {
try {
tempFile = Files.createTempFile(null, null);
tempFile.toFile().deleteOnExit();
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
// a "proxy" emitter for the first subscription
emitter = BufferedEmittingPublisher.create();
// an emitter used to write the chunks to a file
fileEmitter = BufferedEmittingPublisher.create();
Multi.create(fileEmitter)
.to(IoMulti.writeToFile(tempFile)
.build());
}
@Override
public Flow.Publisher<DataChunk> apply(Flow.Publisher<DataChunk> publisher) {
AtomicLong totalBytes = new AtomicLong();
Multi.create(publisher)
.forEach(chunk -> {
// duplicate the chunk and release the original
byte[] bytes = chunk.bytes();
chunk.release();
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
emitter.emit(byteBuffer);
if (inMemory.get()) {
long total = totalBytes.addAndGet(bytes.length);
if (total <= THRESHOLD) {
// accumulate data memory
inMemoryData.add(byteBuffer);
return;
} else {
// we passed the threshold
// write the data accumulated in-memory to the temporary file
// and mark inMemory=false
inMemoryData.forEach(fileEmitter::emit);
inMemoryData.clear();
inMemory.set(false);
}
}
fileEmitter.emit(byteBuffer);
})
.onError(th -> {
emitter.fail(th);
fileEmitter.fail(th);
error = th;
})
.onComplete(() -> {
emitter.complete();
fileEmitter.complete();
done.set(true);
}).ignoreElement();
// using "defer" to supply the proxy publisher the first time
// or one backed by the file if the buffering is complete
return Multi.defer(this::publisher).map(DataChunk::create);
}
Flow.Publisher<ByteBuffer> publisher() {
if (!supplied.getAndSet(true)) {
// can be used only once
return emitter;
}
if (error != null) {
return Multi.error(error);
}
if (done.get()) {
try {
ByteChannel byteChannel = Files.newByteChannel(tempFile, StandardOpenOption.READ);
return IoMulti.multiFromByteChannel(byteChannel);
} catch (IOException ex) {
return Multi.error(ex);
}
}
return Multi.error(new IllegalStateException("Cannot replay data while buffering"));
}
void deleteFile() {
try {
Files.delete(tempFile);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
}
}
package com.acme;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import io.helidon.media.common.MessageBodyReadableContent;
import io.helidon.webserver.Routing;
import io.helidon.webserver.ServerRequest;
import io.helidon.webserver.ServerResponse;
import io.helidon.webserver.Service;
public class TestService implements Service {
@Override
public void update(Routing.Rules rules) {
rules.put(this::handlePut);
}
private void handlePut(ServerRequest req, ServerResponse res) {
MessageBodyReadableContent content = RequestBuffering.content(req, res);
content.as(byte[].class).forSingle(bytes -> {
res.headers().add("Payload-Hash", hash(bytes));
res.send(content);
});
}
private static String hash(byte[] bytes) {
try {
MessageDigest messageDigest = MessageDigest.getInstance("SHA-256");
messageDigest.update(bytes);
return Base64.getEncoder().encodeToString(messageDigest.digest());
} catch (NoSuchAlgorithmException ex) {
throw new RuntimeException(ex);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment