Last active
November 20, 2019 15:17
-
-
Save yondonfu/3b1afee2e1a99d3d07fd4ec5ae500831 to your computer and use it in GitHub Desktop.
Orchestator selection
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
// "encoding/csv" | |
// "encoding/hex" | |
"encoding/csv" | |
"encoding/hex" | |
"fmt" | |
"log" | |
"math/rand" | |
"os" | |
) | |
type config struct { | |
maxWorkSetSize int | |
minWorkSetSize int | |
gtvWeight float64 | |
psWeight float64 | |
latencyTolerance float64 | |
randomSelectProb float64 | |
segDuration float64 | |
maxLatency float64 | |
minLatency float64 | |
// % probability for creating an orch that returns a satisfactory response | |
satProb float64 | |
numOrchs int | |
numSegs int | |
stakeGen stakeGenFunc | |
} | |
type stakeGenFunc func(numOrchs int) []int64 | |
func runExperiment(cfg config) { | |
store := newStore( | |
cfg.maxWorkSetSize, | |
cfg.minWorkSetSize, | |
cfg.gtvWeight, | |
cfg.psWeight, | |
cfg.latencyTolerance, | |
cfg.randomSelectProb, | |
) | |
// Create orchs | |
remoteOrchs := make(map[int]*orch) | |
stakes := cfg.stakeGen(cfg.numOrchs) | |
for i := 0; i < cfg.numOrchs; i++ { | |
r := rand.Float64() | |
var strat strategy | |
if r < cfg.satProb { | |
strat = satisfactory | |
} else { | |
strat = unsatisfactory | |
} | |
latency := cfg.minLatency + r*(cfg.maxLatency-cfg.minLatency) | |
orch := &orch{id: i, strat: strat, latency: latency} | |
remoteOrchs[i] = orch | |
store.AddOrch(i, stakes[i], orch) | |
} | |
numSat := 0 | |
numUnsat := 0 | |
totalLatency := 0.0 | |
totalSegs := 0 | |
for i := 0; i < cfg.numSegs; i++ { | |
res, latency := store.Request(int64(i), cfg.segDuration) | |
if res { | |
numSat++ | |
} else { | |
numUnsat++ | |
} | |
totalLatency += latency | |
totalSegs++ | |
} | |
log.Printf("avg latency: %v", totalLatency/float64(totalSegs)) | |
log.Printf("# satisfactory responses: %v", numSat) | |
log.Printf("# unsatisfactory responses: %v", numUnsat) | |
f, err := os.Create(fmt.Sprintf("result-%v.csv", randID())) | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer f.Close() | |
w := csv.NewWriter(f) | |
defer w.Flush() | |
createRecord := func(id int, orch *orch) []string { | |
record := []string{} | |
record = append(record, fmt.Sprintf("%f", orch.latency)) | |
record = append(record, fmt.Sprintf("%d", orch.strat)) | |
record = append(record, fmt.Sprintf("%d", store.Sat(id))) | |
record = append(record, fmt.Sprintf("%f", store.LTV(id))) | |
record = append(record, fmt.Sprintf("%f", store.GTV(id))) | |
record = append(record, fmt.Sprintf("%f", store.Rep(id))) | |
record = append(record, fmt.Sprintf("%d", orch.reqs)) | |
return record | |
} | |
for id, orch := range remoteOrchs { | |
w.Write(createRecord(id, orch)) | |
} | |
// for id, orch := range remoteOrchs { | |
// log.Printf("Orch %v Latency %v Strategy: %v SAT: %v LTV: %v GTV: %v REP: %v Requests: %v", id, orch.latency, orch.strat, store.Sat(id), store.LTV(id), store.GTV(id), store.Rep(id), orch.reqs) | |
// } | |
} | |
func randID() string { | |
x := make([]byte, 10, 10) | |
for i := 0; i < len(x); i++ { | |
x[i] = byte(rand.Uint32()) | |
} | |
return hex.EncodeToString(x) | |
} | |
func main() { | |
cfg := config{ | |
maxWorkSetSize: 15, | |
minWorkSetSize: 10, | |
gtvWeight: .5, | |
psWeight: 0.7, | |
randomSelectProb: .1, | |
latencyTolerance: 0.15, | |
satProb: .7, | |
segDuration: 2.0, | |
maxLatency: 6.0, | |
minLatency: 1.0, | |
numOrchs: 50, | |
numSegs: 100000, | |
} | |
// cfg.stakeGen = func (numOrchs int) []int64 { | |
// stakes := make([]int64, numOrchs) | |
// for i := 0; i < numOrchs; i++ { | |
// stakes[i] = int64(100000) / int64(numOrchs) | |
// } | |
// return stakes | |
// } | |
// runExperiment(cfg) | |
cfg.stakeGen = func(numOrchs int) []int64 { | |
r := rand.New(rand.NewSource(7)) | |
zr := rand.NewZipf(r, 1.01, 1, 100000) | |
stakes := make([]int64, numOrchs) | |
for i := 0; i < numOrchs; i++ { | |
stakes[i] = int64(zr.Uint64()) | |
} | |
return stakes | |
} | |
runExperiment(cfg) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"log" | |
"math/rand" | |
) | |
type strategy int | |
const ( | |
satisfactory strategy = iota | |
unsatisfactory | |
) | |
type orch struct { | |
id int | |
strat strategy | |
latency float64 | |
reqs int | |
} | |
func (o *orch) Request() (bool, float64) { | |
o.reqs++ | |
return o.strat == satisfactory, o.latency | |
} | |
type orchRecord struct { | |
id int | |
stake int64 | |
// Reference to a remote orch to send requests to | |
remoteOrch *orch | |
// # of satisfactory responses - # of unsatisfactory responses | |
sat int64 | |
// # of work set refreshes that orch will not be eligible | |
numRefreshesFrozen int64 | |
} | |
type workRecord struct { | |
id int | |
totalLatencyScore float64 | |
latencyScoreSamples int64 | |
} | |
func (wr *workRecord) avgLatencyScore() float64 { | |
if wr.latencyScoreSamples == 0.0 { | |
return 0.0 | |
} | |
return wr.totalLatencyScore / float64(wr.latencyScoreSamples) | |
} | |
type store struct { | |
orchs map[int]*orchRecord | |
workSet map[int]*workRecord | |
maxWorkSetSize int | |
minWorkSetSize int | |
// % weight for gtv relative to ltv in rep calculation | |
gtvWeight float64 | |
// % weight for ps relative to rep in selection weight calculation | |
psWeight float64 | |
// Max % lower than work set average latency score to tolerate | |
latencyTolerance float64 | |
// % chance of running a purely random selection instead of a weighted random selection | |
randomSelectProb float64 | |
} | |
func newStore(maxWorkSetSize int, minWorkSetSize int, gtvWeight, psWeight, latencyTolerance, randomSelectProb float64) *store { | |
return &store{ | |
orchs: make(map[int]*orchRecord), | |
workSet: make(map[int]*workRecord), | |
maxWorkSetSize: maxWorkSetSize, | |
minWorkSetSize: minWorkSetSize, | |
gtvWeight: gtvWeight, | |
psWeight: psWeight, | |
latencyTolerance: latencyTolerance, | |
randomSelectProb: randomSelectProb, | |
} | |
} | |
func (s *store) AddOrch(id int, stake int64, remoteOrch *orch) { | |
s.orchs[id] = &orchRecord{ | |
id: id, | |
stake: stake, | |
remoteOrch: remoteOrch, | |
} | |
} | |
func (s *store) RemoveOrch(id int) { | |
delete(s.orchs, id) | |
} | |
func (s *store) Request(segNo int64, segDuration float64) (bool, float64) { | |
// Make sure work set is populated | |
if len(s.workSet) < s.minWorkSetSize { | |
s.refreshWorkSet() | |
s.updateEligibleOrchs() | |
log.Printf("refreshed work set: %v", s.workSetLst()) | |
} | |
// Send request to selected orch | |
pick := s.selectOrch() | |
res, latency := s.orchs[pick].remoteOrch.Request() | |
if res { | |
s.recordSat(pick) | |
latencyScore := segDuration / latency | |
s.recordLatencyScore(pick, latencyScore) | |
} else { | |
s.recordUnsat(pick) | |
} | |
return res, latency | |
} | |
func (s *store) Sat(id int) int64 { | |
return s.orchs[id].sat | |
} | |
func (s *store) LTV(id int) float64 { | |
numOrchs := len(s.orchs) | |
if numOrchs == 0 { | |
return 0 | |
} | |
totalSat := s.totalSat() | |
if totalSat == 0 { | |
return 1.0 / float64(numOrchs) | |
} | |
sat := s.orchs[id].sat | |
if sat < 0 { | |
sat = 0 | |
} | |
return float64(sat) / float64(totalSat) | |
} | |
func (s *store) GTV(id int) float64 { | |
numOrchs := len(s.orchs) | |
if numOrchs == 0 { | |
return 0 | |
} | |
totalStake := s.totalStake() | |
if totalStake == 0 { | |
return 1.0 / float64(numOrchs) | |
} | |
return float64(s.orchs[id].stake) / float64(totalStake) | |
} | |
// Rep is historical and persists across streams. All orchs have a rep value | |
func (s *store) Rep(id int) float64 { | |
return (s.gtvWeight * s.GTV(id)) + ((1.0 - s.gtvWeight) * s.LTV(id)) | |
} | |
// PS is per-stream. Only orchs in the work set have a PS value | |
func (s *store) PS(id int) float64 { | |
numOrchs := len(s.workSet) | |
if numOrchs == 0 { | |
return 0 | |
} | |
sum := 0.0 | |
for _, orch := range s.workSet { | |
sum += orch.avgLatencyScore() | |
} | |
if sum == 0.0 { | |
return 1.0 / float64(numOrchs) | |
} | |
return s.workSet[id].avgLatencyScore() / sum | |
} | |
func (s *store) Weight(id int) float64 { | |
return (s.psWeight * s.PS(id)) + ((1.0 - s.psWeight) * s.Rep(id)) | |
} | |
func (s *store) selectOrch() int { | |
r := rand.Float64() | |
if r < s.randomSelectProb { | |
return s.randomSelect() | |
} | |
return s.weightedSelect() | |
} | |
func (s *store) randomSelect() int { | |
lst := s.workSetLst() | |
r := rand.Intn(len(lst)) | |
return lst[r] | |
} | |
func (s *store) weightedSelect() int { | |
lst := s.workSetLst() | |
totalWeight := float64(0) | |
for _, id := range lst { | |
totalWeight += s.Weight(id) | |
} | |
r := rand.Float64() * totalWeight | |
for _, id := range lst { | |
r -= s.Weight(id) | |
if r <= 0.0 { | |
return id | |
} | |
} | |
return -1 | |
} | |
func (s *store) totalSat() int64 { | |
sum := int64(0) | |
for _, orch := range s.orchs { | |
sat := orch.sat | |
if sat > 0 { | |
sum += sat | |
} | |
} | |
return sum | |
} | |
func (s *store) totalStake() int64 { | |
sum := int64(0) | |
for _, orch := range s.orchs { | |
sum += orch.stake | |
} | |
return sum | |
} | |
func (s *store) workSetAvgLatencyScore() float64 { | |
sum := 0.0 | |
samples := 0 | |
for _, orch := range s.workSet { | |
als := orch.avgLatencyScore() | |
if als > 0 { | |
sum += als | |
samples++ | |
} | |
} | |
if samples == 0 { | |
return 0.0 | |
} | |
return sum / float64(samples) | |
} | |
func (s *store) recordSat(id int) { | |
s.orchs[id].sat++ | |
} | |
func (s *store) recordUnsat(id int) { | |
s.orchs[id].sat-- | |
if s.orchs[id].sat < 0 { | |
s.orchs[id].numRefreshesFrozen = -s.orchs[id].sat | |
} else { | |
s.orchs[id].numRefreshesFrozen++ | |
} | |
delete(s.workSet, id) | |
log.Printf("unsatisfactory response - dropped orch %v from work set", id) | |
} | |
func (s *store) recordLatencyScore(id int, latencyScore float64) { | |
workSetAvgLatencyScore := s.workSetAvgLatencyScore() | |
// If the latency score is above this value, it is "good enough" | |
goodEnoughLatencyScore := 1.0 | |
// If the latency score is above this value the orch is performing reasonably relative to the baseline | |
baselineLatencyScore := workSetAvgLatencyScore * (1.0 - s.latencyTolerance) | |
// Drop the orch if it is not "good enough" and if it is below the baseline | |
if workSetAvgLatencyScore > 0 && latencyScore < goodEnoughLatencyScore && latencyScore < baselineLatencyScore { | |
s.orchs[id].numRefreshesFrozen++ | |
delete(s.workSet, id) | |
log.Printf("actual: %v vs. baseline: %v", latencyScore, baselineLatencyScore) | |
log.Printf("failed latency threshold check - dropped orch %v from work set", id) | |
} else { | |
s.workSet[id].totalLatencyScore += latencyScore | |
s.workSet[id].latencyScoreSamples++ | |
} | |
} | |
func (s *store) updateEligibleOrchs() { | |
for _, orch := range s.orchs { | |
if orch.numRefreshesFrozen > 0 { | |
orch.numRefreshesFrozen-- | |
} | |
} | |
} | |
func (s *store) eligibleOrchs() []int { | |
var lst []int | |
for id, orch := range s.orchs { | |
if orch.numRefreshesFrozen == 0 { | |
lst = append(lst, id) | |
} | |
} | |
return lst | |
} | |
func (s *store) refreshWorkSet() { | |
// Randomly create work set from list of eligible orchs | |
orchs := s.eligibleOrchs() | |
var newOrchs []int | |
for i := 0; i < s.maxWorkSetSize; i++ { | |
r := rand.Intn(len(orchs)) | |
newOrchs = append(newOrchs, orchs[r]) | |
} | |
// Only add new orchs that are not already in the work set | |
for _, id := range newOrchs { | |
if _, ok := s.workSet[id]; !ok { | |
s.workSet[id] = &workRecord{id: id} | |
} | |
} | |
} | |
func (s *store) workSetLst() []int { | |
var lst []int | |
for id := range s.workSet { | |
lst = append(lst, id) | |
} | |
return lst | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment