Last active
January 15, 2020 17:23
-
-
Save Viveckh/1d8b9f151bc76f45421bdb5ddd038b23 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from metaflow import FlowSpec, step, catch, retry, IncludeFile, Parameter | |
class GenreStatsFlow(FlowSpec): | |
""" | |
A flow to generate some statistics about the movie genres. | |
The flow performs the following steps: | |
1) Ingests a CSV into a Pandas Dataframe. | |
2) Compute quartiles for each genre in parallel | |
3) Save a dictionary of genre specific statistics. | |
""" | |
@step | |
def start(self): | |
""" | |
The start step: | |
1) Loads the movie metadata into pandas dataframe. | |
2) Finds all the unique genres. | |
3) Launches parallel statistics computation for each genre. | |
""" | |
# TODO: Loading the CSV and getting unique genres | |
self.genres = [] | |
self.next(self.compute_statistics, foreach='genres') | |
@catch(var='compute_failed') | |
@retry(times=1) | |
@step | |
def compute_statistics(self): | |
"""Compute statistics for a single genre. Run in cloud""" | |
self.genre = self.input | |
# TODO: Computing statistics for a genre | |
self.next(self.join) | |
@step | |
def join(self, inputs): | |
"""Join our parallel branches and merge results into a dictionary.""" | |
# TODO: Joining the results | |
self.next(self.end) | |
@step | |
def end(self): | |
"""End the flow.""" | |
pass | |
if __name__ == '__main__': | |
GenreStatsFlow() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment