Skip to content

Instantly share code, notes, and snippets.

@reynoldsm88
Created July 25, 2022 21:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save reynoldsm88/457118fc6c733b81a25a7f03a2a94e49 to your computer and use it in GitHub Desktop.
Save reynoldsm88/457118fc6c733b81a25a7f03a2a94e49 to your computer and use it in GitHub Desktop.
prefect 2.0 task analysis example
########################################################################
# Source interfaces
########################################################################
@dataclass
class Source:
__metaclass__ = ABCMeta
@abstractmethod
def poll() -> List[ Tuple[ str,str ] ]:
return NotImplemented()
@dataclass
class MySource( Source ):
def __init__( self, host: str, port: int ):
self.host = host
self.port = port
self.stream = MyStream.connect( host, port )
def poll() : List[ Tuple[ str,str ] ]:
return self.stream.poll( 100 )
########################################################################
# Sink interfaces
########################################################################
@dataclass
class Sink:
__metaclass__ = ABCMeta
@abstractmethod
def do_sink( data : List[ dict ] ) -> int:
raise NotImplemented()
@dataclass
class MySink:
def __init__( self, host: str, port: int ):
self.host = host
self.port = port
self.datastore = MyDatastore.connect( host, port )
def do_sink( data : List[dict] ):
self.datastore.batch_save( data )
return len( data )
@task( name = 'read-source' )
async def read_source( source : Source ):
return source.poll()
@task( name = 'process-items' )
async def process_items( data : List[ dict] ) -> List[dict]:
return process( data )
@task( name = 'sink-results' )
async def sink_results( data : List[ dict ], sink : Sink ):
return sink.do_sink( data )
@flow( name = 'myflow' )
def run_flow( sources : List[ Source ], sinks : List[ Sink ] ):
aggregated = []
for src in sources:
msgs = await read_source( source )
results = await process_items( msgs )
aggregated.append( results )
for sink in sinks:
sink_results( aggregated, sinks )
def run():
sources = [ MySource( 'host', 1010 ) ]
sinks = [ MySink( 'host', 1011 ) ]
run_flow( sources, sinks )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment