Last active
December 5, 2015 14:01
-
-
Save giffels/ac1529e3453f8abf83d9 to your computer and use it in GitHub Desktop.
Multi-threaded PhEDEx stage-in scripts for dCache
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from threading import Thread | |
import logging, os, sys | |
class LessThanFilter(logging.Filter): | |
def __init__(self, level): | |
self._level = level | |
logging.Filter.__init__(self) | |
def filter(self, rec): | |
return rec.levelno < self._level | |
def check_status(*pfns): | |
for pfn in pfns: | |
# check here the right pnfs syntax for your site | |
if not pfn.startswith('/grid'): | |
logging.error('%s: wrong pfn skipped: %s\n' % (sys.argv[0], pfn) ) | |
return | |
if not os.system("dccp -P -t -1 %s > /dev/null 2>&1" % pfn): | |
logging.info(pfn) | |
if __name__ == '__main__': | |
#configure logger | |
root_handler = logging.getLogger() | |
root_handler.setLevel(logging.NOTSET) | |
log_handler_out = logging.StreamHandler(sys.stdout) | |
log_handler_out.setLevel(logging.DEBUG) | |
log_handler_out.addFilter(LessThanFilter(logging.WARNING)) | |
root_handler.addHandler(log_handler_out) | |
log_handler_err = logging.StreamHandler(sys.stderr) | |
log_handler_err.setLevel(logging.WARNING) | |
root_handler.addHandler(log_handler_err) | |
#degree of parallelism | |
number_of_threads = 25 | |
# pfns are passed blank-separated | |
pfns = sys.argv[1:] | |
if not pfns: | |
logging.error("%s: No pfns given\n" % sys.argv[0]) | |
sys.exit(1) | |
#distribute pfns almost equally to threads | |
n_items, end_index, pfn_chunks = len(pfns), 0, [] | |
for thread in range(number_of_threads): | |
start_index, end_index = end_index, end_index + (n_items+thread)//number_of_threads | |
pfn_chunks.append(pfns[start_index:end_index]) | |
threads = [Thread(target=check_status, args=pfn_chunk) for pfn_chunk in pfn_chunks if pfn_chunk] | |
for thread in threads: | |
thread.start() | |
for thread in threads: | |
thread.join() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from threading import Thread | |
import logging, os, sys | |
def stage_files(*pfns): | |
for pfn in pfns: | |
# check here the right pfn syntax for your site | |
if not pfn.startswith('/grid'): | |
logging.error('%s: wrong pfn skipped: %s\n' % (sys.argv[0], pfn)) | |
continue | |
if os.system("dccp -P %s" % pfn): | |
logging.error('%s: dccp -P %s returns error\n' % (sys.argv[0], pfn)) | |
if __name__ == '__main__': | |
#configure logger | |
logging.basicConfig(format='%(message)s') | |
root_handler = logging.getLogger() | |
root_handler.setLevel(logging.INFO) | |
#degree of parallelism | |
number_of_threads = 10 | |
# pfns are passed blank-separated | |
pfns = sys.argv[1:] | |
if not pfns: | |
logging.error("%s: No pfns given\n" % sys.argv[0]) | |
sys.exit(1) | |
#distribute pfns almost equally to threads | |
n_items, end_index, pfn_chunks = len(pfns), 0, [] | |
for thread in range(number_of_threads): | |
start_index, end_index = end_index, end_index + (n_items+thread)//number_of_threads | |
pfn_chunks.append(pfns[start_index:end_index]) | |
threads = [Thread(target=stage_files, args=pfn_chunk) for pfn_chunk in pfn_chunks[::-1] if pfn_chunk] | |
for thread in threads: | |
thread.start() | |
for thread in threads: | |
thread.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment