Skip to content

Instantly share code, notes, and snippets.

@TechGeeky
Created November 7, 2022 19:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TechGeeky/6d1d1e4e5a0c8332e06586eedab05534 to your computer and use it in GitHub Desktop.
Save TechGeeky/6d1d1e4e5a0c8332e06586eedab05534 to your computer and use it in GitHub Desktop.
/*
Modified from https://github.com/orcaman/concurrent-map
The MIT License (MIT)
Copyright (c) 2014 streamrail
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
package cmap_int
import (
"encoding/json"
"sync"
)
var SHARD_COUNT = 32
var seed = maphash.MakeSeed()
// A "thread" safe map of type string:Anything.
// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
type ConcurrentMap[V any] []*ConcurrentMapShared[V]
// A "thread" safe string to anything map.
type ConcurrentMapShared[V any] struct {
items map[int64]V
sync.RWMutex // Read Write mutex, guards access to internal map.
}
// Creates a new concurrent map.
func New[V any]() ConcurrentMap[V] {
m := make(ConcurrentMap[V], SHARD_COUNT)
for i := 0; i < SHARD_COUNT; i++ {
m[i] = &ConcurrentMapShared[V]{items: make(map[int64]V)}
}
return m
}
// GetShard returns shard under given key
func (m ConcurrentMap) GetShard(key int64) *ConcurrentMapShared {
var h maphash.Hash
h.SetSeed(seed)
binary.Write(&h, binary.BigEndian, key)
return m[h.Sum64()%uint64(SHARD_COUNT)]
}
func (m ConcurrentMap[V]) MSet(data map[int64]V) {
for key, value := range data {
shard := m.GetShard(key)
shard.Lock()
shard.items[key] = value
shard.Unlock()
}
}
// Sets the given value under the specified key.
func (m ConcurrentMap[V]) Set(key int64, value V) {
// Get map shard.
shard := m.GetShard(key)
shard.Lock()
shard.items[key] = value
shard.Unlock()
}
// Callback to return new element to be inserted into the map
// It is called while lock is held, therefore it MUST NOT
// try to access other keys in same map, as it can lead to deadlock since
// Go sync.RWLock is not reentrant
type UpsertCb[V any] func(exist bool, valueInMap V, newValue V) V
// Insert or Update - updates existing element or inserts a new one using UpsertCb
func (m ConcurrentMap[V]) Upsert(key int64, value V, cb UpsertCb[V]) (res V) {
shard := m.GetShard(key)
shard.Lock()
v, ok := shard.items[key]
res = cb(ok, v, value)
shard.items[key] = res
shard.Unlock()
return res
}
// Sets the given value under the specified key if no value was associated with it.
func (m ConcurrentMap[V]) SetIfAbsent(key int64, value V) bool {
// Get map shard.
shard := m.GetShard(key)
shard.Lock()
_, ok := shard.items[key]
if !ok {
shard.items[key] = value
}
shard.Unlock()
return !ok
}
// Get retrieves an element from map under given key.
func (m ConcurrentMap[V]) Get(key int64) (V, bool) {
// Get shard
shard := m.GetShard(key)
shard.RLock()
// Get item from shard.
val, ok := shard.items[key]
shard.RUnlock()
return val, ok
}
// Count returns the number of elements within the map.
func (m ConcurrentMap[V]) Count() int {
count := 0
for i := 0; i < SHARD_COUNT; i++ {
shard := m[i]
shard.RLock()
count += len(shard.items)
shard.RUnlock()
}
return count
}
// Looks up an item under specified key
func (m ConcurrentMap[V]) Has(key int64) bool {
// Get shard
shard := m.GetShard(key)
shard.RLock()
// See if element is within shard.
_, ok := shard.items[key]
shard.RUnlock()
return ok
}
// Remove removes an element from the map.
func (m ConcurrentMap[V]) Remove(key int64) {
// Try to get shard.
shard := m.GetShard(key)
shard.Lock()
delete(shard.items, key)
shard.Unlock()
}
// RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held
// If returns true, the element will be removed from the map
type RemoveCb[V any] func(key int64, v V, exists bool) bool
// RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params
// If callback returns true and element exists, it will remove it from the map
// Returns the value returned by the callback (even if element was not present in the map)
func (m ConcurrentMap[V]) RemoveCb(key int64, cb RemoveCb[V]) bool {
// Try to get shard.
shard := m.GetShard(key)
shard.Lock()
v, ok := shard.items[key]
remove := cb(key, v, ok)
if remove && ok {
delete(shard.items, key)
}
shard.Unlock()
return remove
}
// Pop removes an element from the map and returns it
func (m ConcurrentMap[V]) Pop(key int64) (v V, exists bool) {
// Try to get shard.
shard := m.GetShard(key)
shard.Lock()
v, exists = shard.items[key]
delete(shard.items, key)
shard.Unlock()
return v, exists
}
// IsEmpty checks if map is empty.
func (m ConcurrentMap[V]) IsEmpty() bool {
return m.Count() == 0
}
// Used by the Iter & IterBuffered functions to wrap two variables together over a channel,
type Tuple[V any] struct {
Key int64
Val V
}
// Iter returns an iterator which could be used in a for range loop.
//
// Deprecated: using IterBuffered() will get a better performence
func (m ConcurrentMap[V]) Iter() <-chan Tuple[V] {
chans := snapshot(m)
ch := make(chan Tuple[V])
go fanIn(chans, ch)
return ch
}
// IterBuffered returns a buffered iterator which could be used in a for range loop.
func (m ConcurrentMap[V]) IterBuffered() <-chan Tuple[V] {
chans := snapshot(m)
total := 0
for _, c := range chans {
total += cap(c)
}
ch := make(chan Tuple[V], total)
go fanIn(chans, ch)
return ch
}
// Clear removes all items from map.
func (m ConcurrentMap[V]) Clear() {
for item := range m.IterBuffered() {
m.Remove(item.Key)
}
}
// Returns a array of channels that contains elements in each shard,
// which likely takes a snapshot of `m`.
// It returns once the size of each buffered channel is determined,
// before all the channels are populated using goroutines.
func snapshot[V any](m ConcurrentMap[V]) (chans []chan Tuple[V]) {
//When you access map items before initializing.
if len(m) == 0 {
panic(`cmap.ConcurrentMap is not initialized. Should run New() before usage.`)
}
chans = make([]chan Tuple[V], SHARD_COUNT)
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
// Foreach shard.
for index, shard := range m {
go func(index int, shard *ConcurrentMapShared[V]) {
// Foreach key, value pair.
shard.RLock()
chans[index] = make(chan Tuple[V], len(shard.items))
wg.Done()
for key, val := range shard.items {
chans[index] <- Tuple[V]{key, val}
}
shard.RUnlock()
close(chans[index])
}(index, shard)
}
wg.Wait()
return chans
}
// fanIn reads elements from channels `chans` into channel `out`
func fanIn[V any](chans []chan Tuple[V], out chan Tuple[V]) {
wg := sync.WaitGroup{}
wg.Add(len(chans))
for _, ch := range chans {
go func(ch chan Tuple[V]) {
for t := range ch {
out <- t
}
wg.Done()
}(ch)
}
wg.Wait()
close(out)
}
// Items returns all items as map[string]V
func (m ConcurrentMap[V]) Items() map[int64]V {
tmp := make(map[int64]V)
// Insert items to temporary map.
for item := range m.IterBuffered() {
tmp[item.Key] = item.Val
}
return tmp
}
// Iterator callbacalled for every key,value found in
// maps. RLock is held for all calls for a given shard
// therefore callback sess consistent view of a shard,
// but not across the shards
type IterCb[V any] func(key int64, v V)
// Callback based iterator, cheapest way to read
// all elements in a map.
func (m ConcurrentMap[V]) IterCb(fn IterCb[V]) {
for idx := range m {
shard := (m)[idx]
shard.RLock()
for key, value := range shard.items {
fn(key, value)
}
shard.RUnlock()
}
}
// Keys returns all keys as []string
func (m ConcurrentMap[V]) Keys() []int64 {
count := m.Count()
ch := make(chan int64, count)
go func() {
// Foreach shard.
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
for _, shard := range m {
go func(shard *ConcurrentMapShared[V]) {
// Foreach key, value pair.
shard.RLock()
for key := range shard.items {
ch <- key
}
shard.RUnlock()
wg.Done()
}(shard)
}
wg.Wait()
close(ch)
}()
// Generate keys
keys := make([]int64, 0, count)
for k := range ch {
keys = append(keys, k)
}
return keys
}
//Reviles ConcurrentMap "private" variables to json marshal.
func (m ConcurrentMap[V]) MarshalJSON() ([]byte, error) {
// Create a temporary map, which will hold all item spread across shards.
tmp := make(map[int64]V)
// Insert items to temporary map.
for item := range m.IterBuffered() {
tmp[item.Key] = item.Val
}
return json.Marshal(tmp)
}
// Concurrent map uses Interface{} as its value, therefor JSON Unmarshal
// will probably won't know which to type to unmarshal into, in such case
// we'll end up with a value of type map[string]V, In most cases this isn't
// out value type, this is why we've decided to remove this functionality.
// func (m *ConcurrentMap) UnmarshalJSON(b []byte) (err error) {
// // Reverse process of Marshal.
// tmp := make(map[string]V)
// // Unmarshal into a single map.
// if err := json.Unmarshal(b, &tmp); err != nil {
// return nil
// }
// // foreach key,value pair in temporary map insert into our concurrent map.
// for key, val := range tmp {
// m.Set(key, val)
// }
// return nil
// }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment