Skip to content

Instantly share code, notes, and snippets.

@yeshvantbhavnasi
Created February 5, 2022 02:45
Show Gist options
  • Save yeshvantbhavnasi/4a7780a2788354d1be417c29356ccd33 to your computer and use it in GitHub Desktop.
Save yeshvantbhavnasi/4a7780a2788354d1be417c29356ccd33 to your computer and use it in GitHub Desktop.
script to test the bigquery write from aws
import com.box.data.platform.model.Env
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.{Field, JobInfo, Schema, StandardSQLTypeName, TableDefinition, TableId, TimePartitioning}
import com.box.data.platform.model.bigquery.ingestion.BigQueryTableId
import com.box.data.platform.model.spark._
import com.box.data.platform.model.spark.ProxyConfig
import com.box.dataplatform.spark.context.internal.bigquery.BigQueryWriterImpl.{DS, HOUR, STRINGTYPE, UNIX_EPOCH_MICROSECONDS, UNIX_EPOCH_MILLISECONDS, UNIX_EPOCH_SECONDS}
import com.box.dataplatform.spark.context.internal.vault.{MigrationCredentialsProvider, MigrationCredentialsProviderFactory}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.{BinaryType, BooleanType, DataType, DateType, DecimalType, DoubleType, IntegerType, StringType, TimestampType}
import org.json4s.DefaultFormats
import org.slf4s.{Logger, LoggerFactory, Logging}
import org.spark_project.guava.base.Charsets
import org.spark_project.guava.io.BaseEncoding
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
import org.json4s.jackson.JsonMethods._
import org.json4s.JsonDSL._
import java.util.function.Consumer
import scala.collection.JavaConversions.asScalaIterator
spark.sparkContext.setLogLevel("INFO")
val df = Seq.empty[String].toDF()
val credentials = "{\n \"type\": \"service_account\",\n \"project_id\": \"box-prod-dp-autolog\",\n \"private_key_id\": \"5a2a2cd0da8709729d487a571fb6687314d69941\",\n \"private_key\": \"-----BEGIN PRIVATE KEY-----\\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCFuJjGSJNPv1q0\\nm/EBRmw+0cZTr7k/R9T0fCeuShvoWlE7ssQyyx8PVKF4ZGy56cijkqXqvYWcZ3td\\nD+Ra993S79STkLiY+Suo8pSau45uLU5cnG6WK/hkP4ZY9ksU8cXGyceQmvLnuX3e\\ncG3XdogNf7JKCoJJhTpo/tt2wk8Ycr8vZE1ofVb0wYhelY1zTul16iTdphk9OE3J\\nah4+0jbkxCloacuTwR9jpF+Rz8fTUKfP96Aqqzuclfvatn4qbxxmwHeghUjL+0TN\\nXAHBdWPVCOb/1aMi+ggsHNBRRPmQOC+c76KhVYOhru1rGpH1YRenNidLAtLFIbue\\nXDUo7ip3AgMBAAECggEAQV79fM3FClLjPnEPsE5jo5A80vOQbbGplz3MzkUXgI8K\\nsFMT3Dr1e7KlX2xExNxXxDNwXiH5m/zwz41x1tDexfyN4Pf6bno9UFIKuNwg5JbM\\nfLTS0SI8w5FUldAN43bQd350S6qcPhEtyd0PraoaF/NJzy11BQkXBXY2Bsg2D55w\\nFnJ1yQWZdFlKH44y4VyfMPT+VzivAJeH9Rwxbyrr27ErrZL3AzEcfv286EZOTtyO\\nHxXNO5mA0KVNgq6DXXaCTGFjGkOegj+R2ef0ZfOTTVd+F1nLkOKCLpJE4z6rl7+l\\nrFJ3ujcY1M3kqsAtkl/6QagILc8LvzdyMN//0oI54QKBgQC5mQSlqS/RKWblPwwY\\n4/+zKcg5u/9ni7bl93Bvr4/Ru4cjs2CqEmH9Cneg17Qujvoeklp71TwHSZAA+469\\nV+w3h2yLjsC0vdE+pgQQ4PWL6rYPpF+yrKbEjgfwfJZeq/8XmqPPYgcu0QX1UTMi\\nMfVA2IilAJdT8nzlKjEPQusIIQKBgQC4cfP9rhalFMSMzJ49I1PLTjRboFrBdMpP\\nN3jVIVxZBM9Z7YneB39MR/QhRLD9hFmg5irrugP3Ug00vgDDhO39uChPjXD6C7HK\\nubJt+7HjoUDOtoIQvOlnKHm1l+9GyxdiCkcH0pQE41X1DkJhbZ230MfrIkMt8Slh\\nBT+cHsR/lwKBgQCC8C2TIlWV3vu06FIEjXYsVGHqOXPq/Wq/u14brWRttzuUOE0O\\nL4HqH6ReWU/JOR3Ey8QCcWG0UnXB+7esZYdFdclifAgAe5pP9JgRwGP7nk9roa/S\\nQnQ1X27Cf2Mmz3lckcDX4nTm1pi5hdvpWA2K/jkJD+2zT/RFSQrrlW5zgQKBgEGr\\noPnDkCB6wnh3KigOefk7AP1WnLmggZ+u2oDX/e9b/Q/BuYKqHLLmM4/cyz+YbekR\\nylpfwqW+OyF6PP9gRvQ9K20XqQ/+NfE+KL9sDCxF77Mp3ClleZzXFm6HiBQmn06y\\n5hNs8HLFFofH6e6SfsNBvMyFSHIFGXyHx9woQEJFAoGBAIL2jQxFuB+jI+CW+fjh\\nPaZNuTrna0zpXk3NDkSq1rJk9DoCvTSv/klE+RQXtYQi7a+KG4cLZ4w7T0w9PiPw\\nIOYWcLZrlsUj7EJDzKCF1aIvT37oMp0goyIhfyC0mS0w/WW/JYENQJ+wMlyyhzlM\\nrBTKFjyTz/BrSGjb8rK9/NyY\\n-----END PRIVATE KEY-----\\n\",\n \"client_email\": \"autolog-ingester@box-prod-dp-autolog.iam.gserviceaccount.com\",\n \"client_id\": \"104580600433292242021\",\n \"auth_uri\": \"https://accounts.google.com/o/oauth2/auth\",\n \"token_uri\": \"https://oauth2.googleapis.com/token\",\n \"auth_provider_x509_cert_url\": \"https://www.googleapis.com/oauth2/v1/certs\",\n \"client_x509_cert_url\": \"https://www.googleapis.com/robot/v1/metadata/x509/autolog-ingester%40box-prod-dp-autolog.iam.gserviceaccount.com\"\n}"
import com.box.data.platform.model.bigquery.ingestion.BigQueryTableId
val bigQueryTableId = BigQueryTableId(com.box.data.platform.model.tenant.TenantId("autolog"),"online_sales","online_sales_transaction-compact")
val proxyConfig = com.box.data.platform.model.spark.ProxyConfig("data-platform","/global/prod/generic/aic/migration/private/outbound-proxy/password/1","web-proxy-vip.prod.box.net",3128)
val migrationPassword = "VU027vxElNk4sQ4O6jr0fd5eJekBIPB"
implicit val formats: DefaultFormats.type = DefaultFormats
val serviceAccountJson = parse(credentials)
val privateKeyId = (serviceAccountJson \ "private_key_id").extract[String]
val privateKey = (serviceAccountJson \ "private_key").extract[String]
val clientEmail = (serviceAccountJson \ "client_email").extract[String]
val projectId = (serviceAccountJson \ "project_id").extract[String]
val tempStagingBucket = s"box-${Env.fromValue(env)}-dp-${bigQueryTableId.tenantId.name}-bq-${bigQueryTableId.dataset}-staging"
// set hadoop configuration to detect filesystem gs
val conf = spark.sparkContext.hadoopConfiguration
spark.sparkContext.getConf
spark.conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark.conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
spark.conf.set("fs.gs.auth.service.account.enable", "true")
conf.set("fs.gs.auth.service.account.enable", "true")
conf.set("fs.gs.auth.service.account.enable", "true")
conf.set("spark.hadoop.fs.gs.auth.service.account.enable", "true")
spark.conf.set("fs.gs.auth.service.account.email",clientEmail)
conf.set("fs.gs.auth.service.account.email",clientEmail)
spark.conf.set("fs.gs.project.id", projectId)
conf.set("fs.gs.project.id", projectId)
spark.conf.set("fs.gs.system.bucket", tempStagingBucket)
conf.set("fs.gs.system.bucket", tempStagingBucket)
spark.conf.set("fs.gs.auth.service.account.private.key.id", privateKeyId)
conf.set("fs.gs.auth.service.account.private.key.id", privateKeyId)
spark.conf.set("fs.gs.auth.service.account.private.key", privateKey)
conf.set("fs.gs.auth.service.account.private.key", privateKey)
spark.conf.set("google.cloud.auth.service.account.enable", "true")
conf.set("google.cloud.auth.service.account.enable", "true")
spark.conf.set("google.cloud.auth.service.account.email",clientEmail)
conf.set("google.cloud.auth.service.account.email",clientEmail)
spark.conf.set("google.cloud.project.id", projectId)
conf.set("google.cloud.project.id", projectId)
spark.conf.set("google.cloud.system.bucket", tempStagingBucket)
conf.set("google.cloud.project.id", projectId)
spark.conf.set("google.cloud.auth.service.account.private.key.id", privateKeyId)
conf.set("google.cloud.auth.service.account.private.key.id", privateKeyId)
spark.conf.set("google.cloud.auth.service.account.private.key", privateKey)
conf.set("google.cloud.auth.service.account.private.key", privateKey)
spark.conf.set("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark.conf.set("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
conf.set("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
spark.conf.set("spark.hadoop.google.cloud.auth.service.account.enable", "true")
conf.set("spark.hadoop.google.cloud.auth.service.account.enable", "true")
spark.conf.set("spark.hadoop.google.cloud.auth.service.account.email",clientEmail)
conf.set("spark.hadoop.google.cloud.auth.service.account.email",clientEmail)
spark.conf.set("spark.hadoop.google.cloud.project.id", projectId)
conf.set("spark.hadoop.google.cloud.project.id", projectId)
spark.conf.set("spark.hadoop.google.cloud.system.bucket", tempStagingBucket)
conf.set("spark.hadoop.google.cloud.system.bucket", tempStagingBucket)
spark.conf.set("spark.hadoop.google.cloud.auth.service.account.private.key.id", privateKeyId)
conf.set("spark.hadoop.google.cloud.auth.service.account.private.key.id", privateKeyId)
spark.conf.set("spark.hadoop.google.cloud.auth.service.account.private.key", privateKey)
conf.set("spark.hadoop.google.cloud.auth.service.account.private.key", privateKey)
//configuration to set credentials and proxy details
spark.conf.set("credentials",BaseEncoding.base64.encode(serviceAccountCredentials.getBytes(Charsets.UTF_8)))
spark.conf.set("proxyAddress", proxyConfig.proxyHost+":"+proxyConfig.proxyPort)
spark.conf.set("google.cloud.proxy.address", "http://"+proxyConfig.proxyHost+":"+proxyConfig.proxyPort)
spark.conf.set("google.cloud.proxy.username", proxyConfig.proxyUser)
spark.conf.set("proxyUsername", proxyConfig.proxyUser)
spark.conf.set("google.cloud.proxy.password", migrationPassword)
spark.conf.set("proxyPassword", migrationPassword)
spark.conf.set("temporaryGcsBucket", s"box-prod-dp-${bigQueryTableId.tenantId.name}-bq-${bigQueryTableId.dataset}-staging")
conf.set("credentials", BaseEncoding.base64.encode(serviceAccountCredentials.getBytes(Charsets.UTF_8)))
conf.set("proxyAddress", "http://"+proxyConfig.proxyHost+":"+proxyConfig.proxyPort)
conf.set("proxyUsername", proxyConfig.proxyUser)
conf.set("proxyPassword", migrationPassword)
spark.conf.set("spark.hadoop.fs.gs.proxy.address", "http://"+proxyConfig.proxyHost+":"+proxyConfig.proxyPort)
spark.conf.set("spark.hadoop.fs.gs.proxy.username", proxyConfig.proxyUser)
spark.conf.set("spark.hadoop.fs.gs.proxy.password", migrationPassword)
conf.set("proxyUsername", proxyConfig.proxyUser)
conf.set("spark.hadoop.fs.gs.proxy.password", migrationPassword)
conf.set("proxyPassword", migrationPassword)
conf.set("fs.gs.http.max.retry", "20")
conf.set("fs.gs.http.connect-timeout", "40000")
conf.set("temporaryGcsBucket", s"box-prod-dp-${bigQueryTableId.tenantId.name}-bq-${bigQueryTableId.dataset}-staging")
df.write.format("bigquery").mode(SaveMode.Append).option("proxyAddress", s"http://${proxyConfig.proxyHost}:${proxyConfig.proxyPort}").option("proxyUsername", s"${proxyConfig.proxyUser}").option("proxyPassword", migrationPassword).option("credentials", BaseEncoding.base64.encode(credentials.getBytes(Charsets.UTF_8))).option("parentProject",projectId).option("fs.gs.proxy.address", s"${proxyConfig.proxyHost}:${proxyConfig.proxyPort}").option("fs.gs.proxy.user", s"${proxyConfig.proxyUser}").option("fs.gs.proxy.password", migrationPassword).save("online_sales.online_sales_transaction-compact")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment