Skip to content

Instantly share code, notes, and snippets.

@alabotski
Created November 3, 2022 09:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alabotski/a8629913d8606e25d5e120c4a02e19ab to your computer and use it in GitHub Desktop.
Save alabotski/a8629913d8606e25d5e120c4a02e19ab to your computer and use it in GitHub Desktop.
Just an example. How to get value from RequestBody in WebFilter in WebFlux reactive application and put it in request attributes.
@Order
@Slf4j
@Component
@RequiredArgsConstructor
public class PrepareHeaderWebFilter implements WebFilter {
private static final byte[] EMPTY_BYTES = {};
public static final String CACHED_REQUEST_BODY_ATTR = "cachedRequestBody";
public static final String CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR = "cachedServerHttpRequestDecorator";
private static final List<HttpMessageReader<?>> MESSAGE_READERS = HandlerStrategies.withDefaults()
.messageReaders();
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return cacheRequestBody(exchange, serverHttpRequest -> {
var exchangeMutate = exchange.mutate()
.request(serverHttpRequest)
.build();
var serverRequest = ServerRequest.create(exchangeMutate, MESSAGE_READERS);
return serverRequest.bodyToMono(BaseRequest.class)
.map(baseRequest -> {
exchange.getAttributes()
.put(BaseRequest.Fields.rqUID, baseRequest.getRqUID());
return baseRequest;
})
.then(removeCacheAndChain(exchange, chain));
});
}
private static Mono<Void> cacheRequestBody(ServerWebExchange exchange, Function<ServerHttpRequest, Mono<Void>> function) {
ServerHttpResponse response = exchange.getResponse();
DataBufferFactory factory = response.bufferFactory();
return DataBufferUtils.join(exchange.getRequest()
.getBody())
.defaultIfEmpty(factory.wrap(EMPTY_BYTES))
.map(dataBuffer -> decorate(exchange, dataBuffer))
.switchIfEmpty(Mono.just(exchange.getRequest()))
.flatMap(function);
}
private static Mono<Void> removeCacheAndChain(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest cachedRequest = exchange.getAttribute(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR);
Assert.notNull(cachedRequest, "cache request shouldn't be null");
exchange.getAttributes()
.remove(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR);
return chain.filter(exchange.mutate()
.request(cachedRequest)
.build());
}
private static ServerHttpRequest decorate(ServerWebExchange exchange, DataBuffer dataBuffer) {
if (dataBuffer.readableByteCount() > 0) {
if (log.isTraceEnabled()) {
log.trace("retaining body in exchange attribute");
}
exchange.getAttributes()
.put(CACHED_REQUEST_BODY_ATTR, dataBuffer);
}
ServerHttpRequest decorator = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return Mono.fromSupplier(() -> {
if (exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_ATTR, null) == null) {
// probably == downstream closed or no body
return null;
}
if (dataBuffer instanceof NettyDataBuffer) {
NettyDataBuffer pdb = (NettyDataBuffer) dataBuffer;
return pdb.factory()
.wrap(pdb.getNativeBuffer()
.retainedSlice());
} else if (dataBuffer instanceof DefaultDataBuffer) {
DefaultDataBuffer ddf = (DefaultDataBuffer) dataBuffer;
return ddf.factory()
.wrap(Unpooled.wrappedBuffer(ddf.getNativeBuffer())
.nioBuffer());
} else {
throw new IllegalArgumentException("Unable to handle DataBuffer of type " + dataBuffer.getClass());
}
})
.flux();
}
};
exchange.getAttributes()
.put(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR, decorator);
return decorator;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment