-
-
Save srikanthbojja/65a2e168c70dc5c11065f977f52ff7cc to your computer and use it in GitHub Desktop.
Functionality to read SAS data from a SAS server (or locally) and return dask.dataframe.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
GET_SAS_AS_DASK.PY | |
2019-05-02 | |
kingfischer16 | |
Functionality to read SAS data from a SAS server (or locally) and return | |
dask.dataframe. | |
General idea: Using SASPY, build a list of pandas.DataFrames that are blocks | |
called via a SAS session. These blocks then make up the dask.DataFrame. Helper | |
functions are included to automate the choice of number of blocks that | |
should comprise the dask.DataFrame. Dask.delayed ends up calling a number | |
of parallel sessions to get each of the blocks. Transmission time (getting data | |
from the SAS servers) was the limiting factor in my case. | |
This script is provided with no warranty. It has been shown to work | |
on the systems on which it was developed, but may required some | |
tweaking to work on your system. Good luck :) | |
""" | |
import saspy | |
import dask.dataframe as dd | |
from dask import delayed | |
from psutil import virtual_memory | |
from multiprocessing import cpu_count | |
from math import ceil, floor | |
def sasdata_type_dict(sdata): | |
""" | |
Returns the datatypes to set each column in the dataframe. | |
:param sdata: (SASData object) The active SASData object to get the row count. | |
:return: (dict) The datatype dictionary. | |
""" | |
df_info = sdata.columnInfo() | |
df_info['DType'] = '' | |
for r, ix in df_info.iterrows(): | |
var_type = df_info.loc[r, 'Type'] | |
var_format = df_info.loc[r, 'Format'] | |
if var_type == 'Char': | |
df_info.loc[r, 'DType'] = str | |
elif var_type == 'Num': | |
if isinstance(var_format, float): | |
df_info.loc[r, 'DType'] = float | |
elif 'date' in var_format.split('.')[0].lower(): | |
df_info.loc[r, 'DType'] = str | |
elif len(var_format.split('.')[-1]) > 0: | |
df_info.loc[r, 'DType'] = float | |
else: | |
df_info.loc[r, 'DType'] = float | |
else: | |
df_info.loc[r, 'DType'] = str | |
df_convert = df_info[['Variable', 'DType']].set_index('Variable', | |
inplace=False).copy() | |
d_conv = df_convert.to_dict(orient='default')['DType'] | |
return d_conv | |
def sasdata_get_num_obs(sdata): | |
""" | |
Returns the number of observations in a saspy.SASData object. | |
:param sdata: (SASData object) The active SASData object to get the row count. | |
:return: (int) The row count. | |
""" | |
d_contents = sdata.contents() | |
df_att = d_contents['Attributes'] | |
n_rows = int(df_att[df_att['Label2'] == | |
'Observations']['nValue2'].values[0]) | |
return n_rows | |
def sasdata_get_table_disk_size(sdata): | |
""" | |
Returns the table size-on-disk in bytes. Operates on a | |
saspy.SASData object. | |
:param sdata: (saspy.SASData) The active SASData object to get the | |
size on disk. | |
:return: (int) Table size in bytes. | |
""" | |
d_contents = sdata.contents() | |
df_eng = d_contents['Enginehost'] | |
n_bytes = int(df_eng[df_eng['Label1'] == | |
'File Size (bytes)']['nValue1'].values[0]) | |
return n_bytes | |
def sasdata_calculate_num_parts(sdata): | |
""" | |
Calculates the required number of partitions for the given | |
saspy.SASData object. | |
:param sdata: (saspy.SASData) The active SASData object to get the | |
size on disk. | |
:return: (int) Suggested number of partitions for this data set. | |
""" | |
n_total_bytes = sasdata_get_table_disk_size(sdata) | |
max_part_size = int(floor(virtual_memory().available / cpu_count())) | |
num_parts = int(ceil(n_total_bytes / max_part_size)) | |
num_parts = max(num_parts, cpu_count()) | |
return num_parts | |
def calculate_list_block_tuples(total_num_obs, n_partitions=1, | |
max_rows=None): | |
""" | |
Returns a list of tuples (start, stop) of row numbers that divide | |
the dataset. | |
:param total_num_obs: (int) The total number of observations expected | |
in the dataset. | |
:param n_partitions: (int) The number of chunks in which to split the | |
dataset. Default is 1 (i.e. no split). | |
:param max_rows: (int) The maximum number of rows to have per sub-dataset. | |
Default is None, which will cause the function to rely on the n_partitions | |
argument. If max_rows is provided, then the max_rows parameter overrides | |
anything set by n_partitions. | |
:return: (list, tuple) A list of tuples specifying (start, stop) indices | |
for splitting the dataset. | |
""" | |
if max_rows is not None: | |
n_partitions = ceil(total_num_obs / max_rows) | |
# Variables renamed for local convenience. | |
tot = total_num_obs | |
n_p = n_partitions | |
tup_list = [(int((tot / n_p) * (p_i - 1) + 1), int((tot / n_p) * p_i)) | |
for p_i in range(1, n_p + 1)] | |
return tup_list | |
def sas_get_block_indices(sascfg_file, sascfg_profile, libpath, table_name, | |
limit_obs=None, autoexec=None, return_dtypes=False): | |
""" | |
An encapsulation function for returning the list of index blocks | |
for a SAS data object. | |
:param sascfg_file: (string) The filepath to the SASCFG file used by SASPY. | |
:param sascfg_profile: (string) The profile name to use for connecting. | |
Must be found in the SASCFG_FILE. | |
:param libpath: (string) The complete string to the folder containing | |
the target table. Libpath must be specified using proper backslash | |
handling in Python, i.e. '\\' for a '\' in SAS code. | |
:param table_name: (string) The name of the table to be referenced using | |
a dask.DataFrame. Table must exist within the 'libpath' location | |
provided. | |
:param limit_obs: (int) Default 'None' will add all observations from the | |
target table into the dask DataFrame. Providing a number will only add | |
the first 'limit_obs' number of observations to the DataFrame. | |
:param autoexec: (string) A string of SAS code that will be submitted | |
upon establishing a connection. | |
:param return_dtypes: (bool) Option to return a dictionary of the data | |
types from the SAS table, {variable: dtype}. Default is False. | |
:return: (list) A list of tuples specifying (start, stop) indices for | |
splitting the dataset. | |
""" | |
librefname = 'fpath' | |
sess = saspy.SASsession(cfgfile=sascfg_file, | |
cfgname=sascfg_profile, | |
autoexec=autoexec) | |
if isinstance(limit_obs, int): | |
worktable = 'dfile' | |
worklib = 'work' | |
if isinstance(libpath, type(None)): | |
librefname = 'work' | |
sas_code = '' | |
else: | |
sas_code = f'libname {librefname} "{libpath}";' | |
sas_code += f'data {worktable}; ' | |
sas_code += f'set {librefname}.{table_name}(obs={limit_obs});' | |
sas_code += 'run;' | |
sess.submit(sas_code) | |
else: | |
worktable = table_name | |
if isinstance(libpath, type(None)): | |
worklib = 'work' | |
else: | |
worklib = librefname | |
sas_code = f'libname {librefname} "{libpath}";' | |
sess.submit(sas_code) | |
sdat = sess.sasdata(table=worktable, libref=worklib) | |
num_total_obs = sasdata_get_num_obs(sdat) | |
num_parts = sasdata_calculate_num_parts(sdat) | |
d_types = sasdata_type_dict(sdat) | |
sess.disconnect() | |
block_tuples = calculate_list_block_tuples(total_num_obs=num_total_obs, | |
n_partitions=num_parts) | |
if return_dtypes: | |
return block_tuples, d_types | |
else: | |
return block_tuples | |
def sas_get_block_as_dataframe(sascfg_file, sascfg_profile, table_name, | |
block_tup=None, libpath=None, autoexec=None): | |
""" | |
Gets the specified block of rows as a pandas.DataFrame. | |
:param sascfg_file: (string) The filepath to the SASCFG file used by SASPY. | |
:param sascfg_profile: (string) The profile name to use for connecting. | |
Must be found in the 'sascfg_file'. | |
:param table_name: (string) The name of the data table to be accessed. | |
Must reside either in the 'libpath' or the 'work' folder (if SAS code is | |
passed via 'autoexec). | |
:param block_tup: (tuple) A tuple containing the first and last row | |
numbers that bound the block which is to be returned. Default is 'None' | |
which attempts to return the entire SAS table as a pandas.DataFrame. | |
:param libpath: (string) The library reference to where the target table | |
is stored. Default is 'None', which will use 'work' as a library reference. | |
Libpath must be specified using proper backslash handling in Python, | |
i.e. '\\' for a '\' in SAS code. | |
:param autoexec: (string) A string of SAS code that will be submitted | |
upon establishing a connection. | |
:return: (pandas.DataFrame) The requested block of the SAS dataset as a | |
pandas.DataFrame. | |
""" | |
# Connect to remote session | |
sess = saspy.SASsession(cfgfile=sascfg_file, | |
cfgname=sascfg_profile, | |
autoexec=autoexec) | |
# set libref | |
if isinstance(libpath, type(None)): | |
lib_ref = 'work' | |
else: | |
lib_ref = 'libref' | |
sess.saslib(libref=lib_ref, path=libpath) | |
# pull chunk of SAS dataset to pandas.DataFrame | |
if isinstance(block_tup, type(None)): | |
df_sas = sess.sd2df_CSV(table=table_name, libref=lib_ref) | |
else: | |
df_sas = sess.sd2df_CSV(table=table_name, libref=lib_ref, | |
dsopts={'firstobs': block_tup[0], | |
'obs': block_tup[1] | |
}) | |
sdat = sess.sasdata(table=table_name, libref=lib_ref) | |
d_types = sasdata_type_dict(sdat) | |
df_sas = df_sas.astype(dtype=d_types, inplace=False) | |
sess.disconnect() | |
return df_sas | |
def get_sas_as_dask_dataframe(sascfg_file, sascfg_profile, libpath, table_name, | |
limit_obs=None, autoexec=None): | |
""" | |
Returns the target SAS table as a dask.DataFrame. | |
:param sascfg_file: (string) The filepath to the SASCFG file used by SASPY. | |
:param sascfg_profile: (string) The profile name to use for connecting. | |
Must be found in the SASCFG_FILE. | |
:param libpath: (string) The complete string to the folder containing | |
the target table. Libpath must be specified using proper backslash | |
handling in Python, i.e. '\\' for a '\' in SAS code. | |
:param table_name: (string) The name of the table to be accessed. Table | |
must exist within the 'folder_path' location provided. | |
:param limit_obs: (int) Default 'None' will add all observations from | |
the target table into the dask.DataFrame. Providing a number will only | |
add the first 'limit_obs' number of observations to the DataFrame. Useful | |
for creating smaller and faster-executing dask.DataFrames. | |
:param autoexec: (string) A string of SAS code that will be submitted | |
upon establishing a connection. | |
:return: (dask.DataFrame, int, dict) The dask.DataFrame computational graph | |
that references the SAS table as specified by the arguments, an integer | |
specifying the number of partitions, and a dictionary of the data types. | |
""" | |
# get block indices of the target data table | |
block_tuples, d_types = sas_get_block_indices(sascfg_file=sascfg_file, | |
sascfg_profile=sascfg_profile, | |
libpath=libpath, | |
table_name=table_name, | |
limit_obs=limit_obs, | |
autoexec=autoexec, | |
return_dtypes=True) | |
# build a list of delayed session calls based on the block indices | |
dfs = [delayed(sas_get_block_as_dataframe)(sascfg_file=sascfg_file, | |
sascfg_profile=sascfg_profile, | |
table_name=table_name, | |
block_tup=tup, | |
libpath=libpath, | |
autoexec=autoexec) | |
for tup in block_tuples] | |
n_partitions = len(block_tuples) | |
ddf = dd.from_delayed(dfs) | |
return ddf, n_partitions, d_types |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment