Last active
January 26, 2024 15:15
-
-
Save spacewander/dab2ec1c9f478837d6a1c25f51ce6303 to your computer and use it in GitHub Desktop.
Envoy on demand copy headers vs copy headers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/contrib/golang/common/dso/dso.cc b/contrib/golang/common/dso/dso.cc | |
index ee8b0abda4..8cfb641c91 100644 | |
--- a/contrib/golang/common/dso/dso.cc | |
+++ b/contrib/golang/common/dso/dso.cc | |
@@ -93,9 +93,9 @@ GoUint64 HttpFilterDsoImpl::envoyGoFilterOnHttpData(httpRequest* p0, GoUint64 p1 | |
return envoy_go_filter_on_http_data_(p0, p1, p2, p3); | |
} | |
-void HttpFilterDsoImpl::envoyGoFilterOnHttpLog(httpRequest* p0, int p1) { | |
+void HttpFilterDsoImpl::envoyGoFilterOnHttpLog(httpRequest* p0, int p1, GoUint64 p2, GoUint64 p3) { | |
ASSERT(envoy_go_filter_on_http_log_ != nullptr); | |
- envoy_go_filter_on_http_log_(p0, GoUint64(p1)); | |
+ envoy_go_filter_on_http_log_(p0, GoUint64(p1), p2, p3); | |
} | |
void HttpFilterDsoImpl::envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) { | |
diff --git a/contrib/golang/common/dso/dso.h b/contrib/golang/common/dso/dso.h | |
index 2ad0be8ada..b7540b2252 100644 | |
--- a/contrib/golang/common/dso/dso.h | |
+++ b/contrib/golang/common/dso/dso.h | |
@@ -39,7 +39,7 @@ public: | |
GoUint64 p3) PURE; | |
virtual GoUint64 envoyGoFilterOnHttpData(httpRequest* p0, GoUint64 p1, GoUint64 p2, | |
GoUint64 p3) PURE; | |
- virtual void envoyGoFilterOnHttpLog(httpRequest* p0, int p1) PURE; | |
+ virtual void envoyGoFilterOnHttpLog(httpRequest* p0, int p1, GoUint64 p2, GoUint64 p3) PURE; | |
virtual void envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) PURE; | |
virtual void envoyGoRequestSemaDec(httpRequest* p0) PURE; | |
}; | |
@@ -56,7 +56,7 @@ public: | |
GoUint64 envoyGoFilterOnHttpHeader(httpRequest* p0, GoUint64 p1, GoUint64 p2, | |
GoUint64 p3) override; | |
GoUint64 envoyGoFilterOnHttpData(httpRequest* p0, GoUint64 p1, GoUint64 p2, GoUint64 p3) override; | |
- void envoyGoFilterOnHttpLog(httpRequest* p0, int p1) override; | |
+ void envoyGoFilterOnHttpLog(httpRequest* p0, int p1, GoUint64 p2, GoUint64 p3) override; | |
void envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) override; | |
void envoyGoRequestSemaDec(httpRequest* p0) override; | |
@@ -69,7 +69,8 @@ private: | |
GoUint64 p3) = {nullptr}; | |
GoUint64 (*envoy_go_filter_on_http_data_)(httpRequest* p0, GoUint64 p1, GoUint64 p2, | |
GoUint64 p3) = {nullptr}; | |
- void (*envoy_go_filter_on_http_log_)(httpRequest* p0, GoUint64 p1) = {nullptr}; | |
+ void (*envoy_go_filter_on_http_log_)(httpRequest* p0, GoUint64 p1, GoUint64 p2, | |
+ GoUint64 p3) = {nullptr}; | |
void (*envoy_go_filter_on_http_destroy_)(httpRequest* p0, GoUint64 p1) = {nullptr}; | |
void (*envoy_go_filter_go_request_sema_dec_)(httpRequest* p0) = {nullptr}; | |
}; | |
diff --git a/contrib/golang/common/dso/libgolang.h b/contrib/golang/common/dso/libgolang.h | |
index 7d400fbc1e..c0da3adca8 100644 | |
--- a/contrib/golang/common/dso/libgolang.h | |
+++ b/contrib/golang/common/dso/libgolang.h | |
@@ -120,7 +120,7 @@ extern GoUint64 envoyGoFilterOnHttpData(httpRequest* r, GoUint64 end_stream, GoU | |
// go:linkname envoyGoFilterOnHttpLog | |
// github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http.envoyGoFilterOnHttpLog | |
-extern void envoyGoFilterOnHttpLog(httpRequest* r, GoUint64 type); | |
+extern void envoyGoFilterOnHttpLog(httpRequest* r, GoUint64 type, GoUint64 p2, GoUint64 p3); | |
// go:linkname envoyGoFilterOnHttpDestroy | |
// github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http.envoyGoFilterOnHttpDestroy | |
diff --git a/contrib/golang/common/go/api/api.h b/contrib/golang/common/go/api/api.h | |
index 504d412d3e..6b79cc2305 100644 | |
--- a/contrib/golang/common/go/api/api.h | |
+++ b/contrib/golang/common/go/api/api.h | |
@@ -61,6 +61,7 @@ CAPIStatus envoyGoFilterHttpSendPanicReply(void* r, void* details_data, int deta | |
CAPIStatus envoyGoFilterHttpGetHeader(void* r, void* key_data, int key_len, uint64_t* value_data, | |
int* value_len); | |
CAPIStatus envoyGoFilterHttpCopyHeaders(void* r, void* strs, void* buf); | |
+CAPIStatus envoyGoFilterHttpOnDemandCopyHeaders(void* r, void* strs, void* buf); | |
CAPIStatus envoyGoFilterHttpSetHeaderHelper(void* r, void* key_data, int key_len, void* value_data, | |
int value_len, headerAction action); | |
CAPIStatus envoyGoFilterHttpRemoveHeader(void* r, void* key_data, int key_len); | |
diff --git a/contrib/golang/common/go/api/capi.go b/contrib/golang/common/go/api/capi.go | |
index 2ff034c7f5..996e411559 100644 | |
--- a/contrib/golang/common/go/api/capi.go | |
+++ b/contrib/golang/common/go/api/capi.go | |
@@ -32,6 +32,8 @@ type HttpCAPI interface { | |
HttpSetHeader(r unsafe.Pointer, key string, value string, add bool) | |
HttpRemoveHeader(r unsafe.Pointer, key string) | |
+ HttpOnDemandCopyHeaders(r unsafe.Pointer, num uint64, bytes uint64) map[string][]string | |
+ | |
HttpGetBuffer(r unsafe.Pointer, bufferPtr uint64, length uint64) []byte | |
HttpDrainBuffer(r unsafe.Pointer, bufferPtr uint64, length uint64) | |
HttpSetBufferHelper(r unsafe.Pointer, bufferPtr uint64, value string, action BufferAction) | |
diff --git a/contrib/golang/filters/http/source/cgo.cc b/contrib/golang/filters/http/source/cgo.cc | |
index 5cd8e047ba..827dc035b8 100644 | |
--- a/contrib/golang/filters/http/source/cgo.cc | |
+++ b/contrib/golang/filters/http/source/cgo.cc | |
@@ -125,6 +125,14 @@ CAPIStatus envoyGoFilterHttpCopyHeaders(void* r, void* strs, void* buf) { | |
}); | |
} | |
+CAPIStatus envoyGoFilterHttpOnDemandCopyHeaders(void* r, void* strs, void* buf) { | |
+ return envoyGoFilterHandlerWrapper(r, [strs, buf](std::shared_ptr<Filter>& filter) -> CAPIStatus { | |
+ auto go_strs = reinterpret_cast<GoString*>(strs); | |
+ auto go_buf = reinterpret_cast<char*>(buf); | |
+ return filter->onDemandCopyHeaders(go_strs, go_buf); | |
+ }); | |
+} | |
+ | |
CAPIStatus envoyGoFilterHttpSetHeaderHelper(void* r, void* key_data, int key_len, void* value_data, | |
int value_len, headerAction act) { | |
return envoyGoFilterHandlerWrapper(r, | |
diff --git a/contrib/golang/filters/http/source/go/pkg/http/capi_impl.go b/contrib/golang/filters/http/source/go/pkg/http/capi_impl.go | |
index a195c3b261..9aff4c0b43 100644 | |
--- a/contrib/golang/filters/http/source/go/pkg/http/capi_impl.go | |
+++ b/contrib/golang/filters/http/source/go/pkg/http/capi_impl.go | |
@@ -341,6 +341,53 @@ func (c *httpCApiImpl) HttpGetDynamicMetadata(rr unsafe.Pointer, filterName stri | |
return meta.AsMap() | |
} | |
+func (c *httpCApiImpl) HttpOnDemandCopyHeaders(rr unsafe.Pointer, num uint64, bytes uint64) map[string][]string { | |
+ r := (*httpRequest)(rr) | |
+ //api.LogCritical("hit lock") | |
+ r.mutex.Lock() | |
+ defer r.mutex.Unlock() | |
+ r.sema.Add(1) | |
+ | |
+ var strs []string | |
+ if num <= maxStackAllocedHeaderSize { | |
+ // NOTE: only const length slice may be allocated on stack. | |
+ strs = make([]string, maxStackAllocedSliceLen) | |
+ } else { | |
+ // TODO: maybe we could use a memory pool for better performance, | |
+ // since these go strings in strs, will be copied into the following map. | |
+ strs = make([]string, num*2) | |
+ } | |
+ // NOTE: this buffer can not be reused safely, | |
+ // since strings may refer to this buffer as string data, and string is const in go. | |
+ // we have to make sure the all strings is not using before reusing, | |
+ // but strings may be alive beyond the request life. | |
+ buf := make([]byte, bytes) | |
+ atomic.AddInt32(&r.waitingOnEnvoy, 1) | |
+ //now := time.Now() | |
+ res := C.envoyGoFilterHttpOnDemandCopyHeaders(unsafe.Pointer(r.req), unsafe.Pointer(unsafe.SliceData(strs)), unsafe.Pointer(unsafe.SliceData(buf))) | |
+ //api.LogCriticalf("go call c time: %v", time.Since(now)) | |
+ if res == C.CAPIYield { | |
+ r.sema.Wait() | |
+ } else { | |
+ r.sema.Done() | |
+ handleCApiStatus(res) | |
+ } | |
+ //api.LogCriticalf("copy headers time: %v", time.Since(now)) | |
+ | |
+ m := make(map[string][]string, num) | |
+ for i := uint64(0); i < num*2; i += 2 { | |
+ key := strings.Clone(strs[i]) | |
+ value := strings.Clone(strs[i+1]) | |
+ | |
+ if v, found := m[key]; !found { | |
+ m[key] = []string{value} | |
+ } else { | |
+ m[key] = append(v, value) | |
+ } | |
+ } | |
+ return m | |
+} | |
+ | |
func (c *httpCApiImpl) HttpSetDynamicMetadata(r unsafe.Pointer, filterName string, key string, value interface{}) { | |
v, err := structpb.NewValue(value) | |
if err != nil { | |
diff --git a/contrib/golang/filters/http/source/go/pkg/http/shim.go b/contrib/golang/filters/http/source/go/pkg/http/shim.go | |
index 45d4cf1d4f..ee7abc98f2 100644 | |
--- a/contrib/golang/filters/http/source/go/pkg/http/shim.go | |
+++ b/contrib/golang/filters/http/source/go/pkg/http/shim.go | |
@@ -35,9 +35,12 @@ import "C" | |
import ( | |
"errors" | |
"fmt" | |
+ "reflect" | |
"runtime" | |
"sync" | |
"sync/atomic" | |
+ "time" | |
+ "unsafe" | |
"github.com/envoyproxy/envoy/contrib/golang/common/go/api" | |
) | |
@@ -157,6 +160,41 @@ func envoyGoFilterOnHttpHeader(r *C.httpRequest, endStream, headerNum, headerByt | |
}, | |
} | |
status = f.EncodeHeaders(header, endStream == 1) | |
+ status = api.Running | |
+ go func() { | |
+ var h1, h2 map[string][]string | |
+ now := time.Now() | |
+ for i := 0; i < 5; i++ { | |
+ h := &responseHeaderMapImpl{ | |
+ requestOrResponseHeaderMapImpl{ | |
+ headerMapImpl{ | |
+ request: req, | |
+ headerNum: headerNum, | |
+ headerBytes: headerBytes, | |
+ }, | |
+ }, | |
+ } | |
+ h1 = cAPI.HttpOnDemandCopyHeaders(unsafe.Pointer(req), h.headerNum, h.headerBytes) | |
+ } | |
+ onDemandTime := time.Since(now) | |
+ now = time.Now() | |
+ for i := 0; i < 5; i++ { | |
+ h := &responseHeaderMapImpl{ | |
+ requestOrResponseHeaderMapImpl{ | |
+ headerMapImpl{ | |
+ request: req, | |
+ headerNum: headerNum, | |
+ headerBytes: headerBytes, | |
+ }, | |
+ }, | |
+ } | |
+ h2 = cAPI.HttpCopyHeaders(unsafe.Pointer(h.request.req), h.headerNum, h.headerBytes) | |
+ } | |
+ api.LogCriticalf("h1 %v, h2 %v", h1, h2) | |
+ api.LogCriticalf("async time cost: %v, onDemand time cost: %v, pass: %v, ratio: %v", time.Since(now), onDemandTime, | |
+ reflect.DeepEqual(h1, h2), float64(time.Since(now))/float64(onDemandTime)) | |
+ req.Continue(api.Continue) | |
+ }() | |
case api.EncodeTrailerPhase: | |
header := &responseTrailerMapImpl{ | |
requestOrResponseTrailerMapImpl{ | |
@@ -207,7 +245,7 @@ func envoyGoFilterOnHttpData(r *C.httpRequest, endStream, buffer, length uint64) | |
} | |
//export envoyGoFilterOnHttpLog | |
-func envoyGoFilterOnHttpLog(r *C.httpRequest, logType uint64) { | |
+func envoyGoFilterOnHttpLog(r *C.httpRequest, logType, headerNum, headerBytes uint64) { | |
req := getRequest(r) | |
if req == nil { | |
req = createRequest(r) | |
@@ -227,6 +265,36 @@ func envoyGoFilterOnHttpLog(r *C.httpRequest, logType uint64) { | |
case api.AccessLogDownstreamPeriodic: | |
f.OnLogDownstreamPeriodic() | |
case api.AccessLogDownstreamEnd: | |
+ var h1, h2 map[string][]string | |
+ now := time.Now() | |
+ for i := 0; i < 100000; i++ { | |
+ h := &requestHeaderMapImpl{ | |
+ requestOrResponseHeaderMapImpl{ | |
+ headerMapImpl{ | |
+ request: req, | |
+ headerNum: headerNum, | |
+ headerBytes: headerBytes, | |
+ }, | |
+ }, | |
+ } | |
+ h1 = cAPI.HttpOnDemandCopyHeaders(unsafe.Pointer(req), h.headerNum, h.headerBytes) | |
+ } | |
+ onDemandTime := time.Since(now) | |
+ now = time.Now() | |
+ for i := 0; i < 100000; i++ { | |
+ h := &requestHeaderMapImpl{ | |
+ requestOrResponseHeaderMapImpl{ | |
+ headerMapImpl{ | |
+ request: req, | |
+ headerNum: headerNum, | |
+ headerBytes: headerBytes, | |
+ }, | |
+ }, | |
+ } | |
+ h2 = cAPI.HttpCopyHeaders(unsafe.Pointer(h.request.req), h.headerNum, h.headerBytes) | |
+ } | |
+ api.LogCriticalf("time cost: %v, onDemand time cost: %v, pass: %v, ratio: %v", time.Since(now), onDemandTime, | |
+ reflect.DeepEqual(h1, h2), float64(time.Since(now))/float64(onDemandTime)) | |
f.OnLog() | |
default: | |
api.LogErrorf("access log type %d is not supported yet", logType) | |
diff --git a/contrib/golang/filters/http/source/golang_filter.cc b/contrib/golang/filters/http/source/golang_filter.cc | |
index 01642e6ac7..5fb8f33bf7 100644 | |
--- a/contrib/golang/filters/http/source/golang_filter.cc | |
+++ b/contrib/golang/filters/http/source/golang_filter.cc | |
@@ -230,8 +230,10 @@ void Filter::log(const Http::RequestHeaderMap* headers, const Http::ResponseHead | |
} | |
state.enterLog(); | |
+ headers_ = static_cast<Http::RequestOrResponseHeaderMap*>( | |
+ const_cast<Http::RequestHeaderMap*>(headers)); | |
req_->phase = static_cast<int>(state.phase()); | |
- dynamic_lib_->envoyGoFilterOnHttpLog(req_, int(type)); | |
+ dynamic_lib_->envoyGoFilterOnHttpLog(req_, int(type), headers_->size(), headers_->byteSize()); | |
state.leaveLog(); | |
} break; | |
default: | |
@@ -671,6 +673,54 @@ CAPIStatus Filter::copyHeaders(GoString* go_strs, char* go_buf) { | |
return CAPIStatus::CAPIOK; | |
} | |
+CAPIStatus Filter::onDemandCopyHeaders(GoString* go_strs, char* go_buf) { | |
+ // auto start = std::chrono::high_resolution_clock::now(); | |
+ Thread::LockGuard lock(mutex_); | |
+ if (has_destroyed_) { | |
+ ENVOY_LOG(debug, "golang filter has been destroyed"); | |
+ return CAPIStatus::CAPIFilterIsDestroy; | |
+ } | |
+ auto& state = getProcessorState(); | |
+ if (!state.isProcessingInGo()) { | |
+ ENVOY_LOG(debug, "golang filter is not processing Go"); | |
+ return CAPIStatus::CAPINotInGo; | |
+ } | |
+ if (headers_ == nullptr) { | |
+ ENVOY_LOG(debug, "invoking cgo api at invalid phase: {}", __func__); | |
+ return CAPIStatus::CAPIInvalidPhase; | |
+ } | |
+ | |
+ if (!state.isThreadSafe()) { | |
+ auto weak_ptr = weak_from_this(); | |
+ ENVOY_LOG(debug, "golang filter onDemandCopyHeaders posting request to dispatcher"); | |
+ auto post_start = std::chrono::high_resolution_clock::now(); | |
+ state.getDispatcher().post([this, &state, weak_ptr, go_strs, go_buf, post_start] { | |
+ ENVOY_LOG(debug, "golang filter onDemandCopyHeaders request in worker thread"); | |
+ if (!weak_ptr.expired() && !hasDestroyed()) { | |
+ copyHeaderMapToGo(*headers_, go_strs, go_buf); | |
+ // auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>( | |
+ // std::chrono::high_resolution_clock::now() - post_start); | |
+ // ENVOY_LOG(error, "onDemandCopyHeaders after post {} ns", duration.count()); | |
+ dynamic_lib_->envoyGoRequestSemaDec(req_); | |
+ } else { | |
+ ENVOY_LOG(info, "golang filter has gone or destroyed in onDemandCopyHeaders"); | |
+ } | |
+ }); | |
+ // auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>( | |
+ // std::chrono::high_resolution_clock::now() - start); | |
+ // ENVOY_LOG(error, "onDemandCopyHeaders {} ns", duration.count()); | |
+ return CAPIStatus::CAPIYield; | |
+ } | |
+ | |
+ copyHeaderMapToGo(*headers_, go_strs, go_buf); | |
+ ENVOY_LOG(debug, "golang filter onDemandCopyHeaders replying directly"); | |
+ | |
+ // auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>( | |
+ // std::chrono::high_resolution_clock::now() - start); | |
+ // ENVOY_LOG(error, "onDemandCopyHeaders {} ns", duration.count()); | |
+ return CAPIStatus::CAPIOK; | |
+} | |
+ | |
// It won't take affect immidiately while it's invoked from a Go thread, instead, it will post a | |
// callback to run in the envoy worker thread. | |
CAPIStatus Filter::setHeader(absl::string_view key, absl::string_view value, headerAction act) { | |
diff --git a/contrib/golang/filters/http/source/golang_filter.h b/contrib/golang/filters/http/source/golang_filter.h | |
index 925c182e44..39cb5f7c2b 100644 | |
--- a/contrib/golang/filters/http/source/golang_filter.h | |
+++ b/contrib/golang/filters/http/source/golang_filter.h | |
@@ -222,6 +222,7 @@ public: | |
CAPIStatus getHeader(absl::string_view key, uint64_t* value_data, int* value_len); | |
CAPIStatus copyHeaders(GoString* go_strs, char* go_buf); | |
+ CAPIStatus onDemandCopyHeaders(GoString* go_strs, char* go_buf); | |
CAPIStatus setHeader(absl::string_view key, absl::string_view value, headerAction act); | |
CAPIStatus removeHeader(absl::string_view key); | |
CAPIStatus copyBuffer(Buffer::Instance* buffer, char* data); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment