Last active
May 29, 2024 19:36
-
-
Save sjrusso8/5fde3f9ca8ea986cf7311a4c7c19e045 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use spark_connect_rs::{SparkSession, SparkSessionBuilder}; | |
use spark_connect_rs::dataframe::SaveMode; | |
use spark_connect_rs::functions as F; | |
use spark_connect_rs::types::*; | |
#[tokio::main] | |
async fn main() -> Result<(), Box<dyn std::error::Error>> { | |
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/") | |
.build() | |
.await?; | |
let paths = ["/opt/spark/examples/src/main/resources/people.csv"]; | |
let schema = StructType::new(vec![ | |
StructField { | |
name: "name", | |
data_type: DataType::String, | |
nullable: false, | |
metadata: None, | |
}, | |
StructField { | |
name: "age", | |
data_type: DataType::Integer, | |
nullable: false, | |
metadata: None, | |
}, | |
StructField { | |
name: "job", | |
data_type: DataType::String, | |
nullable: false, | |
metadata: None, | |
}, | |
]); | |
// Load a CSV file from the spark server | |
let df = spark | |
.clone() | |
.read() | |
.format("csv") | |
.schema(schema) | |
.option("header", "True") | |
.option("delimiter", ";") | |
.load(paths)?; | |
// write as a delta table and register it as a table | |
df.write() | |
.format("delta") | |
.mode(SaveMode::Overwrite) | |
.saveAsTable("default.people_delta") | |
.await?; | |
// view the history of the table | |
spark | |
.clone() | |
.sql("DESCRIBE HISTORY default.people_delta") | |
.await? | |
.show(Some(1), None, Some(true)) | |
.await?; | |
// create another dataframe | |
let df = spark | |
.clone() | |
.sql("SELECT 'john' as name, 40 as age, 'engineer' as job") | |
.await?; | |
// append to the delta table | |
df.write() | |
.format("delta") | |
.mode(SaveMode::Append) | |
.saveAsTable("default.people_delta") | |
.await?; | |
// view the history of the table | |
let df = spark | |
.clone() | |
.sql("DESCRIBE HISTORY default.people_delta") | |
.await?; | |
df.show(Some(1), None, Some(true)).await?; | |
let df = spark.read().table("default.people_delta", None)?; | |
let df = df | |
.select(["job", "age"]) | |
.groupBy(Some("job")) | |
.agg(F::min("age").alias("min_age")); | |
df.show(Some(2), None, None).await?; | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment