Skip to content

Instantly share code, notes, and snippets.

@huichen
Created September 7, 2019 15:12
Show Gist options
  • Save huichen/fed0ce62409f865695509d493b414b75 to your computer and use it in GitHub Desktop.
Save huichen/fed0ce62409f865695509d493b414b75 to your computer and use it in GitHub Desktop.
采用多线程和采样算法较快地统计阿里云 tablestore 中满足某个条件的记录数
package main
// 假设你的 tablestore 中有两个主键,其中一个是另一个的 128bit md5 hash string
// 可以使用这个程序较快地统计满足某个条件的记录数
// 采用了多线程和采样,可以将统计速度提升 100 倍
// 我的经验,如果 sampleRation = 0.01 的情况下,假设表格中有 100 万行,统计一遍大约需要 10 秒
import (
"fmt"
"log"
"math/big"
"sync/atomic"
"github.com/aliyun/aliyun-tablestore-go-sdk/tablestore"
)
var (
counter uint64
tsClient *tablestore.TableStoreClient
// 请替换成你的表信息
endpoint = "xxx"
instance = "xxx"
keyId = "xxx"
keySecret = "xxx"
tableName = "xxx"
pk = "key"
pkHash = "key_hash"
// 使用多少个线程并行统计
threads = 10
// 只采样到这么大的比例就停止,0.01 是个合理的值,如果你允许跑更长的时间,请适当增加这个值
sampleRatio = 0.01
)
func main() {
// 这里替换成你的接入配置信息
tsClient = tablestore.NewClient(endpoint, instance, keyId, keySecret)
// 得到 md5 128bit hash 的区间上限 fffff... (32个f)
max := big.NewInt(1)
two := big.NewInt(2)
for i := 0; i < 128; i++ {
max.Mul(max, two)
}
max.Sub(max, big.NewInt(1))
// delta = max / threads
delta := big.NewInt(0)
delta.Set(max)
delta.Div(delta, big.NewInt(int64(threads)))
// 用于从线程中返回比例值
backChan := make(chan float64, threads)
// 启动 threads 个线程,每个线程处理 [0, max] 的 threads 分之一区间
start := big.NewInt(0)
for i := 0; i < threads; i++ {
// end = start + delta
end := big.NewInt(0)
if i == threads-1 {
// 最后一个线程,end = max
end.Set(max)
} else {
end.Set(start)
end.Add(end, delta)
}
// 复制一份 start, end 出来传参,Set 操作是深拷贝
iStart := big.NewInt(0)
iEnd := big.NewInt(0)
iStart.Set(start)
iEnd.Set(end)
// 启动线程
go count(iStart, iEnd, backChan)
// start = end
start.Set(end)
start.Add(start, big.NewInt(1))
}
// 等待线程返回
ratio := 0.0
for i := 0; i < threads; i++ {
ratio = ratio + <-backChan
}
// 计算记录个数
ratio = ratio / float64(threads)
log.Printf("%f", float64(counter)/ratio)
return
}
// 将 big.Int 变成 32 个字符的 16 进制字符串,不足的位数前面补 0
func bigIntToHex(num *big.Int) string {
ret := fmt.Sprintf("%x", num)
digit := len(ret)
for i := 0; i < 32-digit; i++ {
ret = fmt.Sprintf("%s%s", "0", ret)
}
return ret
}
func count(iStart, iEnd *big.Int, backChan chan float64) {
start := bigIntToHex(iStart)
end := bigIntToHex(iEnd)
getRangeRequest := &tablestore.GetRangeRequest{}
rangeRowQueryCriteria := &tablestore.RangeRowQueryCriteria{}
rangeRowQueryCriteria.TableName = tableName
// 添加查询范围,这里假设只有 pkHashName,和 pk
startPK := new(tablestore.PrimaryKey)
startPK.AddPrimaryKeyColumn(pkHash, start)
startPK.AddPrimaryKeyColumnWithMinValue(pk)
endPK := new(tablestore.PrimaryKey)
endPK.AddPrimaryKeyColumn(pkHash, end)
endPK.AddPrimaryKeyColumnWithMaxValue(pk)
rangeRowQueryCriteria.StartPrimaryKey = startPK
rangeRowQueryCriteria.EndPrimaryKey = endPK
// 其他参数
rangeRowQueryCriteria.Direction = tablestore.FORWARD
rangeRowQueryCriteria.MaxVersion = 1
rangeRowQueryCriteria.Limit = 100
getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria
// ratio = nu / de
// nu = 结束值 - iStart
// de = iEnd - iStart
de := big.NewInt(0)
de.Set(iEnd)
de.Sub(de, iStart)
var ratioValue float64
for {
getRangeResp, err := tsClient.GetRange(getRangeRequest)
// 异常情况处理
if err != nil {
backChan <- ratioValue
return
}
// 查询到头的情况处理
if len(getRangeResp.Rows) == 0 || getRangeResp.NextStartPrimaryKey == nil {
backChan <- ratioValue
return
}
for _, _ = range getRangeResp.Rows {
if true { // 换成你的条件
atomic.AddUint64(&counter, 1)
}
}
// nu = 下个主键对应的整数 - iStart
nu := big.NewInt(1)
nu.SetString(getRangeResp.NextStartPrimaryKey.PrimaryKeys[0].Value.(string), 16)
nu.Sub(nu, iStart)
// ratio = nu / de
ratio := big.NewRat(1, 1)
ratio.SetFrac(nu, de)
v, _ := ratio.Float64()
log.Printf("ratio = %f", v)
if v > sampleRatio {
backChan <- v
return
}
getRangeRequest.RangeRowQueryCriteria.StartPrimaryKey = getRangeResp.NextStartPrimaryKey
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment