Skip to content

Instantly share code, notes, and snippets.

@kurochan
Created July 11, 2021 16:52
Show Gist options
  • Save kurochan/f8f826559a0ed2d33facb0282619358d to your computer and use it in GitHub Desktop.
Save kurochan/f8f826559a0ed2d33facb0282619358d to your computer and use it in GitHub Desktop.
investigation of Snowpark runtime environment
package org.kurochan.snowpark_test
import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import collection.JavaConverters._
object Main {
def createSession(): Session = {
val configs = Map (
"URL" -> "https://xxxxxxx.ap-northeast-1.aws.snowflakecomputing.com:443",
"USER" -> "TEST_USER1",
"PASSWORD" -> "xxxxxxxxxxxxxxxxxxxxxxxxxx",
"ROLE" -> "SYSADMIN",
"WAREHOUSE" -> "COMPUTE_WH",
"DB" -> "SANDBOX",
"SCHEMA" -> "PUBLIC"
)
Session.builder.configs(configs).create
}
def getProps(s: String): String = {
val props = System.getProperties
val names = props.propertyNames().asScala
val propStrings = names.map(n => s"${n.toString}: ${props.getProperty(n.toString)}")
propStrings.foldLeft("")((x, y) => s"${x}, ${y}")
}
def main(args: Array[String]): Unit = {
val session = createSession()
val getPropsUdf = udf(getProps _)
val table = session.range(1)
val res = table.select(getPropsUdf(col("ID"))).collect().head
println(res.toString())
}
}
package org.kurochan.snowpark_test
import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
object Main {
def createSession(): Session = {
val configs = Map (
"URL" -> "https://xxxxxxx.ap-northeast-1.aws.snowflakecomputing.com:443",
"USER" -> "TEST_USER1",
"PASSWORD" -> "xxxxxxxxxxxxxxxxxxxxxxxxxx",
"ROLE" -> "SYSADMIN",
"WAREHOUSE" -> "COMPUTE_WH",
"DB" -> "SANDBOX",
"SCHEMA" -> "PUBLIC"
)
Session.builder.configs(configs).create
}
val randomId: String = scala.util.Random.alphanumeric.take(10).mkString
def getRandomId(s: String): String = {
randomId
}
var counter = 0
def incrementAndGet(s: String): Long = {
counter += 1
counter
}
def getProcessors(s: String): Long = {
Runtime.getRuntime.availableProcessors()
}
def getMemory(s: String): Long = {
Runtime.getRuntime.maxMemory()
}
def getCurrentMemory(s: String): Long = {
Runtime.getRuntime.totalMemory()
}
def main(args: Array[String]): Unit = {
val session = createSession()
val randomIdUdf = udf(getRandomId _)
val incrementAndGetUdf = udf(incrementAndGet _)
val getProcessorsUdf = udf(getProcessors _)
val getMemoryUdf = udf(getMemory _)
val getCurrentMemoryUdf = udf(getCurrentMemory _)
val table = session.table("large_log_table").limit(1000000)
val data = table.select(
randomIdUdf(col("name")).as("static_id"),
incrementAndGetUdf(col("name")).as("counter_value"),
getProcessorsUdf(col("name")).as("processors"),
getMemoryUdf(col("name")).as("memory"),
getCurrentMemoryUdf(col("name")).as("current_memory"),
)
val localResult = data.groupBy(col("static_id"), col("counter_value"))
.agg(
min(col("processors")).as("min_processors"),
max(col("processors")).as("max_processors"),
(min(col("memory"))).as("min_memory"),
(max(col("memory"))).as("max_memory"),
(max(col("current_memory"))).as("current_memory"),
count(lit(1)).as("count"),
)
val clusterResult = localResult.groupBy(col("static_id"))
.agg(
min(col("min_processors")).as("min_processors"),
max(col("max_processors")).as("max_processors"),
(min(col("min_memory")) / 1024 / 1024 / 1024).as("min_memory"),
(max(col("max_memory")) / 1024 / 1024 / 1024).as("max_memory"),
(max(col("current_memory")) / 1024 / 1024 / 1024).as("current_memory"),
max(col("count")).as("conflict_count"),
sum(col("count")).as("exec_count")
)
.sort(col("exec_count").desc)
clusterResult.show(100)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment