Skip to content

Instantly share code, notes, and snippets.

@ljjjustin
Last active April 1, 2024 08:29
Show Gist options
  • Save ljjjustin/f2213ac9b9b8c31df746f8b56095ea32 to your computer and use it in GitHub Desktop.
Save ljjjustin/f2213ac9b9b8c31df746f8b56095ea32 to your computer and use it in GitHub Desktop.
leader election using mysql
create table election (
id int(10) unsigned not null auto_increment,
name varchar(128) not null default '',
node varchar(128) not null default '',
last_seen timestamp not null default CURRENT_TIMESTAMP,
primary key (id),
unique key uniq_idx_name(name)
) engine=InnoDB;
package main
import (
"context"
"errors"
"fmt"
"log"
"os"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/go-xorm/xorm"
)
type electionKey string
var (
engineKey = electionKey("engine")
defaultElectionName = "themisLeader"
ErrElectionNoEngine = errors.New("election: no engine")
)
type Election struct {
LeaderName string
}
type ElectionRecord struct {
Id uint32 `xorm:"INT notnull autoincr pk"`
ElectionName string `xorm:"VARCHAR(32) notnull default '' unique"`
LeaderName string `xorm:"VARCHAR(32) notnull default ''"`
LastUpdate time.Time `xorm:"TIMESTAMP notnull default 'CURRENT_TIMESTAMP'"`
}
func NewElection(name string) *Election {
return &Election{LeaderName: name}
}
// Campaign puts a value as eligible for the election.
// It blocks until it is elected, an error occurs, or the context is cancelled.
func (e *Election) Campaign(ctx context.Context) error {
_, ok := ctx.Value(engineKey).(*xorm.Engine)
if !ok {
return ErrElectionNoEngine
}
for {
res, err := campaign(ctx, e.LeaderName)
if err != nil {
return err
}
if res {
// update record every 5 seconds if we became leader.
go func(ctx context.Context, leader string) error {
for {
_, err := campaign(ctx, leader)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(6 * time.Second):
continue
}
}
}(ctx, e.LeaderName)
return nil
} else {
// wait 20 seconds and campaign again
time.Sleep(20 * time.Second)
}
}
}
// IsLeader query engine if we are the Leader.
func (e *Election) IsLeader(ctx context.Context) (bool, error) {
engine, ok := ctx.Value(engineKey).(*xorm.Engine)
if !ok {
return false, ErrElectionNoEngine
}
sql := `SELECT COUNT(*) as is_leader FROM election_record where election_name=? and leader_name=?`
res, err := engine.Query(sql, defaultElectionName, e.LeaderName)
if err != nil {
return false, err
}
if len(res) > 0 {
return true, nil
} else {
return false, nil
}
}
func campaign(ctx context.Context, leader string) (bool, error) {
engine, ok := ctx.Value(engineKey).(*xorm.Engine)
if !ok {
return false, ErrElectionNoEngine
}
sql := `INSERT IGNORE INTO election_record (election_name, leader_name, last_update) VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE
leader_name = IF(last_update < DATE_SUB(VALUES(last_update), INTERVAL 30 SECOND), VALUES(leader_name), leader_name),
last_update = IF(leader_name = VALUES(leader_name), VALUES(last_update), last_update)`
res, err := engine.Exec(sql, defaultElectionName, leader, time.Now())
if err != nil {
return false, err
}
affected, err := res.RowsAffected()
if err != nil {
return false, err
}
if affected >= 1 {
return true, nil
} else {
return false, nil
}
}
func RegisterModel(engine *xorm.Engine) {
err := engine.Sync2(new(ElectionRecord))
if err != nil {
log.Fatal(err)
}
}
func main() {
if len(os.Args) < 2 {
fmt.Println("usage: %s <name>", os.Args[0])
os.Exit(-1)
}
leader_name := os.Args[1]
engine, err := xorm.NewEngine("mysql", "gotest:passw0rd@tcp(localhost:3306)/gotest?charset=utf8")
if err != nil {
log.Fatal(err)
}
defer engine.Close()
RegisterModel(engine)
election := NewElection(leader_name)
ctx, cancel := context.WithCancel(context.Background())
ctx = context.WithValue(ctx, engineKey, engine)
for {
if err := election.Campaign(ctx); err != nil {
fmt.Println(err)
cancel()
break
}
log.Printf("Now %s am the leader.\n", leader_name)
for {
isLeader, err := election.IsLeader(ctx)
if err != nil {
cancel()
log.Fatal(err)
}
if !isLeader {
break
}
// do something
log.Println("do something... sleep 20")
time.Sleep(20 * time.Second)
}
}
}
#!/bin/bash
if [ $# -ne 1 ]; then
echo "usage:$0 <node name>"
exit
fi
node=$1
get_leader() {
mysql -e "select node from gotest.election where name='monitor'" | grep -v node
}
get_time() {
mysql -e "select now()" | grep -v now
}
for i in $(seq 1 300)
do
old_leader=$(get_leader)
now=$(get_time)
mysql -e "insert ignore into gotest.election (name, node, last_seen) values ('monitor', '${node}', '${now}') on duplicate key update node = if(last_seen < DATE_SUB('${now}', INTERVAL 30 SECOND), '${node}', '${old_leader}'), last_seen = if(node = values(node), values(last_seen), last_seen)"
new_leader=$(get_leader)
if [ "${old_leader}" != "${new_leader}" ]; then
echo "leader switched to: ${new_leader}"
fi
sleep 1
done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment