Skip to content

Instantly share code, notes, and snippets.

@salomartin
Created February 22, 2024 12:45
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save salomartin/d4ee7170f678b0b44554af46fe8efb3f to your computer and use it in GitHub Desktop.
Save salomartin/d4ee7170f678b0b44554af46fe8efb3f to your computer and use it in GitHub Desktop.
"""
__author__ = "Martin Salo"
__email__ = "martin@yummy.eu"
The `sdmx_source` package is designed to facilitate the retrieval of statistical data from various SDMX
(Statistical Data and Metadata eXchange) data sources. It leverages the `sdmx` Python library to query and
fetch data and pass it on as dlt resource.
## Installation
To use this package, you need to have the `sdmx` library installed. You can install it using pip:
pip install sdmx1
## Usage
Here is a basic example of how to use the `sdmx_source` function to retrieve data from 3 sources one using
string as key and the other as dict:
`
source = sdmx_source(
[
{
"data_source": "ESTAT",
"dataflow": "PRC_PPP_IND",
"key": {
"freq": "A",
"na_item": "PLI_EU28",
"ppp_cat": "A0101",
"geo": ["EE", "FI"],
},
"table_name": "food_price_index", #Optionally override table name for clarity
},
{
"data_source": "ESTAT",
"dataflow": "sts_inpr_m",
"key": "M.PROD.B-D+C+D.CA.I15+I10.EE",
},
{
"data_source": "ECB",
"dataflow": "EXR",
"key": {"FREQ": "A", "CURRENCY": "USD"},
}
]
)
print(list(source))
`
In this example, the first bit of data is fetched from the Eurostat [PRC_PPP_IND]
(https://ec.europa.eu/eurostat/databrowser/view/prc_ppp_ind$defaultview/default/table?lang=en) dataset.
The key specifies the frequency (`freq`), the item (`na_item`), the PPP category (`ppp_cat`), and the
geographical areas (`geo`) for which data is requested.
## Documentation and Resources
- For more detailed information on the `sdmx` library and its capabilities, visit the GitHub repository:
[sdmx library on GitHub](https://github.com/khaeru/sdmx).
- The `sdmx` library documentation can be found here: [sdmx documentation](https://sdmx1.readthedocs.io/en/latest/).
- For a list of available SDMX data sources that can be queried using this library, refer to:
[List of SDMX data sources](https://sdmx1.readthedocs.io/en/latest/sources.html).
## Roadmap
The implementation is kept simple on purpose to quickly retrieve flat single table data.
Potential enhancement is to also support adding all the code lists as foreign key references and tables.
"""
import dlt
from typing import Dict, Any, List, TypedDict, Optional, Union
import logging
import sdmx
import pandas as pd
logging.basicConfig(
format="%(levelname)s [%(asctime)s] %(name)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO,
)
class DataflowDict(TypedDict, total=False):
data_source: str
dataflow: str
key: Optional[Union[str, Dict[str, Any]]]
params: Optional[Dict[str, Any]]
table_name: Optional[str]
@dlt.defer
def sdmx_resource(
data_source: str,
dataflow: str,
key: Union[str, Dict[str, Any]] = "",
params: Optional[Dict[str, Any]] = {},
time_period_column: Optional[str] = "TIME_PERIOD",
):
"""
Fetches data from a designated SDMX data source and delivers it in the form of a pandas DataFrame.
Parameters:
- data_source (str): Identifier of the SDMX data source to be queried (e.g., "ESTAT"). For a list of available SDMX data sources
that can be queried using this library, refer to: [List of SDMX data sources](https://sdmx1.readthedocs.io/en/latest/sources.html).
- dataflow (str): Identifier of the specific dataflow to be accessed.
- key (Union[str, Dict[str, Any]]): Query key, defined either as a string or a dictionary, specifying the data to be retrieved.
- params (Optional[Dict[str, Any]]): Dictionary of additional query parameters, such as 'startPeriod' and 'endPeriod'.
Returns:
- pandas.DataFrame: DataFrame containing the data retrieved from the specified SDMX data source.
"""
client = sdmx.Client(data_source)
try:
response = client.data(dataflow, key=key, params=params)
datasets = response.data
except KeyError:
new_dataflow = dataflow.upper() if dataflow.islower() else dataflow.lower()
logging.warning(
f"Key mismatch for dataflow {dataflow}. Trying again with case change to {new_dataflow}."
)
try:
response = client.data(new_dataflow, key=key, params=params)
datasets = response.data
except KeyError:
raise ValueError(
f"Dataflow {dataflow} and {new_dataflow} not found in {data_source}."
)
for dataset in datasets:
df = sdmx.to_pandas(dataset, attributes="o").reset_index()
# Cast AttributeValue columns to strings
for col in df.columns:
if df[col].apply(lambda x: isinstance(x, sdmx.model.AttributeValue)).any():
df[col] = df[col].astype(str)
if time_period_column not in df.columns:
yield df
continue
try:
df[time_period_column] = pd.to_datetime(
df[time_period_column], errors="coerce"
)
except Exception as e:
logging.warning(f"Error converting {time_period_column} to datetime: {e}")
yield df
continue
yield df
@dlt.source()
def sdmx_source(dataflows: List[DataflowDict]):
for flow in dataflows:
yield dlt.resource(
sdmx_resource(**{k: v for k, v in flow.items() if k != "table_name"}),
name=flow.get("table_name", flow.get("dataflow")),
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment