Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Concurent or Single process : data access to ONE-writer, MULTIPLE-readers Store with RAY
"""
This example shows how can you handle a data store that can have only ONE WRITER, but MULTIPLE READERS.
KV stores and SQLite come to mind. Also Text files, right ?
There are Three scenarios not TWO :
1. Writer writes directly
2. Reader uses Writer in the same process
3. Reader uses Writer in the remote process
"""
class Store(object):
def __init__(self, store_name, root, is_writer=False, writer=None):
assert (is_writer and writer is None) or (not is_writer and writer is not None), 'Reader or Writer'
self.root = root
self.data = self.root + '/data'
self.store_name = store_name
#writer? : by name or if locally as object
self.is_writer = is_writer
#writer is a string i.e. name not reference
if isinstance(writer, str) :
self.writer_name = writer
self.writer = ray.get_actor(self.writer_name)
else :
self.writer = writer
if self.is_writer :
log('kv',f'ReadWrite store : {self.store_name} ...')
self.store = .... open read-write ....
else :
log('kv',f'ReadOnly store : {self.store_name} ...')
self.store = .... open read-only ....
#Single writer logic
def set(self,key,value):
if self.is_writer :#direct write
log('kvset', f'local-writer-set> {key} => {value}')
self.store.kv_set(key, value) # !!!original store method
else : # writer in remote process
if str(self.writer).startswith('Actor') :
log('kvset', f'remote-set> {key} => {value}')
self.writer.set.remote(key, value)
else : #writer in the same process
log('kvset', f'local-reader-set> {key} => {value}')
self.writer.set(key, value)
@ray.remote #Ray Actor
class RemoteStore(Store): pass
#------------- later ------------------------------------
# in this example both are Remote.
# if you use many actors you should probably create the Writer as remote,
# because you need to use Named-writer to pass the name around
class Runner(object):
def __init__(self, store_name, nreaders=2, writer_name='writer'):
self.writer = RemoteStore.options(name=writer_name).remote(store_name, is_writer=True)
self.readers = [
RemoteStore.remote(store_name, is_writer=False, writer=writer_name)
for _ in range(nreaders)
]
self.pool = ActorPool(self.readers)
... do something ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment