Skip to content

Instantly share code, notes, and snippets.

@Viveckh
Last active January 15, 2020 17:23
Show Gist options
  • Save Viveckh/1d8b9f151bc76f45421bdb5ddd038b23 to your computer and use it in GitHub Desktop.
Save Viveckh/1d8b9f151bc76f45421bdb5ddd038b23 to your computer and use it in GitHub Desktop.
from metaflow import FlowSpec, step, catch, retry, IncludeFile, Parameter
class GenreStatsFlow(FlowSpec):
"""
A flow to generate some statistics about the movie genres.
The flow performs the following steps:
1) Ingests a CSV into a Pandas Dataframe.
2) Compute quartiles for each genre in parallel
3) Save a dictionary of genre specific statistics.
"""
@step
def start(self):
"""
The start step:
1) Loads the movie metadata into pandas dataframe.
2) Finds all the unique genres.
3) Launches parallel statistics computation for each genre.
"""
# TODO: Loading the CSV and getting unique genres
self.genres = []
self.next(self.compute_statistics, foreach='genres')
@catch(var='compute_failed')
@retry(times=1)
@step
def compute_statistics(self):
"""Compute statistics for a single genre. Run in cloud"""
self.genre = self.input
# TODO: Computing statistics for a genre
self.next(self.join)
@step
def join(self, inputs):
"""Join our parallel branches and merge results into a dictionary."""
# TODO: Joining the results
self.next(self.end)
@step
def end(self):
"""End the flow."""
pass
if __name__ == '__main__':
GenreStatsFlow()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment