Skip to content

Instantly share code, notes, and snippets.

@lhlyu
Last active February 5, 2022 02:58
Show Gist options
  • Save lhlyu/4d959afc024bb5840f1c1d586794633d to your computer and use it in GitHub Desktop.
Save lhlyu/4d959afc024bb5840f1c1d586794633d to your computer and use it in GitHub Desktop.
并发顺序
package main
import (
"fmt"
"log"
"strconv"
"sync"
"time"
)
func main() {
var l sync.Mutex
m := make(map[int][]string)
OrderSub(10, 2, func(index int, v string) {
l.Lock()
arr, ok := m[index]
if !ok {
arr = make([]string, 0)
}
arr = append(arr, v)
m[index] = arr
l.Unlock()
})
time.Sleep(time.Second)
for index, arr := range m {
fmt.Println(index, arr)
}
}
// 并发保证消息顺序
// 根据key将消息进行分区
// limit 队列数量
// size chan 缓存大小
func OrderSub(limit, size uint, fn func(index int, v string)) {
if limit == 0 {
limit = 1
}
m := make([]chan string, limit)
for i := range m {
m[i] = make(chan string, size)
go func(i int) {
for message := range m[i] {
fn(i, message)
}
}(i)
}
log.Println("====>start")
total := 0
for message := range Recv() {
m[int(getHash(message, limit))] <- message
total++
}
log.Println("====>end:", total)
}
func Recv() chan string {
c := make(chan string)
go func() {
time.Sleep(time.Millisecond)
for i := 0; i < 100; i++ {
c <- strconv.Itoa(i)
}
close(c)
}()
return c
}
func getHash(key string, limit uint) uint {
return uint(fnv32(key)) % limit
}
func fnv32(key string) uint32 {
hash := uint32(2166136261)
const prime32 = uint32(16777619)
for i := 0; i < len(key); i++ {
hash *= prime32
hash ^= uint32(key[i])
}
return hash
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment