Skip to content

Instantly share code, notes, and snippets.

@haakonvt
Last active April 4, 2023 15:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save haakonvt/880626055673a4a2a4a750fbe2281c6c to your computer and use it in GitHub Desktop.
Save haakonvt/880626055673a4a2a4a750fbe2281c6c to your computer and use it in GitHub Desktop.
A first draft at creating a `kedro` `DataSet` for the CDF resource type `Datapoints`
from typing import Any, Dict, List, NoReturn, TYPE_CHECKING, Union
from kedro.io.core import AbstractDataSet, DataSetError
from cognite.client import CogniteClient
from cognite.client.config import ClientConfig
from cognite.client.data_classes import ClientCredentials
from cognite.client.credentials import OAuthClientCredentials
from cognite.client.exceptions import CogniteNotFoundError
if TYPE_CHECKING:
import pandas as pd
class CDFDatapointsDataSet(AbstractDataSet[NoReturn, pd.DataFrame]):
"""``CDFDatapointsDataSet`` loads time series data from Cognite Data Fusion (CDF).
Saving is not supported.
Example usage.
A few notes: When the parameter *aggregates* is not specified, raw datapoints are fetched.
``start`` defaults to 1970 unless specified. ``end`` defaults to *now*, unless specified.
.. code-block:: yaml
time_series_dataset:
type: CDFDatapointsDataSet
project: customer-prod
cluster: api
tenant_id: 00000000-aaaa-bbbb-cccc-000000000000
credentials: cdf_client_credentials
queries:
- external_id: my-xid
start: 10w-ago
end: now
aggregates: average
granularity: 1m
- external_id: another
aggregates:
- average
- count
granularity: 1m
- external_id: last-one
"""
def __init__(
self,
project: str,
cluster: str,
tenant_id: str,
credentials: ClientCredentials,
queries: List[Dict[str, Any]],
) -> None:
"""Creates a new instance of ``CDFDatapointsDataSet`` to fetch time series data from CDF.
Args:
project: The CDF project, typically the name of your company + prod/dev.
cluster: The CDF cluster, typically 'api' or 'westeurope-1'.
tenant_id: Tenant ID of your organisation (from Azure Active Directory).
credentials: Client credentials to authenticate towards CDF.
queries: List of query dictionaries, specifying details about what to fetch.
"""
super().__init__()
self._queries = queries
self._description = {"project": project, "cluster": cluster, "tenant_id": tenant_id, "queries": queries[:]}
self._client = self._setup_client(project, cluster, tenant_id, credentials)
@staticmethod
def _setup_client(
project: str, cluster: str, tenant_id: str, credentials: ClientCredentials
) -> CogniteClient:
return CogniteClient(
ClientConfig(
client_name="virtual-flow-meter-pipeline",
base_url=f"https://{cluster}.cognitedata.com",
project=project,
credentials=OAuthClientCredentials(
token_url=f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token",
client_id=credentials.client_id,
client_secret=credentials.client_secret,
scopes=[f"https://{cluster}.cognitedata.com/.default"],
),
)
)
def _describe(self) -> Dict[str, Union[str, List[Dict[str, Any]]]]:
return self._description.copy()
def _load(self) -> pd.DataFrame:
return self._client.time_series.data.retrieve_dataframe(
external_id=self._queries,
limit=None,
include_outside_points=False,
ignore_unknown_ids=False,
uniform_index=False,
include_aggregate_name=True,
include_granularity_name=False,
column_names="external_id",
)
def _save(self, data: None) -> NoReturn:
raise DataSetError(f"{type(self).__name__} is a read-only data set type")
def _exists(self) -> bool:
try:
self._client.time_series.retrieve_multiple(
external_id=list(set(dct["external_id"] for dct in self._queries)),
ignore_unknown_ids=False,
)
return True
except CogniteNotFoundError:
return False
except Exception:
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment