Skip to content

Instantly share code, notes, and snippets.

@danking
Last active March 2, 2020 23:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danking/454a2cdbecacd1046ef6d8422f41ab02 to your computer and use it in GitHub Desktop.
Save danking/454a2cdbecacd1046ef6d8422f41ab02 to your computer and use it in GitHub Desktop.

Shuffler IR Design

UUID is probably a string

ShuffleStart(
  keyFields Array<String>,
  rowType (virtual) Struct,
  TypedCodecSpec codecAndEType
): id UUID

ShuffleWrite(
  id UUID,
  partitionId PInt64,
  attemptId PInt64,
  rows Stream<Struct>
): Unit

ShuffleWritingFinished(
  id UUID,
  outPartitions PInt64
): Array<rowType.select(keyFields)>

ShuffleRead(
  id UUID,
  partitionId PInt64
): Stream<Struct>

ShuffleDelete(
  id UUID
): Unit

How does IR manage resources? Should Shuffle be a new type that performs a network call when it is freed? My current thinking is that ShuffleDelete can be inserted by the TableIR lowerer. I believe it will see all potential reads of the shuffle (effectively, the transitive closure of the parents of the TableKeyBy or TableOrderBy that is being lowered to a pair of stages using a ShuffleWrite and a ShuffleRead).

ShuffleWritingFinished returns an array of length outPartitioning + 1. Partition i's bounds are [result[i], result[i+1]).

ShuffleRead is permitted to pass any partitionId in [0, outPartitions). It may read the same partition multiple times by multiple actors. The stream returned by each partition contains only records whose keys fall within the partition bounds (left-inclusive, right-exclusive) returned by ShuffleWritingFinished. Each partition will have roughly the same number of records.

@danking
Copy link
Author

danking commented Mar 2, 2020

  • can the leader ask the shuffler for statistics & metadata about the shuffler
  • can I read the shuffle twice

@danking
Copy link
Author

danking commented Mar 2, 2020

  • should we have a separate node for start read that lets you do multiple reads (without permitting more writes)

@danking
Copy link
Author

danking commented Mar 2, 2020

  • shuffle read that accepts an interval instead of a partition number

@danking
Copy link
Author

danking commented Mar 2, 2020

  • Q: what are the shuffle primitives we want? How much should the lowerer specify the partitioner? Not at all? Number of partitions? The whole partitioning?

@danking
Copy link
Author

danking commented Mar 2, 2020

  • maybe the leader should specify the successful attempt ids? Cotton: no, we should make it so that which attempt succeeds doesn't matter. Only guarantee atomicity of the partition.

@danking
Copy link
Author

danking commented Mar 2, 2020

Tim TODO:

  • try to implement a Spark version of this IR or formulate a way to do both
  • draft of the structure of the lowerer for the shuffler

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment