Skip to content

Instantly share code, notes, and snippets.

@giffels
Last active December 5, 2015 14:01
Show Gist options
  • Save giffels/ac1529e3453f8abf83d9 to your computer and use it in GitHub Desktop.
Save giffels/ac1529e3453f8abf83d9 to your computer and use it in GitHub Desktop.
Multi-threaded PhEDEx stage-in scripts for dCache
#!/usr/bin/env python
from threading import Thread
import logging, os, sys
class LessThanFilter(logging.Filter):
def __init__(self, level):
self._level = level
logging.Filter.__init__(self)
def filter(self, rec):
return rec.levelno < self._level
def check_status(*pfns):
for pfn in pfns:
# check here the right pnfs syntax for your site
if not pfn.startswith('/grid'):
logging.error('%s: wrong pfn skipped: %s\n' % (sys.argv[0], pfn) )
return
if not os.system("dccp -P -t -1 %s > /dev/null 2>&1" % pfn):
logging.info(pfn)
if __name__ == '__main__':
#configure logger
root_handler = logging.getLogger()
root_handler.setLevel(logging.NOTSET)
log_handler_out = logging.StreamHandler(sys.stdout)
log_handler_out.setLevel(logging.DEBUG)
log_handler_out.addFilter(LessThanFilter(logging.WARNING))
root_handler.addHandler(log_handler_out)
log_handler_err = logging.StreamHandler(sys.stderr)
log_handler_err.setLevel(logging.WARNING)
root_handler.addHandler(log_handler_err)
#degree of parallelism
number_of_threads = 25
# pfns are passed blank-separated
pfns = sys.argv[1:]
if not pfns:
logging.error("%s: No pfns given\n" % sys.argv[0])
sys.exit(1)
#distribute pfns almost equally to threads
n_items, end_index, pfn_chunks = len(pfns), 0, []
for thread in range(number_of_threads):
start_index, end_index = end_index, end_index + (n_items+thread)//number_of_threads
pfn_chunks.append(pfns[start_index:end_index])
threads = [Thread(target=check_status, args=pfn_chunk) for pfn_chunk in pfn_chunks if pfn_chunk]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
#!/usr/bin/env python
from threading import Thread
import logging, os, sys
def stage_files(*pfns):
for pfn in pfns:
# check here the right pfn syntax for your site
if not pfn.startswith('/grid'):
logging.error('%s: wrong pfn skipped: %s\n' % (sys.argv[0], pfn))
continue
if os.system("dccp -P %s" % pfn):
logging.error('%s: dccp -P %s returns error\n' % (sys.argv[0], pfn))
if __name__ == '__main__':
#configure logger
logging.basicConfig(format='%(message)s')
root_handler = logging.getLogger()
root_handler.setLevel(logging.INFO)
#degree of parallelism
number_of_threads = 10
# pfns are passed blank-separated
pfns = sys.argv[1:]
if not pfns:
logging.error("%s: No pfns given\n" % sys.argv[0])
sys.exit(1)
#distribute pfns almost equally to threads
n_items, end_index, pfn_chunks = len(pfns), 0, []
for thread in range(number_of_threads):
start_index, end_index = end_index, end_index + (n_items+thread)//number_of_threads
pfn_chunks.append(pfns[start_index:end_index])
threads = [Thread(target=stage_files, args=pfn_chunk) for pfn_chunk in pfn_chunks[::-1] if pfn_chunk]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment