-
-
Save KentShikama/8b1c83a88e0a87785c3c70fb3605a3fb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Python implementation of Rchain Tuplespace | |
from typing import Tuple, List, Any, Callable, Dict, Optional | |
import dill | |
DataT = Any | |
ConsumeCandidateT = Tuple[int, DataT] | |
ProduceCandidateT = Tuple[str, int] | |
CodeT = Callable | |
EnvT = Dict | |
PersistentT = bool | |
# TODO: Modify serialize/deserialize methods to be more robust (Potentially use json.dumps and loads) | |
class Pattern: | |
def __init__(self, pattern: Any = ""): | |
self.__pattern = pattern | |
def is_match(self, data): | |
if self == WILDCARD: | |
return True | |
else: | |
return self.__pattern == data | |
class Wildcard(Pattern): | |
pass | |
WILDCARD = Wildcard() | |
class WaitingContinuation(object): | |
def __init__(self, patterns: List[Pattern], context: Tuple[CodeT, EnvT, PersistentT]): | |
self.patterns = patterns | |
self.context = context | |
class Subspace(object): | |
def __init__(self, data_list: List[DataT], waiting_continuation_list: List[WaitingContinuation]): | |
self.data_list = data_list | |
self.waiting_continuation_list = waiting_continuation_list | |
def remove_data_at_index(self, index: int): | |
return self.data_list.pop(index) | |
def remove_waiting_continuation_at_index(self, index: int): | |
return self.waiting_continuation_list.pop(index) | |
def append_waiting_continuation(self, waiting_continuation: WaitingContinuation): | |
self.waiting_continuation_list.append(waiting_continuation) | |
def append_data(self, product: DataT): | |
self.data_list.append(product) | |
class Tuplespace(Dict[Any, Subspace]): | |
def serialize(self, obj): | |
return dill.dumps(obj) | |
def deserialize(self, serial): | |
return dill.loads(serial) | |
# The consume/produce methods mimic the logic of the RBL code, | |
# and thus some choices may feel non-pythonic. | |
class Namespace: | |
# TODO: Locking/Threads | |
# TODO: Persistence | |
# TODO: Reimplement with dictionary instead of positional logic | |
def __init__(self, t: Tuplespace): | |
self._t = t | |
def consume(self, channels: Tuple[str], patterns: Tuple[Pattern], code: CodeT, env: EnvT, persistent: bool) -> None: | |
candidates = self.__extract_data_candidates(channels, patterns) | |
chosen_candidates = self.__select_best_products(candidates) | |
if self.__candidates_exist(chosen_candidates): | |
products = self.__consume_products(channels, chosen_candidates) | |
code(products) | |
else: | |
self.__store_waiting_continuation(channels, patterns, code, env, persistent) | |
def __extract_data_candidates(self, channels: Tuple[str], patterns: Tuple[Pattern]) -> List[Optional[List[Optional[ConsumeCandidateT]]]]: | |
candidates = [] | |
for channel_index, channel in enumerate(channels): | |
subspace_candidates = Optional[List[Optional[ConsumeCandidateT]]] | |
subspace = self._t.get(channel) | |
if subspace: | |
for data_index, data in enumerate(subspace.data_list): | |
if patterns[channel_index].is_match(data): | |
subspace_candidates.append((data_index, data)) | |
else: | |
subspace_candidates.append(None) | |
else: | |
subspace_candidates = None | |
candidates.append(subspace_candidates) | |
return candidates | |
def __select_best_products(self, candidates: List[Optional[List[Optional[ConsumeCandidateT]]]]) -> List[Optional[ConsumeCandidateT]]: | |
chosen_candidates = [] | |
for subspace_candidates in candidates: | |
if subspace_candidates: | |
chosen_candidate = next(filter(None.__ne__, subspace_candidates)) | |
chosen_candidates.append(chosen_candidate) | |
else: | |
chosen_candidates.append(None) | |
return chosen_candidates | |
def __candidates_exist(self, chosen_candidates): | |
return all(_ for _ in chosen_candidates) | |
def __consume_products(self, channels: Tuple[str], chosen_candidates: List[ConsumeCandidateT]) -> List[DataT]: | |
products = [] # type = List[DataT] | |
for channel_index, candidate in enumerate(chosen_candidates): | |
data_index, data = candidate | |
channel = channels[channel_index] | |
subspace = self._t[channel] | |
subspace.remove_data_at_index(data_index) | |
products.append(data) | |
return products | |
def __store_waiting_continuation(self, channels, patterns, code, env, persistent): | |
waiting_continuation = WaitingContinuation(patterns, (code, env, persistent)) | |
subspace = self._t.get(channels) | |
if subspace: | |
subspace.append_waiting_continuation(waiting_continuation) | |
else: | |
subspace = Subspace([],[]) | |
subspace.append_waiting_continuation(waiting_continuation) | |
self._t[channels] = subspace | |
def produce(self, channel: str, product: DataT) -> None: | |
candidate_channels_key = [] | |
for key in self._t.keys(): | |
if channel in key: | |
candidate_channels_key.append(key) | |
candidates = self.__extract_consume_candidates(candidate_channels_key, channel, product) | |
if candidates: | |
chosen_candidate = candidates[0] | |
consumed_continuation, products = self.__consume_continuation(chosen_candidate, product) | |
code, env, persistent = consumed_continuation.context | |
code(env, *products) | |
# TODO: Persistent | |
else: | |
self.__store_product(channel, product) | |
def __extract_consume_candidates(self, candidate_channels_key: Any, channel: Any, product: DataT) -> List[Tuple[List[ProduceCandidateT], Tuple[Any, int]]]: | |
candidates = [] # type = List[Tuple[List[ProduceCandidateT], Tuple[Any, int]]] | |
if candidate_channels_key: | |
for candidate_channel_key in candidate_channels_key: | |
if isinstance(candidate_channel_key, tuple): | |
candidate_channels = list(candidate_channel_key) | |
else: | |
candidate_channels = candidate_channel_key | |
channel_position = candidate_channels.index(channel) | |
subspace = self._t.get(candidate_channel_key) | |
for waiting_continuation_index, waiting_continuation in enumerate(subspace.waiting_continuation_list): | |
product_patterns = waiting_continuation.patterns | |
if product_patterns[channel_position].is_match(product): | |
produce_candidates = self.__fetch_produce_candidates(candidate_channels, channel_position, | |
product_patterns) | |
if all(produce_candidates): | |
candidate = (produce_candidates, (candidate_channel_key, waiting_continuation_index)) | |
candidates.append(candidate) | |
return candidates | |
def __fetch_produce_candidates(self, candidate_channels: Any, channel_position: int, product_patterns: List[Pattern]): | |
produce_candidates = [] # type = List[Optional[ProduceCandidateT]] | |
for candidate_channel_index, candidate_channel in enumerate(candidate_channels): | |
if channel_position == candidate_channel_index: | |
produce_candidates.append((candidate_channel, -1)) | |
else: | |
candidate_subspace = self._t[candidate_channel] | |
if candidate_subspace.data_list: | |
for data_index, data in candidate_subspace.data_list: | |
if product_patterns[candidate_channel_index].is_match(data): | |
produce_candidates.append((candidate_channel, data_index)) | |
else: | |
produce_candidates.append(None) | |
return produce_candidates | |
def __consume_continuation(self, chosen_candidate: Tuple[List[ProduceCandidateT], Tuple[Any, int]], product: DataT) -> Tuple[WaitingContinuation, List[DataT]]: | |
produce_candidates, (candidate_channel_key, waiting_continuation_index) = chosen_candidate | |
products = [] # type = List[DataT] | |
for produce_channel, data_index in produce_candidates: | |
if data_index == -1: | |
products.append(product) | |
else: | |
subspace = self._t[produce_channel] | |
consumed_product = subspace.remove_data_at_index(data_index) | |
products.append(consumed_product) | |
chosen_subspace = self._t[candidate_channel_key] | |
consumed_continuation = chosen_subspace.remove_waiting_continuation_at_index(waiting_continuation_index) | |
return consumed_continuation, products | |
def __store_product(self, channel, product): | |
subspace = self._t.get(channel) | |
if subspace: | |
subspace.append_data(product) | |
else: | |
subspace = Subspace([], []) | |
subspace.append_data(product) | |
self._t[channel] = subspace |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Line 174 needs to break.