Skip to content

Instantly share code, notes, and snippets.

@KentShikama
Created February 6, 2018 00:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save KentShikama/8b1c83a88e0a87785c3c70fb3605a3fb to your computer and use it in GitHub Desktop.
Save KentShikama/8b1c83a88e0a87785c3c70fb3605a3fb to your computer and use it in GitHub Desktop.
# Python implementation of Rchain Tuplespace
from typing import Tuple, List, Any, Callable, Dict, Optional
import dill
DataT = Any
ConsumeCandidateT = Tuple[int, DataT]
ProduceCandidateT = Tuple[str, int]
CodeT = Callable
EnvT = Dict
PersistentT = bool
# TODO: Modify serialize/deserialize methods to be more robust (Potentially use json.dumps and loads)
class Pattern:
def __init__(self, pattern: Any = ""):
self.__pattern = pattern
def is_match(self, data):
if self == WILDCARD:
return True
else:
return self.__pattern == data
class Wildcard(Pattern):
pass
WILDCARD = Wildcard()
class WaitingContinuation(object):
def __init__(self, patterns: List[Pattern], context: Tuple[CodeT, EnvT, PersistentT]):
self.patterns = patterns
self.context = context
class Subspace(object):
def __init__(self, data_list: List[DataT], waiting_continuation_list: List[WaitingContinuation]):
self.data_list = data_list
self.waiting_continuation_list = waiting_continuation_list
def remove_data_at_index(self, index: int):
return self.data_list.pop(index)
def remove_waiting_continuation_at_index(self, index: int):
return self.waiting_continuation_list.pop(index)
def append_waiting_continuation(self, waiting_continuation: WaitingContinuation):
self.waiting_continuation_list.append(waiting_continuation)
def append_data(self, product: DataT):
self.data_list.append(product)
class Tuplespace(Dict[Any, Subspace]):
def serialize(self, obj):
return dill.dumps(obj)
def deserialize(self, serial):
return dill.loads(serial)
# The consume/produce methods mimic the logic of the RBL code,
# and thus some choices may feel non-pythonic.
class Namespace:
# TODO: Locking/Threads
# TODO: Persistence
# TODO: Reimplement with dictionary instead of positional logic
def __init__(self, t: Tuplespace):
self._t = t
def consume(self, channels: Tuple[str], patterns: Tuple[Pattern], code: CodeT, env: EnvT, persistent: bool) -> None:
candidates = self.__extract_data_candidates(channels, patterns)
chosen_candidates = self.__select_best_products(candidates)
if self.__candidates_exist(chosen_candidates):
products = self.__consume_products(channels, chosen_candidates)
code(products)
else:
self.__store_waiting_continuation(channels, patterns, code, env, persistent)
def __extract_data_candidates(self, channels: Tuple[str], patterns: Tuple[Pattern]) -> List[Optional[List[Optional[ConsumeCandidateT]]]]:
candidates = []
for channel_index, channel in enumerate(channels):
subspace_candidates = Optional[List[Optional[ConsumeCandidateT]]]
subspace = self._t.get(channel)
if subspace:
for data_index, data in enumerate(subspace.data_list):
if patterns[channel_index].is_match(data):
subspace_candidates.append((data_index, data))
else:
subspace_candidates.append(None)
else:
subspace_candidates = None
candidates.append(subspace_candidates)
return candidates
def __select_best_products(self, candidates: List[Optional[List[Optional[ConsumeCandidateT]]]]) -> List[Optional[ConsumeCandidateT]]:
chosen_candidates = []
for subspace_candidates in candidates:
if subspace_candidates:
chosen_candidate = next(filter(None.__ne__, subspace_candidates))
chosen_candidates.append(chosen_candidate)
else:
chosen_candidates.append(None)
return chosen_candidates
def __candidates_exist(self, chosen_candidates):
return all(_ for _ in chosen_candidates)
def __consume_products(self, channels: Tuple[str], chosen_candidates: List[ConsumeCandidateT]) -> List[DataT]:
products = [] # type = List[DataT]
for channel_index, candidate in enumerate(chosen_candidates):
data_index, data = candidate
channel = channels[channel_index]
subspace = self._t[channel]
subspace.remove_data_at_index(data_index)
products.append(data)
return products
def __store_waiting_continuation(self, channels, patterns, code, env, persistent):
waiting_continuation = WaitingContinuation(patterns, (code, env, persistent))
subspace = self._t.get(channels)
if subspace:
subspace.append_waiting_continuation(waiting_continuation)
else:
subspace = Subspace([],[])
subspace.append_waiting_continuation(waiting_continuation)
self._t[channels] = subspace
def produce(self, channel: str, product: DataT) -> None:
candidate_channels_key = []
for key in self._t.keys():
if channel in key:
candidate_channels_key.append(key)
candidates = self.__extract_consume_candidates(candidate_channels_key, channel, product)
if candidates:
chosen_candidate = candidates[0]
consumed_continuation, products = self.__consume_continuation(chosen_candidate, product)
code, env, persistent = consumed_continuation.context
code(env, *products)
# TODO: Persistent
else:
self.__store_product(channel, product)
def __extract_consume_candidates(self, candidate_channels_key: Any, channel: Any, product: DataT) -> List[Tuple[List[ProduceCandidateT], Tuple[Any, int]]]:
candidates = [] # type = List[Tuple[List[ProduceCandidateT], Tuple[Any, int]]]
if candidate_channels_key:
for candidate_channel_key in candidate_channels_key:
if isinstance(candidate_channel_key, tuple):
candidate_channels = list(candidate_channel_key)
else:
candidate_channels = candidate_channel_key
channel_position = candidate_channels.index(channel)
subspace = self._t.get(candidate_channel_key)
for waiting_continuation_index, waiting_continuation in enumerate(subspace.waiting_continuation_list):
product_patterns = waiting_continuation.patterns
if product_patterns[channel_position].is_match(product):
produce_candidates = self.__fetch_produce_candidates(candidate_channels, channel_position,
product_patterns)
if all(produce_candidates):
candidate = (produce_candidates, (candidate_channel_key, waiting_continuation_index))
candidates.append(candidate)
return candidates
def __fetch_produce_candidates(self, candidate_channels: Any, channel_position: int, product_patterns: List[Pattern]):
produce_candidates = [] # type = List[Optional[ProduceCandidateT]]
for candidate_channel_index, candidate_channel in enumerate(candidate_channels):
if channel_position == candidate_channel_index:
produce_candidates.append((candidate_channel, -1))
else:
candidate_subspace = self._t[candidate_channel]
if candidate_subspace.data_list:
for data_index, data in candidate_subspace.data_list:
if product_patterns[candidate_channel_index].is_match(data):
produce_candidates.append((candidate_channel, data_index))
else:
produce_candidates.append(None)
return produce_candidates
def __consume_continuation(self, chosen_candidate: Tuple[List[ProduceCandidateT], Tuple[Any, int]], product: DataT) -> Tuple[WaitingContinuation, List[DataT]]:
produce_candidates, (candidate_channel_key, waiting_continuation_index) = chosen_candidate
products = [] # type = List[DataT]
for produce_channel, data_index in produce_candidates:
if data_index == -1:
products.append(product)
else:
subspace = self._t[produce_channel]
consumed_product = subspace.remove_data_at_index(data_index)
products.append(consumed_product)
chosen_subspace = self._t[candidate_channel_key]
consumed_continuation = chosen_subspace.remove_waiting_continuation_at_index(waiting_continuation_index)
return consumed_continuation, products
def __store_product(self, channel, product):
subspace = self._t.get(channel)
if subspace:
subspace.append_data(product)
else:
subspace = Subspace([], [])
subspace.append_data(product)
self._t[channel] = subspace
@KentShikama
Copy link
Author

Line 174 needs to break.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment