Skip to content

Instantly share code, notes, and snippets.

@onderkalaci
Last active August 23, 2016 08:38
Show Gist options
  • Save onderkalaci/25fc519a1d6501a65db1827dcbf76689 to your computer and use it in GitHub Desktop.
Save onderkalaci/25fc519a1d6501a65db1827dcbf76689 to your computer and use it in GitHub Desktop.
#
# Psude code for creating hash partitioned tables.
# Algorithm:
# When a table is first created with a given shard count and
# replication factor, generate a new collocationId.
# If there already exists a collocation configuration with the
# given shard count and replication factor, create placements
# that are collocated with an existing table.
#
def create_hash_partitioned_table(tableName, partitionColumn):
# check if there already exists a collocation for the given shard count
# and replication factor
colocationId = GetColocationId(shardCount, replicationFactor)
if collocationId != InvalidOid
# create the placements on the same worker nodes as their collocated tables
CreateWorkerShardsForGivenCollocationId('tableName'::regclass, collocationId)
else
CreateACollocationId(shardCount, replicationFactor)
# use the existing round-robin policy
CreateWorkerShards('tableName'::regclass, shardCount, replicationFactor);
CreateDistributedTable('tableName'::regclass, DISTRIBUTED_BY_HASH, partitionColumn, colocationId);
#
# This function creates collocated table for the given tableName
# with collocationId.
#
def CreateWorkerShardsForGivenCollocationId(tableName, collocationId):
CollocatedTable = getFirstCollocatedTable(collocationId)
CollocatedTableShardPlacements = CollocatedTable->WorkerShardPlacements
# sort by shardId, workerNode and worker port
CollocatedTableShardPlacements.SortList()
for collocatedPlacement in CollocatedTableShardPlacements:
hostName = collocatedPlacement->workerName
portnumber = collocatedPlacement->workerPort
shardMinValue = ShardMinValue(collocatedPlacement)
shardMaxValue = ShardMaxValue(collocatedPlacement)
shardId = GenerateNewShardId()
WorkerCreateShard(ddlEvents, hostName, portName, shardMinValue, shardMaxValue, shardId)
#
# Psude code for rebalancing colocated tables
#
# Input is a table name, all colocated tables has to
# be rebalanced together.
#
# Algorithm: Ensure that queries that relies on collocation information are
# blocked until rebalance for all collocated tables are done.
#
# With this, we can utilize all the infrastructure that shard rebalancer already
# provides.
#
def rebalance_colocated_tables_simpler_but_blocks_all_collocated_queries(inputTableList[]):
colocatedTableList = []
collocationId = InvalidOid
rebalanceOperations = []
# rebalance all collocated tables with the same collocationId
if list_length(inputTableList) == 1:
colocatedTableList = ColocatedTableList(colocationId)
# rebalance only given tables
else
colocatedTableList = inputTableList
# get the collocationId for the given table
colocationId = ColocationId(inputTableList[0])
# acquire lock to prevent collocated queries being planned
AcquireAccessExclusiveLock(colocationId)
# get all the rebalance operations that are on the colocatedTableList
for colocatedTable in colocatedTableList:
rebalanceOperations [colocatedTable] = shard_placement_rebalance_array()
# TODO: Could it be an Assert instead?
# We should ensure that all (source -> target) must be the same for collocated tables
ErrorIfNotAllRebalanceOperationsAreSameForAllTables(rebalanceOperations)
# once we're OK with that, we should continue to rebalance operations
for rebalanceOperationsForSingleTable in rebalanceOperations:
# Acquire lock as we already do on rebalance_table_shards()
AcquireAccessExclusiveLock(tableOid)
for placement_update_data in rebalanceOperationsForSingleTable
update_shard_placement(placement_update_data)
# release the lock
ReleaseLock(tableOid)
#
# Psude code for rebalancing colocated tables
#
# Input is a table name, all colocated tables has to
# be rebalanced together.
#
# Algorithm: Ensure that placements that are collocated are moved once at a time
# with acquiring locks on all of them first.
#
# With this, we need to re-write some of the fundemantal functions that shard rebalancer already
# provided to us.
#
def rebalance_colocated_tables_more_complicated_but_does_not_block_collocated_queries(inputTableList[]):
colocatedTableList = []
collocationId = InvalidOid
rebalanceOperations = []
# rebalance all collocated tables with the same collocationId
if list_length(inputTableList) == 1:
colocatedTableList = ColocatedTableList(colocationId)
# rebalance only given tables
else
colocatedTableList = inputTableList
# get the collocationId for the given table
colocationId = ColocationId(inputTableList[0])
# get all the rebalance operations that are on the colocatedTableList
for colocatedTable in colocatedTableList:
rebalanceOperations [colocatedTable] = shard_placement_rebalance_array()
# TODO: Could it be an Assert instead?
# We should ensure that all (source -> target) must be the same for collocated tables
ErrorIfNotAllRebalanceOperationsAreSameForAllTables(rebalanceOperations)
for collocatedPlacements in rebalanceOperations:
for collocatedPlacement in collocatedPlacements:
AcquireAccessExclusiveLock(collocatedPlacement.shardId)
move_shard_placements(placement_update_data)
for collocatedPlacement in collocatedPlacements:
ReleaseAccessExclusiveLock(collocatedPlacement.shardId)
#
# We probably want to keep rebalance_table_shards(). But,
# if user uses this, we need to remove the given table from
# collocation group.
#
def rebalance_table_shards(tableName):
# we already have this function in shard
# rebalancer repo
AcquireLocak('tableName'::regclass)
ResponsiveWorkerNodeList = GetResponsiveWorkerList()
RebalanceUpdateCommandList = shard_placement_rebalance_array(ResponsiveWorkerNodeList, shardPlacementList)
## Now that we should update collocationId of the table
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment