Skip to content

Instantly share code, notes, and snippets.

@mstump
Last active April 23, 2019 11:39
Show Gist options
  • Save mstump/147fc4a29bd265f4328dfdd6096a7336 to your computer and use it in GitHub Desktop.
Save mstump/147fc4a29bd265f4328dfdd6096a7336 to your computer and use it in GitHub Desktop.
Python Faust with an expiring cache reading from historical data, also using protobuf3 as the serializer
import asyncio
import calendar
import faust
import itertools
import random
import signal
import string
import time
import uuid
from contextlib import suppress
from datetime import datetime, timedelta
from faust import windows
from faust.serializers import codecs
from google.protobuf.timestamp_pb2 import Timestamp
from threading import Thread, RLock
from typing import Any
import page_views_pb2
def copy_attributes(source, dest):
fields = None
if isinstance(source, dict):
for name in source.keys():
if hasattr(dest, name):
setattr(dest, name, source.get(name))
else:
if hasattr(source, "_meta"):
fields = source._meta.fields
else:
fields = dir(source)
for name in fields:
if not name.startswith('_'):
value = getattr(source, name)
setattr(dest, name, value)
return dest
class PageView(faust.Record, serializer='page_view_pb'):
id: str = None
user: str = None
occurred_at: datetime = None
@property
def occurred_at_posix(self):
return calendar.timegm(self.occurred_at.timetuple())
@staticmethod
def id_hash(pv):
return pv.id
def _pbSupport(*args):
class PBSupport(codecs.Codec):
def _dumps(self, obj: Any) -> bytes:
out_obj = page_views_pb2.PageView()
out_obj.id = obj["id"]
out_obj.user = obj["user"]
out_obj.occurred_at.FromDatetime(obj["occurred_at"])
return out_obj.SerializeToString()
def _loads(self, s: bytes) -> Any:
pb_obj = page_views_pb2.PageView()
pb_obj.ParseFromString(s)
out_obj = PageView()
out_obj.id = pb_obj.id
out_obj.user = pb_obj.user
out_obj.occurred_at = pb_obj.occurred_at.ToDatetime()
return out_obj
codecs.register('page_view_pb', PBSupport())
_pbSupport()
class CacheEntry():
def __init__(self, value, ttl=20, expires_at=None):
self.value = value
if expires_at:
self.expires_at = expires_at
else:
self.expires_at = time.time() + ttl
self._expired = False
def expired(self, expire_clock=None):
if not expire_clock:
expire_clock = time.time()
if self._expired is False:
return (self.expires_at < expire_clock)
else:
return self._expired
class CacheList():
def __init__(self):
self.entries = []
self.lock = RLock()
def add_entry(self, value, ttl=20, expires_at=None):
with self.lock:
self.entries.append(CacheEntry(value, ttl, expires_at))
def read_entries(self, expire_clock=None):
with self.lock:
self.entries = list(itertools.dropwhile(lambda x: x.expired(expire_clock), self.entries))
return [entry.value for entry in self.entries]
class CacheSet():
def __init__(self, hash_function=hash):
self.entries = dict()
self.lock = RLock()
self.hash_function = hash_function
def add_entry(self, value, ttl=20, expires_at=None):
with self.lock:
self.entries[self.hash_function(value)] = CacheEntry(value, ttl, expires_at)
def read_entries(self, expire_clock=None):
with self.lock:
self.entries = dict(itertools.dropwhile(lambda x: x[1].expired(expire_clock), self.entries.items()))
return [entry.value for entry in self.entries.values()]
cache = CacheSet(PageView.id_hash)
ttl = 10
app = faust.App(
'page_views',
broker='kafka://localhost:9092',
topic_partitions=4,
)
page_view_topic = app.topic('page_views', value_type=PageView)
active_users_table = app.Table(
'active_users',
default=None).tumbling(
ttl,
expires=timedelta(seconds=30),
key_index=True
).relative_to_field(PageView.occurred_at)
# @app.timer(interval=2, on_leader=True)
# async def generator():
# user = ''.join(random.choices(string.ascii_uppercase + string.digits, k=7))
# page_view = PageView(str(uuid.uuid4()), user, datetime.utcnow())
# await page_view_topic.send(value=page_view)
@app.agent(page_view_topic)
async def print_windowed_events(stream):
async for page_view in stream:
cache.add_entry(page_view, expires_at=page_view.occurred_at_posix + ttl)
cache_entries = cache.read_entries(page_view.occurred_at_posix)
print(f"seconds in the past: {datetime.utcnow() - page_view.occurred_at}")
print(f"{len(cache_entries)}, {repr(cache_entries)}")
@asyncio.coroutine
def custom_sleep():
print("SLEEP", datetime.now())
yield from asyncio.sleep(1)
async def historical_producer():
d = datetime.utcnow() - timedelta(seconds=100)
for i in range(100):
user = ''.join(random.choices(string.ascii_uppercase + string.digits, k=7))
page_view = PageView(str(uuid.uuid4()), user, d)
await page_view_topic.send(value=page_view)
d += timedelta(seconds=1)
await custom_sleep()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(historical_producer())
loop.stop()
syntax = "proto3";
import "google/protobuf/timestamp.proto";
package page_views;
message PageView {
string id = 1;
string user = 2;
google.protobuf.Timestamp occurred_at = 3;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment