Skip to content

Instantly share code, notes, and snippets.

@FabienHenon
Created February 1, 2017 14:53
Show Gist options
  • Save FabienHenon/b07b4dbeba0aff0d6b44016821036a09 to your computer and use it in GitHub Desktop.
Save FabienHenon/b07b4dbeba0aff0d6b44016821036a09 to your computer and use it in GitHub Desktop.
retrieve_subscriptions() # Returns a stream with active subscriptions and users
|> Flow.from_enumerable(max_demand: 2) # Start flow
|> Flow.partition(max_demand: 5, stages: 10, hash: fn {u, s} -> {{u, s}, rem(u.id, 10)} end) # Partition dispatching by user id to be able to keep unique users
|> Flow.uniq_by(fn {user, _} -> user.id end) # Keep unique users
|> Flow.partition(max_demand: 5, stages: 10) # Partition
|> Flow.map(&retrieve_best_place/1) # Retrieve best place for user
|> Flow.partition(window: Flow.Window.count(100), stages: 1) # Partition and creating window bound for reduce
|> Flow.reduce(fn -> [] end, fn item, list -> [item | list] end) # Concat places to prepare for batch insert
|> Flow.emit(:state) # Continue with new state from reduce
|> Flow.partition(max_demand: 20, stages: 2) # Partition
|> Flow.map(&save_offers/1) # Save offers with batch places
|> Flow.run # Run flow
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment