Skip to content

Instantly share code, notes, and snippets.

View al102964's full-sized avatar

Arturo Gonzalez al102964

  • Data Scientist ITAM
  • México city
View GitHub Profile
name: CI
# Controls when the action will run.
on:
# Triggers the workflow on push or pull request events but only for the master branch
push:
branches: [ master ]
pull_request:
branches: [ master ]
name: CI
# Controls when the action will run.
on:
# Triggers the workflow on push or pull request events but only for the master branch
push:
branches: [ master ]
pull_request:
branches: [ master ]
Skip to content
Pull requests
Issues
Marketplace
Explore
@al102964
al102964 /
react-app-netlify
@al102964
al102964 / SparkCopyPostgres.scala
Created April 6, 2021 04:58 — forked from longcao/SparkCopyPostgres.scala
COPY Spark DataFrame rows to PostgreSQL (via JDBC)
import java.io.InputStream
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.{ DataFrame, Row }
import org.postgresql.copy.CopyManager
import org.postgresql.core.BaseConnection
val jdbcUrl = s"jdbc:postgresql://..." // db credentials elided
val connectionProperties = {
{
"fileLocations":[
{"URIPrefixes":["s3://bucket-name/archivo.csv"]}
],
"globalUploadSettings":{
"format": "CSV",
"delimiter":",",
"containsHeader":true
}
}
from pyspark.sql import Window
import pyspark.sql.functions as f
w = Window.orderBy("start")
consumo = df.select("central","id_dispositivo","valor","timestamp","tarifa")\
.filter("id_dispositivo = 'eta'")\
.groupBy(f.window("timestamp", "60 minutes").alias("intervalo"),"tarifa")\
.agg(f.max("valor").alias("consumo_max_hora"))\
.select("intervalo.start","tarifa","consumo_max_hora")\
from pyspark.sql.types import *
schema = StructType([
StructField("central", StringType()),
StructField("gateway", StringType()),
StructField("board", StringType()),
StructField("id_dispositivo", StringType()),
StructField("valor", FloatType()),
StructField("timestamp", TimestampType()),
StructField("tarifa", StringType()),
display(dbutils.fs.ls('/mnt/s3data/data/'))
import urllib
#Llaves de acceso a bucket de trabajo
ACCESS_KEY = "LLAVE"
SECRET_KEY = "SECRET"
AWS_BUCKET_NAME = "al102964-iot-data"
ENCODED_SECRET_KEY = urllib.parse.quote(SECRET_KEY,"")
MOUNT_NAME = "s3data"
print("Mean predictive power score value: {}".format(pps.matrix(df).abs().unstack().sort_values(ascending=False).drop_duplicates().mean()))
print("Mean correlation value: {}".format(df.corr().abs().unstack().sort_values(ascending=False).drop_duplicates().mean()))