Skip to content

Instantly share code, notes, and snippets.

@CAFxX
Last active November 15, 2018 01:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save CAFxX/08cabedf3344b362693070bd3a84c11a to your computer and use it in GitHub Desktop.
Save CAFxX/08cabedf3344b362693070bd3a84c11a to your computer and use it in GitHub Desktop.
Sarama BalancedAffinityPartitioner
package partitioner
/*
A partitioner that assigns affinity from keys to partitions, while at the same time
attempting to spread the load across partitions. Affinity, to this end, is temporary.
This means that the affinity of a key to a partition changes over time.
There are three parameters that affect how often the affinity changes:
- every timeWindow affinity changes for all keys
- when, within a timeWindow, a key produces more than switchEveryBytes bytes or
switchEveryMessages messages the affinity of that key changes
As long as the clocks are reasonably synchronized and the inputs to the producers
reasonably even, multiple instances of the producer should be able to independently
agree on the affinity of a key to a partition.
It should be clear that the goal though is not to *consistently* select the same
partition for a key, just to attempt to ensure that most messages of a certain key
end up in a certain partition instead of the others. For this reason consumers are
required to handle messages for a key regardless of the partition the message is in.
*/
import (
"time"
"fmt"
"hash/fnv"
)
type balancedAffinityPartitioner struct {
switchEveryBytes uint64
switchEveryMessages uint64
timeWindow time.Duration
counts map[string]appStats
nextReset time.Time
}
type appStats struct {
bytes uint64
messages uint64
preferredPartition uint64
}
func NewBalancedAffinityPartitioner(timeWindow time.Duration, switchEveryMessages, switchEveryBytes uint64) Partitioner {
if switchEveryBytes <= 0 {
switchEveryBytes = math.MaxUint64
}
if switchEveryMessages <= 0 {
switchEveryMessages = math.MaxUint64
}
// TODO: handle the case of timeWindow<=0
return &balancedAffinityPartitioner{
timeWindow: timeWindow,
switchEveryMessages: switchEveryMessages,
switchEveryBytes: switchEveryBytes,
apps: make(map[string]appStats),
nextReset: getNextReset(time.Now(), timeWindow),
}
}
func (p *balancedAffinityPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
if now := time.Now(); now.After(p.nextReset) {
p.counts = make(map[string]appStats)
p.nextReset = getNextReset(now, p.timeWindow)
}
appGuid := getAppGuidFromMessage(message)
app, found := p.counts[appGuid]
if !found {
app = appStats{
// consider also the current interval (ending at nextReset) when deciding the preferred
// partition to avoid pathological cases of multiple applications constantly being assigned
// to the same partition
preferredPartition: getPreferredPartition(appGuid, p.nextReset)
}
}
app.messages++
app.bytes += int64(message.Value.Length())
p.counts[appGuid] = app
offset := app.messages / p.switchEveryMessages
if offsetBytes := app.bytes / p.switchEveryBytes; offsetBytes > offset {
offset = offsetBytes
}
return int((app.preferredPartition + offset) % uint64(numPartitions))
}
func (p *balancedAffinityPartitioner) RequiresConsistency() bool {
// we only need affinity, not strict consistency: this allows sarama to
// continue producing messages on a different partition in case the preferred
// partition is temporarily not available
return false
}
func getNextReset(now time.Time, window time.Duration) time.Time {
return time.Time((time.Now().Nanoseconds() / timeWindow.Nanoseconds() + 1) * timeWindow.Nanoseconds())
}
func getAppGuidFromMessage(message *ProducerMessage) string {
// TODO: get the AppGUID out of the message
panic("not implemented")
}
func getPreferredPartition(appGuid string, nextReset time.Time) uint64 {
h := fnv.New64a()
fmt.Fprintf(h, "%s%d", appGuid, nextReset.Nanoseconds())
return h.Sum64()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment