Created
July 25, 2022 21:41
-
-
Save reynoldsm88/457118fc6c733b81a25a7f03a2a94e49 to your computer and use it in GitHub Desktop.
prefect 2.0 task analysis example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
######################################################################## | |
# Source interfaces | |
######################################################################## | |
@dataclass | |
class Source: | |
__metaclass__ = ABCMeta | |
@abstractmethod | |
def poll() -> List[ Tuple[ str,str ] ]: | |
return NotImplemented() | |
@dataclass | |
class MySource( Source ): | |
def __init__( self, host: str, port: int ): | |
self.host = host | |
self.port = port | |
self.stream = MyStream.connect( host, port ) | |
def poll() : List[ Tuple[ str,str ] ]: | |
return self.stream.poll( 100 ) | |
######################################################################## | |
# Sink interfaces | |
######################################################################## | |
@dataclass | |
class Sink: | |
__metaclass__ = ABCMeta | |
@abstractmethod | |
def do_sink( data : List[ dict ] ) -> int: | |
raise NotImplemented() | |
@dataclass | |
class MySink: | |
def __init__( self, host: str, port: int ): | |
self.host = host | |
self.port = port | |
self.datastore = MyDatastore.connect( host, port ) | |
def do_sink( data : List[dict] ): | |
self.datastore.batch_save( data ) | |
return len( data ) | |
@task( name = 'read-source' ) | |
async def read_source( source : Source ): | |
return source.poll() | |
@task( name = 'process-items' ) | |
async def process_items( data : List[ dict] ) -> List[dict]: | |
return process( data ) | |
@task( name = 'sink-results' ) | |
async def sink_results( data : List[ dict ], sink : Sink ): | |
return sink.do_sink( data ) | |
@flow( name = 'myflow' ) | |
def run_flow( sources : List[ Source ], sinks : List[ Sink ] ): | |
aggregated = [] | |
for src in sources: | |
msgs = await read_source( source ) | |
results = await process_items( msgs ) | |
aggregated.append( results ) | |
for sink in sinks: | |
sink_results( aggregated, sinks ) | |
def run(): | |
sources = [ MySource( 'host', 1010 ) ] | |
sinks = [ MySink( 'host', 1011 ) ] | |
run_flow( sources, sinks ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment