Created
February 22, 2024 12:45
-
-
Save salomartin/d4ee7170f678b0b44554af46fe8efb3f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
__author__ = "Martin Salo" | |
__email__ = "martin@yummy.eu" | |
The `sdmx_source` package is designed to facilitate the retrieval of statistical data from various SDMX | |
(Statistical Data and Metadata eXchange) data sources. It leverages the `sdmx` Python library to query and | |
fetch data and pass it on as dlt resource. | |
## Installation | |
To use this package, you need to have the `sdmx` library installed. You can install it using pip: | |
pip install sdmx1 | |
## Usage | |
Here is a basic example of how to use the `sdmx_source` function to retrieve data from 3 sources one using | |
string as key and the other as dict: | |
` | |
source = sdmx_source( | |
[ | |
{ | |
"data_source": "ESTAT", | |
"dataflow": "PRC_PPP_IND", | |
"key": { | |
"freq": "A", | |
"na_item": "PLI_EU28", | |
"ppp_cat": "A0101", | |
"geo": ["EE", "FI"], | |
}, | |
"table_name": "food_price_index", #Optionally override table name for clarity | |
}, | |
{ | |
"data_source": "ESTAT", | |
"dataflow": "sts_inpr_m", | |
"key": "M.PROD.B-D+C+D.CA.I15+I10.EE", | |
}, | |
{ | |
"data_source": "ECB", | |
"dataflow": "EXR", | |
"key": {"FREQ": "A", "CURRENCY": "USD"}, | |
} | |
] | |
) | |
print(list(source)) | |
` | |
In this example, the first bit of data is fetched from the Eurostat [PRC_PPP_IND] | |
(https://ec.europa.eu/eurostat/databrowser/view/prc_ppp_ind$defaultview/default/table?lang=en) dataset. | |
The key specifies the frequency (`freq`), the item (`na_item`), the PPP category (`ppp_cat`), and the | |
geographical areas (`geo`) for which data is requested. | |
## Documentation and Resources | |
- For more detailed information on the `sdmx` library and its capabilities, visit the GitHub repository: | |
[sdmx library on GitHub](https://github.com/khaeru/sdmx). | |
- The `sdmx` library documentation can be found here: [sdmx documentation](https://sdmx1.readthedocs.io/en/latest/). | |
- For a list of available SDMX data sources that can be queried using this library, refer to: | |
[List of SDMX data sources](https://sdmx1.readthedocs.io/en/latest/sources.html). | |
## Roadmap | |
The implementation is kept simple on purpose to quickly retrieve flat single table data. | |
Potential enhancement is to also support adding all the code lists as foreign key references and tables. | |
""" | |
import dlt | |
from typing import Dict, Any, List, TypedDict, Optional, Union | |
import logging | |
import sdmx | |
import pandas as pd | |
logging.basicConfig( | |
format="%(levelname)s [%(asctime)s] %(name)s - %(message)s", | |
datefmt="%Y-%m-%d %H:%M:%S", | |
level=logging.INFO, | |
) | |
class DataflowDict(TypedDict, total=False): | |
data_source: str | |
dataflow: str | |
key: Optional[Union[str, Dict[str, Any]]] | |
params: Optional[Dict[str, Any]] | |
table_name: Optional[str] | |
@dlt.defer | |
def sdmx_resource( | |
data_source: str, | |
dataflow: str, | |
key: Union[str, Dict[str, Any]] = "", | |
params: Optional[Dict[str, Any]] = {}, | |
time_period_column: Optional[str] = "TIME_PERIOD", | |
): | |
""" | |
Fetches data from a designated SDMX data source and delivers it in the form of a pandas DataFrame. | |
Parameters: | |
- data_source (str): Identifier of the SDMX data source to be queried (e.g., "ESTAT"). For a list of available SDMX data sources | |
that can be queried using this library, refer to: [List of SDMX data sources](https://sdmx1.readthedocs.io/en/latest/sources.html). | |
- dataflow (str): Identifier of the specific dataflow to be accessed. | |
- key (Union[str, Dict[str, Any]]): Query key, defined either as a string or a dictionary, specifying the data to be retrieved. | |
- params (Optional[Dict[str, Any]]): Dictionary of additional query parameters, such as 'startPeriod' and 'endPeriod'. | |
Returns: | |
- pandas.DataFrame: DataFrame containing the data retrieved from the specified SDMX data source. | |
""" | |
client = sdmx.Client(data_source) | |
try: | |
response = client.data(dataflow, key=key, params=params) | |
datasets = response.data | |
except KeyError: | |
new_dataflow = dataflow.upper() if dataflow.islower() else dataflow.lower() | |
logging.warning( | |
f"Key mismatch for dataflow {dataflow}. Trying again with case change to {new_dataflow}." | |
) | |
try: | |
response = client.data(new_dataflow, key=key, params=params) | |
datasets = response.data | |
except KeyError: | |
raise ValueError( | |
f"Dataflow {dataflow} and {new_dataflow} not found in {data_source}." | |
) | |
for dataset in datasets: | |
df = sdmx.to_pandas(dataset, attributes="o").reset_index() | |
# Cast AttributeValue columns to strings | |
for col in df.columns: | |
if df[col].apply(lambda x: isinstance(x, sdmx.model.AttributeValue)).any(): | |
df[col] = df[col].astype(str) | |
if time_period_column not in df.columns: | |
yield df | |
continue | |
try: | |
df[time_period_column] = pd.to_datetime( | |
df[time_period_column], errors="coerce" | |
) | |
except Exception as e: | |
logging.warning(f"Error converting {time_period_column} to datetime: {e}") | |
yield df | |
continue | |
yield df | |
@dlt.source() | |
def sdmx_source(dataflows: List[DataflowDict]): | |
for flow in dataflows: | |
yield dlt.resource( | |
sdmx_resource(**{k: v for k, v in flow.items() if k != "table_name"}), | |
name=flow.get("table_name", flow.get("dataflow")), | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment