Created
May 12, 2020 03:49
-
-
Save upbit/1b8acd2c200fc127ee7201c656953bb6 to your computer and use it in GitHub Desktop.
Filter API中对象池的
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Filter 注册的filter,用于在server端过滤数据并提供高性能查询 | |
type Filter struct { | |
KeyFileds []string | |
FilterStr string // 更新无极数据时用到的,注册的filter条件 | |
Template interface{} // 返回数据类型模板 | |
// objStore 用于Filter后数据的存储 | |
objStore map[string]interface{} | |
objStoreMutex *sync.RWMutex | |
dataVersion int // 当前内存中无极数据的版本 | |
stat *Stat | |
} | |
// Get 查询Filter中指定Key的数据 | |
// | |
// "key" 用于查询数据。例如注册时keyFields为 | |
// []string{"video_type"} | |
// 当我们需要查询 video_type 取值为 1 的数据,可以填 | |
// "video_type=1" | |
func (filter *Filter) Get(key string) interface{} { | |
filter.objStoreMutex.RLock() | |
obj, ok := filter.objStore[key] | |
filter.objStoreMutex.RUnlock() | |
filter.stat.Gets.Add(1) | |
if !ok { | |
return nil | |
} | |
filter.stat.Hits.Add(1) | |
return obj | |
} | |
// 解析filter后无极数据到存储,供 Filter.Get 函数查询 | |
func (filter *Filter) unmarshalToCache(data []map[string]interface{}) { | |
filter.objStoreMutex.Lock() | |
defer filter.objStoreMutex.Unlock() | |
for _, item := range data { | |
filter.stat.Reloads.Add(1) | |
bs, _ := json.Marshal(item) | |
// 还原出json数据后,再按照用户期望的sturct结构进行 Unmarshal | |
obj := reflect.New(reflect.TypeOf(filter.Template)).Interface() | |
decoder := json.NewDecoder(bytes.NewReader(bs)) | |
if err := decoder.Decode(obj); err != nil { | |
log.Errorf("polling(%s): json.Decode() error: %s", filter.FilterStr, err.Error()) | |
filter.stat.ReloadErrs.Add(1) | |
continue | |
} | |
key := "" | |
hasError := false | |
for _, fieldName := range filter.KeyFileds { | |
if value, ok := item[fieldName]; !ok { | |
log.Errorf("polling(%s): item[%s] not exist", filter.FilterStr, fieldName) | |
hasError = true | |
filter.stat.ReloadErrs.Add(1) | |
break | |
} else { | |
// 按顺序拼接成查询key | |
key += fmt.Sprintf("%s=%v&", fieldName, value) | |
} | |
} | |
if hasError { | |
// key找不到,跳过整行数据 | |
continue | |
} | |
// 截断末尾的 & 并保存obj供直接查询 | |
key = key[0 : len(key)-1] | |
filter.objStore[key] = obj | |
} | |
} | |
// AddFilter 注册新的Filter | |
// | |
// "keyFields" 用于指定filter查询时的名称和顺序,如 | |
// []string{"field1", "field2"} | |
// 则Get时key为 | |
// field1=%s&field2=%s | |
// "filterStr" 为过滤无极表的方法,具体写法参见 http://v.oa.com/open-wuji/documents/api-query.html 数据过滤一节 | |
// 参数无需转义,不筛选可以留空 | |
// video_type=1 | |
// "template" 无极数据的数据结构模板 | |
// | |
// 注: 必须设置 `EnableFilter=true` 此API才可用 | |
func (client *Client) AddFilter(keyFields []string, filterStr string, template interface{}) (*Filter, error) { | |
if !client.opts.EnableFilter { | |
return nil, ErrFilterNotEnable | |
} | |
filter := &Filter{ | |
KeyFileds: keyFields, | |
FilterStr: filterStr, | |
Template: template, | |
objStore: make(map[string]interface{}, client.opts.EntriesCapacity), | |
objStoreMutex: new(sync.RWMutex), | |
stat: client.stat, | |
} | |
client.filters[filterStr] = filter | |
_ = client.initFromCache(filter) // 从Cache读取数据 (如果有) | |
go client.polling(filter) // 注册后台更新 | |
return filter, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment