Skip to content

Instantly share code, notes, and snippets.

@jweinst1
Last active June 13, 2024 01:20
Show Gist options
  • Save jweinst1/883b78616819e4132d1917e141a0a7dd to your computer and use it in GitHub Desktop.
Save jweinst1/883b78616819e4132d1917e141a0a7dd to your computer and use it in GitHub Desktop.
import sys
import socket
import os
import json
class Lock:
def __init__(self, owner, member_set):
self.owner = owner
self.member_set = member_set
class Member:
"""def connect_to(self, path):
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client.setblocking(False)
client.connect(path)
client.sendall(json.dumps({"host":self.path}).encode())
self.state = 'connect_wait'
connect_resp = client.recv(1024)
connect_resp_dict = json.loads(connect_resp)
if connect_resp_dict['status'] == 'ok':
self.connections[path] = client"""
"""def process(self):
try:
client_conn, client_address = self.socket.accept()
new_cli_data = client_conn.recv(1024)
new_cli_dict = json.loads(new_cli_data)
new_host = new_cli_dict['host']
client_conn.sendall(json.dumps({"status":"ok"}).encode())
client_conn.setblocking(False)
self.connections[new_host] = client_conn
except BlockingIOError:
print("{} got would block on accept".format(self.path))
except Exception as exc:
print("{} Unexpected err {} ".format(self.path, exc))
sys.exit(1)"""
@staticmethod
def create_cluster(*paths):
mems = [Member(path) for path in paths]
for mem in mems:
for memb in mems:
Member.connect_two(mem, memb)
return mems
@staticmethod
def connect_two(a, b):
if a.path == b.path or a.path in b.connections or b.path in a.connections:
return # self connection
client_a = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client_a.setblocking(False)
client_a.connect(b.path)
client_conn, client_address = b.socket.accept()
client_conn.setblocking(False)
a.connections[b.path] = client_a
b.connections[a.path] = client_conn
def __init__(self, path):
self.locks = {}
self.connections = {}
self.reqs = {}
self.self_reqs = []
self.path = path
try:
os.unlink(path)
except Exception as exc:
pass
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket.setblocking(False)
self.socket.bind(self.path)
self.socket.listen(10)
def get_member_set(self):
return {memb:True for memb in self.connections.keys()} | {self.path:True}
def lock_req(self, key):
mem_set = self.get_member_set()
sent_set = {memb:'sent' for memb in mem_set.keys()}
req = {'key':key, 'owner':self.path, 'observ':mem_set, 'op':'lock'}
req_dat = json.dumps(req).encode()
for k, v in self.connections.items():
print("Sending req={} to memb={}".format(req, k))
v.sendall(req_dat)
self.self_reqs.append(req)
self.reqs[key] = sent_set
def process_lock(self, req, conn):
if req['key'] not in self.locks:
self.locks[req['key']] = req
conn.sendall(json.dumps({'op':'lock_resp', 'state':'acq', 'key':req['key']}).encode())
elif self.locks[req['key']]['observ'] != req['observ']:
conn.sendall(json.dumps({'op':'lock_resp', 'state':'neq', 'key':req['key']}).encode())
else:
conn.sendall(json.dumps({'op':'lock_resp', 'state':'ocu', 'key':req['key']}).encode())
def process_lock_resp(self, req, memb, conn):
if req['key'] not in self.locks:
print("{} recevied unexpected lock_resp for lock={}".format(self.path, req['key']))
return
if req['state'] == 'acq':
self.reqs[req['key']][memb] = 'acq'
elif req['state'] == 'neq':
print("{} got an neq from {}, for lock={}".format(self.path, memb, req['key']))
del self.reqs[req['key']]
elif req['state'] == 'ocu':
self.reqs[req['key']][memb] = 'acq'
else:
print("{} Unexpected state={} for lock op on {}".format(self.path, req['state'], req['key']))
sys.exit(4)
def process_self_requests(self):
while len(self.self_reqs) > 0:
popped = self.self_reqs.pop()
if popped['op'] == 'lock':
self.process_lock(popped)
# to do, switch conn and v to self interface
def process_requests(self):
for k, v in self.connections:
try:
cdata = v.recv(1024)
cdict = json.loads(cdata)
if cdict['op'] == 'lock':
self.process_lock(cdict, v)
elif cdict['op'] == 'lock_resp':
self.process_lock_resp(cdict, k, v)
except BlockingIOError:
pass
except Exception as exc:
print("Unexpecter err={} when reading data from memb={}".format(exc, k))
def __repr__(self):
return "({}, {}, {}, {})".format(self.path, self.locks, self.connections, self.reqs)
if __name__ == '__main__':
lst = Member.create_cluster("foo", "bar", "poo")
lst[0].lock_req('abc')
for elem in lst:
print(elem, list(elem.connections.keys()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment