Last active
June 13, 2024 01:20
-
-
Save jweinst1/883b78616819e4132d1917e141a0a7dd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import socket | |
import os | |
import json | |
class Lock: | |
def __init__(self, owner, member_set): | |
self.owner = owner | |
self.member_set = member_set | |
class Member: | |
"""def connect_to(self, path): | |
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
client.setblocking(False) | |
client.connect(path) | |
client.sendall(json.dumps({"host":self.path}).encode()) | |
self.state = 'connect_wait' | |
connect_resp = client.recv(1024) | |
connect_resp_dict = json.loads(connect_resp) | |
if connect_resp_dict['status'] == 'ok': | |
self.connections[path] = client""" | |
"""def process(self): | |
try: | |
client_conn, client_address = self.socket.accept() | |
new_cli_data = client_conn.recv(1024) | |
new_cli_dict = json.loads(new_cli_data) | |
new_host = new_cli_dict['host'] | |
client_conn.sendall(json.dumps({"status":"ok"}).encode()) | |
client_conn.setblocking(False) | |
self.connections[new_host] = client_conn | |
except BlockingIOError: | |
print("{} got would block on accept".format(self.path)) | |
except Exception as exc: | |
print("{} Unexpected err {} ".format(self.path, exc)) | |
sys.exit(1)""" | |
@staticmethod | |
def create_cluster(*paths): | |
mems = [Member(path) for path in paths] | |
for mem in mems: | |
for memb in mems: | |
Member.connect_two(mem, memb) | |
return mems | |
@staticmethod | |
def connect_two(a, b): | |
if a.path == b.path or a.path in b.connections or b.path in a.connections: | |
return # self connection | |
client_a = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
client_a.setblocking(False) | |
client_a.connect(b.path) | |
client_conn, client_address = b.socket.accept() | |
client_conn.setblocking(False) | |
a.connections[b.path] = client_a | |
b.connections[a.path] = client_conn | |
def __init__(self, path): | |
self.locks = {} | |
self.connections = {} | |
self.reqs = {} | |
self.self_reqs = [] | |
self.path = path | |
try: | |
os.unlink(path) | |
except Exception as exc: | |
pass | |
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
self.socket.setblocking(False) | |
self.socket.bind(self.path) | |
self.socket.listen(10) | |
def get_member_set(self): | |
return {memb:True for memb in self.connections.keys()} | {self.path:True} | |
def lock_req(self, key): | |
mem_set = self.get_member_set() | |
sent_set = {memb:'sent' for memb in mem_set.keys()} | |
req = {'key':key, 'owner':self.path, 'observ':mem_set, 'op':'lock'} | |
req_dat = json.dumps(req).encode() | |
for k, v in self.connections.items(): | |
print("Sending req={} to memb={}".format(req, k)) | |
v.sendall(req_dat) | |
self.self_reqs.append(req) | |
self.reqs[key] = sent_set | |
def process_lock(self, req, conn): | |
if req['key'] not in self.locks: | |
self.locks[req['key']] = req | |
conn.sendall(json.dumps({'op':'lock_resp', 'state':'acq', 'key':req['key']}).encode()) | |
elif self.locks[req['key']]['observ'] != req['observ']: | |
conn.sendall(json.dumps({'op':'lock_resp', 'state':'neq', 'key':req['key']}).encode()) | |
else: | |
conn.sendall(json.dumps({'op':'lock_resp', 'state':'ocu', 'key':req['key']}).encode()) | |
def process_lock_resp(self, req, memb, conn): | |
if req['key'] not in self.locks: | |
print("{} recevied unexpected lock_resp for lock={}".format(self.path, req['key'])) | |
return | |
if req['state'] == 'acq': | |
self.reqs[req['key']][memb] = 'acq' | |
elif req['state'] == 'neq': | |
print("{} got an neq from {}, for lock={}".format(self.path, memb, req['key'])) | |
del self.reqs[req['key']] | |
elif req['state'] == 'ocu': | |
self.reqs[req['key']][memb] = 'acq' | |
else: | |
print("{} Unexpected state={} for lock op on {}".format(self.path, req['state'], req['key'])) | |
sys.exit(4) | |
def process_self_requests(self): | |
while len(self.self_reqs) > 0: | |
popped = self.self_reqs.pop() | |
if popped['op'] == 'lock': | |
self.process_lock(popped) | |
# to do, switch conn and v to self interface | |
def process_requests(self): | |
for k, v in self.connections: | |
try: | |
cdata = v.recv(1024) | |
cdict = json.loads(cdata) | |
if cdict['op'] == 'lock': | |
self.process_lock(cdict, v) | |
elif cdict['op'] == 'lock_resp': | |
self.process_lock_resp(cdict, k, v) | |
except BlockingIOError: | |
pass | |
except Exception as exc: | |
print("Unexpecter err={} when reading data from memb={}".format(exc, k)) | |
def __repr__(self): | |
return "({}, {}, {}, {})".format(self.path, self.locks, self.connections, self.reqs) | |
if __name__ == '__main__': | |
lst = Member.create_cluster("foo", "bar", "poo") | |
lst[0].lock_req('abc') | |
for elem in lst: | |
print(elem, list(elem.connections.keys())) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment