Skip to content

Instantly share code, notes, and snippets.

@bmyerz
Created November 21, 2015 17:50
Show Gist options
  • Save bmyerz/a5819e49d8256d143f40 to your computer and use it in GitHub Desktop.
Save bmyerz/a5819e49d8256d143f40 to your computer and use it in GitHub Desktop.
Decomposegroupby before and after changes
class DecomposeGroupBy(rules.Rule):
"""Convert a logical group by into a two-phase group by.
The local half of the aggregate before the shuffle step, whereas the remote
half runs after the shuffle step.
TODO: omit this optimization if the data is already shuffled, or
if the cardinality of the grouping keys is high.
"""
@staticmethod
def do_transfer(op):
"""Introduce a network transfer before a groupby operation."""
# Get an array of position references to columns in the child scheme
child_scheme = op.input.scheme()
group_fields = [expression.toUnnamed(ref, child_scheme)
for ref in op.grouping_list]
if len(group_fields) == 0:
# Need to Collect all tuples at once place
op.input = algebra.Collect(op.input)
else:
# Need to Shuffle
op.input = algebra.Shuffle(op.input, group_fields)
def fire(self, op):
# Punt if it's not a group by or we've already converted this into an
# an instance of MyriaGroupBy
if op.__class__ != algebra.GroupBy:
return op
# Bail early if we have any non-decomposable aggregates
if not all(x.is_decomposable() for x in op.aggregate_list):
out_op = MyriaGroupBy()
out_op.copy(op)
DecomposeGroupBy.do_transfer(out_op)
return out_op
num_grouping_terms = len(op.grouping_list)
local_emitters = []
local_statemods = []
remote_emitters = []
remote_statemods = []
finalizer_exprs = []
# The starting positions for the current local, remote aggregate
local_output_pos = num_grouping_terms
remote_output_pos = num_grouping_terms
requires_finalizer = False
for agg in op.aggregate_list:
# Multiple emit arguments can be associated with a single
# decomposition rule; coalesce them all together.
state = agg.get_decomposable_state()
assert state
################################
# Extract the set of emitters and statemods required for the
# local aggregate.
################################
laggs = state.get_local_emitters()
local_emitters.extend(laggs)
local_statemods.extend(state.get_local_statemods())
################################
# Extract the set of emitters and statemods required for the
# remote aggregate. Remote expressions must be rebased to
# remove instances of LocalAggregateOutput
################################
raggs = state.get_remote_emitters()
raggs = [rebase_local_aggregate_output(x, local_output_pos)
for x in raggs]
remote_emitters.extend(raggs)
rsms = state.get_remote_statemods()
for sm in rsms:
update_expr = rebase_local_aggregate_output(
sm.update_expr, local_output_pos)
remote_statemods.append(
StateVar(sm.name, sm.init_expr, update_expr))
################################
# Extract any required finalizers. These must be rebased to remove
# instances of RemoteAggregateOutput
################################
finalizer = state.get_finalizer()
if finalizer is not None:
requires_finalizer = True
finalizer_exprs.append(
rebase_finalizer(finalizer, remote_output_pos))
else:
for i in range(len(raggs)):
finalizer_exprs.append(
UnnamedAttributeRef(remote_output_pos + i))
local_output_pos += len(laggs)
remote_output_pos += len(raggs)
################################
# Glue together the local and remote aggregates:
# Local => Shuffle => Remote => (optional) Finalizer.
################################
local_gb = MyriaGroupBy(op.grouping_list, local_emitters, op.input,
local_statemods)
grouping_fields = [UnnamedAttributeRef(i)
for i in range(num_grouping_terms)]
remote_gb = MyriaGroupBy(grouping_fields, remote_emitters, local_gb,
remote_statemods)
DecomposeGroupBy.do_transfer(remote_gb)
if requires_finalizer:
# Pass through grouping terms
gmappings = [(None, UnnamedAttributeRef(i))
for i in range(num_grouping_terms)]
fmappings = [(None, fx) for fx in finalizer_exprs]
return algebra.Apply(gmappings + fmappings, remote_gb)
return remote_gb
class DecomposeGroupBy(Rule):
"""Convert a logical group by into a two-phase group by.
The local half of the aggregate before the shuffle step, whereas the remote
half runs after the shuffle step.
TODO: omit this optimization if the data is already shuffled, or
if the cardinality of the grouping keys is high.
"""
def __init__(self, partition_groupby_class, only_fire_on_multi_key=None):
self._gb_class = partition_groupby_class
self._only_fire_on_multi_key = only_fire_on_multi_key
super(DecomposeGroupBy, self).__init__()
@staticmethod
def do_transfer(op):
"""Introduce a network transfer before a groupby operation."""
# Get an array of position references to columns in the child scheme
child_scheme = op.input.scheme()
group_fields = [expression.toUnnamed(ref, child_scheme)
for ref in op.grouping_list]
if len(group_fields) == 0:
# Need to Collect all tuples at once place
op.input = algebra.Collect(op.input)
else:
# Need to Shuffle
op.input = algebra.Shuffle(op.input, group_fields)
def fire(self, op):
# Punt if it's not a group by or we've already converted this into an
# an instance of self.gb_class
if op.__class__ != algebra.GroupBy:
return op
if self._only_fire_on_multi_key and len(op.grouping_list) == 0:
out_op = self._only_fire_on_multi_key()
out_op.copy(op)
return out_op
# Bail early if we have any non-decomposable aggregates
if not all(x.is_decomposable() for x in op.aggregate_list):
out_op = self._gb_class()
out_op.copy(op)
DecomposeGroupBy.do_transfer(out_op)
return out_op
num_grouping_terms = len(op.grouping_list)
local_emitters = []
local_statemods = []
remote_emitters = []
remote_statemods = []
finalizer_exprs = []
# The starting positions for the current local, remote aggregate
local_output_pos = num_grouping_terms
remote_output_pos = num_grouping_terms
requires_finalizer = False
for agg in op.aggregate_list:
# Multiple emit arguments can be associated with a single
# decomposition rule; coalesce them all together.
state = agg.get_decomposable_state()
assert state
################################
# Extract the set of emitters and statemods required for the
# local aggregate.
################################
laggs = state.get_local_emitters()
local_emitters.extend(laggs)
local_statemods.extend(state.get_local_statemods())
################################
# Extract the set of emitters and statemods required for the
# remote aggregate. Remote expressions must be rebased to
# remove instances of LocalAggregateOutput
################################
raggs = state.get_remote_emitters()
raggs = [rebase_local_aggregate_output(x, local_output_pos)
for x in raggs]
remote_emitters.extend(raggs)
rsms = state.get_remote_statemods()
for sm in rsms:
update_expr = rebase_local_aggregate_output(
sm.update_expr, local_output_pos)
remote_statemods.append(
StateVar(sm.name, sm.init_expr, update_expr))
################################
# Extract any required finalizers. These must be rebased to remove
# instances of RemoteAggregateOutput
################################
finalizer = state.get_finalizer()
if finalizer is not None:
requires_finalizer = True
finalizer_exprs.append(
rebase_finalizer(finalizer, remote_output_pos))
else:
for i in range(len(raggs)):
finalizer_exprs.append(
UnnamedAttributeRef(remote_output_pos + i))
local_output_pos += len(laggs)
remote_output_pos += len(raggs)
################################
# Glue together the local and remote aggregates:
# Local => Shuffle => Remote => (optional) Finalizer.
################################
local_gb = self._gb_class(op.grouping_list, local_emitters, op.input,
local_statemods)
grouping_fields = [UnnamedAttributeRef(i)
for i in range(num_grouping_terms)]
remote_gb = self._gb_class(grouping_fields, remote_emitters, local_gb,
remote_statemods)
DecomposeGroupBy.do_transfer(remote_gb)
if requires_finalizer:
# Pass through grouping terms
gmappings = [(None, UnnamedAttributeRef(i))
for i in range(num_grouping_terms)]
fmappings = [(None, fx) for fx in finalizer_exprs]
return algebra.Apply(gmappings + fmappings, remote_gb)
return remote_gb
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment