Skip to content

Instantly share code, notes, and snippets.

View juri96's full-sized avatar

juri96

View GitHub Profile
WITH dataset AS (SELECT pm25,
pm10,
so2,
co,
o3,
temp,
press,
dewp,
rain,
timestamp,
CREATE OR REPLACE MODEL `GCP_PROJECT_NAME.DATASET_NAME.pollution_kmeans` OPTIONS(model_type='kmeans') AS
WITH dataset AS (SELECT pm25,
pm10,
so2,
co,
o3,
temp,
press,
dewp,
rain,
fun writeToBigQuery(resultTableSpec: TableReference): BigQueryIO.Write<TableRow> {
return BigQueryIO.writeTableRows()
.to(resultTableSpec)
.withSchema(RecordSchema.schema)
.withCreateDisposition(
BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED
)
.withWriteDisposition(
BigQueryIO.Write.WriteDisposition.WRITE_APPEND
)
fun toTableRow(record: Record) = TableRow().also {
it.set("station", record.station)
it.set("timestamp", Utils.toTimeStampBQFormat(record.timestamp))
it.set("pm25", record.pm25)
it.set("pm10", record.pm10)
it.set("so2", record.so2)
it.set("no2", record.no2)
it.set("co", record.co)
it.set("o3", record.o3)
it.set("temp", record.temp)
typealias SpitedFileLine = List<String>
fun mapElementsToEntity(): MapElements<SpitedFileLine, Record> {
return MapElements.into(TypeDescriptor.of(Record::class.java))
.via(ProcessFunction<SpitedFileLine, Record> { r ->
Record(timestamp = Utils.csvToLocalDateTime(r.subList(1, 5)),
pm25 = r.getDoubleOrNull(5),
pm10 = r.getDoubleOrNull(6),
so2 = r.getDoubleOrNull(7),
no2 = r.getDoubleOrNull(8),
co = r.getDoubleOrNull(9),
typealias SpitedFileLine = List<String>
typealias FileLine = String
fun splitCsvLine(): MapElements<FileLine, SpitedFileLine> {
return MapElements.into(
TypeDescriptors.lists(
TypeDescriptor.of(String::class.java)
)
)
.via(ProcessFunction<FileLine, SpitedFileLine> {
it.split(",")
private const val headers = """"No","year","month","day","hour","PM2.5","PM10","SO2","NO2","CO","O3","TEMP","PRES","DEWP","RAIN","wd","WSPM","station""""
fun filterHeaderRow(): Filter<String> = Filter.by(ProcessFunction<String, Boolean> {
!it.startsWith(headers)
})
TextIO.read().from("gs://BUCKET_NAME/*.csv")
val resultTableSpec = TableReference()
.apply {
projectId = "GCP_PROJECT_NAME"
datasetId = "BIGQUERY_DATASET_NAME"
tableId = "BIGQUERY_TABLE_NAME"
}
pipeline
.apply("Read files", TextIO.read().from("gs://BUCKET_NAME/*.csv"))
.apply("Filter headers", PipelineSteps.filterHeaderRow())
val options = PipelineOptionsFactory.`as`(DataflowPipelineOptions::class.java)
.apply {
project = "GCP_PROJECT_NAME"
stagingLocation = "gs://BUCKET_NAME/staging"
tempLocation = "gs://BUCKET_NAME/temp"
jobName = "etl-pipeline"
runner = DataflowRunner::class.java
}
val pipeline = Pipeline.create(options)