Skip to content

Instantly share code, notes, and snippets.

@cra
Created November 15, 2023 18:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cra/4ec345e4627f89352045796ead7fc20d to your computer and use it in GitHub Desktop.
Save cra/4ec345e4627f89352045796ead7fc20d to your computer and use it in GitHub Desktop.
import logging
from datetime import timedelta
import psycopg2 # pip install psycopg2-binary
# pip install bytewax==0.17.2
from bytewax.connectors.stdio import StdOutput
from bytewax.connectors.periodic import SimplePollingInput
from bytewax.dataflow import Dataflow
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
)
class PGTableInput(SimplePollingInput):
TABLE = 'public.entries'
TS_FIELD = 'timestamp'
DATA_FIELDS = 'id,event_type'
def __init__(self, interval):
# skip align_to argument
super().__init__(interval, None)
self.conn = psycopg2.connect(
dbname='analytics',
user='my_user',
password='my_password',
host='data.some.db',
port=5433,
)
self.cur = self.conn.cursor()
# you might want to pass some initial value for this one
self.latest_ts = None
@property
def query(self):
# this is to make sure timestamp is the first
query = f'''
SELECT
{self.TS_FIELD}, {self.DATA_FIELDS}
FROM {self.TABLE}
'''
if self.latest_ts:
query += f'''
WHERE {self.TS_FIELD} > {self.latest_ts}
'''
query += f'ORDER BY {self.TS_FIELD}'
return query
def _fetch_rows(self):
self.cur.execute(self.query)
return self.cur.fetchall()
def next_item(self):
rows = self._fetch_rows()
logging.info(f'{len(rows)} new rows, latest ts={self.latest_ts}')
# first field of last row is the latest ts
if rows:
self.latest_ts = rows[-1][0]
return rows
def do_the_thing(row):
'''
Placeholder that does something with a row passed to it
'''
return (len(row), row)
flow = Dataflow()
inp = PGTableInput(interval=timedelta(seconds=1))
out = StdOutput()
flow.input('inp', inp)
flow.flat_map(lambda x: x) # (a,b,c) -> a b c
flow.redistribute() # rebalances flow among workers
flow.map(do_the_thing)
flow.output('out', out)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment