Created
November 21, 2015 17:50
-
-
Save bmyerz/a5819e49d8256d143f40 to your computer and use it in GitHub Desktop.
Decomposegroupby before and after changes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class DecomposeGroupBy(rules.Rule): | |
"""Convert a logical group by into a two-phase group by. | |
The local half of the aggregate before the shuffle step, whereas the remote | |
half runs after the shuffle step. | |
TODO: omit this optimization if the data is already shuffled, or | |
if the cardinality of the grouping keys is high. | |
""" | |
@staticmethod | |
def do_transfer(op): | |
"""Introduce a network transfer before a groupby operation.""" | |
# Get an array of position references to columns in the child scheme | |
child_scheme = op.input.scheme() | |
group_fields = [expression.toUnnamed(ref, child_scheme) | |
for ref in op.grouping_list] | |
if len(group_fields) == 0: | |
# Need to Collect all tuples at once place | |
op.input = algebra.Collect(op.input) | |
else: | |
# Need to Shuffle | |
op.input = algebra.Shuffle(op.input, group_fields) | |
def fire(self, op): | |
# Punt if it's not a group by or we've already converted this into an | |
# an instance of MyriaGroupBy | |
if op.__class__ != algebra.GroupBy: | |
return op | |
# Bail early if we have any non-decomposable aggregates | |
if not all(x.is_decomposable() for x in op.aggregate_list): | |
out_op = MyriaGroupBy() | |
out_op.copy(op) | |
DecomposeGroupBy.do_transfer(out_op) | |
return out_op | |
num_grouping_terms = len(op.grouping_list) | |
local_emitters = [] | |
local_statemods = [] | |
remote_emitters = [] | |
remote_statemods = [] | |
finalizer_exprs = [] | |
# The starting positions for the current local, remote aggregate | |
local_output_pos = num_grouping_terms | |
remote_output_pos = num_grouping_terms | |
requires_finalizer = False | |
for agg in op.aggregate_list: | |
# Multiple emit arguments can be associated with a single | |
# decomposition rule; coalesce them all together. | |
state = agg.get_decomposable_state() | |
assert state | |
################################ | |
# Extract the set of emitters and statemods required for the | |
# local aggregate. | |
################################ | |
laggs = state.get_local_emitters() | |
local_emitters.extend(laggs) | |
local_statemods.extend(state.get_local_statemods()) | |
################################ | |
# Extract the set of emitters and statemods required for the | |
# remote aggregate. Remote expressions must be rebased to | |
# remove instances of LocalAggregateOutput | |
################################ | |
raggs = state.get_remote_emitters() | |
raggs = [rebase_local_aggregate_output(x, local_output_pos) | |
for x in raggs] | |
remote_emitters.extend(raggs) | |
rsms = state.get_remote_statemods() | |
for sm in rsms: | |
update_expr = rebase_local_aggregate_output( | |
sm.update_expr, local_output_pos) | |
remote_statemods.append( | |
StateVar(sm.name, sm.init_expr, update_expr)) | |
################################ | |
# Extract any required finalizers. These must be rebased to remove | |
# instances of RemoteAggregateOutput | |
################################ | |
finalizer = state.get_finalizer() | |
if finalizer is not None: | |
requires_finalizer = True | |
finalizer_exprs.append( | |
rebase_finalizer(finalizer, remote_output_pos)) | |
else: | |
for i in range(len(raggs)): | |
finalizer_exprs.append( | |
UnnamedAttributeRef(remote_output_pos + i)) | |
local_output_pos += len(laggs) | |
remote_output_pos += len(raggs) | |
################################ | |
# Glue together the local and remote aggregates: | |
# Local => Shuffle => Remote => (optional) Finalizer. | |
################################ | |
local_gb = MyriaGroupBy(op.grouping_list, local_emitters, op.input, | |
local_statemods) | |
grouping_fields = [UnnamedAttributeRef(i) | |
for i in range(num_grouping_terms)] | |
remote_gb = MyriaGroupBy(grouping_fields, remote_emitters, local_gb, | |
remote_statemods) | |
DecomposeGroupBy.do_transfer(remote_gb) | |
if requires_finalizer: | |
# Pass through grouping terms | |
gmappings = [(None, UnnamedAttributeRef(i)) | |
for i in range(num_grouping_terms)] | |
fmappings = [(None, fx) for fx in finalizer_exprs] | |
return algebra.Apply(gmappings + fmappings, remote_gb) | |
return remote_gb | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class DecomposeGroupBy(Rule): | |
"""Convert a logical group by into a two-phase group by. | |
The local half of the aggregate before the shuffle step, whereas the remote | |
half runs after the shuffle step. | |
TODO: omit this optimization if the data is already shuffled, or | |
if the cardinality of the grouping keys is high. | |
""" | |
def __init__(self, partition_groupby_class, only_fire_on_multi_key=None): | |
self._gb_class = partition_groupby_class | |
self._only_fire_on_multi_key = only_fire_on_multi_key | |
super(DecomposeGroupBy, self).__init__() | |
@staticmethod | |
def do_transfer(op): | |
"""Introduce a network transfer before a groupby operation.""" | |
# Get an array of position references to columns in the child scheme | |
child_scheme = op.input.scheme() | |
group_fields = [expression.toUnnamed(ref, child_scheme) | |
for ref in op.grouping_list] | |
if len(group_fields) == 0: | |
# Need to Collect all tuples at once place | |
op.input = algebra.Collect(op.input) | |
else: | |
# Need to Shuffle | |
op.input = algebra.Shuffle(op.input, group_fields) | |
def fire(self, op): | |
# Punt if it's not a group by or we've already converted this into an | |
# an instance of self.gb_class | |
if op.__class__ != algebra.GroupBy: | |
return op | |
if self._only_fire_on_multi_key and len(op.grouping_list) == 0: | |
out_op = self._only_fire_on_multi_key() | |
out_op.copy(op) | |
return out_op | |
# Bail early if we have any non-decomposable aggregates | |
if not all(x.is_decomposable() for x in op.aggregate_list): | |
out_op = self._gb_class() | |
out_op.copy(op) | |
DecomposeGroupBy.do_transfer(out_op) | |
return out_op | |
num_grouping_terms = len(op.grouping_list) | |
local_emitters = [] | |
local_statemods = [] | |
remote_emitters = [] | |
remote_statemods = [] | |
finalizer_exprs = [] | |
# The starting positions for the current local, remote aggregate | |
local_output_pos = num_grouping_terms | |
remote_output_pos = num_grouping_terms | |
requires_finalizer = False | |
for agg in op.aggregate_list: | |
# Multiple emit arguments can be associated with a single | |
# decomposition rule; coalesce them all together. | |
state = agg.get_decomposable_state() | |
assert state | |
################################ | |
# Extract the set of emitters and statemods required for the | |
# local aggregate. | |
################################ | |
laggs = state.get_local_emitters() | |
local_emitters.extend(laggs) | |
local_statemods.extend(state.get_local_statemods()) | |
################################ | |
# Extract the set of emitters and statemods required for the | |
# remote aggregate. Remote expressions must be rebased to | |
# remove instances of LocalAggregateOutput | |
################################ | |
raggs = state.get_remote_emitters() | |
raggs = [rebase_local_aggregate_output(x, local_output_pos) | |
for x in raggs] | |
remote_emitters.extend(raggs) | |
rsms = state.get_remote_statemods() | |
for sm in rsms: | |
update_expr = rebase_local_aggregate_output( | |
sm.update_expr, local_output_pos) | |
remote_statemods.append( | |
StateVar(sm.name, sm.init_expr, update_expr)) | |
################################ | |
# Extract any required finalizers. These must be rebased to remove | |
# instances of RemoteAggregateOutput | |
################################ | |
finalizer = state.get_finalizer() | |
if finalizer is not None: | |
requires_finalizer = True | |
finalizer_exprs.append( | |
rebase_finalizer(finalizer, remote_output_pos)) | |
else: | |
for i in range(len(raggs)): | |
finalizer_exprs.append( | |
UnnamedAttributeRef(remote_output_pos + i)) | |
local_output_pos += len(laggs) | |
remote_output_pos += len(raggs) | |
################################ | |
# Glue together the local and remote aggregates: | |
# Local => Shuffle => Remote => (optional) Finalizer. | |
################################ | |
local_gb = self._gb_class(op.grouping_list, local_emitters, op.input, | |
local_statemods) | |
grouping_fields = [UnnamedAttributeRef(i) | |
for i in range(num_grouping_terms)] | |
remote_gb = self._gb_class(grouping_fields, remote_emitters, local_gb, | |
remote_statemods) | |
DecomposeGroupBy.do_transfer(remote_gb) | |
if requires_finalizer: | |
# Pass through grouping terms | |
gmappings = [(None, UnnamedAttributeRef(i)) | |
for i in range(num_grouping_terms)] | |
fmappings = [(None, fx) for fx in finalizer_exprs] | |
return algebra.Apply(gmappings + fmappings, remote_gb) | |
return remote_gb |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment