Last active
August 29, 2015 14:18
-
-
Save peluse/4f77b73db5e2a220591c to your computer and use it in GitHub Desktop.
job rework
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
The code below produces the following jobs for the following scenarios: | |
# part 0: 3C1/hash/xxx-1.data <-- job: sync_only - parnters (FI 1) | |
# /xxx.durable <-- included in earlier job (FI 1) | |
# 061/hash/xxx-1.data <-- included in earlier job (FI 1) | |
# /xxx.durable <-- included in earlier job (FI 1) | |
# /xxx-2.data <-- job: sync_revert to index 2 | |
# part 1: 3C1/hash/xxx-0.data <-- job: sync_only - parnters (FI 0) | |
# /xxx-1.data <-- job: sync_revert to index 1 | |
# /xxx.durable <-- included in earlier jobs (FI 0, 1) | |
# 061/hash/xxx-1.data <-- included in earlier job (FI 1) | |
# /xxx.durable <-- included in earlier job (FI 1) | |
# part 2: 3C1/hash/xxx-2.data <-- job: sync_revert to index 2 | |
# /xxx.durable <-- included in earlier job (FI 2) | |
# 061/hash/xxx-0.data <-- job: sync_revert to index 0 | |
# /xxx.durable <-- included in earlier job (FI 0) | |
def _get_job_info(self, local_dev, part_path, partition, policy): | |
""" | |
Helper function for build_reconstruction_jobs(), handles | |
common work that jobs will need to do so they don't | |
duplicate the effort in each one | |
:param local_dev: the lcoal device | |
:param obj_ring: the object ring | |
:param part_path: full path to partition | |
:param partition: partition number | |
:param policy: the policy | |
:returns: dict of job info for processing in a thread | |
""" | |
jobs = [] | |
local_id = local_dev['id'] | |
# get a suffix hashes and frag indexes in this part | |
hashes = self.tpool_get_info(policy, part_path, do_listdir=True) | |
# dest_nodes represents all potential nodes we need to sync to | |
part_nodes = policy.object_ring.get_part_nodes(int(partition)) | |
local_node = [n for n in part_nodes if n['id'] == local_id] | |
# these are the partners we'll sync to when we're a primary | |
partners = self._get_partners(local_id, policy.object_ring, partition) | |
# decide what work needs to be done based on: | |
# 1) part dir with all FI's matching the local node index | |
# this is the case where everything is where it belongs | |
# and we just need to compare hashes and sync if needed, | |
# here we sync with our partners | |
# 2) part dir with one local and mix of others | |
# here we need to sync with our partners where FI matches | |
# the lcoal_id , all others are sync'd with their home | |
# nodes and then killed | |
# 3) part dir with no local FI and just one or more others | |
# here we sync with just the FI that exists, nobody else | |
# and then all the local FAs are killed | |
processed_frags = [] | |
# there will be one job per FI for each partition that includes | |
# a list of suffix dirs that need to be sync'd | |
for suffix in hashes: | |
suffixes_per_fi = {} | |
for frag_index in hashes[suffix]: | |
job_info = {} | |
# build up an FI key based dict of 'affected' suffixes | |
suffixes_per_fi.setdefault(frag_index, []).append(suffix) | |
# once we've created a job for an FI, we don't need another | |
# one since they will all sync to the same place, we will | |
# add the complete list of suffix dirs to the job after | |
# we've processed all of the FIs and know what they are | |
if frag_index in processed_frags: | |
continue | |
processed_frags.append(frag_index) | |
job_info['frag_index'] = frag_index | |
job_info['sync_to'] = [] | |
# tombstones/durables have no FI encoded so when we build a | |
# job for them, we can't know where to send them when we're | |
# reverting. So, in that case we talk to all part_nodes, | |
# otherwise we talk to just the partners | |
if frag_index is None: | |
other_nodes = \ | |
[n for n in part_nodes if n['id'] != local_id] | |
revert = len(other_nodes) > len(part_nodes) - 1 | |
if revert: | |
job_info['sync_to'] = part_nodes | |
job_info['sync_type'] = REVERT | |
else: | |
job_info['sync_to'] = partners | |
job_info['sync_type'] = SYNC | |
else: | |
# if the current FI belongs here, we sync this suffix dir | |
# with partners for all files matching the FI | |
if partners and int(frag_index) == local_node[0]['index']: | |
job_info['sync_to'] = partners | |
job_info['sync_type'] = SYNC | |
else: | |
# otherwise add in the coresponding node for this FI | |
# that of a FI | |
job_info['sync_type'] = REVERT | |
fi_node = [n for n in part_nodes if n['index'] == | |
int(frag_index)] | |
job_info['sync_to'] = fi_node | |
# and now the rest of the job info needed to process... | |
job_info['partition'] = partition | |
job_info['path'] = part_path | |
job_info['hashes'] = hashes | |
job_info['local_dev'] = local_dev | |
job_info['policy'] = policy | |
job_info['device'] = local_dev['device'] | |
job_info['suffix_list'] = [] | |
if local_node: | |
job_info['local_index'] = local_node[0]['index'] | |
else: | |
job_info['local_index'] = '-1' | |
jobs.append(job_info) | |
# so we don't need the 'None' job unless there aren't any | |
# "FI jobs" but we don't know that until now (after the | |
# frag index loop) so lets rip it out if its in there. | |
# note that the job itself is not a duplicate but the work | |
# its doing is already covered if there are other jobs | |
if len(jobs) > 1: | |
remove = False | |
for job in jobs: | |
if job['frag_index'] is None: | |
remove = True | |
break | |
if remove: | |
jobs.remove(job) | |
# now we know all of the suffixes affected by this FI, | |
# update the jobs with the suffixes they need to work on | |
for fi in suffixes_per_fi: | |
for job in jobs: | |
if job['frag_index'] == fi: | |
job['suffix_list'].extend(suffixes_per_fi[fi]) | |
# return a list of jobs for this part | |
return jobs |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment