Skip to content

Instantly share code, notes, and snippets.

@yondonfu
Last active November 20, 2019 15:17
Show Gist options
  • Save yondonfu/3b1afee2e1a99d3d07fd4ec5ae500831 to your computer and use it in GitHub Desktop.
Save yondonfu/3b1afee2e1a99d3d07fd4ec5ae500831 to your computer and use it in GitHub Desktop.
Orchestator selection
package main
import (
// "encoding/csv"
// "encoding/hex"
"encoding/csv"
"encoding/hex"
"fmt"
"log"
"math/rand"
"os"
)
type config struct {
maxWorkSetSize int
minWorkSetSize int
gtvWeight float64
psWeight float64
latencyTolerance float64
randomSelectProb float64
segDuration float64
maxLatency float64
minLatency float64
// % probability for creating an orch that returns a satisfactory response
satProb float64
numOrchs int
numSegs int
stakeGen stakeGenFunc
}
type stakeGenFunc func(numOrchs int) []int64
func runExperiment(cfg config) {
store := newStore(
cfg.maxWorkSetSize,
cfg.minWorkSetSize,
cfg.gtvWeight,
cfg.psWeight,
cfg.latencyTolerance,
cfg.randomSelectProb,
)
// Create orchs
remoteOrchs := make(map[int]*orch)
stakes := cfg.stakeGen(cfg.numOrchs)
for i := 0; i < cfg.numOrchs; i++ {
r := rand.Float64()
var strat strategy
if r < cfg.satProb {
strat = satisfactory
} else {
strat = unsatisfactory
}
latency := cfg.minLatency + r*(cfg.maxLatency-cfg.minLatency)
orch := &orch{id: i, strat: strat, latency: latency}
remoteOrchs[i] = orch
store.AddOrch(i, stakes[i], orch)
}
numSat := 0
numUnsat := 0
totalLatency := 0.0
totalSegs := 0
for i := 0; i < cfg.numSegs; i++ {
res, latency := store.Request(int64(i), cfg.segDuration)
if res {
numSat++
} else {
numUnsat++
}
totalLatency += latency
totalSegs++
}
log.Printf("avg latency: %v", totalLatency/float64(totalSegs))
log.Printf("# satisfactory responses: %v", numSat)
log.Printf("# unsatisfactory responses: %v", numUnsat)
f, err := os.Create(fmt.Sprintf("result-%v.csv", randID()))
if err != nil {
log.Fatal(err)
}
defer f.Close()
w := csv.NewWriter(f)
defer w.Flush()
createRecord := func(id int, orch *orch) []string {
record := []string{}
record = append(record, fmt.Sprintf("%f", orch.latency))
record = append(record, fmt.Sprintf("%d", orch.strat))
record = append(record, fmt.Sprintf("%d", store.Sat(id)))
record = append(record, fmt.Sprintf("%f", store.LTV(id)))
record = append(record, fmt.Sprintf("%f", store.GTV(id)))
record = append(record, fmt.Sprintf("%f", store.Rep(id)))
record = append(record, fmt.Sprintf("%d", orch.reqs))
return record
}
for id, orch := range remoteOrchs {
w.Write(createRecord(id, orch))
}
// for id, orch := range remoteOrchs {
// log.Printf("Orch %v Latency %v Strategy: %v SAT: %v LTV: %v GTV: %v REP: %v Requests: %v", id, orch.latency, orch.strat, store.Sat(id), store.LTV(id), store.GTV(id), store.Rep(id), orch.reqs)
// }
}
func randID() string {
x := make([]byte, 10, 10)
for i := 0; i < len(x); i++ {
x[i] = byte(rand.Uint32())
}
return hex.EncodeToString(x)
}
func main() {
cfg := config{
maxWorkSetSize: 15,
minWorkSetSize: 10,
gtvWeight: .5,
psWeight: 0.7,
randomSelectProb: .1,
latencyTolerance: 0.15,
satProb: .7,
segDuration: 2.0,
maxLatency: 6.0,
minLatency: 1.0,
numOrchs: 50,
numSegs: 100000,
}
// cfg.stakeGen = func (numOrchs int) []int64 {
// stakes := make([]int64, numOrchs)
// for i := 0; i < numOrchs; i++ {
// stakes[i] = int64(100000) / int64(numOrchs)
// }
// return stakes
// }
// runExperiment(cfg)
cfg.stakeGen = func(numOrchs int) []int64 {
r := rand.New(rand.NewSource(7))
zr := rand.NewZipf(r, 1.01, 1, 100000)
stakes := make([]int64, numOrchs)
for i := 0; i < numOrchs; i++ {
stakes[i] = int64(zr.Uint64())
}
return stakes
}
runExperiment(cfg)
}
package main
import (
"log"
"math/rand"
)
type strategy int
const (
satisfactory strategy = iota
unsatisfactory
)
type orch struct {
id int
strat strategy
latency float64
reqs int
}
func (o *orch) Request() (bool, float64) {
o.reqs++
return o.strat == satisfactory, o.latency
}
type orchRecord struct {
id int
stake int64
// Reference to a remote orch to send requests to
remoteOrch *orch
// # of satisfactory responses - # of unsatisfactory responses
sat int64
// # of work set refreshes that orch will not be eligible
numRefreshesFrozen int64
}
type workRecord struct {
id int
totalLatencyScore float64
latencyScoreSamples int64
}
func (wr *workRecord) avgLatencyScore() float64 {
if wr.latencyScoreSamples == 0.0 {
return 0.0
}
return wr.totalLatencyScore / float64(wr.latencyScoreSamples)
}
type store struct {
orchs map[int]*orchRecord
workSet map[int]*workRecord
maxWorkSetSize int
minWorkSetSize int
// % weight for gtv relative to ltv in rep calculation
gtvWeight float64
// % weight for ps relative to rep in selection weight calculation
psWeight float64
// Max % lower than work set average latency score to tolerate
latencyTolerance float64
// % chance of running a purely random selection instead of a weighted random selection
randomSelectProb float64
}
func newStore(maxWorkSetSize int, minWorkSetSize int, gtvWeight, psWeight, latencyTolerance, randomSelectProb float64) *store {
return &store{
orchs: make(map[int]*orchRecord),
workSet: make(map[int]*workRecord),
maxWorkSetSize: maxWorkSetSize,
minWorkSetSize: minWorkSetSize,
gtvWeight: gtvWeight,
psWeight: psWeight,
latencyTolerance: latencyTolerance,
randomSelectProb: randomSelectProb,
}
}
func (s *store) AddOrch(id int, stake int64, remoteOrch *orch) {
s.orchs[id] = &orchRecord{
id: id,
stake: stake,
remoteOrch: remoteOrch,
}
}
func (s *store) RemoveOrch(id int) {
delete(s.orchs, id)
}
func (s *store) Request(segNo int64, segDuration float64) (bool, float64) {
// Make sure work set is populated
if len(s.workSet) < s.minWorkSetSize {
s.refreshWorkSet()
s.updateEligibleOrchs()
log.Printf("refreshed work set: %v", s.workSetLst())
}
// Send request to selected orch
pick := s.selectOrch()
res, latency := s.orchs[pick].remoteOrch.Request()
if res {
s.recordSat(pick)
latencyScore := segDuration / latency
s.recordLatencyScore(pick, latencyScore)
} else {
s.recordUnsat(pick)
}
return res, latency
}
func (s *store) Sat(id int) int64 {
return s.orchs[id].sat
}
func (s *store) LTV(id int) float64 {
numOrchs := len(s.orchs)
if numOrchs == 0 {
return 0
}
totalSat := s.totalSat()
if totalSat == 0 {
return 1.0 / float64(numOrchs)
}
sat := s.orchs[id].sat
if sat < 0 {
sat = 0
}
return float64(sat) / float64(totalSat)
}
func (s *store) GTV(id int) float64 {
numOrchs := len(s.orchs)
if numOrchs == 0 {
return 0
}
totalStake := s.totalStake()
if totalStake == 0 {
return 1.0 / float64(numOrchs)
}
return float64(s.orchs[id].stake) / float64(totalStake)
}
// Rep is historical and persists across streams. All orchs have a rep value
func (s *store) Rep(id int) float64 {
return (s.gtvWeight * s.GTV(id)) + ((1.0 - s.gtvWeight) * s.LTV(id))
}
// PS is per-stream. Only orchs in the work set have a PS value
func (s *store) PS(id int) float64 {
numOrchs := len(s.workSet)
if numOrchs == 0 {
return 0
}
sum := 0.0
for _, orch := range s.workSet {
sum += orch.avgLatencyScore()
}
if sum == 0.0 {
return 1.0 / float64(numOrchs)
}
return s.workSet[id].avgLatencyScore() / sum
}
func (s *store) Weight(id int) float64 {
return (s.psWeight * s.PS(id)) + ((1.0 - s.psWeight) * s.Rep(id))
}
func (s *store) selectOrch() int {
r := rand.Float64()
if r < s.randomSelectProb {
return s.randomSelect()
}
return s.weightedSelect()
}
func (s *store) randomSelect() int {
lst := s.workSetLst()
r := rand.Intn(len(lst))
return lst[r]
}
func (s *store) weightedSelect() int {
lst := s.workSetLst()
totalWeight := float64(0)
for _, id := range lst {
totalWeight += s.Weight(id)
}
r := rand.Float64() * totalWeight
for _, id := range lst {
r -= s.Weight(id)
if r <= 0.0 {
return id
}
}
return -1
}
func (s *store) totalSat() int64 {
sum := int64(0)
for _, orch := range s.orchs {
sat := orch.sat
if sat > 0 {
sum += sat
}
}
return sum
}
func (s *store) totalStake() int64 {
sum := int64(0)
for _, orch := range s.orchs {
sum += orch.stake
}
return sum
}
func (s *store) workSetAvgLatencyScore() float64 {
sum := 0.0
samples := 0
for _, orch := range s.workSet {
als := orch.avgLatencyScore()
if als > 0 {
sum += als
samples++
}
}
if samples == 0 {
return 0.0
}
return sum / float64(samples)
}
func (s *store) recordSat(id int) {
s.orchs[id].sat++
}
func (s *store) recordUnsat(id int) {
s.orchs[id].sat--
if s.orchs[id].sat < 0 {
s.orchs[id].numRefreshesFrozen = -s.orchs[id].sat
} else {
s.orchs[id].numRefreshesFrozen++
}
delete(s.workSet, id)
log.Printf("unsatisfactory response - dropped orch %v from work set", id)
}
func (s *store) recordLatencyScore(id int, latencyScore float64) {
workSetAvgLatencyScore := s.workSetAvgLatencyScore()
// If the latency score is above this value, it is "good enough"
goodEnoughLatencyScore := 1.0
// If the latency score is above this value the orch is performing reasonably relative to the baseline
baselineLatencyScore := workSetAvgLatencyScore * (1.0 - s.latencyTolerance)
// Drop the orch if it is not "good enough" and if it is below the baseline
if workSetAvgLatencyScore > 0 && latencyScore < goodEnoughLatencyScore && latencyScore < baselineLatencyScore {
s.orchs[id].numRefreshesFrozen++
delete(s.workSet, id)
log.Printf("actual: %v vs. baseline: %v", latencyScore, baselineLatencyScore)
log.Printf("failed latency threshold check - dropped orch %v from work set", id)
} else {
s.workSet[id].totalLatencyScore += latencyScore
s.workSet[id].latencyScoreSamples++
}
}
func (s *store) updateEligibleOrchs() {
for _, orch := range s.orchs {
if orch.numRefreshesFrozen > 0 {
orch.numRefreshesFrozen--
}
}
}
func (s *store) eligibleOrchs() []int {
var lst []int
for id, orch := range s.orchs {
if orch.numRefreshesFrozen == 0 {
lst = append(lst, id)
}
}
return lst
}
func (s *store) refreshWorkSet() {
// Randomly create work set from list of eligible orchs
orchs := s.eligibleOrchs()
var newOrchs []int
for i := 0; i < s.maxWorkSetSize; i++ {
r := rand.Intn(len(orchs))
newOrchs = append(newOrchs, orchs[r])
}
// Only add new orchs that are not already in the work set
for _, id := range newOrchs {
if _, ok := s.workSet[id]; !ok {
s.workSet[id] = &workRecord{id: id}
}
}
}
func (s *store) workSetLst() []int {
var lst []int
for id := range s.workSet {
lst = append(lst, id)
}
return lst
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment