Last active
April 4, 2023 15:06
-
-
Save haakonvt/880626055673a4a2a4a750fbe2281c6c to your computer and use it in GitHub Desktop.
A first draft at creating a `kedro` `DataSet` for the CDF resource type `Datapoints`
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Any, Dict, List, NoReturn, TYPE_CHECKING, Union | |
from kedro.io.core import AbstractDataSet, DataSetError | |
from cognite.client import CogniteClient | |
from cognite.client.config import ClientConfig | |
from cognite.client.data_classes import ClientCredentials | |
from cognite.client.credentials import OAuthClientCredentials | |
from cognite.client.exceptions import CogniteNotFoundError | |
if TYPE_CHECKING: | |
import pandas as pd | |
class CDFDatapointsDataSet(AbstractDataSet[NoReturn, pd.DataFrame]): | |
"""``CDFDatapointsDataSet`` loads time series data from Cognite Data Fusion (CDF). | |
Saving is not supported. | |
Example usage. | |
A few notes: When the parameter *aggregates* is not specified, raw datapoints are fetched. | |
``start`` defaults to 1970 unless specified. ``end`` defaults to *now*, unless specified. | |
.. code-block:: yaml | |
time_series_dataset: | |
type: CDFDatapointsDataSet | |
project: customer-prod | |
cluster: api | |
tenant_id: 00000000-aaaa-bbbb-cccc-000000000000 | |
credentials: cdf_client_credentials | |
queries: | |
- external_id: my-xid | |
start: 10w-ago | |
end: now | |
aggregates: average | |
granularity: 1m | |
- external_id: another | |
aggregates: | |
- average | |
- count | |
granularity: 1m | |
- external_id: last-one | |
""" | |
def __init__( | |
self, | |
project: str, | |
cluster: str, | |
tenant_id: str, | |
credentials: ClientCredentials, | |
queries: List[Dict[str, Any]], | |
) -> None: | |
"""Creates a new instance of ``CDFDatapointsDataSet`` to fetch time series data from CDF. | |
Args: | |
project: The CDF project, typically the name of your company + prod/dev. | |
cluster: The CDF cluster, typically 'api' or 'westeurope-1'. | |
tenant_id: Tenant ID of your organisation (from Azure Active Directory). | |
credentials: Client credentials to authenticate towards CDF. | |
queries: List of query dictionaries, specifying details about what to fetch. | |
""" | |
super().__init__() | |
self._queries = queries | |
self._description = {"project": project, "cluster": cluster, "tenant_id": tenant_id, "queries": queries[:]} | |
self._client = self._setup_client(project, cluster, tenant_id, credentials) | |
@staticmethod | |
def _setup_client( | |
project: str, cluster: str, tenant_id: str, credentials: ClientCredentials | |
) -> CogniteClient: | |
return CogniteClient( | |
ClientConfig( | |
client_name="virtual-flow-meter-pipeline", | |
base_url=f"https://{cluster}.cognitedata.com", | |
project=project, | |
credentials=OAuthClientCredentials( | |
token_url=f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token", | |
client_id=credentials.client_id, | |
client_secret=credentials.client_secret, | |
scopes=[f"https://{cluster}.cognitedata.com/.default"], | |
), | |
) | |
) | |
def _describe(self) -> Dict[str, Union[str, List[Dict[str, Any]]]]: | |
return self._description.copy() | |
def _load(self) -> pd.DataFrame: | |
return self._client.time_series.data.retrieve_dataframe( | |
external_id=self._queries, | |
limit=None, | |
include_outside_points=False, | |
ignore_unknown_ids=False, | |
uniform_index=False, | |
include_aggregate_name=True, | |
include_granularity_name=False, | |
column_names="external_id", | |
) | |
def _save(self, data: None) -> NoReturn: | |
raise DataSetError(f"{type(self).__name__} is a read-only data set type") | |
def _exists(self) -> bool: | |
try: | |
self._client.time_series.retrieve_multiple( | |
external_id=list(set(dct["external_id"] for dct in self._queries)), | |
ignore_unknown_ids=False, | |
) | |
return True | |
except CogniteNotFoundError: | |
return False | |
except Exception: | |
raise |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment