Skip to content

Instantly share code, notes, and snippets.

@spacewander
Last active January 26, 2024 15:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save spacewander/dab2ec1c9f478837d6a1c25f51ce6303 to your computer and use it in GitHub Desktop.
Save spacewander/dab2ec1c9f478837d6a1c25f51ce6303 to your computer and use it in GitHub Desktop.
Envoy on demand copy headers vs copy headers
diff --git a/contrib/golang/common/dso/dso.cc b/contrib/golang/common/dso/dso.cc
index ee8b0abda4..8cfb641c91 100644
--- a/contrib/golang/common/dso/dso.cc
+++ b/contrib/golang/common/dso/dso.cc
@@ -93,9 +93,9 @@ GoUint64 HttpFilterDsoImpl::envoyGoFilterOnHttpData(httpRequest* p0, GoUint64 p1
return envoy_go_filter_on_http_data_(p0, p1, p2, p3);
}
-void HttpFilterDsoImpl::envoyGoFilterOnHttpLog(httpRequest* p0, int p1) {
+void HttpFilterDsoImpl::envoyGoFilterOnHttpLog(httpRequest* p0, int p1, GoUint64 p2, GoUint64 p3) {
ASSERT(envoy_go_filter_on_http_log_ != nullptr);
- envoy_go_filter_on_http_log_(p0, GoUint64(p1));
+ envoy_go_filter_on_http_log_(p0, GoUint64(p1), p2, p3);
}
void HttpFilterDsoImpl::envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) {
diff --git a/contrib/golang/common/dso/dso.h b/contrib/golang/common/dso/dso.h
index 2ad0be8ada..b7540b2252 100644
--- a/contrib/golang/common/dso/dso.h
+++ b/contrib/golang/common/dso/dso.h
@@ -39,7 +39,7 @@ public:
GoUint64 p3) PURE;
virtual GoUint64 envoyGoFilterOnHttpData(httpRequest* p0, GoUint64 p1, GoUint64 p2,
GoUint64 p3) PURE;
- virtual void envoyGoFilterOnHttpLog(httpRequest* p0, int p1) PURE;
+ virtual void envoyGoFilterOnHttpLog(httpRequest* p0, int p1, GoUint64 p2, GoUint64 p3) PURE;
virtual void envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) PURE;
virtual void envoyGoRequestSemaDec(httpRequest* p0) PURE;
};
@@ -56,7 +56,7 @@ public:
GoUint64 envoyGoFilterOnHttpHeader(httpRequest* p0, GoUint64 p1, GoUint64 p2,
GoUint64 p3) override;
GoUint64 envoyGoFilterOnHttpData(httpRequest* p0, GoUint64 p1, GoUint64 p2, GoUint64 p3) override;
- void envoyGoFilterOnHttpLog(httpRequest* p0, int p1) override;
+ void envoyGoFilterOnHttpLog(httpRequest* p0, int p1, GoUint64 p2, GoUint64 p3) override;
void envoyGoFilterOnHttpDestroy(httpRequest* p0, int p1) override;
void envoyGoRequestSemaDec(httpRequest* p0) override;
@@ -69,7 +69,8 @@ private:
GoUint64 p3) = {nullptr};
GoUint64 (*envoy_go_filter_on_http_data_)(httpRequest* p0, GoUint64 p1, GoUint64 p2,
GoUint64 p3) = {nullptr};
- void (*envoy_go_filter_on_http_log_)(httpRequest* p0, GoUint64 p1) = {nullptr};
+ void (*envoy_go_filter_on_http_log_)(httpRequest* p0, GoUint64 p1, GoUint64 p2,
+ GoUint64 p3) = {nullptr};
void (*envoy_go_filter_on_http_destroy_)(httpRequest* p0, GoUint64 p1) = {nullptr};
void (*envoy_go_filter_go_request_sema_dec_)(httpRequest* p0) = {nullptr};
};
diff --git a/contrib/golang/common/dso/libgolang.h b/contrib/golang/common/dso/libgolang.h
index 7d400fbc1e..c0da3adca8 100644
--- a/contrib/golang/common/dso/libgolang.h
+++ b/contrib/golang/common/dso/libgolang.h
@@ -120,7 +120,7 @@ extern GoUint64 envoyGoFilterOnHttpData(httpRequest* r, GoUint64 end_stream, GoU
// go:linkname envoyGoFilterOnHttpLog
// github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http.envoyGoFilterOnHttpLog
-extern void envoyGoFilterOnHttpLog(httpRequest* r, GoUint64 type);
+extern void envoyGoFilterOnHttpLog(httpRequest* r, GoUint64 type, GoUint64 p2, GoUint64 p3);
// go:linkname envoyGoFilterOnHttpDestroy
// github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http.envoyGoFilterOnHttpDestroy
diff --git a/contrib/golang/common/go/api/api.h b/contrib/golang/common/go/api/api.h
index 504d412d3e..6b79cc2305 100644
--- a/contrib/golang/common/go/api/api.h
+++ b/contrib/golang/common/go/api/api.h
@@ -61,6 +61,7 @@ CAPIStatus envoyGoFilterHttpSendPanicReply(void* r, void* details_data, int deta
CAPIStatus envoyGoFilterHttpGetHeader(void* r, void* key_data, int key_len, uint64_t* value_data,
int* value_len);
CAPIStatus envoyGoFilterHttpCopyHeaders(void* r, void* strs, void* buf);
+CAPIStatus envoyGoFilterHttpOnDemandCopyHeaders(void* r, void* strs, void* buf);
CAPIStatus envoyGoFilterHttpSetHeaderHelper(void* r, void* key_data, int key_len, void* value_data,
int value_len, headerAction action);
CAPIStatus envoyGoFilterHttpRemoveHeader(void* r, void* key_data, int key_len);
diff --git a/contrib/golang/common/go/api/capi.go b/contrib/golang/common/go/api/capi.go
index 2ff034c7f5..996e411559 100644
--- a/contrib/golang/common/go/api/capi.go
+++ b/contrib/golang/common/go/api/capi.go
@@ -32,6 +32,8 @@ type HttpCAPI interface {
HttpSetHeader(r unsafe.Pointer, key string, value string, add bool)
HttpRemoveHeader(r unsafe.Pointer, key string)
+ HttpOnDemandCopyHeaders(r unsafe.Pointer, num uint64, bytes uint64) map[string][]string
+
HttpGetBuffer(r unsafe.Pointer, bufferPtr uint64, length uint64) []byte
HttpDrainBuffer(r unsafe.Pointer, bufferPtr uint64, length uint64)
HttpSetBufferHelper(r unsafe.Pointer, bufferPtr uint64, value string, action BufferAction)
diff --git a/contrib/golang/filters/http/source/cgo.cc b/contrib/golang/filters/http/source/cgo.cc
index 5cd8e047ba..827dc035b8 100644
--- a/contrib/golang/filters/http/source/cgo.cc
+++ b/contrib/golang/filters/http/source/cgo.cc
@@ -125,6 +125,14 @@ CAPIStatus envoyGoFilterHttpCopyHeaders(void* r, void* strs, void* buf) {
});
}
+CAPIStatus envoyGoFilterHttpOnDemandCopyHeaders(void* r, void* strs, void* buf) {
+ return envoyGoFilterHandlerWrapper(r, [strs, buf](std::shared_ptr<Filter>& filter) -> CAPIStatus {
+ auto go_strs = reinterpret_cast<GoString*>(strs);
+ auto go_buf = reinterpret_cast<char*>(buf);
+ return filter->onDemandCopyHeaders(go_strs, go_buf);
+ });
+}
+
CAPIStatus envoyGoFilterHttpSetHeaderHelper(void* r, void* key_data, int key_len, void* value_data,
int value_len, headerAction act) {
return envoyGoFilterHandlerWrapper(r,
diff --git a/contrib/golang/filters/http/source/go/pkg/http/capi_impl.go b/contrib/golang/filters/http/source/go/pkg/http/capi_impl.go
index a195c3b261..9aff4c0b43 100644
--- a/contrib/golang/filters/http/source/go/pkg/http/capi_impl.go
+++ b/contrib/golang/filters/http/source/go/pkg/http/capi_impl.go
@@ -341,6 +341,53 @@ func (c *httpCApiImpl) HttpGetDynamicMetadata(rr unsafe.Pointer, filterName stri
return meta.AsMap()
}
+func (c *httpCApiImpl) HttpOnDemandCopyHeaders(rr unsafe.Pointer, num uint64, bytes uint64) map[string][]string {
+ r := (*httpRequest)(rr)
+ //api.LogCritical("hit lock")
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+ r.sema.Add(1)
+
+ var strs []string
+ if num <= maxStackAllocedHeaderSize {
+ // NOTE: only const length slice may be allocated on stack.
+ strs = make([]string, maxStackAllocedSliceLen)
+ } else {
+ // TODO: maybe we could use a memory pool for better performance,
+ // since these go strings in strs, will be copied into the following map.
+ strs = make([]string, num*2)
+ }
+ // NOTE: this buffer can not be reused safely,
+ // since strings may refer to this buffer as string data, and string is const in go.
+ // we have to make sure the all strings is not using before reusing,
+ // but strings may be alive beyond the request life.
+ buf := make([]byte, bytes)
+ atomic.AddInt32(&r.waitingOnEnvoy, 1)
+ //now := time.Now()
+ res := C.envoyGoFilterHttpOnDemandCopyHeaders(unsafe.Pointer(r.req), unsafe.Pointer(unsafe.SliceData(strs)), unsafe.Pointer(unsafe.SliceData(buf)))
+ //api.LogCriticalf("go call c time: %v", time.Since(now))
+ if res == C.CAPIYield {
+ r.sema.Wait()
+ } else {
+ r.sema.Done()
+ handleCApiStatus(res)
+ }
+ //api.LogCriticalf("copy headers time: %v", time.Since(now))
+
+ m := make(map[string][]string, num)
+ for i := uint64(0); i < num*2; i += 2 {
+ key := strings.Clone(strs[i])
+ value := strings.Clone(strs[i+1])
+
+ if v, found := m[key]; !found {
+ m[key] = []string{value}
+ } else {
+ m[key] = append(v, value)
+ }
+ }
+ return m
+}
+
func (c *httpCApiImpl) HttpSetDynamicMetadata(r unsafe.Pointer, filterName string, key string, value interface{}) {
v, err := structpb.NewValue(value)
if err != nil {
diff --git a/contrib/golang/filters/http/source/go/pkg/http/shim.go b/contrib/golang/filters/http/source/go/pkg/http/shim.go
index 45d4cf1d4f..ee7abc98f2 100644
--- a/contrib/golang/filters/http/source/go/pkg/http/shim.go
+++ b/contrib/golang/filters/http/source/go/pkg/http/shim.go
@@ -35,9 +35,12 @@ import "C"
import (
"errors"
"fmt"
+ "reflect"
"runtime"
"sync"
"sync/atomic"
+ "time"
+ "unsafe"
"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
)
@@ -157,6 +160,41 @@ func envoyGoFilterOnHttpHeader(r *C.httpRequest, endStream, headerNum, headerByt
},
}
status = f.EncodeHeaders(header, endStream == 1)
+ status = api.Running
+ go func() {
+ var h1, h2 map[string][]string
+ now := time.Now()
+ for i := 0; i < 5; i++ {
+ h := &responseHeaderMapImpl{
+ requestOrResponseHeaderMapImpl{
+ headerMapImpl{
+ request: req,
+ headerNum: headerNum,
+ headerBytes: headerBytes,
+ },
+ },
+ }
+ h1 = cAPI.HttpOnDemandCopyHeaders(unsafe.Pointer(req), h.headerNum, h.headerBytes)
+ }
+ onDemandTime := time.Since(now)
+ now = time.Now()
+ for i := 0; i < 5; i++ {
+ h := &responseHeaderMapImpl{
+ requestOrResponseHeaderMapImpl{
+ headerMapImpl{
+ request: req,
+ headerNum: headerNum,
+ headerBytes: headerBytes,
+ },
+ },
+ }
+ h2 = cAPI.HttpCopyHeaders(unsafe.Pointer(h.request.req), h.headerNum, h.headerBytes)
+ }
+ api.LogCriticalf("h1 %v, h2 %v", h1, h2)
+ api.LogCriticalf("async time cost: %v, onDemand time cost: %v, pass: %v, ratio: %v", time.Since(now), onDemandTime,
+ reflect.DeepEqual(h1, h2), float64(time.Since(now))/float64(onDemandTime))
+ req.Continue(api.Continue)
+ }()
case api.EncodeTrailerPhase:
header := &responseTrailerMapImpl{
requestOrResponseTrailerMapImpl{
@@ -207,7 +245,7 @@ func envoyGoFilterOnHttpData(r *C.httpRequest, endStream, buffer, length uint64)
}
//export envoyGoFilterOnHttpLog
-func envoyGoFilterOnHttpLog(r *C.httpRequest, logType uint64) {
+func envoyGoFilterOnHttpLog(r *C.httpRequest, logType, headerNum, headerBytes uint64) {
req := getRequest(r)
if req == nil {
req = createRequest(r)
@@ -227,6 +265,36 @@ func envoyGoFilterOnHttpLog(r *C.httpRequest, logType uint64) {
case api.AccessLogDownstreamPeriodic:
f.OnLogDownstreamPeriodic()
case api.AccessLogDownstreamEnd:
+ var h1, h2 map[string][]string
+ now := time.Now()
+ for i := 0; i < 100000; i++ {
+ h := &requestHeaderMapImpl{
+ requestOrResponseHeaderMapImpl{
+ headerMapImpl{
+ request: req,
+ headerNum: headerNum,
+ headerBytes: headerBytes,
+ },
+ },
+ }
+ h1 = cAPI.HttpOnDemandCopyHeaders(unsafe.Pointer(req), h.headerNum, h.headerBytes)
+ }
+ onDemandTime := time.Since(now)
+ now = time.Now()
+ for i := 0; i < 100000; i++ {
+ h := &requestHeaderMapImpl{
+ requestOrResponseHeaderMapImpl{
+ headerMapImpl{
+ request: req,
+ headerNum: headerNum,
+ headerBytes: headerBytes,
+ },
+ },
+ }
+ h2 = cAPI.HttpCopyHeaders(unsafe.Pointer(h.request.req), h.headerNum, h.headerBytes)
+ }
+ api.LogCriticalf("time cost: %v, onDemand time cost: %v, pass: %v, ratio: %v", time.Since(now), onDemandTime,
+ reflect.DeepEqual(h1, h2), float64(time.Since(now))/float64(onDemandTime))
f.OnLog()
default:
api.LogErrorf("access log type %d is not supported yet", logType)
diff --git a/contrib/golang/filters/http/source/golang_filter.cc b/contrib/golang/filters/http/source/golang_filter.cc
index 01642e6ac7..5fb8f33bf7 100644
--- a/contrib/golang/filters/http/source/golang_filter.cc
+++ b/contrib/golang/filters/http/source/golang_filter.cc
@@ -230,8 +230,10 @@ void Filter::log(const Http::RequestHeaderMap* headers, const Http::ResponseHead
}
state.enterLog();
+ headers_ = static_cast<Http::RequestOrResponseHeaderMap*>(
+ const_cast<Http::RequestHeaderMap*>(headers));
req_->phase = static_cast<int>(state.phase());
- dynamic_lib_->envoyGoFilterOnHttpLog(req_, int(type));
+ dynamic_lib_->envoyGoFilterOnHttpLog(req_, int(type), headers_->size(), headers_->byteSize());
state.leaveLog();
} break;
default:
@@ -671,6 +673,54 @@ CAPIStatus Filter::copyHeaders(GoString* go_strs, char* go_buf) {
return CAPIStatus::CAPIOK;
}
+CAPIStatus Filter::onDemandCopyHeaders(GoString* go_strs, char* go_buf) {
+ // auto start = std::chrono::high_resolution_clock::now();
+ Thread::LockGuard lock(mutex_);
+ if (has_destroyed_) {
+ ENVOY_LOG(debug, "golang filter has been destroyed");
+ return CAPIStatus::CAPIFilterIsDestroy;
+ }
+ auto& state = getProcessorState();
+ if (!state.isProcessingInGo()) {
+ ENVOY_LOG(debug, "golang filter is not processing Go");
+ return CAPIStatus::CAPINotInGo;
+ }
+ if (headers_ == nullptr) {
+ ENVOY_LOG(debug, "invoking cgo api at invalid phase: {}", __func__);
+ return CAPIStatus::CAPIInvalidPhase;
+ }
+
+ if (!state.isThreadSafe()) {
+ auto weak_ptr = weak_from_this();
+ ENVOY_LOG(debug, "golang filter onDemandCopyHeaders posting request to dispatcher");
+ auto post_start = std::chrono::high_resolution_clock::now();
+ state.getDispatcher().post([this, &state, weak_ptr, go_strs, go_buf, post_start] {
+ ENVOY_LOG(debug, "golang filter onDemandCopyHeaders request in worker thread");
+ if (!weak_ptr.expired() && !hasDestroyed()) {
+ copyHeaderMapToGo(*headers_, go_strs, go_buf);
+ // auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(
+ // std::chrono::high_resolution_clock::now() - post_start);
+ // ENVOY_LOG(error, "onDemandCopyHeaders after post {} ns", duration.count());
+ dynamic_lib_->envoyGoRequestSemaDec(req_);
+ } else {
+ ENVOY_LOG(info, "golang filter has gone or destroyed in onDemandCopyHeaders");
+ }
+ });
+ // auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(
+ // std::chrono::high_resolution_clock::now() - start);
+ // ENVOY_LOG(error, "onDemandCopyHeaders {} ns", duration.count());
+ return CAPIStatus::CAPIYield;
+ }
+
+ copyHeaderMapToGo(*headers_, go_strs, go_buf);
+ ENVOY_LOG(debug, "golang filter onDemandCopyHeaders replying directly");
+
+ // auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(
+ // std::chrono::high_resolution_clock::now() - start);
+ // ENVOY_LOG(error, "onDemandCopyHeaders {} ns", duration.count());
+ return CAPIStatus::CAPIOK;
+}
+
// It won't take affect immidiately while it's invoked from a Go thread, instead, it will post a
// callback to run in the envoy worker thread.
CAPIStatus Filter::setHeader(absl::string_view key, absl::string_view value, headerAction act) {
diff --git a/contrib/golang/filters/http/source/golang_filter.h b/contrib/golang/filters/http/source/golang_filter.h
index 925c182e44..39cb5f7c2b 100644
--- a/contrib/golang/filters/http/source/golang_filter.h
+++ b/contrib/golang/filters/http/source/golang_filter.h
@@ -222,6 +222,7 @@ public:
CAPIStatus getHeader(absl::string_view key, uint64_t* value_data, int* value_len);
CAPIStatus copyHeaders(GoString* go_strs, char* go_buf);
+ CAPIStatus onDemandCopyHeaders(GoString* go_strs, char* go_buf);
CAPIStatus setHeader(absl::string_view key, absl::string_view value, headerAction act);
CAPIStatus removeHeader(absl::string_view key);
CAPIStatus copyBuffer(Buffer::Instance* buffer, char* data);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment