Skip to content

Instantly share code, notes, and snippets.

@TheArcherST
Last active June 19, 2023 17:09
Show Gist options
  • Save TheArcherST/71bad07a98b363960f0b160aa647e70a to your computer and use it in GitHub Desktop.
Save TheArcherST/71bad07a98b363960f0b160aa647e70a to your computer and use it in GitHub Desktop.
"""Event generation system
Assumes that we have some stream of data, and we want to synchronize
it with remote server with minimum network stress. Chosen solution is
to convert data stream to stream of data change events. This module
describes base units of convert system.
Specific features:
1. sources of the data are not eternal. when source expires, we must
accept the latest revision result.
Supported events:
1. CREATE (new record appeared in the source)
2. DELETE (record disappeared from the source)
3. PING (informational event. proof that record steel exists in the source)
Delete requests is producing only after accepting second revision of the
records source.
"""
from __future__ import annotations
import asyncio
import itertools
import random
from typing import Protocol, Type, Optional, Union
from enum import Enum
from dataclasses import dataclass
from hashlib import md5
# ##### protocol for implementing specific data and source #####
class Record(Protocol):
"""DataRecord object
A record, extracted from some records source.
"""
def hash(self) -> str:
"""Hash method
Returns identifier, based on Record's data
"""
class SourceExpired(Exception):
pass
class RecordsSource(Protocol):
def hash(self) -> str:
"""Hash method
Returns identifier of the records source
:returns:
string
"""
async def __anext__(self) -> Record:
"""Next method
Returns next fetched record. Can perform IO requests. But there is
an option to load all data during `load`, if loading all data in
one request is better strategy.
:raise StopIteration:
on records run out.
:raise Expired:
on source expiration. means that it's no more available. must
be accepted last revision.
:returns:
Awaitable[Record]
"""
async def load(self, *, reload=False):
"""Load method
Load the records source. It includes loading of some meta information
about records source. Optionally, can be loaded all records in the
source. This option is described in `__anext__` method's doc.
:raise Expired:
on source expiration. means that it's no more available. must be
accepted last revision.
:returns:
Awaitable[NoneType]
"""
def __aiter__(self):
return self
# ##### protocol for simple storage #####
class Storage(Protocol):
def set_name(self, name: str): ...
async def save(self, data: Union[dict, list]): ...
async def load(self, default: Union[dict, list] = None) -> Union[dict, list]: ...
# ##### events generation implementation #####
class EventType(Enum):
CREATE = 'CREATE'
DELETE = 'DELETE'
PING = 'PING'
@dataclass
class Event:
type: EventType
record: Record = None
record_hash: str = None
class EventStream:
"""Event stream object
Wraps the record source into an event stream.
"""
def __init__(self,
storage: Storage,
records_source: RecordsSource):
self._storage = storage
self._records_source = records_source
self._last_revision_cache = set() # to identify lost
async def _resolve_next_lost_record(self) -> Optional[str]:
data: list[str] = await self._storage.load([])
lost = set(data) - self._last_revision_cache
if not lost:
result = None
else:
result = lost.pop()
data.remove(result)
await self._storage.save(data)
return result
async def _get_event_type(
self,
record_hash: str,
) -> EventType:
data: list[str] = await self._storage.load([])
if record_hash in data:
result = EventType.PING
else:
result = EventType.CREATE
data.append(record_hash)
await self._storage.save(data)
return result
async def __anext__(self) -> Event:
try:
current_record = await anext(self._records_source)
except RuntimeError:
if perform_delete_for := await self._resolve_next_lost_record():
event = Event(EventType.DELETE, record_hash=perform_delete_for)
else:
await self._records_source.load(reload=True)
self._last_revision_cache.clear()
return await self.__anext__()
else:
record_hash = current_record.hash()
self._last_revision_cache.add(record_hash)
event_type = await self._get_event_type(record_hash)
event = Event(event_type, current_record, record_hash)
return event
def __aiter__(self):
return self
class EventStreamsManager:
def __init__(self,
storage_type: Type[Storage]):
self._storage_type = storage_type
self._record_sources = []
self._event_streams = []
self._event_streams_iterator = iter([])
def add_source(self, source: RecordsSource):
self._record_sources.append(source)
self._event_streams.append(self._build_event_stream(self._storage_type, source))
self._event_streams_iterator = itertools.cycle(self._event_streams)
@staticmethod
def _build_event_stream(storage_type: Type[Storage], source: RecordsSource):
exact_storage = storage_type()
exact_storage.set_name(source.hash())
return EventStream(exact_storage, source)
async def __anext__(self):
current_stream = next(self._event_streams_iterator)
try:
result = await anext(current_stream)
except SourceExpired:
# todo: implement source remove
return self.__anext__()
return result
def __aiter__(self):
return self
# ##### example #####
@dataclass
class MyRecord(Record):
name: str
def hash(self):
return md5(self.name.encode()).hexdigest()
class MyRecordsSource(RecordsSource):
def __init__(self, name: str, data):
self._data = []
self._scr = data
self._current_index = 0
self._name = name
async def load(self, *, reload=False):
self._data = self._scr.copy()
self._current_index = 0
async def __anext__(self) -> Record:
if self._current_index >= len(self._data):
raise StopIteration
result = self._data[self._current_index]
self._current_index += 1
return MyRecord(result)
def __hash__(self):
return hash(self._name)
class MyStorage(Storage):
def __init__(self):
self._data = None
self._name = None
def set_name(self, name: str):
self._name = name
async def load(self, default: Union[dict, list] = None) -> Union[dict, list]:
return self._data or default
async def save(self, data: Union[dict, list]):
self._data = data
async def main():
src1 = MyRecordsSource("test1", ["1", "2", "3"])
src2 = MyRecordsSource("test2", ["10", "20", "30"])
manager = EventStreamsManager(MyStorage)
manager.add_source(src1)
manager.add_source(src2)
async for i in manager:
print(i)
# emulate realworld changes
src1._data.append(str(random.randint(0, 9)))
src2._data.append(str(random.randint(0, 9)))
src1._data.pop(random.randint(0, len(src1._data)-1))
src2._data.pop(random.randint(0, len(src2._data)-1))
await asyncio.sleep(0.5)
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment