Last active
February 9, 2024 02:36
-
-
Save romain-grecourt/778019f91ef8bd01bc47b08aac0bc007 to your computer and use it in GitHub Desktop.
Helidon Request buffering to compute a payload hash
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.acme; | |
import java.io.IOException; | |
import java.io.UncheckedIOException; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.ByteChannel; | |
import java.nio.file.Files; | |
import java.nio.file.Path; | |
import java.nio.file.StandardOpenOption; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.Flow; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicLong; | |
import io.helidon.common.http.DataChunk; | |
import io.helidon.common.reactive.BufferedEmittingPublisher; | |
import io.helidon.common.reactive.IoMulti; | |
import io.helidon.common.reactive.Multi; | |
import io.helidon.media.common.MessageBodyFilter; | |
import io.helidon.media.common.MessageBodyReadableContent; | |
import io.helidon.webserver.ServerRequest; | |
import io.helidon.webserver.ServerResponse; | |
class RequestBuffering { | |
private static final long THRESHOLD = 5000 * 1000; // 5MB | |
static MessageBodyReadableContent content(ServerRequest req, ServerResponse res) { | |
FilterImpl filter = new FilterImpl(); | |
res.whenSent().thenRun(filter::deleteFile); | |
MessageBodyReadableContent original = req.content(); | |
return MessageBodyReadableContent.create(filter.apply(original), original.readerContext()); | |
} | |
static class FilterImpl implements MessageBodyFilter { | |
final Path tempFile; | |
final List<ByteBuffer> inMemoryData = new ArrayList<>(); | |
final BufferedEmittingPublisher<ByteBuffer> emitter; | |
final BufferedEmittingPublisher<ByteBuffer> fileEmitter; | |
final AtomicBoolean inMemory = new AtomicBoolean(true); | |
final AtomicBoolean supplied = new AtomicBoolean(); | |
final AtomicBoolean done = new AtomicBoolean(true); | |
volatile Throwable error; | |
FilterImpl() { | |
try { | |
tempFile = Files.createTempFile(null, null); | |
tempFile.toFile().deleteOnExit(); | |
} catch (IOException ex) { | |
throw new UncheckedIOException(ex); | |
} | |
// a "proxy" emitter for the first subscription | |
emitter = BufferedEmittingPublisher.create(); | |
// an emitter used to write the chunks to a file | |
fileEmitter = BufferedEmittingPublisher.create(); | |
Multi.create(fileEmitter) | |
.to(IoMulti.writeToFile(tempFile) | |
.build()); | |
} | |
@Override | |
public Flow.Publisher<DataChunk> apply(Flow.Publisher<DataChunk> publisher) { | |
AtomicLong totalBytes = new AtomicLong(); | |
Multi.create(publisher) | |
.forEach(chunk -> { | |
// duplicate the chunk and release the original | |
byte[] bytes = chunk.bytes(); | |
chunk.release(); | |
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); | |
emitter.emit(byteBuffer); | |
if (inMemory.get()) { | |
long total = totalBytes.addAndGet(bytes.length); | |
if (total <= THRESHOLD) { | |
// accumulate data memory | |
inMemoryData.add(byteBuffer); | |
return; | |
} else { | |
// we passed the threshold | |
// write the data accumulated in-memory to the temporary file | |
// and mark inMemory=false | |
inMemoryData.forEach(fileEmitter::emit); | |
inMemoryData.clear(); | |
inMemory.set(false); | |
} | |
} | |
fileEmitter.emit(byteBuffer); | |
}) | |
.onError(th -> { | |
emitter.fail(th); | |
fileEmitter.fail(th); | |
error = th; | |
}) | |
.onComplete(() -> { | |
emitter.complete(); | |
fileEmitter.complete(); | |
done.set(true); | |
}).ignoreElement(); | |
// using "defer" to supply the proxy publisher the first time | |
// or one backed by the file if the buffering is complete | |
return Multi.defer(this::publisher).map(DataChunk::create); | |
} | |
Flow.Publisher<ByteBuffer> publisher() { | |
if (!supplied.getAndSet(true)) { | |
// can be used only once | |
return emitter; | |
} | |
if (error != null) { | |
return Multi.error(error); | |
} | |
if (done.get()) { | |
try { | |
ByteChannel byteChannel = Files.newByteChannel(tempFile, StandardOpenOption.READ); | |
return IoMulti.multiFromByteChannel(byteChannel); | |
} catch (IOException ex) { | |
return Multi.error(ex); | |
} | |
} | |
return Multi.error(new IllegalStateException("Cannot replay data while buffering")); | |
} | |
void deleteFile() { | |
try { | |
Files.delete(tempFile); | |
} catch (IOException ex) { | |
throw new UncheckedIOException(ex); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.acme; | |
import java.security.MessageDigest; | |
import java.security.NoSuchAlgorithmException; | |
import java.util.Base64; | |
import io.helidon.media.common.MessageBodyReadableContent; | |
import io.helidon.webserver.Routing; | |
import io.helidon.webserver.ServerRequest; | |
import io.helidon.webserver.ServerResponse; | |
import io.helidon.webserver.Service; | |
public class TestService implements Service { | |
@Override | |
public void update(Routing.Rules rules) { | |
rules.put(this::handlePut); | |
} | |
private void handlePut(ServerRequest req, ServerResponse res) { | |
MessageBodyReadableContent content = RequestBuffering.content(req, res); | |
content.as(byte[].class).forSingle(bytes -> { | |
res.headers().add("Payload-Hash", hash(bytes)); | |
res.send(content); | |
}); | |
} | |
private static String hash(byte[] bytes) { | |
try { | |
MessageDigest messageDigest = MessageDigest.getInstance("SHA-256"); | |
messageDigest.update(bytes); | |
return Base64.getEncoder().encodeToString(messageDigest.digest()); | |
} catch (NoSuchAlgorithmException ex) { | |
throw new RuntimeException(ex); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment