Skip to content

Instantly share code, notes, and snippets.

@peluse
Last active August 29, 2015 14:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save peluse/4f77b73db5e2a220591c to your computer and use it in GitHub Desktop.
Save peluse/4f77b73db5e2a220591c to your computer and use it in GitHub Desktop.
job rework
The code below produces the following jobs for the following scenarios:
# part 0: 3C1/hash/xxx-1.data <-- job: sync_only - parnters (FI 1)
# /xxx.durable <-- included in earlier job (FI 1)
# 061/hash/xxx-1.data <-- included in earlier job (FI 1)
# /xxx.durable <-- included in earlier job (FI 1)
# /xxx-2.data <-- job: sync_revert to index 2
# part 1: 3C1/hash/xxx-0.data <-- job: sync_only - parnters (FI 0)
# /xxx-1.data <-- job: sync_revert to index 1
# /xxx.durable <-- included in earlier jobs (FI 0, 1)
# 061/hash/xxx-1.data <-- included in earlier job (FI 1)
# /xxx.durable <-- included in earlier job (FI 1)
# part 2: 3C1/hash/xxx-2.data <-- job: sync_revert to index 2
# /xxx.durable <-- included in earlier job (FI 2)
# 061/hash/xxx-0.data <-- job: sync_revert to index 0
# /xxx.durable <-- included in earlier job (FI 0)
def _get_job_info(self, local_dev, part_path, partition, policy):
"""
Helper function for build_reconstruction_jobs(), handles
common work that jobs will need to do so they don't
duplicate the effort in each one
:param local_dev: the lcoal device
:param obj_ring: the object ring
:param part_path: full path to partition
:param partition: partition number
:param policy: the policy
:returns: dict of job info for processing in a thread
"""
jobs = []
local_id = local_dev['id']
# get a suffix hashes and frag indexes in this part
hashes = self.tpool_get_info(policy, part_path, do_listdir=True)
# dest_nodes represents all potential nodes we need to sync to
part_nodes = policy.object_ring.get_part_nodes(int(partition))
local_node = [n for n in part_nodes if n['id'] == local_id]
# these are the partners we'll sync to when we're a primary
partners = self._get_partners(local_id, policy.object_ring, partition)
# decide what work needs to be done based on:
# 1) part dir with all FI's matching the local node index
# this is the case where everything is where it belongs
# and we just need to compare hashes and sync if needed,
# here we sync with our partners
# 2) part dir with one local and mix of others
# here we need to sync with our partners where FI matches
# the lcoal_id , all others are sync'd with their home
# nodes and then killed
# 3) part dir with no local FI and just one or more others
# here we sync with just the FI that exists, nobody else
# and then all the local FAs are killed
processed_frags = []
# there will be one job per FI for each partition that includes
# a list of suffix dirs that need to be sync'd
for suffix in hashes:
suffixes_per_fi = {}
for frag_index in hashes[suffix]:
job_info = {}
# build up an FI key based dict of 'affected' suffixes
suffixes_per_fi.setdefault(frag_index, []).append(suffix)
# once we've created a job for an FI, we don't need another
# one since they will all sync to the same place, we will
# add the complete list of suffix dirs to the job after
# we've processed all of the FIs and know what they are
if frag_index in processed_frags:
continue
processed_frags.append(frag_index)
job_info['frag_index'] = frag_index
job_info['sync_to'] = []
# tombstones/durables have no FI encoded so when we build a
# job for them, we can't know where to send them when we're
# reverting. So, in that case we talk to all part_nodes,
# otherwise we talk to just the partners
if frag_index is None:
other_nodes = \
[n for n in part_nodes if n['id'] != local_id]
revert = len(other_nodes) > len(part_nodes) - 1
if revert:
job_info['sync_to'] = part_nodes
job_info['sync_type'] = REVERT
else:
job_info['sync_to'] = partners
job_info['sync_type'] = SYNC
else:
# if the current FI belongs here, we sync this suffix dir
# with partners for all files matching the FI
if partners and int(frag_index) == local_node[0]['index']:
job_info['sync_to'] = partners
job_info['sync_type'] = SYNC
else:
# otherwise add in the coresponding node for this FI
# that of a FI
job_info['sync_type'] = REVERT
fi_node = [n for n in part_nodes if n['index'] ==
int(frag_index)]
job_info['sync_to'] = fi_node
# and now the rest of the job info needed to process...
job_info['partition'] = partition
job_info['path'] = part_path
job_info['hashes'] = hashes
job_info['local_dev'] = local_dev
job_info['policy'] = policy
job_info['device'] = local_dev['device']
job_info['suffix_list'] = []
if local_node:
job_info['local_index'] = local_node[0]['index']
else:
job_info['local_index'] = '-1'
jobs.append(job_info)
# so we don't need the 'None' job unless there aren't any
# "FI jobs" but we don't know that until now (after the
# frag index loop) so lets rip it out if its in there.
# note that the job itself is not a duplicate but the work
# its doing is already covered if there are other jobs
if len(jobs) > 1:
remove = False
for job in jobs:
if job['frag_index'] is None:
remove = True
break
if remove:
jobs.remove(job)
# now we know all of the suffixes affected by this FI,
# update the jobs with the suffixes they need to work on
for fi in suffixes_per_fi:
for job in jobs:
if job['frag_index'] == fi:
job['suffix_list'].extend(suffixes_per_fi[fi])
# return a list of jobs for this part
return jobs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment