Skip to content

Instantly share code, notes, and snippets.

@jsmorph
Last active May 13, 2022 03:19
Show Gist options
  • Save jsmorph/3077788ae8d7049c8fc58bbf5e42ce2e to your computer and use it in GitHub Desktop.
Save jsmorph/3077788ae8d7049c8fc58bbf5e42ce2e to your computer and use it in GitHub Desktop.
RemPattern for Quamina

RemPattern for Quamina

Basic problem and proposed solution

Quamina doesn't currently support RemPattern. Some of the contemplated implementations of RemPattern are difficult. At least one approach is pretty easy: Wrap the current matcher to filter removed patterns from match results and periodically rebuild the matcher from scrach with the live patterns. More specifically:

  1. Remember patterns that have been added
  2. Remember patterns that have been removed (implicitly)
  3. Filter MatchedFor...() results to remove any removed patterns
  4. Support rebuilding the matcher state periodically with only the live patterns
  5. Maintain some statistics to help decide when to rebuild

The implementation of the set of live patterns is pluggable via a Go interface. The default implementation is a map[quamina.X]string.

Rebuilding can be triggered automatically using either an application-provided trigger function, which can be based on state statistics, or the default trigger policy, which fires when the removed/live ratio is greater than 1 (and there are at least 100 live patterns).

Design

  1. Embed the real matcher in a new type that extends the real matcher with the new method RemPattern. This new type will have its own AddPattern and MatchesFor... methods that wrap those of the underlying methods.

    The new RemPattern and AddPattern will also call a pluggable rebuild trigger function. If that function returns true, a rebuild is executed synchronously inside the method bodies. This behavior can result in substantial latency for that method call. As an alternative, an application can use a rebuild trigger that never returns true and instead manually call Rebuild (perhaps after consulting Stats).

    Since the nature of this beast is somewhat different from that of the core Quamina design, use a new package (within the Quamina repo). That way core Quamina isn't contaminated, and an application can use pure Quanima without the overhead of RemPattern support.

  2. Maintain a possibly persistent map of live patterns quamina.Xpattern, which is modified by AddPattern and RemPattern.

  3. MatchesFor... is implemented by calling the underlying matcher's MatchesFor... and then removing the returned quamina.Xs that re not in the live set of patterns.

  4. Provide a Rebuild method that rebuilds the entire underlying matcher from scratch based on the live set of patterns. The application has to call Rebuild to compact the matcher state.

    The Rebuild method could remove the old matcher before it builds the new one. That technique could maybe reduce heap demands compared to keeping the old matcher around until the rebuild completes. In theory, the rebuild should never encounter a problem except for memory exhaustion, and in that case prospects are grim anyway. But if the rebuild process does run into a non-fatal problem, the matcher state would be lost. As an experiment, the current Rebuild method takes a fearlessly boolean arguent, which, if false, does not release the previous matcher until the rebuild completes successfully.

  5. Maintain some basic statistics (Added, Removed, and Filtered counts) to aid callers in implementing reasonable rebuild policies.

Discussion

Time and space

The default state is a map[quamina.X]string with the obivous dimensions: an entry for every live pattern. With that implementation, here's the pretty obvious story on time and space overhead:

  1. AddPattern: Overhead: A hash table get and a hash table add.

  2. RemPattern: No overhead in the sense that this method is what we're here for. Costs: a hash table get and a hash table rem. (We could omit the first hash table op if we don't want to report whether the pattern was actually removed.)

  3. MatchesFor...: A hash table get for every returned quamina.X; also allocates a new array.

  4. Rebuild: As discussed above, "fearless" rebuilds at least pretend to minimize demands on the heap during a rebuild. This method performs an underlying AddPattern for every live pattern.

package rem
// RebuildTrigger provides a way to control when rebuilds are
// automatically triggered during mutations.
type RebuildTrigger interface {
// Rebuild should return true to trigger a rebuild.
//
// This method is called by AddPattern and RemPattern. added
// is true when called by AddPattern; false otherwise. These
// methods do not return until the rebuild is complete, so
// beware.
Rebuild(added bool, s *Stats) bool
}
var DefaultRebuildTrigger = NewRatioTrigger(100, 100)
// RatioTrigger's Rebuild function returns true when there are at
// least MinLive live patterns and the ratio of removed to live
// patterns is greater than 1.
type RatioTrigger struct {
Ratio float64
MinLive int
}
func NewRatioTrigger(ratio float64, min int) *RatioTrigger {
return &RatioTrigger{
Ratio: ratio,
MinLive: min,
}
}
func (t *RatioTrigger) Rebuild(added bool, s *Stats) bool {
if added {
return false
}
live := s.Added - s.Removed
if live == 0 {
return false
}
if live < t.MinLive {
return false
}
return t.Ratio <= float64(s.Removed)/float64(live)
}
package rem
import (
"sync"
"time"
quamina "quamina/lib"
)
// Stats reports basic counts to aid in deciding when to Rebuild.
type Stats struct {
// Some of these values are ints instead of uints because Go
// likes to print uints in hex, and I'd like to see some
// temporary logging output in decimal. ToDo: back to uints?
// Added is the count of total patterns added.
Added int
// Removed is the count of the patterns removed.
Removed int
// Filtered is the count of results that have been removed
// because their patterns had been removed.
Filtered int64
// LastRebuilt is the time the last rebuild started.
LastRebuilt time.Time
// RebuildDuration is the duration of the last rebuild.
RebuildDuration time.Duration
// RebuildPurged is the count of patterns removed during
// rebuild.
RebuildPurged int
}
// Matcher provides RemPattern on top of quamina.Matcher.
type Matcher struct {
// Matcher is the underlying matcher that does the hard work.
//
// Matcher should maybe not be public.
*quamina.Matcher
// lock should probably really be quamina.Matcher.lock.
//
// ToDo: Expose that lock or move this implementation or
// whatever.
lock sync.RWMutex
// live is live set of patterns.
live State
stats Stats
// rebuildTrigger, if not nil, determines when a mutation
// triggers a rebuild.
rebuildTrigger RebuildTrigger
}
// NewMatcher does what you'd expect.
//
// The State defaults to MemState, and the rebuildTrigger defaults to
// DefaultRebuildTrigger.
func NewMatcher(s State) *Matcher {
if s == nil {
s = NewMemState()
}
return &Matcher{
Matcher: quamina.NewMatcher(),
live: s,
rebuildTrigger: DefaultRebuildTrigger,
}
}
func (m *Matcher) maybeRebuild(added bool) error {
if m.rebuildTrigger == nil {
return nil
}
if m.rebuildTrigger.Rebuild(added, &m.stats) {
return m.rebuild(added)
}
return nil
}
func (m *Matcher) AddPattern(x quamina.X, pat string) error {
m.lock.Lock()
var err error
if err = m.live.Add(x, pat); err == nil {
if err = m.Matcher.AddPattern(x, pat); err == nil {
m.stats.Added++
m.maybeRebuild(true)
}
}
m.lock.Unlock()
return err
}
func (m *Matcher) MatchesForJSONEvent(event []byte) ([]quamina.X, error) {
xs, err := m.Matcher.MatchesForJSONEvent(event)
if err != nil {
return nil, err
}
// Remove any X that isn't in the live set.
acc := make([]quamina.X, 0, len(xs))
// We're updating stats.Filtered, so we need a write lock. If
// we forget about stats.Filtered, we can live with only a
// read lock here.
m.lock.Lock()
for _, x := range xs {
var have string
if have, err = m.live.Get(x); err != nil {
break
}
if have != "" {
acc = append(acc, x)
} else {
m.stats.Filtered++
}
}
m.lock.Unlock()
if err != nil {
return nil, err
}
return acc, nil
}
func (m *Matcher) RemPattern(x quamina.X) (bool, error) {
m.lock.Lock()
had, err := m.live.Rem(x)
if err == nil {
if had {
m.stats.Removed++
}
}
m.lock.Unlock()
return had, err
}
// Rebuild rebuilds the matcher state based on only live patterns.
//
// If calling fearlessly, then the old matcher is released before
// building the new one.
//
// This method resets the Stats.
func (m *Matcher) Rebuild(fearlessly bool) error {
m.lock.Lock()
err := m.rebuild(fearlessly)
m.lock.Unlock()
return err
}
// rebuild is Rebuild but assumes having the lock.
func (m *Matcher) rebuild(fearlessly bool) error {
// We assume we have the lock.
// Nothing fancy here now.
var (
then = time.Now()
m1 = quamina.NewMatcher()
)
if fearlessly {
// Let the GC reduce heap requirements?
m.Matcher = nil
}
count := 0
err := m.live.Iterate(func(x quamina.X, p string) error {
err := m1.AddPattern(x, p)
if err == nil {
count++
}
return err
})
if err == nil {
m.Matcher = m1
m.stats.RebuildPurged = m.stats.Removed
m.stats.Added = count
m.stats.Removed = 0
m.stats.Filtered = 0
m.stats.LastRebuilt = then
m.stats.RebuildDuration = time.Now().Sub(then)
}
return err
}
func (m *Matcher) Stats() Stats {
m.lock.RLock()
s := m.stats // Copies
m.lock.RUnlock()
return s
}
package rem
import (
"log"
"testing"
)
func TestBasic(t *testing.T) {
var (
pat = `{"likes":["tacos"]}`
id = 1
event = []byte(`{"likes":"tacos"}`)
m = NewMatcher(nil)
stats = func() {
log.Printf("%#v", m.Stats())
}
)
if err := m.AddPattern(id, pat); err != nil {
t.Fatal(err)
}
got, err := m.MatchesForJSONEvent(event)
if err != nil {
t.Fatal(err)
}
if len(got) != 1 {
t.Fatal(got)
}
stats()
if have, err := m.RemPattern(id); err != nil {
t.Fatal(err)
} else if !have {
t.Fatal(have)
}
if have, err := m.RemPattern(id); err != nil {
t.Fatal(err)
} else if have {
t.Fatal(have)
}
stats()
got, err = m.MatchesForJSONEvent(event)
if err != nil {
t.Fatal(err)
}
if 0 != len(got) {
t.Fatal(got)
}
stats()
if err = m.Rebuild(true); err != nil {
t.Fatal(err)
}
stats()
got, err = m.MatchesForJSONEvent(event)
if err != nil {
t.Fatal(err)
}
if 0 != len(got) {
t.Fatal(got)
}
stats()
}
package rem
import (
"fmt"
quamina "quamina/lib"
)
var ErrExists = fmt.Errorf("pattern already exists for that X")
type State interface {
Add(x quamina.X, pattern string) error
Rem(x quamina.X) (bool, error)
Iterate(func(x quamina.X, pattern string) error) error
// Get returns the pattern for the given X.
//
// Since a pattern can't be the empty string, that zero value
// indicates no corresponding pattern.
Get(x quamina.X) (string, error)
}
type MemState map[quamina.X]string
func NewMemState() MemState {
// Accept initial size as a parameter?
return make(map[quamina.X]string)
}
func (s MemState) Add(x quamina.X, pattern string) error {
if _, have := s[x]; have {
return ErrExists
}
s[x] = pattern
return nil
}
func (s MemState) Get(x quamina.X) (string, error) {
return s[x], nil
}
func (s MemState) Rem(x quamina.X) (bool, error) {
_, had := s[x]
if had {
delete(s, x)
}
return had, nil
}
func (s MemState) Iterate(f func(x quamina.X, pattern string) error) error {
for x, p := range s {
if err := f(x, p); err != nil {
return err
}
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment