Skip to content

Instantly share code, notes, and snippets.

@rstofi
Created July 5, 2022 14:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rstofi/9ac3e91ae6b8ec58b077abe9c85ed167 to your computer and use it in GitHub Desktop.
Save rstofi/9ac3e91ae6b8ec58b077abe9c85ed167 to your computer and use it in GitHub Desktop.
Shell script to simulate mock data and perform grid transform tests between teo servers using the gridftp protocol via the globus-url-copy tool
#!/bin/bash
#Generate random files for a simple grid transfer tests and performs the given tests
#NOTE only generating the data and the last test
#(RUN_COMPLEX_P_AND_BS_GRID_TRANSFER_TEST) should be performed
#
#This test performs transfers with all combinations of the given:
# - data sizes (Ns)
# - number of threads (Np)
# - block sizes (Nbs)
#
#So in total Ns*Np*Nbs transfer test is performed covering this 3d parameter space
#
#
#There are pre-defined tests covering only single node or default block size slice
#of the aformentioned test.
#*******************************************************************************
#=== Set up what the script should do ===
DRY_RUN="True"
GENERATE_DATA="True"
RUN_SIMPLE_GRID_TRANSFER_TEST="False"
RUN_PARALLEL_GRID_TRANSFER_TEST="False"
RUN_BLOCK_SIZE_GRID_TRANSFER_TEST="False"
RUN_COMPLEX_P_AND_BS_GRID_TRANSFER_TEST="True"
#*******************************************************************************
#=== Set up envinroment ===
WORKING_DIR="$(pwd)/"
TEMP_LOGFILE_NAME="temp_logfile.log"
FNAME_BASE="transfer_test_file_size_"
STATFILE_FNAME_BASE="transfer_stats_file_size_"
SOURCE_DIR="cosmogw.kosmo.physik.uni-muenchen.de/home/rozgonyi/grid_transfer_tests/blob/"
DEST_DIR="globus.ilifu.ac.za/idia/users/krozgonyi/grid_transfer_test_results/blob_cosmogw_to_ilifu/"
TMP_LOGFILE="${WORKING_DIR}temp_logfile.log"
#*******************************************************************************
#=== Set timer commands ===
function start_timer () {
echo "Transfer started: $(date '+%Y-%m-%d %H:%M:%S')" >> $1
}
function end_timer () {
echo "Transfer finished: $(date '+%Y-%m-%d %H:%M:%S')" >> $1
}
#*******************************************************************************
#=== Functions ===
function generate_random_file () {
#Generate file named $1 with size $2 in byte (!)
#openssl rand -out $1 $2 #This does not work for large files for some reson
#Generate file named $1 with size $2 in MB (!)
#Here the block size (bs) is set to MB !
dd if=/dev/urandom bs=1000000 count="${2}" of=$1
}
function GB_to_MB() {
#Convert GB to byte
#S=$(echo $1*1000000000 | bc) #This is to byte
S=$(echo $1*1000 | bc)
echo ${S%.*}
}
function MB_to_byte() {
S=$(echo $1*1000000 | bc)
echo ${S%.*}
}
function float_string_to_int() {
#This throws an error but I am gonna ignore this for now
printf "%.0f" "${1}"
}
#*******************************************************************************
#=== MAIN ===
#*******************************************************************************
#=== Set up test parameter space ===
declare -a FILE_SIZES=(2.0 4.0 8.0)
#declare -a FILE_SIZES=(1.0 4.0 16.0 64.0 256.0 1024.0)
declare -a TRANSFER_THREADS=(2 4 8)
#declare -a TRANSFER_THREADS=(1 2 4 16 32 64 128 256)
declare -a TRANSFER_BLOCK_SIZE=(1.0 10.0 100.0) #Block size in MB
#declare -a TRANSFER_BLOCK_SIZE=(1.0 10.0 100.0 1000.0 2000.0 4000.0)
#=== Set up basic options for globus-url-copy ===
BASE_TRANSFER_OPTIONS="-v -c -vb -fast"
#*******************************************************************************
#=== Generate random data from list
if [ $GENERATE_DATA = 'True' ]; then
for S in ${FILE_SIZES[@]}; do
DATAFILE_NAME="${WORKING_DIR}blob/${FNAME_BASE}${S}_GB.dat"
S_IN_MB=$(float_string_to_int $(GB_to_MB $S))
echo "Generate file with size ${S}GB as ${DATAFILE_NAME}..."
if [ $DRY_RUN = 'False' ]; then
generate_random_file $DATAFILE_NAME $S_IN_MB
fi
echo "...done"
done
fi
#*******************************************************************************
#=== Run simple grid transfer test ===
if [ $RUN_SIMPLE_GRID_TRANSFER_TEST = 'True' ]; then
for S in ${FILE_SIZES[@]}; do
DATAFILE_NAME="${FNAME_BASE}${S}_GB.dat"
DATAFILE_PATH="${WORKING_DIR}blob/${DATAFILE_NAME}"
TEMP_LOGFILE="${WORKING_DIR}${TEMP_LOGFILE_NAME}"
STATFILE_NAME="${STATFILE_FNAME_BASE}${S}_GB.dat"
STATFILE_PATH="${WORKING_DIR}${STATFILE_NAME}"
echo "Moving file ${DATAFILE_NAME}..."
if [ $DRY_RUN = 'False' ]; then
#Perform grid test
echo "globus-url-copy options: ${BASE_TRANSFER_OPTIONS}" >> ${TEMP_LOGFILE}
start_timer $TEMP_LOGFILE
#echo "globus-url-copy -v -c -vb gsiftp://${SOURCE_DIR}${DATAFILE_NAME} gsiftp://${DEST_DIR}"
globus-url-copy ${BASE_TRANSFER_OPTIONS} gsiftp://${SOURCE_DIR}${DATAFILE_NAME} gsiftp://${DEST_DIR} >> ${TEMP_LOGFILE}
end_timer $TEMP_LOGFILE
#Convert logfile to readable format > statfile
sed -e "s/\r/\n/g" $TEMP_LOGFILE > $STATFILE_PATH
#Move the statfile NOTE that the ststs/ dir should exist
mv $STATFILE_PATH "${WORKING_DIR}stats/"
#less $TEMP_LOGFILE
rm $TEMP_LOGFILE
else
echo "globus-url-copy options: ${BASE_TRANSFER_OPTIONS} >> ${TEMP_LOGFILE}"
echo "start_timer ${TEMP_LOGFILE}"
echo "globus-url-copy ${BASE_TRANSFER_OPTIONS} gsiftp://${SOURCE_DIR}${DATAFILE_NAME} gsiftp://${DEST_DIR} >> ${TEMP_LOGFILE}"
echo "end_timer ${TEMP_LOGFILE}"
echo "sed -e \"s/\r/\n/g\" ${TEMP_LOGFILE} > ${STATFILE_PATH}"
echo "mv ${STATFILE_PATH} \"${WORKING_DIR}stats/\""
echo "rm $TEMP_LOGFILE"
fi
echo "...done"
echo ""
done
fi
#*******************************************************************************
#=== Run the same grid transfer test but using increasing level of parralel transfer ===
if [ $RUN_PARALLEL_GRID_TRANSFER_TEST = 'True' ]; then
for S in ${FILE_SIZES[@]}; do
DATAFILE_NAME="${FNAME_BASE}${S}_GB.dat"
DATAFILE_PATH="${WORKING_DIR}blob/${DATAFILE_NAME}"
TEMP_LOGFILE="${WORKING_DIR}${TEMP_LOGFILE_NAME}"
echo "Moving file ${DATAFILE_NAME}..."
for P in ${TRANSFER_THREADS[@]}; do
STATFILE_NAME="${STATFILE_FNAME_BASE}${S}_GB_${P}_threads.dat"
STATFILE_PATH="${WORKING_DIR}${STATFILE_NAME}"
#NOTE: the file is overwritten in the destination and so the time
# it takes globus to check this and delete the file will appear in
# the time measured via the time stamps!
TRANSFER_OPTIONS="${BASE_TRANSFER_OPTIONS} -p ${P}"
if [ $DRY_RUN = 'False' ]; then
echo "...using ${P} threads..."
#Perform grid test
echo "globus-url-copy options: ${TRANSFER_OPTIONS}" >> ${TEMP_LOGFILE}
start_timer $TEMP_LOGFILE
globus-url-copy ${TRANSFER_OPTIONS} gsiftp://${SOURCE_DIR}${DATAFILE_NAME} gsiftp://${DEST_DIR} >> $TEMP_LOGFILE
end_timer $TEMP_LOGFILE
#Convert logfile to readable format > statfile
sed -e "s/\r/\n/g" $TEMP_LOGFILE > $STATFILE_PATH
#Move the statfile NOTE that the ststs/ dir should exist
mv $STATFILE_PATH "${WORKING_DIR}stats/"
#less $TEMP_LOGFILE
rm $TEMP_LOGFILE
else
echo ""
echo "...using ${P} threads..."
echo "globus-url-copy options: ${TRANSFER_OPTIONS} >> ${TEMP_LOGFILE}"
echo "start_timer ${TEMP_LOGFILE}"
echo "globus-url-copy ${TRANSFER_OPTIONS} gsiftp://${SOURCE_DIR}${DATAFILE_NAME} gsiftp://${DEST_DIR} >> ${TEMP_LOGFILE}"
echo "end_timer ${TEMP_LOGFILE}"
echo "sed -e \"s/\r/\n/g\" ${TEMP_LOGFILE} > ${STATFILE_PATH}"
echo "mv ${STATFILE_PATH} \"${WORKING_DIR}stats/\""
echo "rm $TEMP_LOGFILE"
#echo "globus delete ${DEST_DIR}/${DATAFILE_NAME}" #This delete the file on the remote end
fi
done
echo "... done"
echo ""
done
fi
#*******************************************************************************
#=== Run the same grid transfer test but using increasing level of block sizes ===
if [ $RUN_BLOCK_SIZE_GRID_TRANSFER_TEST = 'True' ]; then
for S in ${FILE_SIZES[@]}; do
DATAFILE_NAME="${FNAME_BASE}${S}_GB.dat"
DATAFILE_PATH="${WORKING_DIR}blob/${DATAFILE_NAME}"
TEMP_LOGFILE="${WORKING_DIR}${TEMP_LOGFILE_NAME}"
echo "Moving file ${DATAFILE_NAME}..."
for BS_MB in ${TRANSFER_BLOCK_SIZE[@]}; do
BS=$(float_string_to_int $(MB_to_byte $BS_MB))
STATFILE_NAME="${STATFILE_FNAME_BASE}${S}_GB_${BS_MB}_MB_block_size.dat"
STATFILE_PATH="${WORKING_DIR}${STATFILE_NAME}"
#NOTE: the file is overwritten in the destination and so the time
# it takes globus to check this and delete the file will appear in
# the time measured via the time stamps!
TRANSFER_OPTIONS="${BASE_TRANSFER_OPTIONS} -bs ${BS}"
if [ $DRY_RUN = 'False' ]; then
echo "...using ${BS_MB} MB block size..."
#Perform grid test
echo "globus-url-copy options: ${TRANSFER_OPTIONS}" >> ${TEMP_LOGFILE}
start_timer $TEMP_LOGFILE
globus-url-copy ${TRANSFER_OPTIONS} gsiftp://${SOURCE_DIR}${DATAFILE_NAME} gsiftp://${DEST_DIR} >> ${TEMP_LOGFILE}
end_timer $TEMP_LOGFILE
#Convert logfile to readable format > statfile
sed -e "s/\r/\n/g" $TEMP_LOGFILE > $STATFILE_PATH
#Move the statfile NOTE that the ststs/ dir should exist
mv $STATFILE_PATH "${WORKING_DIR}stats/"
#less $TEMP_LOGFILE
rm $TEMP_LOGFILE
else
echo ""
echo "...using ${BS_MB} MB block size..."
echo "globus-url-copy options: ${TRANSFER_OPTIONS} >> ${TEMP_LOGFILE}"
echo "start_timer ${TEMP_LOGFILE}"
echo "globus-url-copy ${TRANSFER_OPTIONS} gsiftp://${SOURCE_DIR}${DATAFILE_NAME} gsiftp://${DEST_DIR} >> ${TEMP_LOGFILE}"
echo "end_timer ${TEMP_LOGFILE}"
echo "sed -e \"s/\r/\n/g\" ${TEMP_LOGFILE} > ${STATFILE_PATH}"
echo "mv ${STATFILE_PATH} \"${WORKING_DIR}stats/\""
echo "rm $TEMP_LOGFILE"
#echo "globus delete ${DEST_DIR}/${DATAFILE_NAME}" #This delete the file on the remote end
fi
done
echo "... done"
echo ""
done
fi
#*******************************************************************************
#=== Run grid transfer test with increasing level of block size and parallelism ===
if [ $RUN_COMPLEX_P_AND_BS_GRID_TRANSFER_TEST = 'True' ]; then
for S in ${FILE_SIZES[@]}; do
DATAFILE_NAME="${FNAME_BASE}${S}_GB.dat"
DATAFILE_PATH="${WORKING_DIR}blob/${DATAFILE_NAME}"
TEMP_LOGFILE="${WORKING_DIR}${TEMP_LOGFILE_NAME}"
echo "Moving file ${DATAFILE_NAME}..."
for P in ${TRANSFER_THREADS[@]}; do
for BS_MB in ${TRANSFER_BLOCK_SIZE[@]}; do
BS=$(float_string_to_int $(MB_to_byte $BS_MB))
STATFILE_NAME="${STATFILE_FNAME_BASE}${S}_GB_${P}_threads_${BS_MB}_MB_block_size.dat"
STATFILE_PATH="${WORKING_DIR}${STATFILE_NAME}"
#NOTE: the file is overwritten in the destination and so the time
# it takes globus to check this and delete the file will appear in
# the time measured via the time stamps!
TRANSFER_OPTIONS="${BASE_TRANSFER_OPTIONS} -p ${P} -bs ${BS}"
if [ $DRY_RUN = 'False' ]; then
echo "... using ${P} threads with ${BS_MB} MB block size..."
#Perform grid test
echo "globus-url-copy options: ${TRANSFER_OPTIONS}" >> ${TEMP_LOGFILE}
start_timer $TEMP_LOGFILE
globus-url-copy ${TRANSFER_OPTIONS} gsiftp://${SOURCE_DIR}${DATAFILE_NAME} gsiftp://${DEST_DIR} >> ${TEMP_LOGFILE}
end_timer $TEMP_LOGFILE
#Convert logfile to readable format > statfile
sed -e "s/\r/\n/g" $TEMP_LOGFILE > $STATFILE_PATH
#Move the statfile NOTE that the ststs/ dir should exist
mv $STATFILE_PATH "${WORKING_DIR}stats/"
#less $TEMP_LOGFILE
rm $TEMP_LOGFILE
else
echo ""
echo "... using ${P} threads with ${BS_MB} MB block size..."
echo "globus-url-copy options: ${TRANSFER_OPTIONS} >> ${TEMP_LOGFILE}"
echo "start_timer ${TEMP_LOGFILE}"
echo "globus-url-copy ${TRANSFER_OPTIONS} gsiftp://${SOURCE_DIR}${DATAFILE_NAME} gsiftp://${DEST_DIR} >> ${TEMP_LOGFILE}"
echo "end_timer ${TEMP_LOGFILE}"
echo "sed -e \"s/\r/\n/g\" ${TEMP_LOGFILE} > ${STATFILE_PATH}"
echo "mv ${STATFILE_PATH} \"${WORKING_DIR}stats/\""
echo "rm $TEMP_LOGFILE"
#echo "globus delete ${DEST_DIR}/${DATAFILE_NAME}" #This delete the file on the remote end
fi
done
echo ""
done
echo "... done"
echo ""
done
fi
"""Utility functions to extract info and tomplot the transfer stats files
generated from the globus-url-copy tests.
The stats file has to have the following structure:
--------------------------------------------------------------------------------
globus-url-copy options: v -c -vb -p 1
Transfer started: 2022-03-15 15:18:55
Source: gsiftp://cosmogw.kosmo.physik.uni-muenchen.de/home/rozgonyi/grid_transfer_tests/
Dest: gsiftp://globus.ilifu.ac.za/users/krozgonyi/grid_transfer_tests/
test.dat
29360128 bytes 5.60 MB/sec avg 5.60 MB/sec inst
...
1000000000 bytes 8.02 MB/sec avg 9.15 MB/sec inst
Transfer finished: 2022-03-15 15:20:58
--------------------------------------------------------------------------------
Getting the info out from this file rely on hard-coded line numbers, so diviation
from this format will break the code!
"""
import os
from os import listdir
from os.path import isfile, join
import copy
import logging
import numpy as np
import numpy.ma as ma
from datetime import datetime
from linecache import getline
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.offsetbox import AnchoredText
from matplotlib.patheffects import withStroke
#=== Globals ===
#RCparams for plotting
matplotlib.rcParams['xtick.direction'] = 'in'
matplotlib.rcParams['ytick.direction'] = 'in'
matplotlib.rcParams['xtick.major.size'] = 9
matplotlib.rcParams['ytick.major.size'] = 9
matplotlib.rcParams['xtick.major.width'] = 3
matplotlib.rcParams['ytick.major.width'] = 3
matplotlib.rcParams['xtick.minor.size'] = 6
matplotlib.rcParams['ytick.minor.size'] = 6
matplotlib.rcParams['xtick.minor.width'] = 2
matplotlib.rcParams['ytick.minor.width'] = 2
matplotlib.rcParams['axes.linewidth'] = 2
plt.rcParams['xtick.labelsize']=16
plt.rcParams['ytick.labelsize']=16
plt.rcParams['legend.fontsize']=16
plt.rcParams['legend.framealpha']=1.0
plt.rcParams['legend.edgecolor']='black'
#4 sampled colors from viridis
c0 = '#440154';#Purple
c1 = '#30678D';#Blue
c2 = '#35B778';#Greenish
c3 = '#FDE724';#Yellow
outlier_color = 'grey'
#=== Globals II ===
_VALID_AXES = ['p', 'bs', 'S']
#*******************************************************************************
#=== Functions ===
def byte_to_MB(s):
"""Convert bytes to MB
"""
return float(s / 1000000)
def MB_to_GB(s):
"""Convert MB to GB
"""
return float(s / 1000)
def MB_per_s_to_Gbps(v):
"""Convert MB/s to Gps (Giga bit per second) i.e. using the byte to bit
conversion as well
"""
return float((v * 8) / 1000)
def s_to_hms(t):
"""Convert seconds to hours:minutes:seconds format string
"""
m, s = divmod(t, 60)
h, m = divmod(m, 60)
return "{0:02d}:{1:02d}:{2:02d}".format(h, m, s)
def split_date(date_string):
"""Split a date string formatted as yyyy-mm-dd to three variables of y m and d
"""
date_chunks = date_string.split('-') #Split by dashes
#Get values converted to int
year = int(date_chunks[0])
month = int(date_chunks[1])
day = int(date_chunks[2])
return year, month, day
def split_time(time_string):
"""Split a time string formatted as hh:mm:ss to three variables of h m and s
"""
time_chunks = time_string.split(':') #Split by dashes
#Get values converted to int
hour = int(time_chunks[0])
minute = int(time_chunks[1])
second = int(time_chunks[2])
return hour, minute, second
def get_sfile_name_list_from_standardly_named_files(fsize_list,
Np_list=None,
bs_list=None,
name_base='transfer_stats_file_size_'):
"""Simple code to get a list of stats file names based on the parameters
provided and the naming scheme used in the `simple_grid_test.sh` script
Parameters:
===========
fsize_list: list
List containing the transferred file sizes. The elements can be float,
int or string type variables. These should be formatted as in the
`simple_grid_test.sh` script
Np_list: list of int, optional
List containing the number of parallel threads used
bs_list: list, optional
List containing the block sizes formatted as in the `simple_grid_test.sh`
script. The type can be float, int or str.
name_base: str, optional
Should be equivalent to the `STATFILE_FNAME_BASE` variable used in the
`simple_grid_test.sh` script
Returns:
========
sfile_list: list of str
A list containing the file names which then can be read in.
"""
sfile_list = []
#When only the file size is provided
if Np_list == None and bs_list == None:
for S in fsize_list:
sfile_list.append('{0:s}{1:s}_GB.dat'.format(name_base,str(S)))
#When the number of threads also provided
elif Np_list != None and bs_list == None:
for S in fsize_list:
for p in Np_list:
sfile_list.append('{0:s}{1:s}_GB_{2:d}_threads.dat'.format(
name_base, str(S), p))
#When the block sizes (but not the number of parallel threads) are provided
elif Np_list == None and bs_list != None:
for S in fsize_list:
for bs in bs_list:
sfile_list.append('{0:s}{1:s}_GB_{2:s}_MB_block_size'.format(
name_base, str(S), str(bs)))
#When all params are provided
else:
for S in fsize_list:
for p in Np_list:
for bs in bs_list:
sfile_list.append('{0:s}{1:s}_GB_{2:d}_threads_{3:s}_MB_block_size.dat'.format(
name_base, str(S), p, str(bs)))
return sfile_list
def check_stats_file_structure(sfile):
"""This routine checks if the stats file is structured as expected and raises
error fot the corresponding part if not
Should help with processing stats files
Parameters:
===========
sfile: str
The name and relative (or full) path to the stats file
Returns:
========
Returns True if the format is valid otherwise should raise an error
"""
#Just call a bunch of functions that should raise the corresponding error
_ = get_globus_url_copy_options(sfile) #First line
_ = get_start_and_end_timestamps(sfile) #Time stamps (2nd and last lines)
_ = get_source_and_dest_servers(sfile, raw_out=True) #Server names (3rd & 4th lines)
_ = get_values_from_stats_entry(getline(sfile, 7).strip()) #Test if the 7th line is a stats entry
_ = get_total_size_transferred(sfile) #Check last stats entry line if exist
return True
def get_globus_url_copy_options(sfile):
"""Get the first line from the stats file
This line contains the globus-url-copy command options
Parameters:
===========
sfile: str
The name and relative (or full) path to the stats file
Returns:
========
options_string: str
The first line of the file
"""
#Get the start time
with open(sfile, mode='r') as f:
#first_line = f.readline().strip() #The strip removes the endline character
options_string = f.readline().strip()
return options_string
def parser_parameters_from_options_string(options_string):
"""Parser the globus-url-copy options to hunam-readable and machine-readable
variables for later automatic processing.
NOTE that the run parameters e.g. number of parallel processes could be read
from the options string
Parameters:
===========
options_string: str
The first line of the stats file i.e. get by using `get_globus_url_copy_options`
Retruns:
========
globus_url_copy_options_dict: dict
Dictionary containing the option switches and the values if any
"""
#Split string
line_chunks = options_string.split(' ')
#Check if we read the right string
if '{0:s} {1:s}'.format(line_chunks[0],line_chunks[1]) != 'globus-url-copy options:':
raise ValueError('Bady formatted globus-url-copy options string!')
#Initialise the output dict
globus_url_copy_options_dict = {}
#Loop through the options and populate the dict
for i in range(2,len(line_chunks)):
if line_chunks[i][0] == '-':
if i == len(line_chunks) - 1:
globus_url_copy_options_dict[line_chunks[i]] = None #If looking at last argument
else:
if line_chunks[i+1][0] != '-':
globus_url_copy_options_dict[line_chunks[i]] = line_chunks[i+1]
else:
globus_url_copy_options_dict[line_chunks[i]] = None
else:
pass
return globus_url_copy_options_dict
def get_p_and_bs(sfile):
"""Get the number of parallel threads and block size from the stats file header
NOTE the default values are:
- p = 1
- bs = 0 MB (because I have no ide of the default)
Parameters:
===========
sfile: str
The name and relative (or full) path to the stats file
Returns:
========
p: int
Number of parallel threads used in the test
bs: float
The block size used for the transfer in [MB]
"""
globus_url_copy_options_dict = \
parser_parameters_from_options_string(get_globus_url_copy_options(sfile))
#This is fixing an error but I don't see what....
p = int(1)
bs = float(0.0)
#Get p
if '-p' in globus_url_copy_options_dict:
p = int(globus_url_copy_options_dict['-p'])
elif '-parallel' in globus_url_copy_options_dict:
p = int(globus_url_copy_options_dict['-parallel'])
else:
p = int(1)
#Get block size
if '-bs' in globus_url_copy_options_dict:
bs = byte_to_MB(float(globus_url_copy_options_dict['-bs']))
elif '-block-size' in globus_url_copy_options_dict:
bs = byte_to_MB(float(globus_url_copy_options_dict['-block-size']))
else:
bs = float(0.0)
return p, bs
def get_start_and_end_timestamps(sfile):
"""Get the start and end timestamps as strings from the stats file
Parameters:
===========
sfile: str
The name and relative (or full) path to the stats file
Returns:
========
start_time: str
The start date and time formatted as a string
end_time: str
The end date and time formatted as a string
"""
#Get the start time
#with open(sfile, mode='r') as f:
# first_line = f.readline().strip() #The strip removes the endline character
#Hardcoded the first timestamp line no
#NOTE this is not the first but the second line in the stats file !
first_line = getline(sfile, 2).strip() #The strip removes the endline character
#Extract date and time from string and convert to a python time object
line_chunks = first_line.split(' ') #Split by spaces
if line_chunks[0] + ' ' + line_chunks[1] != 'Transfer started:':
raise ValueError('The first line of the file contains wrongly formatted time stamp!')
start_time = str(line_chunks[2] + ' ' + line_chunks[3])
#Get the end time
#Get the start time
with open(sfile, mode='r') as f:
for line in f:
pass
last_line = line #No need for strip
#Extract date and time from string and convert to a python time object
line_chunks = last_line.split(' ') #Split by spaces
if line_chunks[0] + ' ' + line_chunks[1] != 'Transfer finished:':
raise ValueError('The last line of the file contains wrongly formatted time stamp!')
end_time = str(line_chunks[2] + ' ' + line_chunks[3])
return start_time, end_time
def get_source_and_dest_servers(sfile, raw_out=False):
"""Get the string defining the start and end point servers used for the test
from the stats file
NOTE: this function returns the specific locations if the raw_out is defined!
Parameters:
===========
sfile: str
The name and relative (or full) path to the stats file
raw_out: bool, opt
If true, the raw strings returned else the code attemps to extract the
server name from the string
Returns:
========
source_server: str
The line string containing the source server
dest_server: str
The line string containing the destination server
"""
#NOTE that the line numbers are hard-coded in the stats file generation (!)
#Get startpoint server and location
source_server = getline(sfile, 3).strip() #The strip removes the endline character
if source_server.split()[0] != 'Source:':
#print(source_server.split()[0])
raise ValueError('Wrongly formatted stats file!')
#Get endpoint server and location
dest_server = getline(sfile, 4).strip() #The strip removes the endline character
if dest_server.split()[0] != 'Dest:':
raise ValueError('Wrongly formatted stats file!')
if raw_out:
return source_server.split()[1], dest_server.split()[1]
else:
#Extract server names
source_server_name = source_server.split()[1].split('/')[2]
dest_server_name = dest_server.split()[1].split('/')[2]
return source_server_name, dest_server_name
def get_transfer_time(sfile):
"""Read the total transfer time from the time stamps
The time stamps should be formatted as:
`Transfer started: yyyy-mm-dd hh:mm:ss`
...file..
`Transfer finished: yyyy-mm-dd hh:mm:ss`
Parameters:
===========
sfile: str
The name and relative (or full) path to the stats file
Returns:
========
transfer_time: int
The total transfer time in seconds
"""
#=== Start time stamp
#Get start and end time strings
start_time_string, end_time_string = get_start_and_end_timestamps(sfile)
timestamps = []
for time_stamp in [start_time_string, end_time_string]:
line_chunks = time_stamp.split(' ') #Split by spaces
year, month, day = split_date(line_chunks[0])
hour, minute, second = split_time(line_chunks[1])
timestamps.append(datetime(year=year, month=month, day=day,
hour=hour, minute=minute, second=second,
microsecond=0)) #We only work with second precision
#Calculate time difference in seconds (!)
#By using the builtin datetime function I solve the problem of leap seconds
#and February the 29th...
transfer_time = (timestamps[1]-timestamps[0]).total_seconds()
return int(transfer_time)
def get_values_from_stats_entry(stats_entry_line):
"""Get the values of:
- data transferred [bytes]
- average transfer rate [MB/s]
- instantaneous transfer rate [MB/s]
from a single entry line string read from a stats file
The input entry line thus should be formatted as:
` 769916928 bytes 7.73 MB/sec avg 9.40 MB/sec inst`
Parameters:
===========
stats_entry_line: str
String containing a single stats entry line from the stats file
Returns:
========
data_transferred: float
The data transferred in MB (!)
avg_transfer_rate: float
The average data transfer rate in MB/s
inst_transfer_rate: float
The instantaneous data transfer rate in MB/s
"""
#Split the line string
stat_chunks = stats_entry_line.split()
#Get data transferred
if stat_chunks[1] != 'bytes':
raise ValueError('Wrongly formatted stats line!')
data_transferred = byte_to_MB(int(stat_chunks[0]))
#Get average transfer rate
if stat_chunks[3] != 'MB/sec' or stat_chunks[4] != 'avg':
raise ValueError('Wrongly formatted stats line!')
avg_transfer_rate = float(stat_chunks[2])
#Get instantaneous transfer rate
if stat_chunks[6] != 'MB/sec' or stat_chunks[7] != 'inst':
raise ValueError('Wrongly formatted stats line!')
inst_transfer_rate = float(stat_chunks[5])
return data_transferred, avg_transfer_rate, inst_transfer_rate
def get_total_size_transferred(sfile, unit='MB'):
"""Read from the file the total number of bytes transferred
NOTE: this might not equal the size of the original file if the transfer fails!
Parameters:
===========
sfile: str
The name and relative (or full) path to the stats file
unit: str
The unit in which the data is returned. Valid options are:
['MB', 'GB']
Returns:
========
transfer_size: int
The total transferred file size in bytes
"""
#Get the number of lines because negative indexing does not work for getline()
number_of_lines = len(open(sfile).readlines())
last_entry_line_no = int(number_of_lines - 2)
last_stat_entry = getline(sfile, last_entry_line_no).strip() #The strip removes the endline character
#print(last_stat_entry)
transfer_size, avg_transfer_rate, inst_transfer_rate = get_values_from_stats_entry(last_stat_entry)
if unit == 'MB':
return transfer_size
elif unit == 'GB':
return MB_to_GB(transfer_size)
else:
raise ValueError('Invalid unit is specified')
def get_avg_transfer_rate(sfile):
"""Get the average transfer rate from:
- manually computing from time stamp and overall transferred size
- from the last average transfer rate stats entry
Parameters:
===========
sfile: str
The name and relative (or full) path to the stats file
Returns:
========
manual_avg_transfer_rate: float
Average data transfer rate computed manually from file size and time stamps
[MB/s]
avg_transfer_rate: float
Average data transfer rate read from the stats file
[MB/s]
diff_avg_transfer_rate: float
Absolute difference between `manual_avg_transfer_rate` and `avg_transfer_rate`
[MB/s]
"""
#Compute the manual value
data_transferred = get_total_size_transferred(sfile, unit='MB')
transfer_time = get_transfer_time(sfile)
manual_avg_transfer_rate = data_transferred / transfer_time
#Get the stats value
number_of_lines = len(open(sfile).readlines())
last_entry_line_no = int(number_of_lines - 2)
last_stat_entry = getline(sfile, last_entry_line_no).strip() #The strip removes the endline character
#print(last_stat_entry)
transfer_size, avg_transfer_rate, inst_transfer_rate = get_values_from_stats_entry(last_stat_entry)
#Compute the (absolute) difference
diff_avg_transfer_rate = np.fabs(np.subtract(avg_transfer_rate, manual_avg_transfer_rate))
#Return both values
return manual_avg_transfer_rate, avg_transfer_rate, diff_avg_transfer_rate
def get_stat_arrays(sfile):
"""Get the time-dependent statistics from the file.
The main lines should have the following format:
` 769916928 bytes 7.73 MB/sec avg 9.40 MB/sec inst`
this script returns 4 arrays of:
- a relative measurement time array [arbitary time units] (!)
- data transferred [bytes]
- average transfer rate [MB/s]
- instantaneous transfer rate [MB/s] (this is measured over the arbitary time unit)
Since, I have no idea how frequently globus-url-copy updates it's stats I just
consider this interval as an arbitary time unit with CONSTANT update rate
As such, the unit of the first vector is meaningless, and should be used only
for qualitative analysis.
Parameters:
===========
sfile: str
The name and relative (or full) path to the stats file
Returns:
========
time_array: <numpy.Ndarray>
Array containing the relative measurement time array [arbitary time units] (!)
size_array: <numpy.Ndarray>
Array containing the data transferred [MB]
avg_transfer_rate_array: <numpy.Ndarray>
Array containing the average transfer rate [MB/s]
inst_transfer_rate_array: <numpy.Ndarray>
Array containing the instantaneous transfer rate [MB/s]
"""
#Get the number of lines in the file
number_of_lines = len(open(sfile).readlines())
#The first 6 lines are the header
#The last 2 lineas are the railing empty line and time stamp
#NOTE that I initialise the output vectors with 0-s hence, the extra one entry
N_stats_entries = int(number_of_lines - 7)
#Initialise output arrays
time_array = np.zeros(N_stats_entries)
size_array = np.zeros(N_stats_entries)
avg_transfer_rate_array = np.zeros(N_stats_entries)
inst_transfer_rate_array = np.zeros(N_stats_entries)
line_no = 7 #Hard-coding the first line
for i in range(1,N_stats_entries):
stats_line = getline(sfile, line_no).strip() #The strip removes the endline character
data_transferred, avg_transfer_rate, inst_transfer_rate = get_values_from_stats_entry(stats_line)
time_array[i] = i
size_array[i] = data_transferred
avg_transfer_rate_array[i] = avg_transfer_rate
inst_transfer_rate_array[i] = inst_transfer_rate
line_no += 1
return time_array, size_array, avg_transfer_rate_array, inst_transfer_rate_array
#*******************************************************************************
#=== Plotting functions ===
def create_stats_summary_plot(sfile, ofname):
"""Create a summary plot from a stats file for a single grid dtansfer test
Parameters:
===========
sfile: str
The name and relative (or full) path to the stats file
ofname: str
The oupput file name and relative (or full) path
Returns:
========
Creates a summary image
"""
#Get the stats
ransfer_time = s_to_hms(get_transfer_time(sfile)) #In h:m_s string
data_transferred = get_total_size_transferred(sfile, unit='GB')
manual_avg_transfer_rate, avg_transfer_rate, diff_avg_transfer_rate = get_avg_transfer_rate(sfile)
time_array, size_array, avg_transfer_rate_array, inst_transfer_rate_array = get_stat_arrays(sfile)
#Ger some strings for stats
#options_string = get_globus_url_copy_options(sfile)
p, bs = get_p_and_bs(sfile)
start_time_string, end_time_string = get_start_and_end_timestamps(sfile)
source_server_string, dest_server_string = get_source_and_dest_servers(sfile, raw_out=False)
#Create plot
fig, ax = plt.subplots(1, 1, figsize=(12.,6.5))
fig.suptitle("Avg transfer rate: {3:.2f} +/- {4:.2f} MB/s | Elapsed time: {7:s} \n\
File size: {0:.2f} GB | Parallel threads: {8:d} | Block size: {9:.2f} MB\n\
Transfer started: {1:s} | finished: {2:s}\
Source: {5:s} | Dest: {6:s}".format(
data_transferred, start_time_string, end_time_string,
avg_transfer_rate, diff_avg_transfer_rate,
source_server_string, dest_server_string,
ransfer_time, p, bs),
fontsize=18)
#Note that after the time strings a line separator is missing on purpose
#Adjust for long title
fig.subplots_adjust(top=0.8)
#Original axis
#Average transfer rate
lns2 = ax.plot(time_array[1:], avg_transfer_rate_array[1:], lw=4, alpha=0.5, c=c0,
label='Avg transfer rate')
#Instantaneous transfer rate
lns3 = ax.step(time_array[1:], inst_transfer_rate_array[1:], lw=3, alpha=0.75, c=c2,
label='Inst transfer rate')
#Indicate the "error" based on the manual and stats average transfer rate
ax.fill_between(time_array[1:], avg_transfer_rate_array[1:] - diff_avg_transfer_rate,
avg_transfer_rate_array[1:] + diff_avg_transfer_rate, facecolor=c0, alpha=0.2)
#Second y axis
ax2 = ax.twinx()
lns1 = ax2.plot(time_array, size_array, lw=4, alpha=0.5, color=c1, zorder=0,
label='Data transferred')
ax2.set_yscale("log")
#Add legend
lns = lns1+lns2+lns3
labs = [l.get_label() for l in lns]
ax.legend(lns, labs, loc='best').get_frame().set_linewidth(2)
#Set zorder
ax.set_zorder(ax2.get_zorder()+1)
ax.patch.set_visible(False)
ax.grid(lw=1.5, linestyle='-', alpha=.4, color=outlier_color)
ax2.yaxis.grid(True, which='major', lw=1.5, linestyle='--', alpha=.4, color=c1)
ax2.set_ylabel(r'Data transferred [MB]', color=c1, fontsize = 18)
ax.set_xlabel(r'Time [arbitary units]', fontsize = 18)
ax.set_ylabel(r'Transfer rate [MB/s]', fontsize = 18)
#plt.show()
plt.savefig(ofname, bbox_inches='tight')
plt.close()
def quick_and_dirty_simple_analysis_all_stats_results(working_dir_path=None,fig_extension='.png'):
"""Routine to generate summary plots from arbitary stats files.
Takes ALL stat files in the `working_dir`/stats/ directory and try to generate
the summary plots for all. Continues on faliure i.e. if the stats files are
formatted badly because the transfer is failed.
The output is generated to the `working_dir`/figures/ directory using the same
names as the stat files.
Parameters:
===========
working_dir_path: str, optional
Full path of the parent directory of the stats/ and figures/ directories
The figures directory should have a quick_and_dirty/ subdiectory!
fig_extension: str, optional
String defining the type of images created
Returns:
========
Creates summary plots for all stats files under figures/quick_and_dirty/
"""
if working_dir_path == None:
working_dir = './'
else:
working_dir = working_dir_path
stats_dir_path = working_dir + 'stats/'
#Get the stats file list
stats_file_list = [f for f in listdir(stats_dir_path) if isfile(join(stats_dir_path, f))]
if len(stats_file_list) == 0:
raise ValueError('No stat files found under {0:s} !'.format(stats_dir_path))
#Loop through the files and create summary plots
for stats_file in stats_file_list:
image_name = stats_file.rsplit(".",1)[0] + fig_extension
simage_path = working_dir + 'figures/quick_and_dirty/' + image_name
sfile_path = stats_dir_path + stats_file
#Try to generate the images
print('Generating summary image {0:s} ...'.format(image_name))
try:
check_stats_file_structure(test_spath)
create_stats_summary_plot(sfile_path, ofname=simage_path)
print('...done')
except:
#print(check_stats_file_structure(test_spath))
#create_stats_summary_plot(sfile_path, ofname=simage_path)
print('... error reading in the file ... proceed')
def get_parameterspace_slice(stats_file_list,
axis_to_get='p',
working_dir_path=None,
continue_on_size_error=False):
"""The core function to generate a transfer rate -- number of threads
OR ransfer rate -- block size OR transfer rate -- file size DATA
The input list should contain stats files which has the same sized data
transferred and with the same block size or with the same number of threads,
or with both... i.e. this function gets a data cur in the parameterspace
Obviously the servers should be the same for all tests
NOTE the code only works if the `stats_file_list` contains only the proper
stas files! So be careful of the `axis_to_plot` value
Parameters:
===========
stats_file_list: str
The name of the stats files. Note that the files should be under the stats/
directory. The transferred file and block sizes should be equivalent
axis_to_plot: str, opt
If to plot the dependence on the number of parallel threads ('p') or in the
block size ('bs'), or get the file size dependence ('S')
working_dir_path: str, optional
Full path of the parent directory of the stats/ and figures/ directories
continue_on_size_error: bool, optional
If a transfer fails (i.e. timed out) usually the sfile is generated, but
not all the data is transferred. In this case this code halts. However,
if this parameter is set to True, the code runs as normal and returns nan
values for the failed transfers rather then the measured values
Returns:
========
dependence_array: <numpy.nDarray>
Either the number of parallel threads, or the block size values
avg_transfer_rate_array: <numpy.nDarray>
Array containing the avargae trans fer rates
diff_avg_transfer_rate_array: <numpy.nDarray>
Array containing the 'error' for the average transfer rates
params_dict: dict
A dictionary containing some metadata shared between the tests as follows:
params_dict['p'] = p_baseline
params_dict['bs'] = bs_baseline
params_dict['source'] = source_server_string_baseline
params_dict['dest'] = dest_server_string_baseline
params_dict['S'] = data_transferred_baseline
"""
if axis_to_get not in _VALID_AXES:
raise ValueError('Invalid axis for generating dependence vectors is given!')
if working_dir_path == None:
working_dir = './'
else:
working_dir = working_dir_path
stats_dir_path = working_dir + 'stats/'
stats_file_list = [stats_dir_path + sfile for sfile in stats_file_list]
#Declare arrays to get
avg_transfer_rate_array = [] #Average transfer rate
diff_avg_transfer_rate_array = [] #transfger rate uncertainity
params_dict = {}
if axis_to_get == 'p':
N_parallel_threads = [] #Number of parallel threads
elif axis_to_get == 'bs':
bs_array = [] #Block sizes
elif axis_to_get == 'S':
S_array = []
#Initialise block size and transferred file size
p_baseline, bs_baseline = get_p_and_bs(stats_file_list[0])
source_server_string_baseline, dest_server_string_baseline = \
get_source_and_dest_servers(stats_file_list[0], raw_out=False)
data_transferred_baseline = get_total_size_transferred(stats_file_list[0], unit='GB')
params_dict['S'] = data_transferred_baseline
params_dict['p'] = p_baseline
params_dict['bs'] = bs_baseline
params_dict['source'] = source_server_string_baseline
params_dict['dest'] = dest_server_string_baseline
#Set up flag
flag = False
#Loop through the files and gather the relevant data
for sfile in stats_file_list:
#=== Perform sopme checks
#Get p and bs
p, bs = get_p_and_bs(sfile)
if axis_to_get == 'p' or axis_to_get == 'S':
if bs != bs_baseline:
raise ValueError('Different block size in the input data!')
elif axis_to_get == 'bs' or axis_to_get == 'S':
if p != p_baseline:
raise ValueError('Different number of threads in the input data!')
#Get the size of the transferred data
data_transferred = get_total_size_transferred(sfile, unit='GB')
if axis_to_get == 'S':
S_array.append(data_transferred)
else:
if data_transferred - data_transferred_baseline > 2: #2 Byte precision...
if continue_on_size_error:
flag = True
else:
raise ValueError('Different transferred data size in the input data!')
#Get the server locations
source_server_string, dest_server_string = \
get_source_and_dest_servers(sfile, raw_out=False)
if source_server_string != source_server_string_baseline:
raise ValueError('Different source servers in the input data!')
if dest_server_string != dest_server_string_baseline:
raise ValueError('Different destination servers in the input data!')
#Get the data for the plot
if axis_to_get == 'p':
N_parallel_threads.append(p)
elif axis_to_get == 'bs':
bs_array.append(bs)
#Use the value measured by the software
if axis_to_get != 'S':
if flag:
avg_transfer_rate_array.append(np.nan)
diff_avg_transfer_rate_array.append(np.nan)
else:
manual_avg_transfer_rate, avg_transfer_rate, diff_avg_transfer_rate = \
get_avg_transfer_rate(sfile)
avg_transfer_rate_array.append(avg_transfer_rate)
diff_avg_transfer_rate_array.append(diff_avg_transfer_rate)
else:
manual_avg_transfer_rate, avg_transfer_rate, diff_avg_transfer_rate = \
get_avg_transfer_rate(sfile)
avg_transfer_rate_array.append(avg_transfer_rate)
diff_avg_transfer_rate_array.append(diff_avg_transfer_rate)
if axis_to_get == 'p':
dependence_array = N_parallel_threads
elif axis_to_get == 'bs':
dependence_array = bs_array
elif axis_to_get == 'S':
dependence_array = S_array
return dependence_array, avg_transfer_rate_array, diff_avg_transfer_rate_array, params_dict
def plot_parameterspace_slice_dependence(dependence_array,
avg_transfer_rate_array,
diff_avg_transfer_rate_array,
params_dict,
axis_to_plot='p',
working_dir_path=None,
ofname_base='transfer_dependence_on_',
fig_extension='.png'):
"""The core function to generate a transfer rate -- number of threads plot
OR ransfer rate -- block size OR transfer rate -- data size PLOT
The input arrays should be generated using the function
`get_parameterspace_slice`
Parameters:
===========
dependence_array: <numpy.nDarray>
Either the number of parallel threads, or the block size values
avg_transfer_rate_array: <numpy.nDarray>
Array containing the avargae trans fer rates
diff_avg_transfer_rate_array: <numpy.nDarray>
Array containing the 'error' for the average transfer rates
params_dict: dict
A dictionary containing some metadata shared between the tests as follows:
params_dict['p'] = p_baseline
params_dict['bs'] = bs_baseline
params_dict['source'] = source_server_string_baseline
params_dict['dest'] = dest_server_string_baseline
params_dict['S'] = data_transferred_baseline
ofname_base: str, opt
The base of the output file name
fig_extension: str, optional
String defining the type of images created
Returns:
========
Create the plot defined under the figures/ subdir of the `working_dir_path`
"""
if axis_to_plot not in _VALID_AXES:
raise ValueError('Invalid axis for generating dependence vectors is given!')
if working_dir_path == None:
working_dir = './'
else:
working_dir = working_dir_path
#=== Create the plot
fig, ax = plt.subplots(1, 1, figsize=(8.,8.5))
if axis_to_plot == 'p':
Np_string_list = np.array(dependence_array, dtype=str)
fig.suptitle("File size: {0:.2f} GB | Block size: {1:.2f} MB\n\
Source: {2:s}\nDest: {3:s}".format(
params_dict['S'], params_dict['bs'],
params_dict['source'], params_dict['dest']),
fontsize=16)
ax.bar(Np_string_list, avg_transfer_rate_array,
fc=c1, alpha=0.9)
ax.errorbar(Np_string_list, avg_transfer_rate_array,
yerr=diff_avg_transfer_rate_array, fmt=" ",
color=c2, lw=5, alpha=1.0)
ax.set_xlabel(r'Number of parallel threads', fontsize = 18)
ax.set_ylabel(r'Transfer rate [MB/s]', fontsize = 18)
#Get the name of the output file
ofname = ofname_base + 'number_of_threads_{0:.2f}_GB_{1:.1f}_MB_block_size{2:s}'.format(
params_dict['S'], params_dict['bs'], fig_extension)
elif axis_to_plot == 'bs':
bs_string_list = np.array(dependence_array, dtype=str)
fig.suptitle("File size: {0:.2f} GB | N threads: {1:d}\n\
Source: {2:s}\nDest: {3:s}".format(
params_dict['S'], params_dict['p'],
params_dict['source'], params_dict['dest']),
fontsize=16)
ax.bar(bs_string_list, avg_transfer_rate_array,
fc=c1, alpha=0.9)
ax.errorbar(bs_string_list, avg_transfer_rate_array,
yerr=diff_avg_transfer_rate_array, fmt=" ",
color=c2, lw=5, alpha=1.0)
ax.set_xlabel(r'Block size [MB]', fontsize = 18)
ax.set_ylabel(r'Transfer rate [MB/s]', fontsize = 18)
#Get the name of the output file
ofname = ofname_base + 'block_size_{0:.2f}_GB_{1:d}_threads{2:s}'.format(
params_dict['S'], int(params_dict['p']), fig_extension)
elif axis_to_plot == 'S':
S_string_list = np.array(dependence_array, dtype=str)
fig.suptitle("Parallel threads: {0:d} GB | Block size: {1:.2f} MB\n\
Source: {2:s}\nDest: {3:s}".format(
params_dict['p'], params_dict['bs'],
params_dict['source'], params_dict['dest']),
fontsize=16)
ax.bar(S_string_list, avg_transfer_rate_array,
fc=c1, alpha=0.9)
ax.errorbar(S_string_list, avg_transfer_rate_array,
yerr=diff_avg_transfer_rate_array, fmt=" ",
color=c2, lw=5, alpha=1.0)
ax.set_xlabel(r'Transferred data size [GB]', fontsize = 18)
ax.set_ylabel(r'Transfer rate [MB/s]', fontsize = 18)
#Get the name of the output file
ofname = ofname_base + 'file_size_{0:d}_threads_{1:.1f}_MB_block_size{2:s}'.format(
params_dict['p'], params_dict['bs'], fig_extension)
#plt.show()
plt.savefig(working_dir + 'figures/' + ofname, bbox_inches='tight')
plt.close()
def plot_complex_P_and_BS_results(fsize,
Np_list,
bs_list,
name_base='transfer_stats_file_size_',
fig_extension='.png'):
"""Script to create summary plots from the stats files created by the
`RUN_COMPLEX_P_AND_BS_GRID_TRANSFER_TEST` of the `simple_grid_test.sh` script
This scrip excepts all files exist under the stats/ dir following the conventional
naming scheme
If some of the transfers failed, we can skip these data by setting the
`continue_on_error` parameter to True
Parameters:
===========
Returns:
========
Create the plot defined under the figures/ subdir of the `working_dir_path`
"""
#Define the parameterspace shape and initialise p, bs vectors and the transfer rate matrix
bs_array = []
transfer_rate_matrix = np.zeros((len(bs_list),len(Np_list)))
#Now loop trough the bs values and fill up the vectors and matrices to plot
for block_size, i in zip(bs_list,range(0,len(bs_list))):
#Get the file names
sfile_list = get_sfile_name_list_from_standardly_named_files(fsize_list=[fsize],
Np_list=Np_list,
bs_list=[block_size])
#Get the values
dependence_array, avg_transfer_rate_array, diff_avg_transfer_rate_array, params_dict = \
get_parameterspace_slice(sfile_list,
axis_to_get='p',
working_dir_path=working_dir,
continue_on_size_error=True)
bs_array.append(block_size)
p_array = np.array(dependence_array)
transfer_rate_matrix[i,:] = avg_transfer_rate_array
bs_array = np.array(bs_array)
#=== Create the plot
fig, ax = plt.subplots(1, 1, figsize=(12,8))
fig.suptitle("File size: {0:.2f} GB\n\
Source: {1:s}\nDest: {2:s}".format(
params_dict['S'],
params_dict['source'], params_dict['dest']),
fontsize=18)
ax.set_aspect(0.5) #Square pixels
img = ax.imshow(transfer_rate_matrix)
cb = plt.colorbar(img, aspect=30, fraction=0.04975, pad=0)
cb.ax.yaxis.get_offset_text().set_fontsize(18)
cb.ax.tick_params(labelsize=18)
cb.ax.tick_params(direction='in', length=6, width=2)
cb.ax.set_ylabel(r'Transfer rate [MB/s]', fontsize = 18)
ax.set_xticklabels(np.concatenate([[0],p_array]), fontsize=18)
ax.set_yticklabels(np.concatenate([[0],bs_array]), fontsize=18)
plt.xlabel(r'Parallel threads', fontsize = 18)
plt.ylabel(r'Block size [MB]', fontsize = 18)
#Save the fig
ofname = working_dir + 'figures/transfer_summary_{0:s}_GB{1:s}'.format(
str(fsize),fig_extension)
#plt.show()
plt.savefig(ofname, bbox_inches='tight')
plt.close()
#*******************************************************************************
#=== MAIN ===
if __name__ == "__main__":
#Decide what plots to create
quick_and_dirty_all = False
plot_1T_file_p_dependence = False
plot_1T_file_bs_dependence = False
plot_S_dependence_for_fixed_p_and_bs = False
plot_full_parameterspace_where_possible = True
#=== Set up envinroment
working_dir = '/home/krozgonyi/GLOW_cluster/grid_tests/'
test_sname = 'transfer_stats.dat'
test_spath = working_dir + 'test_stats/' + test_sname
test_summary_image_path = working_dir + 'figures/test.pdf'
#=== Testing some of the base code ===
#check_stats_file_structure(test_spath)
#transfer_time = get_transfer_time(test_spath)
#data_transferred = get_total_size_transferred(test_spath, unit='GB')
#manual_avg_transfer_rate, avg_transfer_rate, diff_avg_transfer_rate = get_avg_transfer_rate(test_spath)
#time_array, size_array, avg_transfer_rate_array, inst_transfer_rate_array = get_stat_arrays(test_spath)
#create_stats_summary_plot(test_spath, ofname=test_summary_image_path)
#exit()
#=====================================
#=== Real-life examples ===
#Quck and dirty diagnostoics plots for all files
if quick_and_dirty_all:
quick_and_dirty_simple_analysis_all_stats_results()
#=== Get example thread dependence
#Get some plots on the dependence of parallel threads using the 1024GB data
if plot_1T_file_p_dependence:
sfile_list = get_sfile_name_list_from_standardly_named_files(
fsize_list=[1024.0],
Np_list=[2,4,16,32,64,128,256],
bs_list=[100.0])
dependence_array, avg_transfer_rate_array, diff_avg_transfer_rate_array, params_dict = \
get_parameterspace_slice(sfile_list,
axis_to_get='p',
working_dir_path=working_dir)
plot_parameterspace_slice_dependence(dependence_array,
avg_transfer_rate_array,
diff_avg_transfer_rate_array,
params_dict,
axis_to_plot='p',
working_dir_path=working_dir)
#=== Get block size dependence
if plot_1T_file_bs_dependence:
sfile_list = get_sfile_name_list_from_standardly_named_files(
fsize_list=[1024.0],
Np_list=[256],
bs_list=[10.0, 100.0, 1000.0, 2000.0, 4000.0])
dependence_array, avg_transfer_rate_array, diff_avg_transfer_rate_array, params_dict = \
get_parameterspace_slice(sfile_list,
axis_to_get='bs',
working_dir_path=working_dir)
plot_parameterspace_slice_dependence(dependence_array,
avg_transfer_rate_array,
diff_avg_transfer_rate_array,
params_dict,
axis_to_plot='bs',
working_dir_path=working_dir)
if plot_S_dependence_for_fixed_p_and_bs:
sfile_list = get_sfile_name_list_from_standardly_named_files(
fsize_list=[4.0, 16.0, 64.0, 256.0, 1024.0],
Np_list=[256],
bs_list=[100.0])
dependence_array, avg_transfer_rate_array, diff_avg_transfer_rate_array, params_dict = \
get_parameterspace_slice(sfile_list,
axis_to_get='S',
working_dir_path=working_dir)
plot_parameterspace_slice_dependence(dependence_array,
avg_transfer_rate_array,
diff_avg_transfer_rate_array,
params_dict,
axis_to_plot='S',
working_dir_path=working_dir)
#== Create summary plots for the p-bs parameterspace
if plot_full_parameterspace_where_possible:
fsize_list = [1.0, 4.0, 16.0, 64.0, 256.0]
for fsize in fsize_list:
plot_complex_P_and_BS_results(fsize=fsize,
Np_list=[1,2,4,16,32,64,128,256],
bs_list=[1.0,10.0,100.0,1000.0,2000.0,4000.0])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment