Skip to content

Instantly share code, notes, and snippets.

@bionoren
Created February 26, 2020 21:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bionoren/c8e45a802dbe0c0be6f1924b56a375f7 to your computer and use it in GitHub Desktop.
Save bionoren/c8e45a802dbe0c0be6f1924b56a375f7 to your computer and use it in GitHub Desktop.
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"runtime/debug"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
type collection struct {
Account
accounts []Account
instances []Accounter
}
func (c collection) Transition(ctx context.Context, transport http.RoundTripper) {
queue := collectionTransport(make(chan collectionRequest))
activeAccounts := uint32(len(c.instances))
var wg sync.WaitGroup
wg.Add(len(c.instances))
for _, a := range c.instances {
go func() {
defer func() {
if r := recover(); r != nil {
fmt.Printf("recovered from panic: %s\n", r)
debug.PrintStack()
}
}()
defer wg.Done()
a.Transition(ctx, queue)
atomic.AddUint32(&activeAccounts, ^uint32(0))
}()
}
msgs := make([]json.RawMessage, activeAccounts)
callbacks := make([]collectionCallback, activeAccounts)
for i := uint32(0); i < activeAccounts; i++ {
select {
case msg := <- queue:
msgs[i] = msg.data
callbacks[i] = msg.callback
default:
time.Sleep(time.Microsecond)
i--
}
}
defer func(ctx context.Context) {
for _, callback := range callbacks {
callback(ctx, nil)
}
}(ctx)
var response json.RawMessage
response, err := Request(ctx, transport, "", msgs)
if err != nil {
return
}
var responses []json.RawMessage
if err = json.Unmarshal(response, &responses); err != nil {
return
}
for i, resp := range responses {
callbacks[i](ctx, resp)
}
callbacks = nil // make sure the defer doesn't re-invoke callbacks
wg.Wait()
}
type collectionCallback func(ctx context.Context, message json.RawMessage)
type collectionRequest struct {
data json.RawMessage
callback collectionCallback
}
type collectionTransport chan collectionRequest
func (t collectionTransport) RoundTrip(r *http.Request) (*http.Response, error) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, err
}
var resp *http.Response
var m sync.Mutex
m.Lock()
t <- collectionRequest{
data: body,
callback: func(ctx context.Context, message json.RawMessage) {
defer func(ctx context.Context) {
if r := recover(); r != nil {
fmt.Printf("recovered from panic: %s\n", r)
debug.PrintStack()
}
}(ctx)
defer m.Unlock()
resp = &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewBuffer(message)),
ContentLength: int64(len(message)),
Uncompressed: true,
Request: r,
}
if message == nil {
resp.StatusCode = http.StatusInternalServerError
}
resp.Status = http.StatusText(resp.StatusCode)
},
}
m.Lock()
return resp, err
}
var _ http.RoundTripper = make(collectionTransport)
func Request(_ context.Context, transport http.RoundTripper, api string, request interface{}) (json.RawMessage, error) {
data, err := json.Marshal(request)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", "http://localhost/"+api, bytes.NewReader(data))
if err != nil {
return nil, err
}
client := http.Client{
Transport: transport,
Timeout: time.Duration(30) * time.Second,
}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var wrapper struct {
Data json.RawMessage `json:"response_data"`
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if err := json.Unmarshal(body, &wrapper); err != nil {
return nil, err
}
return wrapper.Data, nil
}
type Account struct {
index int
transactions [][]interface{}
collection *collection
}
func (a Account) Index() int {
return a.index
}
func (a *Account) AddTransaction(ctx context.Context) error {
a.transactions[0] = append(a.transactions[0], nil)
return nil
}
type Accounter interface {
Transition(ctx context.Context, transport http.RoundTripper)
}
type MockTransport struct {}
const numAccounts = 4000
// RoundTrip is required to implement http.MockTransport. Instead of fulfilling the given request,
// the internal list of responders is consulted to handle the request. If no responder is found
// an error is returned, which is the equivalent of a network error.
func (m MockTransport) RoundTrip(req *http.Request) (*http.Response, error) {
type responseData struct {
ResponseData json.RawMessage `json:"response_data"`
}
msgs := make([]responseData, numAccounts)
data, err := json.Marshal(msgs)
if err != nil {
return nil, err
}
resp := http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(strings.NewReader(fmt.Sprintf(`{"response_data": %s}`, string(data)))),
}
return &resp, nil
}
type testAccount struct {
Account
}
func (a testAccount) Transition(ctx context.Context, transport http.RoundTripper) {
Request(ctx, transport, "/foo/bar", 50)
_ = a.AddTransaction(ctx)
}
func (a testAccount) NetworkOrder() []string {
return []string{
"/foo/bar",
}
}
func BenchmarkCollection_Transition(b *testing.B) {
ctx := context.Background()
c := collection{
accounts: make([]Account, numAccounts),
instances: make([]Accounter, numAccounts),
}
for i := 0; i < numAccounts; i++ {
a := Account{
collection: &c,
transactions: make([][]interface{}, 6),
}
instance := testAccount{a}
c.accounts[i] = a
c.instances[i] = instance
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
c.Transition(ctx, MockTransport{})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment