Skip to content

Instantly share code, notes, and snippets.

@AFutureD
Last active June 6, 2024 09:40
Show Gist options
  • Save AFutureD/57530807815efdb7da6faa3391e7c8eb to your computer and use it in GitHub Desktop.
Save AFutureD/57530807815efdb7da6faa3391e7c8eb to your computer and use it in GitHub Desktop.
package *.gateway.fliter;
import lombok.RequiredArgsConstructor;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.core.Ordered;
import org.springframework.http.MediaType;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.Charset;
import static com.yigegroup.gateway.constants.GatewayConstants.EXCHANGE_CACHED_FORM_DATA;
// Difficult to use `AdaptCachedBodyGlobalFilter`, so using this instead.
@Component
@RequiredArgsConstructor
public class GlobalCacheRequestFilter implements GlobalFilter, Ordered {
private final ServerCodecConfigurer serverCodecConfigurer;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return ServerWebExchangeUtils.cacheRequestBodyAndRequest(exchange, (serverHttpRequest) -> {
if (serverHttpRequest == exchange.getRequest()) {
return chain.filter(exchange);
}
ServerWebExchange mutatedExchange = exchange.mutate().request(serverHttpRequest).build();
return chain.filter(mutatedExchange);
});
}
@Override
public int getOrder() {
return -1024;
}
}
package *.gateway.fliter;
import com.alibaba.fastjson.JSONObject;
import com.yigegroup.core.common.util.RequestHolder;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.HandlerStrategies;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import static org.springframework.http.MediaType.*;
@Data
class GatewayIn {
// 请求时间(请求到达服务器时间戳)
private long t;
private GatewayRequestCtx context;
private String headers;
private String request;
}
@Data
class GatewayLogRecordDTO {
// 客户端环境
private String env;
private long outstamp;
private GatewayIn in;
private GatewayOut out;
// The only constructor for this Class.
// Notice: Try not use AllArgsConstructor or other Annotations.
public GatewayLogRecordDTO(GatewayIn in, GatewayOut out) {
this.in = in;
this.out = out;
this.tryMapCriticalInfo(in);
}
// In this method, it's not allowed to directly call `this.in` property, instead you should call parameter `in`.
public void tryMapCriticalInfo(GatewayIn in) {
this.env = in.getContext().getEnv();
this.outstamp = out.getT();
}
}
@Data
class GatewayOut {
// 服务端返回的信息。
// 一般得,这里只提供客户端需要的内容,更细节的信息,通过 TraceID 获取。
// 返回时间
private long t;
private String headers;
private String response;
private long spent;
}
@Data
class GatewayRequestCtx {
private String requestID;
private String api;
private String apiVersion;
private String env;
private String station;
private String language;
private String country;
private String uid;
private String ip;
private List<String> tags;
// 客户端发出请求的时间戳
private String t;
// 请求方法 GET、POST、DELETE、UPDATE
private String method;
private String param;
}
@Slf4j
@Component
public class LogFilter implements GlobalFilter, Ordered {
@Value("${spring.profiles.active}")
private String active;
private final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();
@Override
public int getOrder() {
return -1023;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
GatewayIn recordIn = this.prepareGatewayIn(exchange);
GatewayOut recordOut = new GatewayOut();
Consumer<String> receiveResponse = (String responseBody) -> {
long stamp = System.currentTimeMillis();
HttpHeaders responseHeaders = exchange.getResponse().getHeaders();
responseHeaders.set("x-rc-outstamp", String.valueOf(stamp));
recordOut.setResponse(responseBody);
recordOut.setHeaders(JSONObject.toJSONString(responseHeaders.toSingleValueMap()));
recordOut.setT(stamp);
recordOut.setSpent(stamp - recordIn.getT());
};
ServerHttpResponseDecorator decoratedResponse = responseDecorator(exchange, receiveResponse);
// 记录普通的
return chain.filter(
exchange.mutate().response(decoratedResponse).build()
)
.then(Mono.fromRunnable(() -> {
// 打印日志
GatewayLogRecordDTO record = new GatewayLogRecordDTO(recordIn, recordOut);
writeAccessLog(record);
}));
}
private void writeAccessLog(GatewayLogRecordDTO record) {
log.info(JSONObject.toJSONString(record));
}
private Route getGatewayRoute(ServerWebExchange exchange) {
return exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
}
private GatewayRequestCtx tryPrepareRequestContext(ServerWebExchange exchange) {
ServerHttpRequest request = exchange.getRequest();
Map<String, String> requestMap = request.getHeaders().toSingleValueMap();
Map<String, String> queryParams = request.getQueryParams().toSingleValueMap();
String requestPath = request.getPath().pathWithinApplication().value();
String ipAddress = RequestHolder.getServerHttpRequestIpAddress(request);
GatewayRequestCtx ctx = new GatewayRequestCtx();
ctx.setRequestID(requestMap.get("X-Request-ID"));
ctx.setApi(requestPath);
ctx.setUid(requestMap.get("x-rc-uid"));
ctx.setTags(Collections.emptyList());
ctx.setIp(ipAddress);
ctx.setT(requestMap.get("x-rc-timestamp"));
ctx.setMethod(request.getMethodValue());
ctx.setParam(JSONObject.toJSONString(queryParams));
return ctx;
}
private GatewayIn prepareGatewayIn(ServerWebExchange exchange) {
GatewayRequestCtx ctx = this.tryPrepareRequestContext(exchange);
Map<String, String> requestMap = exchange.getRequest().getHeaders().toSingleValueMap();
String body = this.prepareRequestBody(exchange);
GatewayIn recordIn = new GatewayIn();
recordIn.setT(System.currentTimeMillis());
recordIn.setContext(ctx);
recordIn.setHeaders(JSONObject.toJSONString(requestMap));
recordIn.setRequest(body);
return recordIn;
}
@SneakyThrows
private String prepareRequestBody(ServerWebExchange exchange) {
ServerHttpRequest request = exchange.getRequest();
MediaType mediaType = request.getHeaders().getContentType();
DataBuffer cachedRequestBody = exchange.getAttribute(ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR);
if (cachedRequestBody == null) {
return null;
}
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(cachedRequestBody.asByteBuffer());
String bodyStr = charBuffer.toString();
if (APPLICATION_JSON.isCompatibleWith(mediaType)) {
return bodyStr;
} else if (
APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)
|| MULTIPART_FORM_DATA.isCompatibleWith(mediaType)
) {
// TODO Parse form data.
return bodyStr;
}
return null;
}
// ModifyResponseBodyGatewayFilterFactory.java
private ServerHttpResponseDecorator responseDecorator(
ServerWebExchange exchange,
Consumer<String> action
) {
ServerHttpResponse response = exchange.getResponse();
DataBufferFactory bufferFactory = response.bufferFactory();
return new ServerHttpResponseDecorator(response) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Flux) {
Flux<? extends DataBuffer> fluxBody = Flux.from(body);
return super.writeWith(fluxBody.buffer().map(dataBuffers -> {
// 合并多个流集合,解决返回体分段传输
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
DataBuffer join = dataBufferFactory.join(dataBuffers);
byte[] content = new byte[join.readableByteCount()];
join.read(content);
// 释放掉内存
DataBufferUtils.release(join);
// 大于 1GB 丢弃
if (content.length <= 1024 * 1024 * 1024) {
String responseResult = new String(content, StandardCharsets.UTF_8);
action.accept(responseResult);
}
return bufferFactory.wrap(content);
}));
} else if (body instanceof Mono) {
Mono<? extends DataBuffer> monoBody = Mono.from(body);
return super.writeWith(monoBody.map(dataBuffer -> {
byte[] content = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(content);
// 释放掉内存
DataBufferUtils.release(dataBuffer);
// 大于 1GB 丢弃
if (content.length <= 1024 * 1024 * 1024) {
String responseResult = new String(content, StandardCharsets.UTF_8);
action.accept(responseResult);
}
return bufferFactory.wrap(content);
}));
}
// if body is not a flux. never got there.
return super.writeWith(body);
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment