Skip to content

Instantly share code, notes, and snippets.

@sjrusso8
Last active May 29, 2024 19:36
Show Gist options
  • Save sjrusso8/5fde3f9ca8ea986cf7311a4c7c19e045 to your computer and use it in GitHub Desktop.
Save sjrusso8/5fde3f9ca8ea986cf7311a4c7c19e045 to your computer and use it in GitHub Desktop.
use spark_connect_rs::{SparkSession, SparkSessionBuilder};
use spark_connect_rs::dataframe::SaveMode;
use spark_connect_rs::functions as F;
use spark_connect_rs::types::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;
let paths = ["/opt/spark/examples/src/main/resources/people.csv"];
let schema = StructType::new(vec![
StructField {
name: "name",
data_type: DataType::String,
nullable: false,
metadata: None,
},
StructField {
name: "age",
data_type: DataType::Integer,
nullable: false,
metadata: None,
},
StructField {
name: "job",
data_type: DataType::String,
nullable: false,
metadata: None,
},
]);
// Load a CSV file from the spark server
let df = spark
.clone()
.read()
.format("csv")
.schema(schema)
.option("header", "True")
.option("delimiter", ";")
.load(paths)?;
// write as a delta table and register it as a table
df.write()
.format("delta")
.mode(SaveMode::Overwrite)
.saveAsTable("default.people_delta")
.await?;
// view the history of the table
spark
.clone()
.sql("DESCRIBE HISTORY default.people_delta")
.await?
.show(Some(1), None, Some(true))
.await?;
// create another dataframe
let df = spark
.clone()
.sql("SELECT 'john' as name, 40 as age, 'engineer' as job")
.await?;
// append to the delta table
df.write()
.format("delta")
.mode(SaveMode::Append)
.saveAsTable("default.people_delta")
.await?;
// view the history of the table
let df = spark
.clone()
.sql("DESCRIBE HISTORY default.people_delta")
.await?;
df.show(Some(1), None, Some(true)).await?;
let df = spark.read().table("default.people_delta", None)?;
let df = df
.select(["job", "age"])
.groupBy(Some("job"))
.agg(F::min("age").alias("min_age"));
df.show(Some(2), None, None).await?;
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment