Skip to content

Instantly share code, notes, and snippets.

@Speccles96
Forked from kingfischer16/get_sas_as_dask.py
Created August 29, 2020 17:01
Show Gist options
  • Save Speccles96/867b9b8622beebee056ae45a5a745d5c to your computer and use it in GitHub Desktop.
Save Speccles96/867b9b8622beebee056ae45a5a745d5c to your computer and use it in GitHub Desktop.
Functionality to read SAS data from a SAS server (or locally) and return dask.dataframe.
"""
GET_SAS_AS_DASK.PY
2019-05-02
kingfischer16
Functionality to read SAS data from a SAS server (or locally) and return
dask.dataframe.
General idea: Using SASPY, build a list of pandas.DataFrames that are blocks
called via a SAS session. These blocks then make up the dask.DataFrame. Helper
functions are included to automate the choice of number of blocks that
should comprise the dask.DataFrame. Dask.delayed ends up calling a number
of parallel sessions to get each of the blocks. Transmission time (getting data
from the SAS servers) was the limiting factor in my case.
This script is provided with no warranty. It has been shown to work
on the systems on which it was developed, but may required some
tweaking to work on your system. Good luck :)
"""
import saspy
import dask.dataframe as dd
from dask import delayed
from psutil import virtual_memory
from multiprocessing import cpu_count
from math import ceil, floor
def sasdata_type_dict(sdata):
"""
Returns the datatypes to set each column in the dataframe.
:param sdata: (SASData object) The active SASData object to get the row count.
:return: (dict) The datatype dictionary.
"""
df_info = sdata.columnInfo()
df_info['DType'] = ''
for r, ix in df_info.iterrows():
var_type = df_info.loc[r, 'Type']
var_format = df_info.loc[r, 'Format']
if var_type == 'Char':
df_info.loc[r, 'DType'] = str
elif var_type == 'Num':
if isinstance(var_format, float):
df_info.loc[r, 'DType'] = float
elif 'date' in var_format.split('.')[0].lower():
df_info.loc[r, 'DType'] = str
elif len(var_format.split('.')[-1]) > 0:
df_info.loc[r, 'DType'] = float
else:
df_info.loc[r, 'DType'] = float
else:
df_info.loc[r, 'DType'] = str
df_convert = df_info[['Variable', 'DType']].set_index('Variable',
inplace=False).copy()
d_conv = df_convert.to_dict(orient='default')['DType']
return d_conv
def sasdata_get_num_obs(sdata):
"""
Returns the number of observations in a saspy.SASData object.
:param sdata: (SASData object) The active SASData object to get the row count.
:return: (int) The row count.
"""
d_contents = sdata.contents()
df_att = d_contents['Attributes']
n_rows = int(df_att[df_att['Label2'] ==
'Observations']['nValue2'].values[0])
return n_rows
def sasdata_get_table_disk_size(sdata):
"""
Returns the table size-on-disk in bytes. Operates on a
saspy.SASData object.
:param sdata: (saspy.SASData) The active SASData object to get the
size on disk.
:return: (int) Table size in bytes.
"""
d_contents = sdata.contents()
df_eng = d_contents['Enginehost']
n_bytes = int(df_eng[df_eng['Label1'] ==
'File Size (bytes)']['nValue1'].values[0])
return n_bytes
def sasdata_calculate_num_parts(sdata):
"""
Calculates the required number of partitions for the given
saspy.SASData object.
:param sdata: (saspy.SASData) The active SASData object to get the
size on disk.
:return: (int) Suggested number of partitions for this data set.
"""
n_total_bytes = sasdata_get_table_disk_size(sdata)
max_part_size = int(floor(virtual_memory().available / cpu_count()))
num_parts = int(ceil(n_total_bytes / max_part_size))
num_parts = max(num_parts, cpu_count())
return num_parts
def calculate_list_block_tuples(total_num_obs, n_partitions=1,
max_rows=None):
"""
Returns a list of tuples (start, stop) of row numbers that divide
the dataset.
:param total_num_obs: (int) The total number of observations expected
in the dataset.
:param n_partitions: (int) The number of chunks in which to split the
dataset. Default is 1 (i.e. no split).
:param max_rows: (int) The maximum number of rows to have per sub-dataset.
Default is None, which will cause the function to rely on the n_partitions
argument. If max_rows is provided, then the max_rows parameter overrides
anything set by n_partitions.
:return: (list, tuple) A list of tuples specifying (start, stop) indices
for splitting the dataset.
"""
if max_rows is not None:
n_partitions = ceil(total_num_obs / max_rows)
# Variables renamed for local convenience.
tot = total_num_obs
n_p = n_partitions
tup_list = [(int((tot / n_p) * (p_i - 1) + 1), int((tot / n_p) * p_i))
for p_i in range(1, n_p + 1)]
return tup_list
def sas_get_block_indices(sascfg_file, sascfg_profile, libpath, table_name,
limit_obs=None, autoexec=None, return_dtypes=False):
"""
An encapsulation function for returning the list of index blocks
for a SAS data object.
:param sascfg_file: (string) The filepath to the SASCFG file used by SASPY.
:param sascfg_profile: (string) The profile name to use for connecting.
Must be found in the SASCFG_FILE.
:param libpath: (string) The complete string to the folder containing
the target table. Libpath must be specified using proper backslash
handling in Python, i.e. '\\' for a '\' in SAS code.
:param table_name: (string) The name of the table to be referenced using
a dask.DataFrame. Table must exist within the 'libpath' location
provided.
:param limit_obs: (int) Default 'None' will add all observations from the
target table into the dask DataFrame. Providing a number will only add
the first 'limit_obs' number of observations to the DataFrame.
:param autoexec: (string) A string of SAS code that will be submitted
upon establishing a connection.
:param return_dtypes: (bool) Option to return a dictionary of the data
types from the SAS table, {variable: dtype}. Default is False.
:return: (list) A list of tuples specifying (start, stop) indices for
splitting the dataset.
"""
librefname = 'fpath'
sess = saspy.SASsession(cfgfile=sascfg_file,
cfgname=sascfg_profile,
autoexec=autoexec)
if isinstance(limit_obs, int):
worktable = 'dfile'
worklib = 'work'
if isinstance(libpath, type(None)):
librefname = 'work'
sas_code = ''
else:
sas_code = f'libname {librefname} "{libpath}";'
sas_code += f'data {worktable}; '
sas_code += f'set {librefname}.{table_name}(obs={limit_obs});'
sas_code += 'run;'
sess.submit(sas_code)
else:
worktable = table_name
if isinstance(libpath, type(None)):
worklib = 'work'
else:
worklib = librefname
sas_code = f'libname {librefname} "{libpath}";'
sess.submit(sas_code)
sdat = sess.sasdata(table=worktable, libref=worklib)
num_total_obs = sasdata_get_num_obs(sdat)
num_parts = sasdata_calculate_num_parts(sdat)
d_types = sasdata_type_dict(sdat)
sess.disconnect()
block_tuples = calculate_list_block_tuples(total_num_obs=num_total_obs,
n_partitions=num_parts)
if return_dtypes:
return block_tuples, d_types
else:
return block_tuples
def sas_get_block_as_dataframe(sascfg_file, sascfg_profile, table_name,
block_tup=None, libpath=None, autoexec=None):
"""
Gets the specified block of rows as a pandas.DataFrame.
:param sascfg_file: (string) The filepath to the SASCFG file used by SASPY.
:param sascfg_profile: (string) The profile name to use for connecting.
Must be found in the 'sascfg_file'.
:param table_name: (string) The name of the data table to be accessed.
Must reside either in the 'libpath' or the 'work' folder (if SAS code is
passed via 'autoexec).
:param block_tup: (tuple) A tuple containing the first and last row
numbers that bound the block which is to be returned. Default is 'None'
which attempts to return the entire SAS table as a pandas.DataFrame.
:param libpath: (string) The library reference to where the target table
is stored. Default is 'None', which will use 'work' as a library reference.
Libpath must be specified using proper backslash handling in Python,
i.e. '\\' for a '\' in SAS code.
:param autoexec: (string) A string of SAS code that will be submitted
upon establishing a connection.
:return: (pandas.DataFrame) The requested block of the SAS dataset as a
pandas.DataFrame.
"""
# Connect to remote session
sess = saspy.SASsession(cfgfile=sascfg_file,
cfgname=sascfg_profile,
autoexec=autoexec)
# set libref
if isinstance(libpath, type(None)):
lib_ref = 'work'
else:
lib_ref = 'libref'
sess.saslib(libref=lib_ref, path=libpath)
# pull chunk of SAS dataset to pandas.DataFrame
if isinstance(block_tup, type(None)):
df_sas = sess.sd2df_CSV(table=table_name, libref=lib_ref)
else:
df_sas = sess.sd2df_CSV(table=table_name, libref=lib_ref,
dsopts={'firstobs': block_tup[0],
'obs': block_tup[1]
})
sdat = sess.sasdata(table=table_name, libref=lib_ref)
d_types = sasdata_type_dict(sdat)
df_sas = df_sas.astype(dtype=d_types, inplace=False)
sess.disconnect()
return df_sas
def get_sas_as_dask_dataframe(sascfg_file, sascfg_profile, libpath, table_name,
limit_obs=None, autoexec=None):
"""
Returns the target SAS table as a dask.DataFrame.
:param sascfg_file: (string) The filepath to the SASCFG file used by SASPY.
:param sascfg_profile: (string) The profile name to use for connecting.
Must be found in the SASCFG_FILE.
:param libpath: (string) The complete string to the folder containing
the target table. Libpath must be specified using proper backslash
handling in Python, i.e. '\\' for a '\' in SAS code.
:param table_name: (string) The name of the table to be accessed. Table
must exist within the 'folder_path' location provided.
:param limit_obs: (int) Default 'None' will add all observations from
the target table into the dask.DataFrame. Providing a number will only
add the first 'limit_obs' number of observations to the DataFrame. Useful
for creating smaller and faster-executing dask.DataFrames.
:param autoexec: (string) A string of SAS code that will be submitted
upon establishing a connection.
:return: (dask.DataFrame, int, dict) The dask.DataFrame computational graph
that references the SAS table as specified by the arguments, an integer
specifying the number of partitions, and a dictionary of the data types.
"""
# get block indices of the target data table
block_tuples, d_types = sas_get_block_indices(sascfg_file=sascfg_file,
sascfg_profile=sascfg_profile,
libpath=libpath,
table_name=table_name,
limit_obs=limit_obs,
autoexec=autoexec,
return_dtypes=True)
# build a list of delayed session calls based on the block indices
dfs = [delayed(sas_get_block_as_dataframe)(sascfg_file=sascfg_file,
sascfg_profile=sascfg_profile,
table_name=table_name,
block_tup=tup,
libpath=libpath,
autoexec=autoexec)
for tup in block_tuples]
n_partitions = len(block_tuples)
ddf = dd.from_delayed(dfs)
return ddf, n_partitions, d_types
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment