Skip to content

Instantly share code, notes, and snippets.

@usbuild
Last active April 2, 2018 07:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save usbuild/aa507785480daf7e8b00043683de2b56 to your computer and use it in GitHub Desktop.
Save usbuild/aa507785480daf7e8b00043683de2b56 to your computer and use it in GitHub Desktop.
a naïve implmentation of raft consensus algorithm
import sys
from flask import Flask, jsonify, request
from threading import Thread, Lock
from collections import defaultdict
import time
import urllib
import random
import logging
import json
import os.path
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(threadName)s %(process)d %(message)s")
log = logging.getLogger('werkzeug')
log.setLevel(logging.ERROR)
app = Flask(__name__)
info = type('Info', (object,), {})()
info.nodename = sys.argv[2]
info.other_nodes_list = sys.argv[3].split(",")
info.ROLE_FOLLOWER = 1
info.ROLE_CANDIDATE = 2
info.ROLE_LEADER = 3
info.leader = None
info.role = info.ROLE_FOLLOWER
info.current_term = 0
info.voted_for = None
info.entries = []
info.commit_index = -1
info.last_applied = -1
filename = "./%s.json"%(info.nodename, )
def persist():
with open(filename, "w") as f:
json.dump({
"current_term": info.current_term,
"voted_for": info.voted_for,
"entries" : info.entries,
"commit_index": info.commit_index,
"last_applied": info.last_applied
},f)
def load():
with open(filename, "r") as f:
t = json.load(f)
for k, v in t.items():
setattr(info, k, v)
if os.path.isfile(filename):
load()
info.next_index = {}
info.match_index = {}
info.lock = Lock()
def reset_election_timeout():
global info
info.election_timeout = random.randint(3, 5)
reset_election_timeout()
@app.route('/')
def name():
global info
return info.nodename
@app.route("/vote_me")
def vote_me():
global info
name = request.args.get("name")
term = int(request.args.get("term"))
if term < info.current_term:
return "FAIL"
if term > info.current_term:
logging.info("vote: %s %s"%(term, name))
info.current_term = term
info.role = info.ROLE_FOLLOWER
info.voted_for = None
reset_election_timeout()
return "OK"
if info.voted_for is None or info.voted_for == name:
info.voted_for = name
reset_election_timeout()
return "OK"
return "FAIL"
def start_new_election():
global info
info.current_term += 1
if info.role == info.ROLE_FOLLOWER:
info.role = info.ROLE_CANDIDATE
elif info.role == info.ROLE_CANDIDATE:
pass
info.lock.release()
time.sleep(random.random())
info.lock.acquire()
mytickets = 1
info.voted_for = info.nodename
for node in info.other_nodes_list:
try:
info.lock.release()
ret = urllib.urlopen("http://%s/vote_me?term=%d&name=%s"%(node, info.current_term, info.nodename)).read().strip()
if info.role != info.ROLE_CANDIDATE:
return
if ret == "OK":
mytickets += 1
except :
logging.info("Unexpected error: %s" % sys.exc_info()[0])
finally:
info.lock.acquire()
logging.info("tickets %s"%(mytickets, ))
if mytickets >= len(info.other_nodes_list) / 2 + 1:
logging.info("name %s become leader"%(info.nodename, ))
x = len(info.entries) - 1
info.next_index = defaultdict(lambda: x + 1)
info.match_index = defaultdict(lambda: 0)
info.role = info.ROLE_LEADER
leader_anounce()
def commit(commit_index):
global info
# commit from info.commit_index to commit_index
# blah blah blah
info.commit_index = commit_index
@app.route("/append_log")
def leader_live():
global info
reset_election_timeout()
obj = json.loads(request.args.get("param"))
term = obj["term"]
leader_id = obj["leader_id"]
prev_log_index = obj["prev_log_index"]
prev_log_term = obj["prev_log_term"]
entries = obj["entries"]
leader_commit = obj["leader_commit"]
if info.role != info.ROLE_FOLLOWER:
info.role = info.ROLE_FOLLOWER
logging.info("%s become follower"%(info.nodename, ))
info.leader = request.args.get("name")
if term < info.current_term:
logging.info("3 term %s %s"%(term, info.current_term))
return str(current_term)
else:
info.current_term = term
if prev_log_index == -1:
info.entries = entries
else:
if len(info.entries) <= prev_log_index:
logging.info("1 %s %s"%(len(info.entries), prev_log_index))
return "FAIL"
myterm = info.entries[prev_log_index][0]
if myterm != prev_log_term:
logging.info("2 %s %s"%(len(myterm, prev_log_term)))
return "FAIL"
info.entries[prev_log_index + 1:] = entries
commit(leader_commit)
persist()
return "OK"
@app.route("/post")
def post():
if info.ROLE_LEADER != info.role:
return "FAIL"
msg = request.args.get("msg")
info.entries.append((info.current_term, msg))
cnt = leader_anounce()
if info.ROLE_LEADER != info.role:
info.entries.pop(-1)
return "FAIL"
if cnt < len(info.other_nodes_list) / 2:
info.entries.pop(-1)
return "FAIL"
commit(len(info.entries) - 1)
persist()
return "OK"
def leader_anounce():
global info
cnt = 0
for node in info.other_nodes_list:
try:
while True:
param = {}
param["term"] = info.current_term
param["leader_id"] = info.nodename
if not info.entries:
param["prev_log_index"] = -1
param["prev_log_term"] = -1
param["entries"] = []
else:
if info.next_index[node] == 0:
param["prev_log_index"] = -1
param["prev_log_term"] = -1
param["entries"] = info.entries
else:
param["prev_log_index"] = info.next_index[node] - 1
param["prev_log_term"] = info.entries[info.next_index[node] - 1][0]
param["entries"] = info.entries[info.next_index[node]:]
param["leader_commit"] = info.commit_index
ret = urllib.urlopen("http://%s/append_log?%s"%(node, urllib.urlencode({"param": json.dumps(param)}))).read().strip()
if ret == "OK":
info.next_index[node] = len(info.entries)
cnt += 1
break
if ret == "FAIL":
info.next_index[node] -= 1
else:
info.role = info.ROLE_FOLLOWER
info.current_term = int(ret)
except:
logging.info("Unexpected error: %s" % sys.exc_info()[0])
return cnt
@app.before_request
def before_request(*args, **kwargs):
global info
info.lock.acquire()
@app.after_request
def after_request(f):
global info
info.lock.release()
return f
def timeout():
while True:
persist()
time.sleep(1)
global info
info.lock.acquire()
if info.role != info.ROLE_LEADER:
info.election_timeout -= 1
if info.election_timeout == 0:
reset_election_timeout()
start_new_election()
if info.role == info.ROLE_LEADER:
leader_anounce()
info.lock.release()
if __name__ == '__main__':
thd = Thread(target=timeout)
thd.start()
app.run(port=int(sys.argv[1]), debug=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment