Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save oatsandsugar/ff83a250059c2d76786989d7922fc28c to your computer and use it in GitHub Desktop.
Save oatsandsugar/ff83a250059c2d76786989d7922fc28c to your computer and use it in GitHub Desktop.
# Attach a datastore: European Sample Dataset
gcsdef_2 = GCSDatastoreDef(
bucket = "dtl-handset-telemetry",
file_name = "5K_eu_telemetry.csv",
file_format = FileFormat.Csv,
)
EU_sample_handset_data = dtl.datastore.create(
Datastore(
name = "European Handset Telemetry Sample",
definition = gcsdef_2,
credential_id = my_gcs_credentials.id
)
)
# Attach a datastore: European Fullscale Dataset
gcsdef_3 = GCSDatastoreDef(
bucket = "dtl-handset-telemetry",
file_name = "gtc_europe.csv",
file_format = FileFormat.Csv,
)
EU_fullscale_handset_data = dtl.datastore.create(
Datastore(
name = "European Handset Telemetry",
definition = gcsdef_3,
credential_id = my_gcs_credentials.id
)
)
# Attach a datastore: US Fullscale Dataset
gcsdef_4 = GCSDatastoreDef(
bucket = "dtl-handset-telemetry",
file_name = "us_340M.csv",
file_format = FileFormat.Csv,
)
US_fullscale_handset_data = dtl.datastore.create(
Datastore(
name = "US Handset Telemetry",
definition = gcsdef_4,
credential_id = my_gcs_credentials.id
)
)
# Collect into datastore collection
dtl.datastore_collection.create(
DatastoreCollection(
name = "Handset telemetry data for inference",
description = "Handset training data for use in model creation",
storeIds = [EU_sample_handset_data.id, EU_fullscale_handset_data.id, US_fullscale_handset_data.id],
)
)
BOT.server_summary()
# Define source
source1 = EU_sample_handset_data
# Define destination
gcsdef_5 = GCSDatastoreDef(
bucket="dtl-handset-telemetry",
file_name="cleaned_handset_telemetry_data.csv",
file_format=FileFormat.Csv,
)
target1 = dtl.datastore.create(
Datastore(
name="Cleaned Handset Telemetry data",
definition=gcsdef_5,
credential_id=my_gcs_credentials.id,
)
)
# create list of class nodes for use in structure transformation
class_nodes_list = []
# populate list with leaves
for leaf in leaves:
class_nodes_list.append(
ClassNodeDescription(
path=[leaf.name],
tag=leaf.name,
pick_strategy=PickStrategy.HighScore,
data_type=DataType.String,
)
)
# Define pipeline
definition1 = Definition(
transformations=[
# Classify(None, True), # If you have the analyze above, you don't need to classify. It is much more efficient to do this.
Structure(
class_nodes_list
),
],
pipelines=[],
target=target1,
)
# Define stream
my_stream = Stream(source1, [definition1])
# Push
stream_collection = dtl.stream_collection.create(
[my_stream], "Telemetry classification pipeline"
)
# Define source
source2 = EU_fullscale_handset_data
# Define destination
gcsdef_5 = GCSDatastoreDef(
bucket="dtl-handset-telemetry",
file_name="cleaned_EU_handset_telemetry_data.csv",
file_format=FileFormat.Csv,
)
target1 = dtl.datastore.create(
Datastore(
name="Cleaned EU Handset Telemetry data",
definition=gcsdef_5,
credential_id=my_gcs_credentials.id,
)
)
# Define pipeline
definition1 = Definition(
transformations=[
# Classify(None, True), # If you have the analyze above, you don't need to classify. It is much more efficient to do this.
Structure(
class_nodes_list
),
],
pipelines=[],
target=target1,
)
# Define stream
my_stream = Stream(source2, [definition1])
# Push
stream_collection = dtl.stream_collection.create(
[my_stream], "EU Telemetry classification pipeline"
)
# Define source
source3 = US_fullscale_handset_data
# Define destination
gcsdef_5 = GCSDatastoreDef(
bucket="dtl-handset-telemetry",
file_name="cleaned_US_handset_telemetry_data.csv",
file_format=FileFormat.Csv,
)
target1 = dtl.datastore.create(
Datastore(
name="Cleaned US Handset Telemetry data",
definition=gcsdef_5,
credential_id=my_gcs_credentials.id,
)
)
# Define pipeline
definition1 = Definition(
transformations=[
# Classify(None, True), # If you have the analyze above, you don't need to classify. It is much more efficient to do this.
Structure(
class_nodes_list
),
],
pipelines=[],
target=target1,
)
# Define stream
my_stream = Stream(source3, [definition1])
# Push
stream_collection = dtl.stream_collection.create(
[my_stream], "US Telemetry classification pipeline"
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment