Created
February 26, 2020 21:04
-
-
Save bionoren/c8e45a802dbe0c0be6f1924b56a375f7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"context" | |
"encoding/json" | |
"fmt" | |
"io/ioutil" | |
"net/http" | |
"runtime/debug" | |
"strings" | |
"sync" | |
"sync/atomic" | |
"testing" | |
"time" | |
) | |
type collection struct { | |
Account | |
accounts []Account | |
instances []Accounter | |
} | |
func (c collection) Transition(ctx context.Context, transport http.RoundTripper) { | |
queue := collectionTransport(make(chan collectionRequest)) | |
activeAccounts := uint32(len(c.instances)) | |
var wg sync.WaitGroup | |
wg.Add(len(c.instances)) | |
for _, a := range c.instances { | |
go func() { | |
defer func() { | |
if r := recover(); r != nil { | |
fmt.Printf("recovered from panic: %s\n", r) | |
debug.PrintStack() | |
} | |
}() | |
defer wg.Done() | |
a.Transition(ctx, queue) | |
atomic.AddUint32(&activeAccounts, ^uint32(0)) | |
}() | |
} | |
msgs := make([]json.RawMessage, activeAccounts) | |
callbacks := make([]collectionCallback, activeAccounts) | |
for i := uint32(0); i < activeAccounts; i++ { | |
select { | |
case msg := <- queue: | |
msgs[i] = msg.data | |
callbacks[i] = msg.callback | |
default: | |
time.Sleep(time.Microsecond) | |
i-- | |
} | |
} | |
defer func(ctx context.Context) { | |
for _, callback := range callbacks { | |
callback(ctx, nil) | |
} | |
}(ctx) | |
var response json.RawMessage | |
response, err := Request(ctx, transport, "", msgs) | |
if err != nil { | |
return | |
} | |
var responses []json.RawMessage | |
if err = json.Unmarshal(response, &responses); err != nil { | |
return | |
} | |
for i, resp := range responses { | |
callbacks[i](ctx, resp) | |
} | |
callbacks = nil // make sure the defer doesn't re-invoke callbacks | |
wg.Wait() | |
} | |
type collectionCallback func(ctx context.Context, message json.RawMessage) | |
type collectionRequest struct { | |
data json.RawMessage | |
callback collectionCallback | |
} | |
type collectionTransport chan collectionRequest | |
func (t collectionTransport) RoundTrip(r *http.Request) (*http.Response, error) { | |
body, err := ioutil.ReadAll(r.Body) | |
if err != nil { | |
return nil, err | |
} | |
var resp *http.Response | |
var m sync.Mutex | |
m.Lock() | |
t <- collectionRequest{ | |
data: body, | |
callback: func(ctx context.Context, message json.RawMessage) { | |
defer func(ctx context.Context) { | |
if r := recover(); r != nil { | |
fmt.Printf("recovered from panic: %s\n", r) | |
debug.PrintStack() | |
} | |
}(ctx) | |
defer m.Unlock() | |
resp = &http.Response{ | |
StatusCode: http.StatusOK, | |
Body: ioutil.NopCloser(bytes.NewBuffer(message)), | |
ContentLength: int64(len(message)), | |
Uncompressed: true, | |
Request: r, | |
} | |
if message == nil { | |
resp.StatusCode = http.StatusInternalServerError | |
} | |
resp.Status = http.StatusText(resp.StatusCode) | |
}, | |
} | |
m.Lock() | |
return resp, err | |
} | |
var _ http.RoundTripper = make(collectionTransport) | |
func Request(_ context.Context, transport http.RoundTripper, api string, request interface{}) (json.RawMessage, error) { | |
data, err := json.Marshal(request) | |
if err != nil { | |
return nil, err | |
} | |
req, err := http.NewRequest("POST", "http://localhost/"+api, bytes.NewReader(data)) | |
if err != nil { | |
return nil, err | |
} | |
client := http.Client{ | |
Transport: transport, | |
Timeout: time.Duration(30) * time.Second, | |
} | |
resp, err := client.Do(req) | |
if err != nil { | |
return nil, err | |
} | |
defer resp.Body.Close() | |
var wrapper struct { | |
Data json.RawMessage `json:"response_data"` | |
} | |
body, err := ioutil.ReadAll(resp.Body) | |
if err != nil { | |
return nil, err | |
} | |
if err := json.Unmarshal(body, &wrapper); err != nil { | |
return nil, err | |
} | |
return wrapper.Data, nil | |
} | |
type Account struct { | |
index int | |
transactions [][]interface{} | |
collection *collection | |
} | |
func (a Account) Index() int { | |
return a.index | |
} | |
func (a *Account) AddTransaction(ctx context.Context) error { | |
a.transactions[0] = append(a.transactions[0], nil) | |
return nil | |
} | |
type Accounter interface { | |
Transition(ctx context.Context, transport http.RoundTripper) | |
} | |
type MockTransport struct {} | |
const numAccounts = 4000 | |
// RoundTrip is required to implement http.MockTransport. Instead of fulfilling the given request, | |
// the internal list of responders is consulted to handle the request. If no responder is found | |
// an error is returned, which is the equivalent of a network error. | |
func (m MockTransport) RoundTrip(req *http.Request) (*http.Response, error) { | |
type responseData struct { | |
ResponseData json.RawMessage `json:"response_data"` | |
} | |
msgs := make([]responseData, numAccounts) | |
data, err := json.Marshal(msgs) | |
if err != nil { | |
return nil, err | |
} | |
resp := http.Response{ | |
StatusCode: http.StatusOK, | |
Body: ioutil.NopCloser(strings.NewReader(fmt.Sprintf(`{"response_data": %s}`, string(data)))), | |
} | |
return &resp, nil | |
} | |
type testAccount struct { | |
Account | |
} | |
func (a testAccount) Transition(ctx context.Context, transport http.RoundTripper) { | |
Request(ctx, transport, "/foo/bar", 50) | |
_ = a.AddTransaction(ctx) | |
} | |
func (a testAccount) NetworkOrder() []string { | |
return []string{ | |
"/foo/bar", | |
} | |
} | |
func BenchmarkCollection_Transition(b *testing.B) { | |
ctx := context.Background() | |
c := collection{ | |
accounts: make([]Account, numAccounts), | |
instances: make([]Accounter, numAccounts), | |
} | |
for i := 0; i < numAccounts; i++ { | |
a := Account{ | |
collection: &c, | |
transactions: make([][]interface{}, 6), | |
} | |
instance := testAccount{a} | |
c.accounts[i] = a | |
c.instances[i] = instance | |
} | |
b.ResetTimer() | |
for i := 0; i < b.N; i++ { | |
c.Transition(ctx, MockTransport{}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment