Skip to content

Instantly share code, notes, and snippets.

@alexclifford
Created October 20, 2017 11:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexclifford/4680b83e28acbab4b31e707d6ab28136 to your computer and use it in GitHub Desktop.
Save alexclifford/4680b83e28acbab4b31e707d6ab28136 to your computer and use it in GitHub Desktop.
// Package ddblock provides a...
// TODO:
package ddblock
import (
"errors"
"fmt"
//"runtime"
"strconv"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"golang.org/x/net/context"
)
var (
// ErrConflict is returned when trying to get a lock, but
// someone else already has it. The caller should wait and try again.
ErrConflict = errors.New("ddbmutex: conflict, lock held by another")
)
// default values set when creating a the Mutex.
var (
DefaultTableName = "locks"
DefaultTTL = time.Minute
//DefaultTTL = 10 * time.Second
nameString = "name"
uuidString = "uuid"
expiresString = "expires"
)
// Mutex creates a lock using aws dynamodb. It uses
// credential and region information from the standard sources
// such as a config file or env variables.
type Mutex struct {
lk sync.Mutex
ctx context.Context
cancel func()
TableName string
TTL time.Duration
name string
fullname string
uuid string
}
// New creates a new mutex using dynamodb as the distributed store.
// If context is canceled the lock will be released.
func New(ctx context.Context, name string) *Mutex {
if ctx == nil {
ctx = context.Background()
}
ctx, cancel := context.WithCancel(ctx)
return &Mutex{
ctx: ctx,
cancel: cancel,
TableName: DefaultTableName,
TTL: DefaultTTL,
name: name,
fullname: "ddblock-" + name,
uuid: fmt.Sprintf("%d", time.Now().UnixNano()),
}
}
// Name returns the name of the mutex which should uniquely identify
// it on dynamodb.
func (m *Mutex) Name() string {
return m.name
}
// Lock creates the lock item on dynamodb. The lock is renewed every TTL/2
// to make sure the lock is kept. A nil error indicates success. An error
// of ErrConflict means someone else already has the lock. Another error
// indicates an network or dynamo error.
func (m *Mutex) Lock() error {
//var GoNum int = runtime.NumGoroutine()
go func() {
for m.ctx.Err() == nil {
select {
case <-time.After(m.cleanTTL() / 2):
//fmt.Printf("Renewing... " + strconv.Itoa(GoNum) + "\n")
//m.ctx.Done()
case <-m.ctx.Done():
//fmt.Printf("GOROUTINE LOCK RELEASED " + strconv.Itoa(GoNum) + "\n")
m.Unlock()
return
}
//fmt.Printf("Num Goroutines At Update: " + strconv.Itoa(GoNum) + "\n")
m.update()
}
}()
//fmt.Printf("Num Goroutines At Create: " + strconv.Itoa(GoNum) + "\n")
return m.create()
}
// Unlock deletes the lock from dynamodb and allows other go get it.
func (m *Mutex) Unlock() error {
m.cancel()
return m.delete()
}
func (m *Mutex) create() error {
m.lk.Lock()
defer m.lk.Unlock()
now := time.Now()
params := &dynamodb.PutItemInput{
TableName: &m.TableName,
Item: map[string]*dynamodb.AttributeValue{
"name": {
S: &m.fullname,
},
"expires": {
N: aws.String(strconv.FormatInt(now.Add(m.cleanTTL()).UnixNano(), 10)),
},
"uuid": {
S: &m.uuid,
},
},
ConditionExpression: aws.String("#name <> :name OR (#name = :name AND #exp < :exp)"),
ExpressionAttributeNames: map[string]*string{
"#name": &nameString,
"#exp": &expiresString,
},
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
":name": {
S: &m.fullname,
},
":exp": {
N: aws.String(strconv.FormatInt(now.UnixNano(), 10)),
},
},
}
_, err := getSvc().PutItem(params)
return err
}
func (m *Mutex) update() error {
m.lk.Lock()
defer m.lk.Unlock()
if m.uuid == "" {
// has already been unlocked
return nil
}
now := time.Now()
params := &dynamodb.PutItemInput{
TableName: &m.TableName,
Item: map[string]*dynamodb.AttributeValue{
"name": {
S: &m.fullname,
},
"expires": {
N: aws.String(strconv.FormatInt(now.Add(m.cleanTTL()).UnixNano(), 10)),
},
"uuid": {
S: &m.uuid,
},
},
ConditionExpression: aws.String("#name = :name AND #uuid = :uuid"),
ExpressionAttributeNames: map[string]*string{
"#name": &nameString,
"#uuid": &uuidString,
},
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
":name": {
S: &m.fullname,
},
":uuid": {
S: &m.uuid,
},
},
}
_, err := getSvc().PutItem(params)
if err != nil {
//if e, ok := err.(awserr.Error); ok {
// //return e.Code() == "ConditionalCheckFailedException"
// fmt.Println(e.Code())
//}
panic(err)
}
return err
}
func (m *Mutex) delete() error {
m.lk.Lock()
defer m.lk.Unlock()
if m.uuid == "" {
// has already been unlocked successfully
return nil
}
params := &dynamodb.DeleteItemInput{
TableName: &m.TableName,
Key: map[string]*dynamodb.AttributeValue{
"name": {
S: &m.fullname,
},
},
ConditionExpression: aws.String("#name = :name AND #uuid = :uuid"),
ExpressionAttributeNames: map[string]*string{
"#name": aws.String("name"),
"#uuid": aws.String("uuid"),
},
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
":name": {
S: &m.fullname,
},
":uuid": {
S: &m.uuid,
},
},
}
_, err := getSvc().DeleteItem(params)
if IsAquireError(err) || err == nil {
m.uuid = ""
return nil
}
return err
}
// IsAquireError checks to see if the error returned by Lock
// is the result of someone else holding the lock. If false
// and err != nil, there was some sort of config or network issue.
func IsAquireError(err error) bool {
if e, ok := err.(awserr.Error); ok {
return e.Code() == "ConditionalCheckFailedException"
}
return false
}
func (m *Mutex) cleanTTL() time.Duration {
ttl := m.TTL
if ttl == 0 {
ttl = DefaultTTL
}
if ttl == 0 {
panic("ttl can not be zero")
}
return ttl
}
var (
svc *dynamodb.DynamoDB
svcLk sync.Mutex
)
// getSvc enables the initialization on first read (ie. after config has been parsed),
// kind of like a singleton class.
func getSvc() *dynamodb.DynamoDB {
svcLk.Lock()
defer svcLk.Unlock()
if svc == nil {
c := aws.NewConfig().
WithMaxRetries(3).
WithRegion("us-east-1")
svc = dynamodb.New(session.New(c))
}
return svc
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment