Skip to content

Instantly share code, notes, and snippets.

@upbit
Created May 12, 2020 03:49
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save upbit/1b8acd2c200fc127ee7201c656953bb6 to your computer and use it in GitHub Desktop.
Save upbit/1b8acd2c200fc127ee7201c656953bb6 to your computer and use it in GitHub Desktop.
Filter API中对象池的
// Filter 注册的filter,用于在server端过滤数据并提供高性能查询
type Filter struct {
KeyFileds []string
FilterStr string // 更新无极数据时用到的,注册的filter条件
Template interface{} // 返回数据类型模板
// objStore 用于Filter后数据的存储
objStore map[string]interface{}
objStoreMutex *sync.RWMutex
dataVersion int // 当前内存中无极数据的版本
stat *Stat
}
// Get 查询Filter中指定Key的数据
//
// "key" 用于查询数据。例如注册时keyFields为
// []string{"video_type"}
// 当我们需要查询 video_type 取值为 1 的数据,可以填
// "video_type=1"
func (filter *Filter) Get(key string) interface{} {
filter.objStoreMutex.RLock()
obj, ok := filter.objStore[key]
filter.objStoreMutex.RUnlock()
filter.stat.Gets.Add(1)
if !ok {
return nil
}
filter.stat.Hits.Add(1)
return obj
}
// 解析filter后无极数据到存储,供 Filter.Get 函数查询
func (filter *Filter) unmarshalToCache(data []map[string]interface{}) {
filter.objStoreMutex.Lock()
defer filter.objStoreMutex.Unlock()
for _, item := range data {
filter.stat.Reloads.Add(1)
bs, _ := json.Marshal(item)
// 还原出json数据后,再按照用户期望的sturct结构进行 Unmarshal
obj := reflect.New(reflect.TypeOf(filter.Template)).Interface()
decoder := json.NewDecoder(bytes.NewReader(bs))
if err := decoder.Decode(obj); err != nil {
log.Errorf("polling(%s): json.Decode() error: %s", filter.FilterStr, err.Error())
filter.stat.ReloadErrs.Add(1)
continue
}
key := ""
hasError := false
for _, fieldName := range filter.KeyFileds {
if value, ok := item[fieldName]; !ok {
log.Errorf("polling(%s): item[%s] not exist", filter.FilterStr, fieldName)
hasError = true
filter.stat.ReloadErrs.Add(1)
break
} else {
// 按顺序拼接成查询key
key += fmt.Sprintf("%s=%v&", fieldName, value)
}
}
if hasError {
// key找不到,跳过整行数据
continue
}
// 截断末尾的 & 并保存obj供直接查询
key = key[0 : len(key)-1]
filter.objStore[key] = obj
}
}
// AddFilter 注册新的Filter
//
// "keyFields" 用于指定filter查询时的名称和顺序,如
// []string{"field1", "field2"}
// 则Get时key为
// field1=%s&field2=%s
// "filterStr" 为过滤无极表的方法,具体写法参见 http://v.oa.com/open-wuji/documents/api-query.html 数据过滤一节
// 参数无需转义,不筛选可以留空
// video_type=1
// "template" 无极数据的数据结构模板
//
// 注: 必须设置 `EnableFilter=true` 此API才可用
func (client *Client) AddFilter(keyFields []string, filterStr string, template interface{}) (*Filter, error) {
if !client.opts.EnableFilter {
return nil, ErrFilterNotEnable
}
filter := &Filter{
KeyFileds: keyFields,
FilterStr: filterStr,
Template: template,
objStore: make(map[string]interface{}, client.opts.EntriesCapacity),
objStoreMutex: new(sync.RWMutex),
stat: client.stat,
}
client.filters[filterStr] = filter
_ = client.initFromCache(filter) // 从Cache读取数据 (如果有)
go client.polling(filter) // 注册后台更新
return filter, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment