Skip to content

Instantly share code, notes, and snippets.

@skale1990
Last active June 9, 2020 10:54
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save skale1990/26448400c7655543f9e1177dd17763bf to your computer and use it in GitHub Desktop.
Save skale1990/26448400c7655543f9e1177dd17763bf to your computer and use it in GitHub Desktop.
StackOverflow Problem Solving
sepal_length sepal_width petal_length petal_width label
5.1 3.5 1.4 0.2 Iris-setosa
4.9 3.0 1.4 0.2 Iris-setosa
4.7 3.2 1.3 0.2 Iris-setosa
4.6 3.1 1.5 0.2 Iris-setosa
5.0 3.6 1.4 0.2 Iris-setosa
5.4 3.9 1.7 0.4 Iris-setosa
4.6 3.4 1.4 0.3 Iris-setosa
5.0 3.4 1.5 0.2 Iris-setosa
4.4 2.9 1.4 0.2 Iris-setosa
4.9 3.1 1.5 0.1 Iris-setosa
5.4 3.7 1.5 0.2 Iris-setosa
4.8 3.4 1.6 0.2 Iris-setosa
4.8 3.0 1.4 0.1 Iris-setosa
4.3 3.0 1.1 0.1 Iris-setosa
5.8 4.0 1.2 0.2 Iris-setosa
5.7 4.4 1.5 0.4 Iris-setosa
5.4 3.9 1.3 0.4 Iris-setosa
5.1 3.5 1.4 0.3 Iris-setosa
5.7 3.8 1.7 0.3 Iris-setosa
5.1 3.8 1.5 0.3 Iris-setosa
5.4 3.4 1.7 0.2 Iris-setosa
5.1 3.7 1.5 0.4 Iris-setosa
4.6 3.6 1.0 0.2 Iris-setosa
5.1 3.3 1.7 0.5 Iris-setosa
4.8 3.4 1.9 0.2 Iris-setosa
5.0 3.0 1.6 0.2 Iris-setosa
5.0 3.4 1.6 0.4 Iris-setosa
5.2 3.5 1.5 0.2 Iris-setosa
5.2 3.4 1.4 0.2 Iris-setosa
4.7 3.2 1.6 0.2 Iris-setosa
4.8 3.1 1.6 0.2 Iris-setosa
5.4 3.4 1.5 0.4 Iris-setosa
5.2 4.1 1.5 0.1 Iris-setosa
5.5 4.2 1.4 0.2 Iris-setosa
4.9 3.1 1.5 0.1 Iris-setosa
5.0 3.2 1.2 0.2 Iris-setosa
5.5 3.5 1.3 0.2 Iris-setosa
4.9 3.1 1.5 0.1 Iris-setosa
4.4 3.0 1.3 0.2 Iris-setosa
5.1 3.4 1.5 0.2 Iris-setosa
5.0 3.5 1.3 0.3 Iris-setosa
4.5 2.3 1.3 0.3 Iris-setosa
4.4 3.2 1.3 0.2 Iris-setosa
5.0 3.5 1.6 0.6 Iris-setosa
5.1 3.8 1.9 0.4 Iris-setosa
4.8 3.0 1.4 0.3 Iris-setosa
5.1 3.8 1.6 0.2 Iris-setosa
4.6 3.2 1.4 0.2 Iris-setosa
5.3 3.7 1.5 0.2 Iris-setosa
5.0 3.3 1.4 0.2 Iris-setosa
7.0 3.2 4.7 1.4 Iris-versicolor
6.4 3.2 4.5 1.5 Iris-versicolor
6.9 3.1 4.9 1.5 Iris-versicolor
5.5 2.3 4.0 1.3 Iris-versicolor
6.5 2.8 4.6 1.5 Iris-versicolor
5.7 2.8 4.5 1.3 Iris-versicolor
6.3 3.3 4.7 1.6 Iris-versicolor
4.9 2.4 3.3 1.0 Iris-versicolor
6.6 2.9 4.6 1.3 Iris-versicolor
5.2 2.7 3.9 1.4 Iris-versicolor
5.0 2.0 3.5 1.0 Iris-versicolor
5.9 3.0 4.2 1.5 Iris-versicolor
6.0 2.2 4.0 1.0 Iris-versicolor
6.1 2.9 4.7 1.4 Iris-versicolor
5.6 2.9 3.6 1.3 Iris-versicolor
6.7 3.1 4.4 1.4 Iris-versicolor
5.6 3.0 4.5 1.5 Iris-versicolor
5.8 2.7 4.1 1.0 Iris-versicolor
6.2 2.2 4.5 1.5 Iris-versicolor
5.6 2.5 3.9 1.1 Iris-versicolor
5.9 3.2 4.8 1.8 Iris-versicolor
6.1 2.8 4.0 1.3 Iris-versicolor
6.3 2.5 4.9 1.5 Iris-versicolor
6.1 2.8 4.7 1.2 Iris-versicolor
6.4 2.9 4.3 1.3 Iris-versicolor
6.6 3.0 4.4 1.4 Iris-versicolor
6.8 2.8 4.8 1.4 Iris-versicolor
6.7 3.0 5.0 1.7 Iris-versicolor
6.0 2.9 4.5 1.5 Iris-versicolor
5.7 2.6 3.5 1.0 Iris-versicolor
5.5 2.4 3.8 1.1 Iris-versicolor
5.5 2.4 3.7 1.0 Iris-versicolor
5.8 2.7 3.9 1.2 Iris-versicolor
6.0 2.7 5.1 1.6 Iris-versicolor
5.4 3.0 4.5 1.5 Iris-versicolor
6.0 3.4 4.5 1.6 Iris-versicolor
6.7 3.1 4.7 1.5 Iris-versicolor
6.3 2.3 4.4 1.3 Iris-versicolor
5.6 3.0 4.1 1.3 Iris-versicolor
5.5 2.5 4.0 1.3 Iris-versicolor
5.5 2.6 4.4 1.2 Iris-versicolor
6.1 3.0 4.6 1.4 Iris-versicolor
5.8 2.6 4.0 1.2 Iris-versicolor
5.0 2.3 3.3 1.0 Iris-versicolor
5.6 2.7 4.2 1.3 Iris-versicolor
5.7 3.0 4.2 1.2 Iris-versicolor
5.7 2.9 4.2 1.3 Iris-versicolor
6.2 2.9 4.3 1.3 Iris-versicolor
5.1 2.5 3.0 1.1 Iris-versicolor
5.7 2.8 4.1 1.3 Iris-versicolor
6.3 3.3 6.0 2.5 Iris-virginica
5.8 2.7 5.1 1.9 Iris-virginica
7.1 3.0 5.9 2.1 Iris-virginica
6.3 2.9 5.6 1.8 Iris-virginica
6.5 3.0 5.8 2.2 Iris-virginica
7.6 3.0 6.6 2.1 Iris-virginica
4.9 2.5 4.5 1.7 Iris-virginica
7.3 2.9 6.3 1.8 Iris-virginica
6.7 2.5 5.8 1.8 Iris-virginica
7.2 3.6 6.1 2.5 Iris-virginica
6.5 3.2 5.1 2.0 Iris-virginica
6.4 2.7 5.3 1.9 Iris-virginica
6.8 3.0 5.5 2.1 Iris-virginica
5.7 2.5 5.0 2.0 Iris-virginica
5.8 2.8 5.1 2.4 Iris-virginica
6.4 3.2 5.3 2.3 Iris-virginica
6.5 3.0 5.5 1.8 Iris-virginica
7.7 3.8 6.7 2.2 Iris-virginica
7.7 2.6 6.9 2.3 Iris-virginica
6.0 2.2 5.0 1.5 Iris-virginica
6.9 3.2 5.7 2.3 Iris-virginica
5.6 2.8 4.9 2.0 Iris-virginica
7.7 2.8 6.7 2.0 Iris-virginica
6.3 2.7 4.9 1.8 Iris-virginica
6.7 3.3 5.7 2.1 Iris-virginica
7.2 3.2 6.0 1.8 Iris-virginica
6.2 2.8 4.8 1.8 Iris-virginica
6.1 3.0 4.9 1.8 Iris-virginica
6.4 2.8 5.6 2.1 Iris-virginica
7.2 3.0 5.8 1.6 Iris-virginica
7.4 2.8 6.1 1.9 Iris-virginica
7.9 3.8 6.4 2.0 Iris-virginica
6.4 2.8 5.6 2.2 Iris-virginica
6.3 2.8 5.1 1.5 Iris-virginica
6.1 2.6 5.6 1.4 Iris-virginica
7.7 3.0 6.1 2.3 Iris-virginica
6.3 3.4 5.6 2.4 Iris-virginica
6.4 3.1 5.5 1.8 Iris-virginica
6.0 3.0 4.8 1.8 Iris-virginica
6.9 3.1 5.4 2.1 Iris-virginica
6.7 3.1 5.6 2.4 Iris-virginica
6.9 3.1 5.1 2.3 Iris-virginica
5.8 2.7 5.1 1.9 Iris-virginica
6.8 3.2 5.9 2.3 Iris-virginica
6.7 3.3 5.7 2.5 Iris-virginica
6.7 3.0 5.2 2.3 Iris-virginica
6.3 2.5 5.0 1.9 Iris-virginica
6.5 3.0 5.2 2.0 Iris-virginica
6.2 3.4 5.4 2.3 Iris-virginica
5.9 3.0 5.1 1.8 Iris-virginica
package com.som.spark.learning;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkFiles;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.classification.RandomForestClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.api.java.UDF3;
import org.apache.spark.sql.catalyst.ScalaReflection;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.plans.RightOuter;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.StructType;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.apache.spark.ml.linalg.*;
import org.apache.spark.ml.tuning.*;
import org.apache.spark.sql.expressions.Window;
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.*;
import scala.Serializable;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.mutable.Buffer;
import scala.reflect.ClassTag;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static scala.collection.JavaConversions.*;
import static scala.collection.JavaConverters.*;
public class JavaProblemSolverTest implements Serializable {
private static SparkSession spark = SparkSession.builder().master("local[2]")
.appName("TestSuite")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate();
@BeforeClass
public void setupBeforeAllTests() {
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
}
@BeforeMethod
public void nameBefore(Method method) {
System.out.println("\n==========================================================================");
System.out.println("Test name: " + method.getName());
System.out.println("Stack Overflow Link: https://stackoverflow.com/questions/" +
method.getName()
.replaceFirst("test", ""));
System.out.println("===========================================================================\n");
}
<T> Buffer<T> toScalaSeq(List<T> list) {
return JavaConversions.asScalaBuffer(list);
}
// ############################################################################################################
@Test
public void test62015370() {
String data = "id Col_1 Col_2 Col_3 Col_4 Col_5\n" +
"1 A B C D E\n" +
"2 X Y Z P Q";
List<String> list = Arrays.stream(data.split(System.lineSeparator()))
.map(s -> s.replaceAll("\\s+", ","))
.collect(Collectors.toList());
Dataset<Row> df1 = spark.read()
.option("header", true)
.option("sep", ",")
.csv(spark.createDataset(list, Encoders.STRING()));
df1.show();
df1.printSchema();
/**
* +---+-----+-----+-----+-----+-----+
* | id|Col_1|Col_2|Col_3|Col_4|Col_5|
* +---+-----+-----+-----+-----+-----+
* | 1| A| B| C| D| E|
* | 2| X| Y| Z| P| Q|
* +---+-----+-----+-----+-----+-----+
*
* root
* |-- id: string (nullable = true)
* |-- Col_1: string (nullable = true)
* |-- Col_2: string (nullable = true)
* |-- Col_3: string (nullable = true)
* |-- Col_4: string (nullable = true)
* |-- Col_5: string (nullable = true)
*/
String data1 = "id Col_1 Col_2 Col_3 Col_4 Col_5\n" +
"1 A B6 C D E\n" +
"2 X Y Z8 P Q3";
List<String> list1 = Arrays.stream(data1.split(System.lineSeparator()))
.map(s -> s.replaceAll("\\s+", ","))
.collect(Collectors.toList());
Dataset<Row> df2 = spark.read()
.option("sep", ",")
.option("header", true)
.csv(spark.createDataset(list1, Encoders.STRING()));
df2.show();
df2.printSchema();
/**
* +---+-----+-----+-----+-----+-----+
* | id|Col_1|Col_2|Col_3|Col_4|Col_5|
* +---+-----+-----+-----+-----+-----+
* | 1| A| B6| C| D| E|
* | 2| X| Y| Z8| P| Q3|
* +---+-----+-----+-----+-----+-----+
*
* root
* |-- id: string (nullable = true)
* |-- Col_1: string (nullable = true)
* |-- Col_2: string (nullable = true)
* |-- Col_3: string (nullable = true)
* |-- Col_4: string (nullable = true)
* |-- Col_5: string (nullable = true)
*
* */
List<Column> cols = Arrays.stream(df1.columns())
.map(c -> {
if (c.equalsIgnoreCase("id"))
return col("a.id");
else
return array(toScalaSeq(Arrays.asList(col("a."+c), col("b."+c))).toBuffer()).as(c);
}).collect(Collectors.toList());
Dataset<Row> processedDf = df1.as("a").join(df2.as("b"), df1.col("id").equalTo(df2.col("id")))
.select(toScalaSeq(cols).toBuffer());
List<Column> cols1 = Arrays.stream(df1.columns())
.map(f -> {
if (f.equalsIgnoreCase("id"))
return expr(f);
else
return expr("if(size(array_distinct(" + f + "))==1, NULL, " + f + " ) as " + f);
}).collect(Collectors.toList());
processedDf.select(toScalaSeq(cols1).toBuffer())
.show(false);
/**
* +---+-----+-------+-------+-----+-------+
* |id |Col_1|Col_2 |Col_3 |Col_4|Col_5 |
* +---+-----+-------+-------+-----+-------+
* |1 |null |[B, B6]|null |null |null |
* |2 |null |null |[Z, Z8]|null |[Q, Q3]|
* +---+-----+-------+-------+-----+-------+
*/
}
// ############################################################################################################
@Test
public void test62066377() {
String data = "id Col_1 Col_2 Col_3 Col_4 Col_5\n" +
"1 A B C D E\n" +
"2 X Y Z P \"\"";
List<String> list = Arrays.stream(data.split(System.lineSeparator()))
.map(s -> s.replaceAll("\\s+", ","))
.collect(Collectors.toList());
List<StructField> fields = Arrays.stream("id Col_1 Col_2 Col_3 Col_4 Col_5".split("\\s+"))
.map(s -> new StructField(s, DataTypes.StringType, true, Metadata.empty()))
.collect(Collectors.toList());
Dataset<Row> df1 = spark.read()
.schema(new StructType(fields.toArray(new StructField[fields.size()])))
.option("header", true)
.option("sep", ",")
.csv(spark.createDataset(list, Encoders.STRING()));
df1.show();
df1.printSchema();
String data1 = "id Col_1 Col_2 Col_3 Col_4 Col_5\n" +
"1 A B C D E\n" +
"2 X Y Z P F";
List<String> list1 = Arrays.stream(data1.split(System.lineSeparator()))
.map(s -> s.replaceAll("\\s+", ","))
.collect(Collectors.toList());
List<StructField> fields1 = Arrays.stream("id Col_1 Col_2 Col_3 Col_4 Col_5".split("\\s+"))
.map(s -> new StructField(s, DataTypes.StringType, true, Metadata.empty()))
.collect(Collectors.toList());
Dataset<Row> df2 = spark.read()
.schema(new StructType(fields1.toArray(new StructField[fields.size()])))
.option("header", true)
.option("sep", ",")
.csv(spark.createDataset(list1, Encoders.STRING()));
df2.show();
df2.printSchema();
/**
* Why is your output telling your fields are nullable = true if they are not ?
* Why printSchema() doesn't writes "... : string (nullable = false)" if the field are not nullable.
*/
}
// ############################################################################################################
@Test
public void test62091589() {
B instanceB = new B();
instanceB.setA(1);
instanceB.setB(2);
C instanceC = new C();
instanceC.setA(instanceB);
Encoder<? extends C> encoder = Encoders.bean(instanceC.getClass());
Function<C, String> toJson = c -> {
try {
return new ObjectMapper().writeValueAsString(c);
} catch (JsonProcessingException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
};
// Dataset<Row> json = spark.read().json(spark.createDataset(Collections.singletonList(toJson.apply(instanceC)), Encoders.STRING()));
//
// json.as()
// Dataset<C> ds = spark.createDataFrame(Collections.singletonList(RowFactory.create(instanceC)), RowEncoder.apply(encoder.schema()));
// ds.printSchema();
// ds.show(false);
// StructType schema = new StructType().add(
// new StructField("c", ScalaReflection.schemaFor(instanceB).dataType(), true, Metadata.empty())
// );
// Dataset<Row> ds2 = spark.createDataset(Collections.singletonList(instanceC), ExpressionEncoder.apply());
}
// ############################################################################################################
class MyUDF implements UDF1<Long, String> {
private Map<Long, String> broadCastMap;
public MyUDF(Broadcast<Map<Long, String>> broadCastMap) {
this.broadCastMap = broadCastMap.value();
}
public String call(Long id) {
return id +" -> " + broadCastMap.getOrDefault(id, "No mapping");
}
}
@Test
public void test62121715() {
Dataset<Row> inputDf = spark.range(1, 5).withColumn("col1", lit("a"));
inputDf.show(false);
inputDf.printSchema();
/**
* +---+----+
* |id |col1|
* +---+----+
* |1 |a |
* |2 |a |
* |3 |a |
* |4 |a |
* +---+----+
*
* root
* |-- id: long (nullable = false)
* |-- col1: string (nullable = false)
*/
// Create broadcast
Map<Long, String> map = new HashMap<>();
map.put(1L, "b");
map.put(2L, "c");
Broadcast<Map<Long, String>> broadCastMap = new JavaSparkContext(spark.sparkContext()).broadcast(map);
UserDefinedFunction myUdf = udf(new MyUDF(broadCastMap), DataTypes.StringType);
spark.sqlContext().udf().register("myUdf", myUdf);
inputDf.withColumn("new_col", callUDF("myUdf",
JavaConverters.asScalaBufferConverter(Collections.singletonList(col("id"))).asScala()))
.show();
/**
* +---+----+---------------+
* | id|col1| new_col|
* +---+----+---------------+
* | 1| a| 1 -> b|
* | 2| a| 2 -> c|
* | 3| a|3 -> No mapping|
* | 4| a|4 -> No mapping|
* +---+----+---------------+
*/
inputDf.withColumn("new_col", myUdf.apply(col("id")))
.show();
/**
* +---+----+---------------+
* | id|col1| new_col|
* +---+----+---------------+
* | 1| a| 1 -> b|
* | 2| a| 2 -> c|
* | 3| a|3 -> No mapping|
* | 4| a|4 -> No mapping|
* +---+----+---------------+
*/
}
// ############################################################################################################
@Test
public void test62166849() {
List<String> new_lst = new ArrayList<>();
new_lst.add("value_1");
new_lst.add("value_2");
Dataset<Row> df = spark.range(1).withColumn("col_1", lit("A"))
.withColumn("col_2", lit("value_2"))
.withColumn("col_3", lit("C"));
Dataset<Row> df_new = df.withColumn("new_column",functions.when(functions.col("col_1").equalTo("A")
.and(functions.col("col_2").isInCollection(new_lst)), functions.col("col_3"))
.otherwise(functions.col("col_1"))
);
df_new.show(false);
/**
* +---+-----+-------+-----+----------+
* |id |col_1|col_2 |col_3|new_column|
* +---+-----+-------+-----+----------+
* |0 |A |value_2|C |C |
* +---+-----+-------+-----+----------+
*/
df.withColumn("new_column",functions.when(functions.col("col_1").equalTo("A")
.and(functions.col("col_2").isin((Object[]) new_lst.toArray())),functions.col("col_3"))
.otherwise(functions.col("col_1"))).show(false);
/**
* +---+-----+-------+-----+----------+
* |id |col_1|col_2 |col_3|new_column|
* +---+-----+-------+-----+----------+
* |0 |A |value_2|C |C |
* +---+-----+-------+-----+----------+
*/
}
// ############################################################################################################
@Test
public void test62206832() {
String data = " firstname| lastname| age\n" +
" John | Doe | 21\n" +
" John. | Doe. | 21\n" +
" Mary. | William. | 30";
List<String> list1 = Arrays.stream(data.split(System.lineSeparator()))
.map(s -> Arrays.stream(s.split("\\|"))
.map(s1 -> s1.replaceAll("^[ \t]+|[ \t]+$", ""))
.collect(Collectors.joining(","))
)
.collect(Collectors.toList());
Dataset<Row> df2 = spark.read()
.option("header", true)
.option("inferSchema", true)
.option("sep", ",")
.csv(spark.createDataset(list1, Encoders.STRING()));
df2.show(false);
df2.printSchema();
/**
* +---------+--------+---+
* |firstname|lastname|age|
* +---------+--------+---+
* |John |Doe |21 |
* |John. |Doe. |21 |
* |Mary. |William.|30 |
* +---------+--------+---+
*
* root
* |-- firstname: string (nullable = true)
* |-- lastname: string (nullable = true)
* |-- age: integer (nullable = true)
*/
List<Column> allCols = Arrays.stream(df2.columns()).map(functions::col).collect(Collectors.toList());
// using sha2
//. The Wikipedia page gives an estimate of the likelihood of a collision. If you run the numbers,
// you'll see that all harddisks ever produced on Earth can't hold enough 1MB files to get a likelihood of
// a collision of even 0.01% for SHA-256.
//
//Basically, you can simply ignore the possibility.
df2.withColumn("stringId", sha2(concat_ws(":", toScalaSeq(allCols)), 256))
.show(false);
/**
* run-1
* +---------+--------+---+----------------------------------------------------------------+
* |firstname|lastname|age|stringId |
* +---------+--------+---+----------------------------------------------------------------+
* |John |Doe |21 |95903bdd538bc48810c367d0cbe59364e10068fd2511c1a0377015b02157ad30|
* |John. |Doe. |21 |52092b925014246e67cc80ce460db8791981775f7e2f7a9fc02eed620f7e84f9|
* |Mary. |William.|30 |a782aa33b3a94148fe450b3e251d0a526ecbe83a4e6fbf49781a2f62dbaadc88|
* +---------+--------+---+----------------------------------------------------------------+
* run-2
* +---------+--------+---+----------------------------------------------------------------+
* |firstname|lastname|age|stringId |
* +---------+--------+---+----------------------------------------------------------------+
* |John |Doe |21 |95903bdd538bc48810c367d0cbe59364e10068fd2511c1a0377015b02157ad30|
* |John. |Doe. |21 |52092b925014246e67cc80ce460db8791981775f7e2f7a9fc02eed620f7e84f9|
* |Mary. |William.|30 |a782aa33b3a94148fe450b3e251d0a526ecbe83a4e6fbf49781a2f62dbaadc88|
* +---------+--------+---+----------------------------------------------------------------+
* run-3
* +---------+--------+---+----------------------------------------------------------------+
* |firstname|lastname|age|stringId |
* +---------+--------+---+----------------------------------------------------------------+
* |John |Doe |21 |95903bdd538bc48810c367d0cbe59364e10068fd2511c1a0377015b02157ad30|
* |John. |Doe. |21 |52092b925014246e67cc80ce460db8791981775f7e2f7a9fc02eed620f7e84f9|
* |Mary. |William.|30 |a782aa33b3a94148fe450b3e251d0a526ecbe83a4e6fbf49781a2f62dbaadc88|
* +---------+--------+---+----------------------------------------------------------------+
*/
// using row_number order by all cols
// performance will degrade since there is no partitionBy clause while using big dataset
df2.withColumn("number", row_number().over(Window.orderBy(toScalaSeq(allCols))))
.show(false);
/**
* run-1
* +---------+--------+---+------+
* |firstname|lastname|age|number|
* +---------+--------+---+------+
* |John |Doe |21 |1 |
* |John. |Doe. |21 |2 |
* |Mary. |William.|30 |3 |
* +---------+--------+---+------+
* run-2
* +---------+--------+---+------+
* |firstname|lastname|age|number|
* +---------+--------+---+------+
* |John |Doe |21 |1 |
* |John. |Doe. |21 |2 |
* |Mary. |William.|30 |3 |
* +---------+--------+---+------+
* run-3
* +---------+--------+---+------+
* |firstname|lastname|age|number|
* +---------+--------+---+------+
* |John |Doe |21 |1 |
* |John. |Doe. |21 |2 |
* |Mary. |William.|30 |3 |
* +---------+--------+---+------+
*/
// using UUID.nameUUIDFromBytes
UserDefinedFunction id_udf = udf( (String s) ->
UUID.nameUUIDFromBytes(
s.getBytes(StandardCharsets.UTF_8)
).toString(), DataTypes.StringType);
df2.withColumn("stringId", id_udf.apply(concat_ws(":", toScalaSeq(allCols))))
.show(false);
/**
* run-1
* +---------+--------+---+------------------------------------+
* |firstname|lastname|age|stringId |
* +---------+--------+---+------------------------------------+
* |John |Doe |21 |3d319fa5-7a48-3c21-bdb8-f4546a18dffb|
* |John. |Doe. |21 |49ab483f-692d-3e14-aa53-2e35e0cf2a17|
* |Mary. |William.|30 |9b758f70-3723-3623-b262-6d200d6111cf|
* +---------+--------+---+------------------------------------+
* run-2
* +---------+--------+---+------------------------------------+
* |firstname|lastname|age|stringId |
* +---------+--------+---+------------------------------------+
* |John |Doe |21 |3d319fa5-7a48-3c21-bdb8-f4546a18dffb|
* |John. |Doe. |21 |49ab483f-692d-3e14-aa53-2e35e0cf2a17|
* |Mary. |William.|30 |9b758f70-3723-3623-b262-6d200d6111cf|
* +---------+--------+---+------------------------------------+
* run-3
* +---------+--------+---+------------------------------------+
* |firstname|lastname|age|stringId |
* +---------+--------+---+------------------------------------+
* |John |Doe |21 |3d319fa5-7a48-3c21-bdb8-f4546a18dffb|
* |John. |Doe. |21 |49ab483f-692d-3e14-aa53-2e35e0cf2a17|
* |Mary. |William.|30 |9b758f70-3723-3623-b262-6d200d6111cf|
* +---------+--------+---+------------------------------------+
*/
}
// ############################################################################################################
@Test
public void test62221075() {
String data = "customerid| customername| contactname| address| city|postalcode|country\n" +
" 1| Alfreds Futterkiste| Maria Anders| Obere Str. 57| Berlin| 12209|Germany\n" +
" 2|Ana Trujillo Empa...| Ana Trujillo|Avda. de la Const...|M�xico D.F.| 5021| Mexico\n" +
" 3|Antonio Moreno Ta...| Antonio Moreno| Mataderos 2312|M�xico D.F.| 5023| Mexico\n" +
" 4| Around the Horn| Thomas Hardy| 120 Hanover Sq.| London| WA1 1DP| UK\n" +
" 5| Berglunds snabbk�p|Christina Berglund| Berguvsv�gen 8| Lule�| S-958 22| Sweden";
List<String> list1 = Arrays.stream(data.split(System.lineSeparator()))
.map(s -> Arrays.stream(s.split("\\|"))
.map(s1 -> s1.replaceAll("^[ \t]+|[ \t]+$", ""))
.collect(Collectors.joining(","))
)
.collect(Collectors.toList());
Dataset<Row> dataset = spark.read()
.option("header", true)
.option("inferSchema", true)
.option("sep", ",")
.csv(spark.createDataset(list1, Encoders.STRING()));
dataset.show(false);
dataset.printSchema();
dataset.createOrReplaceTempView("customers");
final Dataset<Row> dataset1 = spark.sql("SELECT count(customerid) as count, customerid, country FROM " +
"customers" +
" GROUP BY country, customerid HAVING count > 5 ORDER BY count DESC");
dataset1.show();
}
}
package com.som.spark.learning
import java.lang.reflect.Method
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkFiles
import org.apache.spark.ml.{Pipeline, PipelineModel, Transformer}
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.StructType
import org.testng.annotations.{BeforeClass, BeforeMethod, Test}
import org.apache.spark.ml.linalg.{Matrices, Matrix, SparseVector, Vectors}
import org.apache.spark.ml.param.{Param, ParamMap}
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.catalog.{Database, Table}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{struct, _}
import org.apache.spark.sql.types._
import scala.collection.mutable
class ProblemSolverTestJun2020 extends Serializable {
private val spark: SparkSession = SparkSession.builder().master("local[2]")
.appName("TestSuite")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
import spark.implicits._
@BeforeClass
def setupBeforeAllTests(): Unit = {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
}
@BeforeMethod
def nameBefore(method: Method): Unit = {
println("\n==========================================================================")
println("Test name: " + method.getName)
println(s"Stack Overflow Link: https://stackoverflow.com/questions/${
method.getName
.replaceFirst("test", "")
}")
println("===========================================================================\n")
}
// ############################################################################################################
@Test
def test62274300(): Unit = {
val data =
"""
|Key
|bankInfo.SBI.C_1.Kothrud.Pune.displayInfo
|bankInfo.ICICI.C_2.TilakRoad.Pune.displayInfo
|bankInfo.Axis.C_3.Santacruz.Mumbai.displayInfo
|bankInfo.HDFC.C_4.Deccan.Pune.displayInfo
""".stripMargin
val stringDS1 = data.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df1 = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS1)
df1.show(false)
df1.printSchema()
df1.select($"key", split($"key", "\\.").as("x"))
.withColumn("bankInfo",
expr(
"""
|named_struct('name', element_at(x, 2), 'cust_id', element_at(x, 3),
| 'branch', element_at(x, 4), 'dist', element_at(x, 5)))
""".stripMargin))
.select($"key", concat_ws(".", $"bankInfo.*").as("local_address"))
.show(false)
}
}
package com.som.spark.learning
import java.lang.reflect.Method
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkFiles
import org.apache.spark.ml.{Pipeline, PipelineModel, Transformer}
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.StructType
import org.testng.annotations.{BeforeClass, BeforeMethod, Test}
import org.apache.spark.ml.linalg.{Matrices, Matrix, SparseVector, Vectors}
import org.apache.spark.ml.param.{Param, ParamMap}
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.catalog.{Database, Table}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{struct, _}
import org.apache.spark.sql.types._
import scala.collection.mutable
class ProblemSolverTestMay2020 extends Serializable {
private val spark: SparkSession = SparkSession.builder().master("local[2]")
.appName("TestSuite")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
import spark.implicits._
@BeforeClass
def setupBeforeAllTests(): Unit = {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
}
@BeforeMethod
def nameBefore(method: Method): Unit = {
println("\n==========================================================================")
println("Test name: " + method.getName)
println(s"Stack Overflow Link: https://stackoverflow.com/questions/${
method.getName
.replaceFirst("test", "")
}")
println("===========================================================================\n")
}
// ############################################################################################################
case class Sales(Name: String, Platform: String, Year: Int, Genre: String, Publisher: String,
NA_Sales: Double, EU_Sales: Double, JP_Sales: Double, Other_Sales: Double)
@Test
def test61929924(): Unit = {
import org.apache.spark.sql.catalyst.ScalaReflection
val data =
"""
|Gran Turismo 3: A-Spec;PS2;2001;Racing;Sony Computer Entertainment;6.85;5.09;1.87;1.16
|Call of Duty: Modern Warfare 3;X360;2011;Shooter;Activision;9.03;4.28;0.13;1.32
|Pokemon Yellow: Special Pikachu Edition;GB;1998;Role-Playing;Nintendo;5.89;5.04;3.12;0.59
|Call of Duty: Black Ops;X360;2010;Shooter;Activision;9.67;3.73;0.11;1.13
|Pokemon HeartGold/Pokemon SoulSilver;DS;2009;Action;Nintendo;4.4;2.77;3.96;0.77
|High Heat Major League Baseball 2003;PS2;2002;Sports;3DO;0.18;0.14;0;0.05
|Panzer Dragoon;SAT;1995;Shooter;Sega;0;0;0.37;0
|Corvette;GBA;2003;Racing;TDK Mediactive;0.2;0.07;0;0.01
""".stripMargin
val ds = spark.read
.schema(ScalaReflection.schemaFor[Sales].dataType.asInstanceOf[StructType])
.option("sep", ";")
.csv(data.split("\n").toSeq.toDS())
ds.show(false)
ds.printSchema()
// global sales
val processedDF = ds.withColumn("global_sale", col("NA_Sales") + col("EU_Sales") + col("JP_Sales"))
.groupBy("Genre")
.agg(sum("global_sale").as("global_sale_by_genre"))
println("Lowest selling :: " + processedDF.orderBy(col("global_sale_by_genre").asc).head()
.getValuesMap(Seq("Genre", "global_sale_by_genre")).mkString(", "))
println("Highest selling :: " + processedDF.orderBy(col("global_sale_by_genre").desc).head()
.getValuesMap(Seq("Genre", "global_sale_by_genre")).mkString(", "))
}
// ############################################################################################################
@Test
def test61995414(): Unit = {
val data =
"""Gran Turismo 3: A-Spec;PS2;2001;Racing;Sony Computer Entertainment;6.85;5.09;1.87;1.16
|Call of Duty: Modern Warfare 3;X360;2011;Shooter;Activision;9.03;4.28;0.13;1.32
|Pokemon Yellow: Special Pikachu Edition;GB;1998;Role-Playing;Nintendo;5.89;5.04;3.12;0.59
|Call of Duty: Black Ops;X360;2010;Shooter;Activision;9.67;3.73;0.11;1.13
|Pokemon HeartGold/Pokemon SoulSilver;DS;2009;Action;Nintendo;4.4;2.77;3.96;0.77
|High Heat Major League Baseball 2003;PS2;2002;Sports;3DO;0.18;0.14;0;0.05
|Panzer Dragoon;SAT;1995;Shooter;Sega;0;0;0.37;0
|Corvette;GBA;2003;Racing;TDK Mediactive;0.2;0.07;0;0.01""".stripMargin
val vgdataLines = spark.sparkContext.makeRDD(data.split("\n").toSeq)
val vgdata = vgdataLines.map(_.split(";"))
val GlobalSales = vgdata.map(r => (r(3), r(5).toDouble + r(6).toDouble + r(7).toDouble)).reduceByKey(_ + _)
GlobalSales.foreach(println)
// (Shooter,27.32)
// (Role-Playing,14.05)
// (Sports,0.32)
// (Action,11.129999999999999)
// (Racing,14.079999999999998)
println("### min-max ###")
val minSale = GlobalSales.min()(Ordering.by(_._2))
val maxSale = GlobalSales.max()(Ordering.by(_._2))
println(s"Highest selling Genre: '${maxSale._1}' Global Sale (in millions): '${maxSale._2}'.")
println(s"Lowest selling Genre: '${minSale._1}' Global Sale (in millions): '${minSale._2}'.")
// ### min-max ###
// Highest selling Genre: 'Shooter' Global Sale (in millions): '27.32'.
// Lowest selling Genre: 'Sports' Global Sale (in millions): '0.32'.
}
// ############################################################################################################
case class Data(matrix: Matrix)
@Test
def test61994423(): Unit = {
import org.apache.hadoop.fs.Path
import org.apache.spark.ml.linalg.{Matrices, Matrix}
def save(matrix: Matrix, path: String): Unit = {
val data = Data(matrix)
val df = spark.createDataFrame(Seq(data))
val dataPath = new Path(path, "data").toString
df.repartition(1).write.mode("overwrite").parquet(dataPath)
}
def load(path: String): Matrix = {
val dataPath = new Path(path, "data").toString
val df = spark.read.parquet(dataPath)
val Row(matrix: Matrix) = df.select("matrix").head()
matrix
}
println("### input matrix ###")
val matrixToSave = Matrices.eye(3)
println(matrixToSave)
save(matrixToSave, "/Users/sokale/models/matrix")
val matrixLoaded = load("/Users/sokale/models/matrix")
println("### Loaded matrix ###")
println(matrixLoaded)
// ### input matrix ###
// 1.0 0.0 0.0
// 0.0 1.0 0.0
// 0.0 0.0 1.0
// ### Loaded matrix ###
// 1.0 0.0 0.0
// 0.0 1.0 0.0
// 0.0 0.0 1.0
}
// ############################################################################################################
@Test
def test61993247(): Unit = {
val data1 =
"""
|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country
| 536365| 85123A|WHITE HANGING HEA...| 6|12/1/2010 8:26| 2.55| 17850|United Kingdom
| 536365| 71053| WHITE METAL LANTERN| 6|12/1/2010 8:26| 3.39| 17850|United Kingdom
| 536365| 84406B|CREAM CUPID HEART...| 8|12/1/2010 8:26| 2.75| 17850|United Kingdom
| 536365| 84029G|KNITTED UNION FLA...| 6|12/1/2010 8:26| 3.39| 17850|United Kingdom
| 536365| 84029E|RED WOOLLY HOTTIE...| 6|12/1/2010 8:26| 3.39| 17850|United Kingdom
| 536365| 22752|SET 7 BABUSHKA NE...| -2|12/1/2010 8:26| 7.65| 17850|United Kingdom
""".stripMargin
val stringDS = data1.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.csv(stringDS)
df.show(false)
df.printSchema()
df.filter("Quantity>=0").show(false)
}
// ############################################################################################################
def mlOnIrisData(): (PipelineModel, Pipeline, CrossValidatorModel, DataFrame) = {
// Flag to switch between StringIndexer and CustomStringIndexer
val USE_CUSTOM_STRING_INDEXER = false
val irisDataURL = getClass.getResource("/data/classification/irisData.csv")
val irisDatasetDF = spark.read
.option("inferSchema", "true")
.option("header", "true")
.csv(irisDataURL.getPath)
irisDatasetDF.show(5)
val featurePreprocessingPipeline = new Pipeline()
val cat_columns = Array("sepal_length", "sepal_width", "petal_length", "petal_width")
val cat_columns_idx = cat_columns.map(col => col + "_Idx")
val out_columns = cat_columns.map(col => col + "_Ohe")
val oneHotEncoder = new OneHotEncoderEstimator()
.setDropLast(false)
.setInputCols(cat_columns_idx)
.setOutputCols(out_columns)
val vectorAssembler = new VectorAssembler()
.setInputCols(out_columns)
.setOutputCol("features")
val inputColsOutputCols = cat_columns.zip(cat_columns_idx)
val index_transformers = inputColsOutputCols.map(inputColOutputCol => {
new StringIndexer()
.setInputCol(inputColOutputCol._1)
.setOutputCol(inputColOutputCol._2)
})
featurePreprocessingPipeline.setStages(index_transformers ++ Array(oneHotEncoder) ++ Array(vectorAssembler))
val transformModel = featurePreprocessingPipeline.fit(irisDatasetDF)
var ds_enc = transformModel.transform(irisDatasetDF)
ds_enc = ds_enc.drop(cat_columns_idx: _*).drop(out_columns: _*).drop(cat_columns: _*)
ds_enc.printSchema()
// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.fit(ds_enc)
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4)
.fit(ds_enc)
// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = ds_enc.randomSplit(Array(0.7, 0.3), seed = 123)
// Train a RandomForest model.
val rf = new RandomForestClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("indexedFeatures")
.setNumTrees(10)
val paramGrid = new ParamGridBuilder()
.addGrid(rf.numTrees, Array(10))
.build()
// Convert indexed labels back to original labels. Testing IndexToString
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labels)
// Chain indexers and forest in a Pipeline.
val pipeline = new Pipeline()
.setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEstimatorParamMaps(paramGrid)
.setEvaluator(new MulticlassClassificationEvaluator().setLabelCol("indexedLabel"))
.setNumFolds(3)
// Train model. This also runs the indexers.
val model = cv.fit(trainingData)
// Make predictions.
val predictions = model.transform(testData)
// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)
// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("indexedLabel")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println("Accuracy = " + accuracy)
// val rfModel = model.stages(2).asInstanceOf[RandomForestClassificationModel]
// println("Learned classification forest model:\n" + rfModel.toDebugString)
(transformModel, pipeline, model, predictions)
}
// ############################################################################################################
@Test
def test61981478(): Unit = {
// // Chain indexers and forest in a Pipeline.
// val pipeline = new Pipeline()
// .setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))
val (featurePipelineModel, pipeline, crossValidatorModel, predictions) = mlOnIrisData()
val labelIndexer = pipeline.getStages(0).asInstanceOf[StringIndexerModel]
// in my case, StringIndexerModel is referenced as labelIndexer
val labelToIndex = labelIndexer.labels.zipWithIndex.map(_.swap).toMap
println(labelToIndex)
import org.apache.spark.ml.linalg.Vector
val mapToLabel = udf((vector: Vector) => vector.toArray.zipWithIndex.toMap.map{
case(prob, index) => labelToIndex(index) -> prob
})
predictions.select(
col("features"),
col("probability"),
to_json(mapToLabel(col("probability"))).as("probability_json"),
col("prediction"),
col("predictedLabel"))
.show(5,false)
// +-------------------------------------+------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------+----------+--------------+
// |features |probability |probability_json |prediction|predictedLabel|
// +-------------------------------------+------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------+----------+--------------+
// |(123,[0,37,82,101],[1.0,1.0,1.0,1.0])|[0.7094347002635046,0.174338768115942,0.11622653162055337] |{"Iris-setosa":0.7094347002635046,"Iris-versicolor":0.174338768115942,"Iris-virginica":0.11622653162055337} |0.0 |Iris-setosa |
// |(123,[0,39,58,101],[1.0,1.0,1.0,1.0])|[0.7867074275362319,0.12433876811594202,0.0889538043478261] |{"Iris-setosa":0.7867074275362319,"Iris-versicolor":0.12433876811594202,"Iris-virginica":0.0889538043478261} |0.0 |Iris-setosa |
// |(123,[0,39,62,107],[1.0,1.0,1.0,1.0])|[0.5159492704509036,0.2794443583750028,0.2046063711740936] |{"Iris-setosa":0.5159492704509036,"Iris-versicolor":0.2794443583750028,"Iris-virginica":0.2046063711740936} |0.0 |Iris-setosa |
// |(123,[2,39,58,101],[1.0,1.0,1.0,1.0])|[0.7822379507920459,0.12164981462756994,0.09611223458038423]|{"Iris-setosa":0.7822379507920459,"Iris-versicolor":0.12164981462756994,"Iris-virginica":0.09611223458038423}|0.0 |Iris-setosa |
// |(123,[2,43,62,101],[1.0,1.0,1.0,1.0])|[0.7049652235193186,0.17164981462756992,0.1233849618531115] |{"Iris-setosa":0.7049652235193186,"Iris-versicolor":0.17164981462756992,"Iris-virginica":0.1233849618531115} |0.0 |Iris-setosa |
// +-------------------------------------+------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------+----------+--------------+
// only showing top 5 rows
}
// ############################################################################################################
@Test
def test61896971(): Unit = {
// spark-test-data.json
// --------------------
// {"id":1,"name":"abc1"}
// {"id":2,"name":"abc2"}
// {"id":3,"name":"abc3"}
import scala.io.Source
val path = "spark-test-data.json"
val fileContent = Source.fromFile(getClass.getResource("/" + path).getPath).getLines()
val df = spark.read
.json(fileContent.toSeq.toDS())
df.show(false)
df.printSchema()
spark.sparkContext.addFile(getClass.getResource("/" + path).getPath)
val absolutePathOfFile = SparkFiles.get(path)
println(s"Absolute path added via sparkcontext.addfile(filePath): $absolutePathOfFile")
val df1 = spark.read
.json(absolutePathOfFile)
df1.show(false)
// Absolute path added via sparkcontext.addfile(filePath): /private/var/folders/mg/lcv5jvyd6dx2vtr7zpmg48s80000gn/T
// /spark-77290ab6-21ef-4960-8a11-91812675d759/userFiles-681fa1fd-9f09-4d9a-a083-6f3c24f9ab8e/spark-test-data.json
// +---+----+
// |id |name|
// +---+----+
// |1 |abc1|
// |2 |abc2|
// |3 |abc3|
// +---+----+
}
// ############################################################################################################
@Test
def test61955248(): Unit = {
val data =
"""
|2018-04-07 07:07:17
|2018-04-07 07:32:27
|2018-04-07 08:36:44
|2018-04-07 08:38:00
|2018-04-07 08:39:29
|2018-04-08 01:43:08
|2018-04-08 01:43:55
|2018-04-09 07:52:31
|2018-04-09 07:52:42
|2019-01-24 11:52:31
|2019-01-24 12:52:42
|2019-01-25 12:52:42
""".stripMargin
val df = spark.read
.schema(StructType(Array(StructField("date_time", DataTypes.TimestampType))))
.csv(data.split(System.lineSeparator()).toSeq.toDS())
df.show(false)
df.printSchema()
// Bucketize the data and find the count for each hour
val hour = 60 * 60
// convert the time into unix epoch
val processedDF = df.withColumn("unix_epoch", unix_timestamp(col("date_time")))
.withColumn("hour_bucket", floor(col("unix_epoch")/hour))
.groupBy("hour_bucket")
.count()
processedDF.show(false)
// find hourly average count
processedDF.agg(avg("count")).show(false)
}
// ############################################################################################################
@Test
def test61970371(): Unit = {
val schema = StructType(
"ID|LOAN|COUNT|A1 |A2 |A3 |A4 |B1 |B2 |B3 |B4"
.split("\\|")
.map(f => StructField(f.trim, DataTypes.IntegerType))
)
val data =
"""
| 1| 100| 1| 35| | | |444| | |
| 2| 200| 3| 30| 15| 18| |111|222|333|
| 3| 300| 2| 18| 20| | |555|666| |
| 4| 400| 4| 28| 60| 80| 90|777|888|123|456
| 5| 500| 1| 45| | | |245| | |
""".stripMargin
val df = spark.read
.schema(schema)
.option("sep", "|")
.csv(data.split(System.lineSeparator()).map(_.replaceAll("\\s*", "")).toSeq.toDS())
df.show(false)
df.printSchema()
// +---+----+-----+---+----+----+----+---+----+----+----+
// |ID |LOAN|COUNT|A1 |A2 |A3 |A4 |B1 |B2 |B3 |B4 |
// +---+----+-----+---+----+----+----+---+----+----+----+
// |1 |100 |1 |35 |null|null|null|444|null|null|null|
// |2 |200 |3 |30 |15 |18 |null|111|222 |333 |null|
// |3 |300 |2 |18 |20 |null|null|555|666 |null|null|
// |4 |400 |4 |28 |60 |80 |90 |777|888 |123 |456 |
// |5 |500 |1 |45 |null|null|null|245|null|null|null|
// +---+----+-----+---+----+----+----+---+----+----+----+
// unpivot the table and remove null entry
df.selectExpr(
"ID",
"LOAN",
"stack(4, A1, B1, A2, B2, A3, B3, A4, B4) as (A, B)"
).where("A is not null and B is not null").show(false)
// +---+----+---+---+
// |ID |LOAN|A |B |
// +---+----+---+---+
// |1 |100 |35 |444|
// |2 |200 |30 |111|
// |2 |200 |15 |222|
// |2 |200 |18 |333|
// |3 |300 |18 |555|
// |3 |300 |20 |666|
// |4 |400 |28 |777|
// |4 |400 |60 |888|
// |4 |400 |80 |123|
// |4 |400 |90 |456|
// |5 |500 |45 |245|
// +---+----+---+---+
}
// ############################################################################################################
@Test
def test62016466(): Unit = {
val table_0 = spark.range(1, 5)
.withColumn("Array_0",
array(struct(lit("a").cast(StringType).as("f1"), lit(2).as("f2"))))
.withColumn("Array_1", array(lit(null))) // .cast("array<struct<f1:string, f2:int>>")
table_0.show(false)
table_0.printSchema()
// +---+-------+-------+
// |id |Array_0|Array_1|
// +---+-------+-------+
// |1 |[1, 2] |[] |
// |2 |[1, 2] |[] |
// |3 |[1, 2] |[] |
// |4 |[1, 2] |[] |
// +---+-------+-------+
//
// root
// |-- id: long (nullable = false)
// |-- Array_0: array (nullable = false)
// | |-- element: integer (containsNull = false)
// |-- Array_1: array (nullable = false)
// | |-- element: integer (containsNull = true)
table_0.createOrReplaceTempView("table_0")
spark.sql(
"""
|SELECT exploded_b_values.*, table_0.id
|FROM table_0
| LATERAL VIEW explode(table_0.Array_1) exploded_b_values AS B
""".stripMargin).printSchema()
// root
// |-- B: null (nullable = true)
// |-- id: long (nullable = false)
// +----+---+
// |B |id |
// +----+---+
// |null|1 |
// |null|2 |
// |null|3 |
// |null|4 |
// +----+---+
spark.sql(
"""
|SELECT exploded_a_values.*, table_0.id
|FROM table_0
| LATERAL VIEW explode(table_0.Array_0) exploded_a_values AS A
""".stripMargin).printSchema()
// root
// |-- A: struct (nullable = false)
// | |-- f1: string (nullable = false)
// | |-- f2: integer (nullable = false)
// |-- id: long (nullable = false)
val processed = spark.sql(
"""
|SELECT exploded_a_values.*, table_0.id
|FROM table_0
| LATERAL VIEW explode(table_0.Array_0) exploded_a_values AS A
|UNION
|SELECT exploded_b_values.*, table_0.id
|FROM table_0
| LATERAL VIEW explode(table_0.Array_1) exploded_b_values AS B
""".stripMargin)
processed.show(false)
processed.printSchema()
// +------+---+
// |A |id |
// +------+---+
// |[a, 2]|1 |
// |[a, 2]|2 |
// |[a, 2]|4 |
// |null |2 |
// |null |4 |
// |[a, 2]|3 |
// |null |1 |
// |null |3 |
// +------+---+
//
// root
// |-- A: struct (nullable = true)
// | |-- f1: string (nullable = false)
// | |-- f2: integer (nullable = false)
// |-- id: long (nullable = false)
}
// ############################################################################################################
@Test
def test61961840(): Unit = {
val data =
"""
|{"person":[{"name":"david", "email": "david@gmail.com"}, {"name":"steve", "email": "steve@gmail.com"}]}
""".stripMargin
val df = spark.read
.json(data.split(System.lineSeparator()).toSeq.toDS())
df.show(false)
df.printSchema()
//
// +----------------------------------------------------+
// |person |
// +----------------------------------------------------+
// |[[david@gmail.com, david], [steve@gmail.com, steve]]|
// +----------------------------------------------------+
//
// root
// |-- person: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- email: string (nullable = true)
// | | |-- name: string (nullable = true)
//
// Spark > 2.3
val answer1 = df.withColumn("person_processed",
expr("transform(person, x -> named_struct( 'email', reverse(x.email), 'name', x.name))"))
answer1.show(false)
answer1.printSchema()
//
// +----------------------------------------------------+----------------------------------------------------+
// |person |person_processed |
// +----------------------------------------------------+----------------------------------------------------+
// |[[david@gmail.com, david], [steve@gmail.com, steve]]|[[moc.liamg@divad, david], [moc.liamg@evets, steve]]|
// +----------------------------------------------------+----------------------------------------------------+
//
// root
// |-- person: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- email: string (nullable = true)
// | | |-- name: string (nullable = true)
// |-- person_processed: array (nullable = true)
// | |-- element: struct (containsNull = false)
// | | |-- email: string (nullable = true)
// | | |-- name: string (nullable = true)
//
// spark < 2.3
case class InfoData(name: String, email: String)
val infoDataSchema =
ArrayType(StructType(Array(StructField("name", StringType), StructField("email", StringType))))
val reverseEmailUDF = udf((arr1: mutable.WrappedArray[String], arr2: mutable.WrappedArray[String]) => {
if (arr1.length != arr2.length) null
else arr1.zipWithIndex.map(t => InfoData(t._1, arr2(t._2).reverse))
}, infoDataSchema)
val spark2_3Processed = df
.withColumn("person_processed",
reverseEmailUDF(
col("person.name").cast("array<string>"),
col("person.email").cast("array<string>")
)
)
spark2_3Processed.show(false)
spark2_3Processed.printSchema()
// +----------------------------------------------------+----------------------------------------------------+
// |person |person_processed |
// +----------------------------------------------------+----------------------------------------------------+
// |[[david@gmail.com, david], [steve@gmail.com, steve]]|[[david, moc.liamg@divad], [steve, moc.liamg@evets]]|
// +----------------------------------------------------+----------------------------------------------------+
//
// root
// |-- person: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- email: string (nullable = true)
// | | |-- name: string (nullable = true)
// |-- person_processed: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- name: string (nullable = true)
// | | |-- email: string (nullable = true)
// spark < 2.3
// if you can't create case class
println(df.schema("person").dataType)
val subSchema = df.schema("person").dataType
val reverseEmailUDF_withoutCaseClass = //udf((nameArrayRow: Row, emailArrayRow: Row) => {
udf((nameArray: mutable.WrappedArray[String], emailArray: mutable.WrappedArray[String]) => {
if (nameArray.length != emailArray.length) null
else nameArray.zipWithIndex.map(t => (t._1, emailArray(t._2).reverse))
}, subSchema)
val withoutCaseClasDF = df
.withColumn("person_processed",
reverseEmailUDF_withoutCaseClass(
col("person.name").cast("array<string>"),
col("person.email").cast("array<string>")
)
)
withoutCaseClasDF.show(false)
withoutCaseClasDF.printSchema()
withoutCaseClasDF.select("person_processed.email").show(false)
//
// ArrayType(StructType(StructField(email,StringType,true), StructField(name,StringType,true)),true)
// +----------------------------------------------------+----------------------------------------------------+
// |person |person_processed |
// +----------------------------------------------------+----------------------------------------------------+
// |[[david@gmail.com, david], [steve@gmail.com, steve]]|[[david, moc.liamg@divad], [steve, moc.liamg@evets]]|
// +----------------------------------------------------+----------------------------------------------------+
//
// root
// |-- person: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- email: string (nullable = true)
// | | |-- name: string (nullable = true)
// |-- person_processed: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- email: string (nullable = true)
// | | |-- name: string (nullable = true)
//
// +--------------+
// |email |
// +--------------+
// |[david, steve]|
// +--------------+
//
}
// ############################################################################################################
class FlatMapTransformer(override val uid: String)
extends Transformer {
/**
* Param for input column name.
*
* @group param
*/
final val inputCol = new Param[String](this, "inputCol", "The input column")
final def getInputCol: String = $(inputCol)
/**
* Param for output column name.
*
* @group param
*/
final val outputCol = new Param[String](this, "outputCol", "The output column")
final def getOutputCol: String = $(outputCol)
def setInputCol(value: String): this.type = set(inputCol, value)
def setOutputCol(value: String): this.type = set(outputCol, value)
def this() = this(Identifiable.randomUID("FlatMapTransformer"))
private val flatMap: String => Seq[String] = { input: String =>
input.split(",")
}
def copy(extra: ParamMap): FlatMapTransformer = defaultCopy(extra)
override def transform(dataset: Dataset[_]): DataFrame = {
val flatMapUdf = udf(flatMap, ArrayType(StringType))
dataset.withColumn($(outputCol), flatMapUdf(col($(inputCol))))
}
override def transformSchema(schema: StructType): StructType = {
val dataType = schema($(inputCol)).dataType
require(
dataType.isInstanceOf[StringType],
s"Input column must be of type StringType but got ${dataType}")
val inputFields = schema.fields
require(
!inputFields.exists(_.name == $(outputCol)),
s"Output column ${$(outputCol)} already exists.")
schema.add($(outputCol), ArrayType(StringType))
}
}
@Test
def test62018875(): Unit = {
val train = spark.range(10)
.withColumn("cat", rand())
.withColumn("cat_ohe", lit("a,b"))
val num_features = Array("id")
val cat_ohe_features = Array("cat_ohe")
val cat_features_string_index = Array("cat")
val catIndexer = cat_features_string_index.map {
feature =>
new StringIndexer()
.setInputCol(feature)
.setOutputCol(feature + "_index")
.setHandleInvalid("keep")
}
val flatMapper = cat_ohe_features.map {
feature =>
new FlatMapTransformer()
.setInputCol(feature)
.setOutputCol(feature + "_transformed")
}
val countVectorizer = cat_ohe_features.map {
feature =>
new CountVectorizer()
.setInputCol(feature + "_transformed")
.setOutputCol(feature + "_vectorized")
.setVocabSize(10)
}
// val countVectorizer = cat_ohe_features.map {
// feature =>
//
// val flatMapper = new FlatMapTransformer()
// .setInputCol(feature)
// .setOutputCol(feature + "_transformed")
//
// new CountVectorizer()
// .setInputCol(flatMapper.getOutputCol)
// .setOutputCol(feature + "_vectorized")
// .setVocabSize(10)
// }
val cat_features_index = cat_features_string_index.map {
(feature: String) => feature + "_index"
}
val count_vectorized_index = cat_ohe_features.map {
(feature: String) => feature + "_vectorized"
}
val catFeatureAssembler = new VectorAssembler()
.setInputCols(cat_features_index)
.setOutputCol("cat_features")
val oheFeatureAssembler = new VectorAssembler()
.setInputCols(count_vectorized_index)
.setOutputCol("cat_ohe_features")
val numFeatureAssembler = new VectorAssembler()
.setInputCols(num_features)
.setOutputCol("num_features")
val featureAssembler = new VectorAssembler()
.setInputCols(Array("cat_features", "num_features", "cat_ohe_features"))
.setOutputCol("features")
val pipelineStages = catIndexer ++ flatMapper ++ countVectorizer ++
Array(
catFeatureAssembler,
oheFeatureAssembler,
numFeatureAssembler,
featureAssembler)
val pipeline = new Pipeline().setStages(pipelineStages)
val model = pipeline.fit(dataset = train)
model.transform(train).show(false)
/**
* +---+-------------------+-------+---------+-------------------+-------------------+------------+----------------+------------+-----------------+
* |id |cat |cat_ohe|cat_index|cat_ohe_transformed|cat_ohe_vectorized |cat_features|cat_ohe_features|num_features|features |
* +---+-------------------+-------+---------+-------------------+-------------------+------------+----------------+------------+-----------------+
* |0 |0.5090906225798505 |a,b |2.0 |[a, b] |(2,[0,1],[1.0,1.0])|[2.0] |[1.0,1.0] |[0.0] |[2.0,0.0,1.0,1.0]|
* |1 |0.8019883419510832 |a,b |7.0 |[a, b] |(2,[0,1],[1.0,1.0])|[7.0] |[1.0,1.0] |[1.0] |[7.0,1.0,1.0,1.0]|
* |2 |0.6189101074687529 |a,b |5.0 |[a, b] |(2,[0,1],[1.0,1.0])|[5.0] |[1.0,1.0] |[2.0] |[5.0,2.0,1.0,1.0]|
* |3 |0.1855605832809084 |a,b |6.0 |[a, b] |(2,[0,1],[1.0,1.0])|[6.0] |[1.0,1.0] |[3.0] |[6.0,3.0,1.0,1.0]|
* |4 |0.23381846247134597|a,b |8.0 |[a, b] |(2,[0,1],[1.0,1.0])|[8.0] |[1.0,1.0] |[4.0] |[8.0,4.0,1.0,1.0]|
* |5 |0.47886431990303546|a,b |3.0 |[a, b] |(2,[0,1],[1.0,1.0])|[3.0] |[1.0,1.0] |[5.0] |[3.0,5.0,1.0,1.0]|
* |6 |0.8733308393998128 |a,b |9.0 |[a, b] |(2,[0,1],[1.0,1.0])|[9.0] |[1.0,1.0] |[6.0] |[9.0,6.0,1.0,1.0]|
* |7 |0.8250921802204912 |a,b |1.0 |[a, b] |(2,[0,1],[1.0,1.0])|[1.0] |[1.0,1.0] |[7.0] |[1.0,7.0,1.0,1.0]|
* |8 |0.8698673151005127 |a,b |4.0 |[a, b] |(2,[0,1],[1.0,1.0])|[4.0] |[1.0,1.0] |[8.0] |[4.0,8.0,1.0,1.0]|
* |9 |0.9832602298773477 |a,b |0.0 |[a, b] |(2,[0,1],[1.0,1.0])|[0.0] |[1.0,1.0] |[9.0] |[0.0,9.0,1.0,1.0]|
* +---+-------------------+-------+---------+-------------------+-------------------+------------+----------------+------------+-----------------+
*/
}
// ############################################################################################################
@Test
def test62025380(): Unit = {
val dfInput = spark.range(1).withColumn("Qty", col("id").cast(StringType))
val processDF = dfInput.withColumn("QtyOut",dfInput.col("Qty").cast("decimal(32,9)"))
processDF.show(false)
processDF.printSchema()
processDF.withColumn("NewQtyOut",format_number(processDF.col("QtyOut"),9)).show()
processDF.withColumn("NewQtyOut",format_number(processDF.col("QtyOut"),9)).printSchema()
processDF
.withColumn("isTrue", when(col("QtyOut").equalTo(0), true).otherwise(false))
.show(false)
def bigDecimalFormatter(x: Double, y: Int): Double =
BigDecimal(x).setScale(y, BigDecimal.RoundingMode.HALF_UP).toDouble
val decimalFormatter = udf((decimal: Double, scale: Int) => bigDecimalFormatter(decimal, scale))
processDF.select(decimalFormatter(col("QtyOut"), lit(9)),
decimalFormatter(lit(1.1000000453E4), lit(5)))
.show(false)
/**
* +--------------+--------------------+
* |UDF(QtyOut, 9)|UDF(11000.000453, 5)|
* +--------------+--------------------+
* |0.0 |11000.00045 |
* +--------------+--------------------+
*/
/**
* +---+---+------+
* |id |Qty|QtyOut|
* +---+---+------+
* |0 |0 |0E-9 |
* +---+---+------+
*
* root
* |-- id: long (nullable = false)
* |-- Qty: string (nullable = false)
* |-- QtyOut: decimal(32,9) (nullable = true)
*
* +---+---+------+-----------+
* | id|Qty|QtyOut| NewQtyOut|
* +---+---+------+-----------+
* | 0| 0| 0E-9|0.000000000|
* +---+---+------+-----------+
*
* root
* |-- id: long (nullable = false)
* |-- Qty: string (nullable = false)
* |-- QtyOut: decimal(32,9) (nullable = true)
* |-- NewQtyOut: string (nullable = true)
*
* +---+---+------+------+
* |id |Qty|QtyOut|isTrue|
* +---+---+------+------+
* |0 |0 |0E-9 |true |
* +---+---+------+------+
*/
}
// ############################################################################################################
@Test
def test62045116(): Unit = {
val data =
"""
|2017-04-07 07:07:17
|2017-04-07 07:32:27
|2017-04-07 08:36:44
|2017-04-07 08:38:00
|2017-04-07 08:39:29
|2017-04-07 07:07:17
|2018-04-07 07:32:27
|2018-04-07 08:36:44
|2018-04-07 08:38:00
|2018-04-07 08:39:29
|2018-04-08 01:43:08
|2018-04-08 01:43:55
|2018-04-09 07:52:31
|2018-04-09 07:52:42
|2019-01-24 11:52:31
|2019-01-24 12:52:42
|2019-01-25 12:52:42
""".stripMargin
val df = spark.read
.schema(StructType(Array(StructField("startDate", DataTypes.TimestampType))))
.csv(data.split(System.lineSeparator()).toSeq.toDS())
df.show(false)
df.printSchema()
/**
* +-------------------+
* |startDate |
* +-------------------+
* |2018-04-07 07:07:17|
* |2018-04-07 07:32:27|
* |2018-04-07 08:36:44|
* |2018-04-07 08:38:00|
* |2018-04-07 08:39:29|
* |2018-04-08 01:43:08|
* |2018-04-08 01:43:55|
* |2018-04-09 07:52:31|
* |2018-04-09 07:52:42|
* |2019-01-24 11:52:31|
* |2019-01-24 12:52:42|
* |2019-01-25 12:52:42|
* +-------------------+
*
* root
* |-- startDate: timestamp (nullable = true)
*/
val filterCOl = (currentDate: String) => when(datediff(date_format(lit(currentDate), "yyyy-MM-dd")
,date_format(lit(currentDate), "yyyy-MM-01"))===lit(0),
date_format(col("startDate"), "yyyy-MM") ===
date_format(concat_ws("-",year(lit(currentDate)), month(lit(currentDate)) -1), "yyyy-MM")
).otherwise(to_date(col("startDate"))
.between(date_format(lit(currentDate), "yyyy-MM-01"), lit(currentDate)))
// if current date
var currentDateStr = "2018-04-08"
df.filter(filterCOl(currentDateStr)).show(false)
/**
* +-------------------+
* |startDate |
* +-------------------+
* |2018-04-07 07:07:17|
* |2018-04-07 07:32:27|
* |2018-04-07 08:36:44|
* |2018-04-07 08:38:00|
* |2018-04-07 08:39:29|
* |2018-04-08 01:43:08|
* |2018-04-08 01:43:55|
* +-------------------+
*/
currentDateStr = "2018-05-01"
df.filter(filterCOl(currentDateStr)).show(false)
/**
* +-------------------+
* |startDate |
* +-------------------+
* |2018-04-07 07:07:17|
* |2018-04-07 07:32:27|
* |2018-04-07 08:36:44|
* |2018-04-07 08:38:00|
* |2018-04-07 08:39:29|
* |2018-04-08 01:43:08|
* |2018-04-08 01:43:55|
* |2018-04-09 07:52:31|
* |2018-04-09 07:52:42|
* +-------------------+
*/
}
// ############################################################################################################
@Test
def test62044544(): Unit = {
val data =
"""
|{
| "id": "1",
| "type": "arr1",
| "address": [
| {
| "id": "1",
| "street": "abc",
| "city": "NY",
| "order": "Primary"
| },
| {
| "id": "2",
| "street": "xyz",
| "city": "SA",
| "order": "Secondary"
| },
| {
| "id": "1",
| "street": "abc",
| "city": "NY",
| "order": "Secondary"
| }
| ]
|}
""".stripMargin
val df = spark.read
.option("multiline", true)
.json(Seq(data).toDS())
df.show(false)
df.printSchema()
println(df.schema("address").dataType)
val subSchema = df.schema("address").dataType
val distingAddress =
udf((addressArray: mutable.WrappedArray[Row]) => {
val sortedArray = addressArray.sortWith((current, next) => {
val (cStreet, cCity) = (current.getAs[String]("street"), current.getAs[String]("city"))
val (nStreet, nCity) = (next.getAs[String]("street"), next.getAs[String]("city"))
cStreet == nStreet && cCity == nCity
} )
// var c = 0
// val array = for (i <- sortedArray.indices) yield {
// val (current, next) = (sortedArray(c), sortedArray(i))
// val (cStreet, cCity) = (current.getAs[String]("street"), current.getAs[String]("city"))
// val (nStreet, nCity) = (next.getAs[String]("street"), next.getAs[String]("city"))
// if (cStreet == nStreet && cCity == nCity) {
// current
// } else {
// c = i
// }
// }
//// val set = addressArray.map(row =>
//// (row.getAs[String]("order"),
//// row.getAs[String]("id"),
//// row.getAs[String]("street"),
//// row.getAs[String]("city"))
////// (row.getAs[String]("street"), row.getAs[String]("city")) -> row
//// )
sortedArray
}, subSchema)
val distingAddressDF = df
.withColumn("address_ab",
distingAddress(
col("address")
)
)
distingAddressDF.show(false)
distingAddressDF.printSchema()
}
// ############################################################################################################
@Test
def test62054332(): Unit = {
val data = """[{"A":120.0,"B":"0005236"},{"A":10.0,"B":"0005200"},{"A":12.0,"B":"00042276"},{"A":20.0,"B":"00052000"}]"""
// case-1 if this is input data
val df = spark.read.json(Seq(data).toDS())
df.printSchema()
df.show(false)
/**
* root
* |-- A: double (nullable = true)
* |-- B: string (nullable = true)
*
* +-----+--------+
* |A |B |
* +-----+--------+
* |120.0|0005236 |
* |10.0 |0005200 |
* |12.0 |00042276|
* |20.0 |00052000|
* +-----+--------+
*/
// case-2 if this is one of the column
val df2 = Seq(data).toDF("gtins")
df2.show(false)
df2.printSchema()
/**
* +--------------------------------------------------------------------------------------------------------+
* |gtins |
* +--------------------------------------------------------------------------------------------------------+
* |[{"A":120.0,"B":"0005236"},{"A":10.0,"B":"0005200"},{"A":12.0,"B":"00042276"},{"A":20.0,"B":"00052000"}]|
* +--------------------------------------------------------------------------------------------------------+
*
* root
* |-- gtins: string (nullable = true)
*/
df2.selectExpr("inline_outer(from_json(gtins, 'array<struct<A:double, B:string>>')) as (packQty, gtin)")
.show(false)
/**
* +-------+--------+
* |packQty|gtin |
* +-------+--------+
* |120.0 |0005236 |
* |10.0 |0005200 |
* |12.0 |00042276|
* |20.0 |00052000|
* +-------+--------+
*/
}
// ############################################################################################################
@Test
def test62058267(): Unit = {
val df1 = Seq(
("a", 2, "c"),
("a", 2, "c"),
("a", 2, "c"),
("b", 2, "d"),
("b", 2, "d")
).toDF("col1", "col2", "col3").groupBy("col2").agg(
collect_list("col1").as("col1"),
collect_list("col3").as("col3")
)
df1.show(false)
df1.printSchema()
/**
* +----+---------------+---------------+
* |col2|col1 |col3 |
* +----+---------------+---------------+
* |2 |[a, a, a, b, b]|[c, c, c, d, d]|
* +----+---------------+---------------+
*
* root
* |-- col2: integer (nullable = false)
* |-- col1: array (nullable = true)
* | |-- element: string (containsNull = true)
* |-- col3: array (nullable = true)
* | |-- element: string (containsNull = true)
*/
val transform = (str: String) => expr(s"TRANSFORM($str, x -> concat('$str-', x)) as $str")
val cols = df1.schema.map(f => if (f.dataType.isInstanceOf[ArrayType]) {
transform(f.name)
} else expr(f.name))
df1.select(cols: _*).show(false)
/**
* +----+----------------------------------------+----------------------------------------+
* |col2|col1 |col3 |
* +----+----------------------------------------+----------------------------------------+
* |2 |[col1-a, col1-a, col1-a, col1-b, col1-b]|[col3-c, col3-c, col3-c, col3-d, col3-d]|
* +----+----------------------------------------+----------------------------------------+
*/
}
// ############################################################################################################
@Test
def test62060242(): Unit = {
val df = Seq(
("a", 2, "c"),
("a", 2, "c"),
("a", 2, "c"),
("b", 2, "d"),
("b", 2, "d")
).toDF("col1", "col2", "col3")
df.repartition(5).map((row)=>row.toString())
.write.mode(SaveMode.Append)
.text("/Users/sokale/models/x")
/**
* [a,2,c]
* [b,2,d]
*/
df.repartition(5).select(concat_ws(",", df.columns.map(col): _*))
.write.mode(SaveMode.Append)
.text("/Users/sokale/models/x1")
//for controlA
// df.repartition(5).select(concat_ws("""\"""+"""u001""", df.columns.map(col): _*))
// .write.mode(SaveMode.Append)
// .text("/Users/sokale/models/x2")
//
// df.repartition(5)
// .write
// .mode(SaveMode.Append)
// .option("header", true)
// .option("sep", """\"""+"""u001""")
// .csv("/Users/sokale/models/csv")
/**
* a,2,c
* b,2,d
*/
}
// ############################################################################################################
@Test
def test62054357(): Unit = {
val data = """{"Id":"31279605299","Type":"12121212","client":"Checklist _API","eventTime":"2020-03-17T15:50:30.640Z","eventType":"Event","payload":{"sourceApp":"ios","questionnaire":{"version":"1.0","question":"How to resolve ? ","fb":"Na"}}}"""
val df = Seq(data).toDF("jsonCol")
df.show(false)
df.printSchema()
/**
* +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
* |jsonCol |
* +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
* |{"Id":"31279605299","Type":"12121212","client":"Checklist _API","eventTime":"2020-03-17T15:50:30.640Z","eventType":"Event","payload":{"sourceApp":"ios","questionnaire":{"version":"1.0","question":"How to resolve ? ","fb":"Na"}}} |
* +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
*
* root
* |-- jsonCol: string (nullable = true)
*/
df.select(json_tuple(col("jsonCol"), "Id", "Type", "client", "eventTime", "eventType", "payload"))
.show(false)
/**
* +-----------+--------+--------------+------------------------+-----+----------------------------------------------------------------------------------------------+
* |c0 |c1 |c2 |c3 |c4 |c5 |
* +-----------+--------+--------------+------------------------+-----+----------------------------------------------------------------------------------------------+
* |31279605299|12121212|Checklist _API|2020-03-17T15:50:30.640Z|Event|{"sourceApp":"ios","questionnaire":{"version":"1.0","question":"How to resolve ? ","fb":"Na"}}|
* +-----------+--------+--------------+------------------------+-----+----------------------------------------------------------------------------------------------+
*/
df.select(schema_of_json(data).as("schema")).show(false)
/**
* +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
* |schema |
* +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
* |struct<Id:string,Type:string,client:string,eventTime:string,eventType:string,payload:struct<questionnaire:struct<fb:string,question:string,version:string>,sourceApp:string>>|
* +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
*/
val processed = df.select(
expr("from_json(jsonCol, 'struct<Id:string,Type:string,client:string,eventTime:string, eventType:string," +
"payload:struct<questionnaire:struct<fb:string,question:string,version:string>,sourceApp:string>>')")
.as("json_converted"))
processed.show(false)
processed.printSchema()
//
// +-------------------------------------------------------------------------------------------------------------+
// |json_converted |
// +-------------------------------------------------------------------------------------------------------------+
// |[31279605299, 12121212, Checklist _API, 2020-03-17T15:50:30.640Z, Event, [[Na, How to resolve ? , 1.0], ios]]|
// +-------------------------------------------------------------------------------------------------------------+
//
// root
// |-- json_converted: struct (nullable = true)
// | |-- Id: string (nullable = true)
// | |-- Type: string (nullable = true)
// | |-- client: string (nullable = true)
// | |-- eventTime: string (nullable = true)
// | |-- eventType: string (nullable = true)
// | |-- payload: struct (nullable = true)
// | | |-- questionnaire: struct (nullable = true)
// | | | |-- fb: string (nullable = true)
// | | | |-- question: string (nullable = true)
// | | | |-- version: string (nullable = true)
// | | |-- sourceApp: string (nullable = true)
//
}
// ############################################################################################################
@Test
def test62050145(): Unit = {
val data =
"""
| {
| "student": {
| "name": "kaleem",
| "rollno": "12"
| }
|}
""".stripMargin
val df = spark.read.json(Seq(data).toDS())
df.show(false)
println(df.schema("student"))
/**
* +------------+
* |student |
* +------------+
* |[kaleem, 12]|
* +------------+
*
* StructField(student,StructType(StructField(name,StringType,true), StructField(rollno,StringType,true)),true)
*/
val processedDf = df.withColumn("student",
expr("named_struct('student_details', student)")
)
processedDf.show(false)
println(processedDf.schema("student"))
//
// +--------------+
// |student |
// +--------------+
// |[[kaleem, 12]]|
// +--------------+
//
// StructField(student,StructType(StructField(student_details,StructType(StructField(name,StringType,true), StructField(rollno,StringType,true)),true)),false)
//
}
// ############################################################################################################
@Test
def test62094520(): Unit = {
val mapConcat = udf((map1: Map[String, Int], map2: Map[String, Int]) => {
val finalMap = mutable.Map.empty[String, mutable.ArrayBuffer[Int]]
map1.foreach { case (key: String, value: Int) =>
if (finalMap.contains(key))
finalMap(key) :+ key
else finalMap.put(key, mutable.ArrayBuffer(value))
}
map2.foreach { case (key: String, value: Int) =>
if (finalMap.contains(key))
finalMap(key) :+ key
else finalMap.put(key, mutable.ArrayBuffer(value))
}
finalMap.mapValues(_.max)
})
spark.udf.register("my_map_concat", mapConcat)
spark.range(2).selectExpr("map('a',1,'b',0)","map('a',0,'c',1)",
"my_map_concat(map('a',1,'b',0),map('a',0,'c',1))")
.show(false)
/**
* +----------------+----------------+-------------------------------------+
* |map(a, 1, b, 0) |map(a, 0, c, 1) |UDF(map(a, 1, b, 0), map(a, 0, c, 1))|
* +----------------+----------------+-------------------------------------+
* |[a -> 1, b -> 0]|[a -> 0, c -> 1]|[b -> 0, a -> 1, c -> 1] |
* |[a -> 1, b -> 0]|[a -> 0, c -> 1]|[b -> 0, a -> 1, c -> 1] |
* +----------------+----------------+-------------------------------------+
*/
}
// ############################################################################################################
@Test
def test62108794(): Unit = {
val data=
"""
|{
| "goods": [{
| "brand_id": ["brand1", "brand2", "brand3"],
| "product_id": "product1"
| }]
|}
""".stripMargin
val df = spark.read.json(Seq(data).toDS())
df.show(false)
df.printSchema()
df.createOrReplaceTempView("goodsInfo")
/**
* +--------------------------------------+
* |goods |
* +--------------------------------------+
* |[[[brand1, brand2, brand3], product1]]|
* +--------------------------------------+
*
* root
* |-- goods: array (nullable = true)
* | |-- element: struct (containsNull = true)
* | | |-- brand_id: array (nullable = true)
* | | | |-- element: string (containsNull = true)
* | | |-- product_id: string (nullable = true)
*/
// filter Dataframe by product_id
spark.sql("select * from goodsInfo where array_contains(goods.product_id, 'product1')").show(false)
/**
* +--------------------------------------+
* |goods |
* +--------------------------------------+
* |[[[brand1, brand2, brand3], product1]]|
* +--------------------------------------+
*/
// filter Dataframe by brand_id which is an array within array..
// positive case
spark.sql("select * from goodsInfo where array_contains(flatten(goods.brand_id), 'brand3')")
.show(false)
/**
* +--------------------------------------+
* |goods |
* +--------------------------------------+
* |[[[brand1, brand2, brand3], product1]]|
* +--------------------------------------+
*/
// negative case
spark.sql("select * from goodsInfo where array_contains(flatten(goods.brand_id), 'brand4')")
.show(false)
/**
* +-----+
* |goods|
* +-----+
* +-----+
*/
}
// ############################################################################################################
@Test
def test62107880(): Unit = {
val data =
"""
|id| tim| price|qty|qtyChg
| 1|31951.509| 0.370| 1| 1
| 2|31951.515|145.380|100| 100
| 3|31951.519|149.370|100| 100
| 4|31951.520|144.370|100| 100
| 5|31951.520|149.370|300| 200
| 6|31951.520|119.370| 5| 5
| 7|31951.521|149.370|400| 100
| 8|31951.522|109.370| 50| 50
| 9|31951.522|149.370|410| 10
|10|31951.522|144.370|400| 300
|11|31951.522|149.870| 50| 50
|12|31951.524|149.370|610| 200
|13|31951.526|135.130| 22| 22
|14|31951.527|149.370|750| 140
|15|31951.528| 89.370|100| 100
|16|31951.528|145.870| 50| 50
|17|31951.528|139.370|100| 100
|18|31951.531|149.370|769| 19
|19|31951.531|144.370|410| 10
|20|31951.538|149.370|869| 100
""".stripMargin
val stringDS = data.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.csv(stringDS)
df.show(false)
df.printSchema()
/**
* +---+---------+------+---+------+
* |id |tim |price |qty|qtyChg|
* +---+---------+------+---+------+
* |1 |31951.509|0.37 |1 |1 |
* |2 |31951.515|145.38|100|100 |
* |3 |31951.519|149.37|100|100 |
* |4 |31951.52 |144.37|100|100 |
* |5 |31951.52 |149.37|300|200 |
* |6 |31951.52 |119.37|5 |5 |
* |7 |31951.521|149.37|400|100 |
* |8 |31951.522|109.37|50 |50 |
* |9 |31951.522|149.37|410|10 |
* |10 |31951.522|144.37|400|300 |
* |11 |31951.522|149.87|50 |50 |
* |12 |31951.524|149.37|610|200 |
* |13 |31951.526|135.13|22 |22 |
* |14 |31951.527|149.37|750|140 |
* |15 |31951.528|89.37 |100|100 |
* |16 |31951.528|145.87|50 |50 |
* |17 |31951.528|139.37|100|100 |
* |18 |31951.531|149.37|769|19 |
* |19 |31951.531|144.37|410|10 |
* |20 |31951.538|149.37|869|100 |
* +---+---------+------+---+------+
*
* root
* |-- id: integer (nullable = true)
* |-- tim: double (nullable = true)
* |-- price: double (nullable = true)
* |-- qty: integer (nullable = true)
* |-- qtyChg: integer (nullable = true)
*/
// what is the highest price so far at every moment.
val w = Window.orderBy("tim").rangeBetween(Window.unboundedPreceding, Window.currentRow)
val w1 = Window.orderBy("tim")
val processedDF = df.withColumn("maxPriceQty", max(struct(col("price"), col("qty"))).over(w))
.withColumn("secondMaxPriceQty", lag(col("maxPriceQty"), 1).over(w1))
.withColumn("top1price", col("maxPriceQty.price"))
.withColumn("top1priceQty", col("maxPriceQty.qty"))
.withColumn("top2price", col("secondMaxPriceQty.price"))
.withColumn("top2priceQty", col("secondMaxPriceQty.qty"))
processedDF.show(false)
/**
* +---+---------+------+---+------+-------------+-----------------+---------+------------+---------+------------+
* |id |tim |price |qty|qtyChg|maxPriceQty |secondMaxPriceQty|top1price|top1priceQty|top2price|top2priceQty|
* +---+---------+------+---+------+-------------+-----------------+---------+------------+---------+------------+
* |1 |31951.509|0.37 |1 |1 |[0.37, 1] |null |0.37 |1 |null |null |
* |2 |31951.515|145.38|100|100 |[145.38, 100]|[0.37, 1] |145.38 |100 |0.37 |1 |
* |3 |31951.519|149.37|100|100 |[149.37, 100]|[145.38, 100] |149.37 |100 |145.38 |100 |
* |4 |31951.52 |144.37|100|100 |[149.37, 300]|[149.37, 100] |149.37 |300 |149.37 |100 |
* |5 |31951.52 |149.37|300|200 |[149.37, 300]|[149.37, 300] |149.37 |300 |149.37 |300 |
* |6 |31951.52 |119.37|5 |5 |[149.37, 300]|[149.37, 300] |149.37 |300 |149.37 |300 |
* |7 |31951.521|149.37|400|100 |[149.37, 400]|[149.37, 300] |149.37 |400 |149.37 |300 |
* |8 |31951.522|109.37|50 |50 |[149.87, 50] |[149.37, 400] |149.87 |50 |149.37 |400 |
* |9 |31951.522|149.37|410|10 |[149.87, 50] |[149.87, 50] |149.87 |50 |149.87 |50 |
* |10 |31951.522|144.37|400|300 |[149.87, 50] |[149.87, 50] |149.87 |50 |149.87 |50 |
* |11 |31951.522|149.87|50 |50 |[149.87, 50] |[149.87, 50] |149.87 |50 |149.87 |50 |
* |12 |31951.524|149.37|610|200 |[149.87, 50] |[149.87, 50] |149.87 |50 |149.87 |50 |
* |13 |31951.526|135.13|22 |22 |[149.87, 50] |[149.87, 50] |149.87 |50 |149.87 |50 |
* |14 |31951.527|149.37|750|140 |[149.87, 50] |[149.87, 50] |149.87 |50 |149.87 |50 |
* |15 |31951.528|89.37 |100|100 |[149.87, 50] |[149.87, 50] |149.87 |50 |149.87 |50 |
* |16 |31951.528|145.87|50 |50 |[149.87, 50] |[149.87, 50] |149.87 |50 |149.87 |50 |
* |17 |31951.528|139.37|100|100 |[149.87, 50] |[149.87, 50] |149.87 |50 |149.87 |50 |
* |18 |31951.531|149.37|769|19 |[149.87, 50] |[149.87, 50] |149.87 |50 |149.87 |50 |
* |19 |31951.531|144.37|410|10 |[149.87, 50] |[149.87, 50] |149.87 |50 |149.87 |50 |
* |20 |31951.538|149.37|869|100 |[149.87, 50] |[149.87, 50] |149.87 |50 |149.87 |50 |
* +---+---------+------+---+------+-------------+-----------------+---------+------------+---------+------------+
*/
}
// ############################################################################################################
@Test
def test62104470(): Unit = {
val df = Seq(("2020-05-21","x",1),("2020-05-21","y",2),("2020-05-22","x",3),("2020-05-22","y",4),("2020-05-23","x",
5), ("2020-05-23","y",6)).toDF("dt","A","B")
df.show(false)
df.printSchema()
/**
* +----------+---+---+
* |dt |A |B |
* +----------+---+---+
* |2020-05-21|x |1 |
* |2020-05-21|y |2 |
* |2020-05-22|x |3 |
* |2020-05-22|y |4 |
* |2020-05-23|x |5 |
* |2020-05-23|y |6 |
* +----------+---+---+
*
* root
* |-- dt: string (nullable = true)
* |-- A: string (nullable = true)
* |-- B: integer (nullable = false)
*/
val w = Window.partitionBy("A").orderBy("dt")
df.withColumn("previusB", lag("B", 1, 0).over(w))
.withColumn("sum", col("B") + col("previusB"))
.orderBy("dt")
.show(false)
/**
* +----------+---+---+--------+---+
* |dt |A |B |previusB|sum|
* +----------+---+---+--------+---+
* |2020-05-21|x |1 |null |1 |
* |2020-05-21|y |2 |null |2 |
* |2020-05-22|x |3 |1 |4 |
* |2020-05-22|y |4 |2 |6 |
* |2020-05-23|x |5 |3 |8 |
* |2020-05-23|y |6 |4 |10 |
* +----------+---+---+--------+---+
*/
}
// ############################################################################################################
@Test
def test62130128(): Unit = {
val data =
"""
|1234_4567_DigitalDoc_XRay-01.pdf
|2345_5678_DigitalDoc_CTC-03.png
|1234_5684_DigitalDoc_XRay-05.pdf
|1234_3345_DigitalDoc_XRay-02.pdf
""".stripMargin
val customSchema = StructType(
StructField("catg", StringType, true)
:: StructField("sub_catg", StringType, true)
:: StructField("doc_name", StringType, true)
:: StructField("revision_label", StringType, true)
:: StructField("extension", StringType, true)
:: Nil
)
val df = spark.read.schema(customSchema)
.option("sep", "_")
.csv(data.split(System.lineSeparator()).toSeq.toDS())
df.show(false)
df.printSchema()
/**
* +----+--------+----------+--------------+---------+
* |catg|sub_catg|doc_name |revision_label|extension|
* +----+--------+----------+--------------+---------+
* |1234|4567 |DigitalDoc|XRay-01.pdf |null |
* |2345|5678 |DigitalDoc|CTC-03.png |null |
* |1234|5684 |DigitalDoc|XRay-05.pdf |null |
* |1234|3345 |DigitalDoc|XRay-02.pdf |null |
* +----+--------+----------+--------------+---------+
*
* root
* |-- catg: string (nullable = true)
* |-- sub_catg: string (nullable = true)
* |-- doc_name: string (nullable = true)
* |-- revision_label: string (nullable = true)
* |-- extension: string (nullable = true)
*/
df.withColumn("doc_name", concat_ws("_", col("doc_name"), col("revision_label")))
.withColumn("extension", substring_index(col("revision_label"), ".", -1))
.withColumn("revision_label", regexp_extract(col("revision_label"),"""\d+""", 0))
.show(false)
/**
* +----+--------+----------------------+--------------+---------+
* |catg|sub_catg|doc_name |revision_label|extension|
* +----+--------+----------------------+--------------+---------+
* |1234|4567 |DigitalDoc_XRay-01.pdf|01 |pdf |
* |2345|5678 |DigitalDoc_CTC-03.png |03 |png |
* |1234|5684 |DigitalDoc_XRay-05.pdf|05 |pdf |
* |1234|3345 |DigitalDoc_XRay-02.pdf|02 |pdf |
* +----+--------+----------------------+--------------+---------+
*/
}
// ############################################################################################################
@Test
def test62133517(): Unit = {
val data =
"""
|id | name | disconnect_dt_time
|1 | "a" | 2020-05-10 00:00:00
|2 | "b" | 2020-05-20 00:00:00
""".stripMargin
val stringDS = data.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.csv(stringDS)
df.show(false)
df.printSchema()
/**
* +---+----+-------------------+
* |id |name|disconnect_dt_time |
* +---+----+-------------------+
* |1 |a |2020-05-10 00:00:00|
* |2 |b |2020-05-20 00:00:00|
* +---+----+-------------------+
*
* root
* |-- id: integer (nullable = true)
* |-- name: string (nullable = true)
* |-- disconnect_dt_time: timestamp (nullable = true)
*/
df.createOrReplaceTempView("df1")
val analysisStartDate = "20200515T00:00:00+0000"
val analysisEndDate = "20200530T00:00:00+0000"
val fmt = "yyyyMMdd'T'HH:mm:ssZ"
val processedDF = spark.table("df1")
.filter(col("disconnect_dt_time").cast("timestamp")
.between(to_timestamp(lit(analysisStartDate), fmt) , to_timestamp(lit(analysisEndDate), fmt)) )
processedDF.show(false)
/**
* +---+----+-------------------+
* |id |name|disconnect_dt_time |
* +---+----+-------------------+
* |2 |b |2020-05-20 00:00:00|
* +---+----+-------------------+
*/
}
// ############################################################################################################
@Test
def test62134528(): Unit = {
val data =
"""
|Name1 | Name2
|RR Industries |
|RR Industries | RR Industries
""".stripMargin
val stringDS = data.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.csv(stringDS)
df.show(false)
df.printSchema()
/**
* +-------------+-------------+
* |Name1 |Name2 |
* +-------------+-------------+
* |RR Industries|null |
* |RR Industries|RR Industries|
* +-------------+-------------+
*
* root
* |-- Name1: string (nullable = true)
* |-- Name2: string (nullable = true)
*/
df.withColumn("Name3(Expected)", concat_ws("", df.columns.map(col).map(c => coalesce(c, lit(""))): _*))
.show(false)
/**
* +-------------+-------------+--------------------------+
* |Name1 |Name2 |Name3(Expected) |
* +-------------+-------------+--------------------------+
* |RR Industries|null |RR Industries |
* |RR Industries|RR Industries|RR IndustriesRR Industries|
* +-------------+-------------+--------------------------+
*/
df.withColumn("Name3(Expected)", concat_ws("", df.columns.map(col): _*))
.show(false)
/**
* +-------------+-------------+--------------------------+
* |Name1 |Name2 |Name3(Expected) |
* +-------------+-------------+--------------------------+
* |RR Industries|null |RR Industries |
* |RR Industries|RR Industries|RR IndustriesRR Industries|
* +-------------+-------------+--------------------------+
*/
}
// ############################################################################################################
@Test
def test62119385(): Unit = {
val df = spark.range(1, 5)
df.createOrReplaceTempView("df_view")
println(spark.catalog.currentDatabase)
val db: Database = spark.catalog.getDatabase(spark.catalog.currentDatabase)
val tables: Dataset[Table] = spark.catalog.listTables(db.name)
tables.show(false)
/**
* default
* +-------+--------+-----------+---------+-----------+
* |name |database|description|tableType|isTemporary|
* +-------+--------+-----------+---------+-----------+
* |df_view|null |null |TEMPORARY|true |
* +-------+--------+-----------+---------+-----------+
*/
}
// ############################################################################################################
@Test
def test62147049(): Unit = {
val df = spark.range(1,5)
.withColumn("batch_id", lit(70) + col("id"))
df.show(false)
df.printSchema()
/**
* +---+--------+
* |id |batch_id|
* +---+--------+
* |1 |71 |
* |2 |72 |
* |3 |73 |
* |4 |74 |
* +---+--------+
*
* root
* |-- id: long (nullable = false)
* |-- batch_id: long (nullable = false)
*/
df.write.partitionBy("batch_id")
.mode(SaveMode.Overwrite)
.parquet("/Users/sokale/models/run_1")
/**
* $ cd run_1/
* $ ls -l
* total 0
* ............ _SUCCESS
* ............ batch_id=71
* ............ batch_id=72
* ............ batch_id=73
* ............ batch_id=74
*/
// read only file with batch_id=73
//spark.sql.parquet.filterPushdown true Enables Parquet filter push-down optimization when set to true.
spark.read.parquet("/Users/sokale/models/run_1").where(col("batch_id").equalTo(73))
.show(false)
/**
* +---+--------+
* |id |batch_id|
* +---+--------+
* |3 |73 |
* +---+--------+
*/
// read all partitions
val readDF = spark.read.parquet("/Users/sokale/models/run_1")
readDF.show(false)
readDF.printSchema()
/**
* +---+--------+
* |id |batch_id|
* +---+--------+
* |3 |73 |
* |2 |72 |
* |1 |71 |
* |4 |74 |
* +---+--------+
*
* root
* |-- id: long (nullable = true)
* |-- batch_id: integer (nullable = true)
*/
}
// ############################################################################################################
@Test
def test62144985(): Unit = {
val data =
"""
|a | b | c
|cat | 3-3 | 78-b
|cat | 3-3 | 89-0
|cat | 4-4 | 78-n
|dog | 4-4 | 89-b
""".stripMargin
val stringDS = data.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.csv(stringDS)
df.show(false)
df.printSchema()
/**
* +---+---+----+
* |a |b |c |
* +---+---+----+
* |cat|3-3|78-b|
* |cat|3-3|89-0|
* |cat|4-4|78-n|
* |dog|4-4|89-b|
* +---+---+----+
*
* root
* |-- a: string (nullable = true)
* |-- b: string (nullable = true)
* |-- c: string (nullable = true)
*/
val processedDF = df
.groupBy("a")
.agg(
collect_list(struct(col("b"), col("c"))).as("value"),
collect_list(col("b")).as("key")
)
.withColumn("map", map_from_arrays(col("key"), col("value")))
processedDF.show(false)
processedDF.printSchema()
/**
* +---+---------------------------------------+---------------+------------------------------------------------------------+
* |a |value |key |map |
* +---+---------------------------------------+---------------+------------------------------------------------------------+
* |cat|[[3-3, 78-b], [3-3, 89-0], [4-4, 78-n]]|[3-3, 3-3, 4-4]|[3-3 -> [3-3, 78-b], 3-3 -> [3-3, 89-0], 4-4 -> [4-4, 78-n]]|
* |dog|[[4-4, 89-b]] |[4-4] |[4-4 -> [4-4, 89-b]] |
* +---+---------------------------------------+---------------+------------------------------------------------------------+
*
* root
* |-- a: string (nullable = true)
* |-- value: array (nullable = true)
* | |-- element: struct (containsNull = true)
* | | |-- b: string (nullable = true)
* | | |-- c: string (nullable = true)
* |-- key: array (nullable = true)
* | |-- element: string (containsNull = true)
* |-- map: map (nullable = true)
* | |-- key: string
* | |-- value: struct (valueContainsNull = true)
* | | |-- b: string (nullable = true)
* | | |-- c: string (nullable = true)
*/
processedDF.select(col("a"), to_json(col("map"))).write
.mode(SaveMode.Overwrite)
.partitionBy("a")
.text("/Users/sokale/models/run_2")
/**
* File directory and content of file
* a=cat
* |- {"3-3":{"b":"3-3","c":"78-b"},"3-3":{"b":"3-3","c":"89-0"},"4-4":{"b":"4-4","c":"78-n"}}
* a=dog
* |- {"4-4":{"b":"4-4","c":"89-b"}}
*/
}
// ############################################################################################################
@Test
def test62148704(): Unit = {
val data =
"""
| id | age | dob
| 1 | 24 |
| 2 | 25 |
| 3 | | 1/1/1973
| 4 | | 6/6/1980
| 5 | 46 |
| 6 | | 1/1/1971
""".stripMargin
val stringDS = data.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.csv(stringDS)
df.show(false)
df.printSchema()
/**
* +---+----+--------+
* |id |age |dob |
* +---+----+--------+
* |1 |24 |null |
* |2 |25 |null |
* |3 |null|1/1/1973|
* |4 |null|6/6/1980|
* |5 |46 |null |
* |6 |null|1/1/1971|
* +---+----+--------+
*
* root
* |-- id: integer (nullable = true)
* |-- age: integer (nullable = true)
* |-- dob: string (nullable = true)
*/
df.withColumn("diff",
coalesce(col("age"),
round(months_between(current_date(),to_date(col("dob"), "d/M/yyyy"),true).divide(12),2)
)
).show()
/**
* +---+----+--------+-----+
* | id| age| dob| diff|
* +---+----+--------+-----+
* | 1| 24| null| 24.0|
* | 2| 25| null| 25.0|
* | 3|null|1/1/1973|47.42|
* | 4|null|6/6/1980|39.99|
* | 5| 46| null| 46.0|
* | 6|null|1/1/1971|49.42|
* +---+----+--------+-----+
*/
}
// ############################################################################################################
@Test
def test6214678(): Unit = {
val startTimeStamp = "02-05-2020 01:00"
val endTimeStamp = "03-05-2020 02:00"
spark.range(1).selectExpr(
s"""
|explode(sequence(
| to_timestamp('$startTimeStamp', 'dd-MM-yyyy HH:mm'),
| to_timestamp('$endTimeStamp', 'dd-MM-yyyy HH:mm'),
| interval 1 hour
| )) as generated_timestamp
""".stripMargin
).show(false)
/**
* +-------------------+
* |generated_timestamp|
* +-------------------+
* |2020-05-02 01:00:00|
* |2020-05-02 02:00:00|
* |2020-05-02 03:00:00|
* |2020-05-02 04:00:00|
* |2020-05-02 05:00:00|
* |2020-05-02 06:00:00|
* |2020-05-02 07:00:00|
* |2020-05-02 08:00:00|
* |2020-05-02 09:00:00|
* |2020-05-02 10:00:00|
* |2020-05-02 11:00:00|
* |2020-05-02 12:00:00|
* |2020-05-02 13:00:00|
* |2020-05-02 14:00:00|
* |2020-05-02 15:00:00|
* |2020-05-02 16:00:00|
* |2020-05-02 17:00:00|
* |2020-05-02 18:00:00|
* |2020-05-02 19:00:00|
* |2020-05-02 20:00:00|
* +-------------------+
* only showing top 20 rows
*/
}
// ############################################################################################################
@Test
def test62166453(): Unit = {
val data =
"""
|id | user_id | date | expense
|1 | 1 | 20200521 | 200
|2 | 2 | 20200601 | 100
|3 | 1 | 20200603 | 90
""".stripMargin
val stringDS = data.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.csv(stringDS)
df.show(false)
df.printSchema()
df.createOrReplaceTempView("user_1")
val data1 =
"""
|id | user_id | date | expense
|1 | 3 | 20200521 | 200
|2 | 4 | 20200601 | 100
|3 | 5 | 20200603 | 90
""".stripMargin
val stringDS1 = data1.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df1 = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.csv(stringDS1)
df1.show(false)
df1.printSchema()
df.createOrReplaceTempView("user_1")
df1.createOrReplaceTempView("user_2")
// spark.catalog.createTable()
spark.sql(
"""
|CREATE VIEW users
| AS SELECT * from user_1
| UNION ALL SELECT * from user_2
""".stripMargin).show(false)
spark.sql(
"""
|select user_id, sum(expense) from users
""".stripMargin).show()
}
// ############################################################################################################
@Test
def test62177090(): Unit = {
val data =
"""
| Date|Mode
|2020-05-10| A
|2020-05-10| B
|2020-05-10| A
|2020-05-11| C
|2020-05-11| C
|2020-05-12| B
""".stripMargin
val stringDS = data.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df = spark.read
.option("sep", ",")
// .option("inferSchema", "true")
.option("header", "true")
.csv(stringDS)
df.show(false)
df.printSchema()
val w = Window.partitionBy(to_date(col("Date")))
df.withColumn("set(Mode)",
collect_set("mode").over(w)
).show(false)
/**
* +----------+----+
* |Date |Mode|
* +----------+----+
* |2020-05-10|A |
* |2020-05-10|B |
* |2020-05-10|A |
* |2020-05-11|C |
* |2020-05-11|C |
* |2020-05-12|B |
* +----------+----+
*
* root
* |-- Date: string (nullable = true)
* |-- Mode: string (nullable = true)
*
* +----------+----+---------+
* |Date |Mode|set(Mode)|
* +----------+----+---------+
* |2020-05-10|A |[B, A] |
* |2020-05-10|B |[B, A] |
* |2020-05-10|A |[B, A] |
* |2020-05-11|C |[C] |
* |2020-05-11|C |[C] |
* |2020-05-12|B |[B] |
* +----------+----+---------+
*/
}
// ############################################################################################################
@Test
def test62174098(): Unit = {
/**
* file content
* spark-test-data.json
* --------------------
* {"id":1,"name":"abc1"}
* {"id":2,"name":"abc2"}
* {"id":3,"name":"abc3"}
*/
val fileName = "spark-test-data.json"
val path = getClass.getResource("/" + fileName).getPath
spark.catalog.createTable("df", path, "json")
.show(false)
/**
* +---+----+
* |id |name|
* +---+----+
* |1 |abc1|
* |2 |abc2|
* |3 |abc3|
* +---+----+
*/
// Collect only statistics that do not require scanning the whole table (that is, size in bytes).
spark.sql("ANALYZE TABLE df COMPUTE STATISTICS NOSCAN")
spark.sql("DESCRIBE EXTENDED df ").filter(col("col_name") === "Statistics").show(false)
/**
* +----------+---------+-------+
* |col_name |data_type|comment|
* +----------+---------+-------+
* |Statistics|68 bytes | |
* +----------+---------+-------+
*/
spark.sql("ANALYZE TABLE df COMPUTE STATISTICS")
spark.sql("DESCRIBE EXTENDED df ").filter(col("col_name") === "Statistics").show(false)
/**
* +----------+----------------+-------+
* |col_name |data_type |comment|
* +----------+----------------+-------+
* |Statistics|68 bytes, 3 rows| |
* +----------+----------------+-------+
*/
}
// ############################################################################################################
@Test
def test62187307(): Unit = {
val data =
"""
|emp_id|emp_site |emp_name
|1 |Washigton | Will
|2 |null | null
|3 |New York | Norman
|4 |Iowa | Ian
""".stripMargin
val stringDS = data.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df1 = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS)
df1.show(false)
df1.printSchema()
/**
* +------+---------+--------+
* |emp_id|emp_site |emp_name|
* +------+---------+--------+
* |1 |Washigton|Will |
* |2 |null |null |
* |3 |New York |Norman |
* |4 |Iowa |Ian |
* +------+---------+--------+
*
* root
* |-- emp_id: integer (nullable = true)
* |-- emp_site: string (nullable = true)
* |-- emp_name: string (nullable = true)
*/
val data1 =
"""
|emp_id|emp_site |emp_name
|1 |Washigton | Watson
|2 |Wisconsin | Sam
|3 |New York | null
|4 |Illinois | Ican
|5 |Pennsylvania | Patrick
""".stripMargin
val stringDS1 = data1.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df2 = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS1)
df2.show(false)
df2.printSchema()
/**
* +------+------------+--------+
* |emp_id|emp_site |emp_name|
* +------+------------+--------+
* |1 |Washigton |Watson |
* |2 |Wisconsin |Sam |
* |3 |New York |null |
* |4 |Illinois |Ican |
* |5 |Pennsylvania|Patrick |
* +------+------------+--------+
*
* root
* |-- emp_id: integer (nullable = true)
* |-- emp_site: string (nullable = true)
* |-- emp_name: string (nullable = true)
*/
val joiningKey = "emp_id"
val cols =
df1.columns.filterNot(_.equals(joiningKey)).zip(df2.columns.filterNot(_.equals(joiningKey)))
.map { c =>
val (df1Col, df2Col) = df1.col(c._1) -> df2.col(c._2)
when(df1Col.isNull && df2Col.isNotNull,
array(map(lit("to"), df2Col), map(lit("change"), lit("insert"))))
.when(df1Col.isNotNull && df2Col.isNull,
array(map(lit("from"), df1Col), map(lit("change"), lit("delete"))))
.when(df1Col.isNotNull && df2Col.isNotNull && df1Col === df2Col,
lit(null))
.when(df1Col.isNull && df2Col.isNull,
lit(null))
.otherwise(array(map(lit("from"), df1Col), map(lit("to"), df2Col), map(lit("change"), lit("update"))))
.as(c._1)
}
df1.join(df2, Seq(joiningKey), "outer")
.select(cols ++ Seq(col(colName = joiningKey)): _*)
.orderBy(joiningKey)
.show(false)
//
// +------------------------------------------------------+----------------------------------------------------+------+
// |emp_site |emp_name |emp_id|
// +------------------------------------------------------+----------------------------------------------------+------+
// |null |[[from -> Will], [to -> Watson], [change -> update]]|1 |
// |[[to -> Wisconsin], [change -> insert]] |[[to -> Sam], [change -> insert]] |2 |
// |null |[[from -> Norman], [change -> delete]] |3 |
// |[[from -> Iowa], [to -> Illinois], [change -> update]]|[[from -> Ian], [to -> Ican], [change -> update]] |4 |
// |[[to -> Pennsylvania], [change -> insert]] |[[to -> Patrick], [change -> insert]] |5 |
// +------------------------------------------------------+----------------------------------------------------+------+
//
// in case column is not of type string
val getExpr = (fromExpr: String, toExpr: String, changeExpr: String) =>
s"named_struct('from', $fromExpr, 'to', $toExpr, 'change', '$changeExpr')"
val cols1 =
df1.columns.filterNot(_.equals(joiningKey)).zip(df2.columns.filterNot(_.equals(joiningKey)))
.map { c =>
val (c1, c2) = s"df1.${c._1}" -> s"df2.${c._2}"
when(expr(s"$c1 is null and $c2 is not null"), expr(getExpr("null", c2, "insert")))
.when(expr(s"$c1 is not null and $c2 is null"), expr(getExpr(c1, "null", "delete")))
.when(expr(s"$c1 is not null and $c2 is not null and $c1=$c2"), expr(getExpr("null", "null", "null")))
.when(expr(s"$c1 is null and $c2 is null"), expr(getExpr("null", "null", "null")))
.otherwise(expr(getExpr(c1, c2, "update")))
.as(c._1)
}
val processedDF = df1.as("df1").join(df2.as("df2"), Seq(joiningKey), "outer")
.select(cols1 ++ Seq(col(colName = joiningKey)): _*)
.orderBy(joiningKey)
processedDF.show(false)
processedDF.printSchema()
/**
* +------------------------+----------------------+------+
* |emp_site |emp_name |emp_id|
* +------------------------+----------------------+------+
* |[,, null] |[Will, Watson, update]|1 |
* |[, Wisconsin, insert] |[, Sam, insert] |2 |
* |[,, null] |[Norman,, delete] |3 |
* |[Iowa, Illinois, update]|[Ian, Ican, update] |4 |
* |[, Pennsylvania, insert]|[, Patrick, insert] |5 |
* +------------------------+----------------------+------+
*
* root
* |-- emp_site: struct (nullable = false)
* | |-- from: string (nullable = true)
* | |-- to: string (nullable = true)
* | |-- change: string (nullable = false)
* |-- emp_name: struct (nullable = false)
* | |-- from: string (nullable = true)
* | |-- to: string (nullable = true)
* | |-- change: string (nullable = false)
* |-- emp_id: integer (nullable = true)
*/
}
// ############################################################################################################
@Test
def test62187989(): Unit = {
val df = spark.range(2).withColumn("webhooks",
array(
struct(lit("index1").as("index"), lit("failed_at1").as("failed_at"),
lit("status1").as("status"), lit("updated_at1").as("updated_at")),
struct(lit("index2").as("index"), lit("failed_at2").as("failed_at"),
lit("status2").as("status"), lit("updated_at2").as("updated_at"))
)
)
df.show(false)
df.printSchema()
//
// +---+----------------------------------------------------------------------------------------+
// |id |webhooks |
// +---+----------------------------------------------------------------------------------------+
// |0 |[[index1, failed_at1, status1, updated_at1], [index2, failed_at2, status2, updated_at2]]|
// |1 |[[index1, failed_at1, status1, updated_at1], [index2, failed_at2, status2, updated_at2]]|
// +---+----------------------------------------------------------------------------------------+
//
val filterList: List[String]= List("index1","status1")
val (index, status) = filterList.head -> filterList.last
df.selectExpr( "webhooks",
s"filter(webhooks, x -> array(x.index, x.status)=array('$index', '$status')) as processed")
.show(false)
//
// +----------------------------------------------------------------------------------------+--------------------------------------------+
// |webhooks |processed |
// +----------------------------------------------------------------------------------------+--------------------------------------------+
// |[[index1, failed_at1, status1, updated_at1], [index2, failed_at2, status2, updated_at2]]|[[index1, failed_at1, status1, updated_at1]]|
// |[[index1, failed_at1, status1, updated_at1], [index2, failed_at2, status2, updated_at2]]|[[index1, failed_at1, status1, updated_at1]]|
// +----------------------------------------------------------------------------------------+--------------------------------------------+
//
}
// ############################################################################################################
@Test
def test62188447(): Unit = {
val data1 =
"""
|Class
|A
|AA
|BB
|AAAA
|ABA
|AAAAA
""".stripMargin
val stringDS1 = data1.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df1 = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS1)
df1.show(false)
df1.printSchema()
/**
* +-----+
* |Class|
* +-----+
* |A |
* |AA |
* |BB |
* |AAAA |
* |ABA |
* |AAAAA|
* +-----+
*
* root
* |-- Class: string (nullable = true)
*/
df1.filter(col("Class").rlike("""^A+$"""))
.show(false)
/**
* +-----+
* |Class|
* +-----+
* |A |
* |AA |
* |AAAA |
* |AAAAA|
* +-----+
*/
}
// ############################################################################################################
@Test
def test62188667(): Unit = {
val data =
"""
|[{
| "ItemType": "CONSTRUCTION",
| "ItemId": "9169-bd62eac18e73",
| "Content": {
| "MetadataSetList": [
| {
| "SetId": "privacy-metadata-set",
| "MetadataList": [
| {
| "MetadataValue": "true",
| "MetadataId": "Public"
| }
| ]
| },
| {
| "SetId": "asset-metadata-set",
| "MetadataList": [
| {
| "MetadataValue": "new upload & edit test",
| "MetadataId": "title"
| },
| {
| "MetadataValue": "someone",
| "MetadataId": "uploader"
| },
| {
| "MetadataValue": "One,Five",
| "MetadataId": "Families"
| },
| {
| "MetadataValue": "@xyzzzzz",
| "MetadataId": "creator"
| }
| ]
| }
| ],
| "MetadataType": "UNDER CONSTRUCTION",
| "Tenant": "8ef4-0e976f342606"
| },
| "Version":"1.0",
| "IsActive":false,
| "Status":"DEPRECATED"
|}]
""".stripMargin
val df = spark.read
.option("multiline", true)
.json(Seq(data).toDS())
df.show(false)
df.printSchema()
/**
* +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-----------------+------------+----------+-------+
* |Content |IsActive|ItemId |ItemType |Status |Version|
* +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-----------------+------------+----------+-------+
* |[[[[[Public, true]], privacy-metadata-set], [[[title, new upload & edit test], [uploader, someone], [Families, One,Five], [creator, @xyzzzzz]], asset-metadata-set]], UNDER CONSTRUCTION, 8ef4-0e976f342606]|false |9169-bd62eac18e73|CONSTRUCTION|DEPRECATED|1.0 |
* +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-----------------+------------+----------+-------+
*
* root
* |-- Content: struct (nullable = true)
* | |-- MetadataSetList: array (nullable = true)
* | | |-- element: struct (containsNull = true)
* | | | |-- MetadataList: array (nullable = true)
* | | | | |-- element: struct (containsNull = true)
* | | | | | |-- MetadataId: string (nullable = true)
* | | | | | |-- MetadataValue: string (nullable = true)
* | | | |-- SetId: string (nullable = true)
* | |-- MetadataType: string (nullable = true)
* | |-- Tenant: string (nullable = true)
* |-- IsActive: boolean (nullable = true)
* |-- ItemId: string (nullable = true)
* |-- ItemType: string (nullable = true)
* |-- Status: string (nullable = true)
* |-- Version: string (nullable = true)
*/
val mergeMap = udf((arr: mutable.WrappedArray[Map[String, String]]) => {
val res = mutable.HashMap.empty[String, String]
arr.foldLeft(res){case (map, next) => next.++:(map)(collection.breakOut)}
})
val processedDF = df.select(col("IsActive").as("is_active"),
col("ItemId").as("item_id"),
col("ItemType").as("item_type"),
col("Status").as("status"),
col("Version").as("version"),
col("Content.MetadataType").as("metadata_type"),
col("Content.Tenant").as("tenant"),
col("Content.MetadataSetList").getItem(0).getField("MetadataList").as("content1"),
col("Content.MetadataSetList").getItem(1).getField("MetadataList").as("content2")
).withColumn("content",
array_union(
col("content1"),
col("content2")
)
)
.withColumn("content", expr("TRANSFORM(content, x -> map(x.MetadataId, x.MetadataValue))"))
.withColumn("content", mergeMap(col("content")))
.drop("content1", "content2")
processedDF.show(false)
processedDF.printSchema()
/**
* +---------+-----------------+------------+----------+-------+------------------+-----------------+-----------------------------------------------------------------------------------------------------------------+
* |is_active|item_id |item_type |status |version|metadata_type |tenant |content |
* +---------+-----------------+------------+----------+-------+------------------+-----------------+-----------------------------------------------------------------------------------------------------------------+
* |false |9169-bd62eac18e73|CONSTRUCTION|DEPRECATED|1.0 |UNDER CONSTRUCTION|8ef4-0e976f342606|[Families -> One,Five, Public -> true, creator -> @xyzzzzz, title -> new upload & edit test, uploader -> someone]|
* +---------+-----------------+------------+----------+-------+------------------+-----------------+-----------------------------------------------------------------------------------------------------------------+
*
* root
* |-- is_active: boolean (nullable = true)
* |-- item_id: string (nullable = true)
* |-- item_type: string (nullable = true)
* |-- status: string (nullable = true)
* |-- version: string (nullable = true)
* |-- metadata_type: string (nullable = true)
* |-- tenant: string (nullable = true)
* |-- content: map (nullable = true)
* | |-- key: string
* | |-- value: string (valueContainsNull = true)
*/
processedDF.toJSON
.show(false)
// {
// "is_active": false,
// "item_id": "9169-bd62eac18e73",
// "item_type": "CONSTRUCTION",
// "status": "DEPRECATED",
// "version": "1.0",
// "metadata_type": "UNDER CONSTRUCTION",
// "tenant": "8ef4-0e976f342606",
// "content": {
// "Public": "true",
// "Families": "One,Five",
// "creator": "@xyzzzzz",
// "uploader": "someone",
// "title": "new upload & edit test"
// }
// }
}
// ############################################################################################################
@Test
def test62204815(): Unit = {
val data = """109.169.248.247 - - [12/Dec/2015:18:25:11 +0100] GET /administrator/ HTTP/1.1 200 4263 - Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0 -"""
val df = Seq(data).toDF("header")
df.show(false)
df.printSchema()
val timestamp_pattern= """\[\d{2}\/\w{3}\/\d{4}\:\d{2}\:\d{2}\:\d{2}\s\S+\d{4}]"""
df.select(regexp_extract(col("header"),timestamp_pattern,0).alias("timestamp"))
.show(false)
}
// ############################################################################################################
@Test
def test62210282(): Unit = {
val data1 =
"""
| Timestamp | RowType | Value
| 2020. 6. 5. 8:12 | X | Null
| 2020. 6. 5. 8:13 | Y | Null
| 2020. 6. 5. 8:14 | Y | Null
| 2020. 6. 5. 8:15 | A | SomeValue
| 2020. 6. 5. 8:16 | Y | Null
| 2020. 6. 5. 8:17 | Y | Null
| 2020. 6. 5. 8:18 | X | Null
| 2020. 6. 5. 8:19 | Y | Null
| 2020. 6. 5. 8:20 | Y | Null
| 2020. 6. 6. 8:21 | A | SomeValue2
| 2020. 6. 7. 8:22 | Y | Null
| 2020. 6. 8. 8:23 | Y | Null
| 2020. 6. 9. 8:24 | X | Null
""".stripMargin
val stringDS1 = data1.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df1 = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "Null")
.csv(stringDS1)
df1.show(false)
df1.printSchema()
df1.filter(col("RowType").isInCollection(Seq("X", "A")))
.withColumn("Value", lead("Value", 1).over(Window.orderBy('Timestamp)))
.show(false)
/**
* +----------------+-------+----------+
* |Timestamp |RowType|Value |
* +----------------+-------+----------+
* |2020. 6. 5. 8:12|X |null |
* |2020. 6. 5. 8:13|Y |null |
* |2020. 6. 5. 8:14|Y |null |
* |2020. 6. 5. 8:15|A |SomeValue |
* |2020. 6. 5. 8:16|Y |null |
* |2020. 6. 5. 8:17|Y |null |
* |2020. 6. 5. 8:18|X |null |
* |2020. 6. 5. 8:19|Y |null |
* |2020. 6. 5. 8:20|Y |null |
* |2020. 6. 6. 8:21|A |SomeValue2|
* |2020. 6. 7. 8:22|Y |null |
* |2020. 6. 8. 8:23|Y |null |
* |2020. 6. 9. 8:24|X |null |
* +----------------+-------+----------+
*
* root
* |-- Timestamp: string (nullable = true)
* |-- RowType: string (nullable = true)
* |-- Value: string (nullable = true)
*
* +----------------+-------+----------+----------+
* |Timestamp |RowType|Value |Value1 |
* +----------------+-------+----------+----------+
* |2020. 6. 5. 8:12|X |null |SomeValue |
* |2020. 6. 5. 8:15|A |SomeValue |null |
* |2020. 6. 5. 8:18|X |null |SomeValue2|
* |2020. 6. 6. 8:21|A |SomeValue2|null |
* |2020. 6. 9. 8:24|X |null |null |
* +----------------+-------+----------+----------+
*/
}
// ############################################################################################################
@Test
def test62211108(): Unit = {
val df1=Seq((1,10.0),(1,20.0),(1,40.6),(1,15.6),(1,17.6),(1,25.6),(1,39.6),(2,20.5),
(2,70.3),(2,69.4),(2,74.4),(2,45.4),(3,60.6),(3,80.6),(4,30.6),(4,90.6))toDF("ID","Count")
val idBucketMapping=Seq((1,4),(2,3),(3,2),(4,2))toDF("ID","Bucket")
def doBucketing(bucket_size : Int) =
(1 until bucket_size).scanLeft(0d)((a, _) => a + (1 / bucket_size.toDouble))
var res = df1.withColumn("percentile",
expr(s"percentile_approx(count, array(${doBucketing(3).mkString(", ")})) over(partition by ID)"))
println(doBucketing(2))
res.show(false)
/**
* +---+-----+------------------+
* |ID |Count|percentile |
* +---+-----+------------------+
* |2 |20.5 |[20.5, 45.4, 70.3]|
* |2 |70.3 |[20.5, 45.4, 70.3]|
* |2 |69.4 |[20.5, 45.4, 70.3]|
* |2 |74.4 |[20.5, 45.4, 70.3]|
* |2 |45.4 |[20.5, 45.4, 70.3]|
* |4 |30.6 |[30.6, 30.6, 90.6]|
* |4 |90.6 |[30.6, 30.6, 90.6]|
* |1 |10.0 |[10.0, 17.6, 25.6]|
* |1 |20.0 |[10.0, 17.6, 25.6]|
* |1 |40.6 |[10.0, 17.6, 25.6]|
* |1 |15.6 |[10.0, 17.6, 25.6]|
* |1 |17.6 |[10.0, 17.6, 25.6]|
* |1 |25.6 |[10.0, 17.6, 25.6]|
* |1 |39.6 |[10.0, 17.6, 25.6]|
* |3 |60.6 |[60.6, 60.6, 80.6]|
* |3 |80.6 |[60.6, 60.6, 80.6]|
* +---+-----+------------------+
*/
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
val getPercentage = udf((bucket_size: Int) => doBucketing(bucket_size))
// spark.udf.register("get_percentage_array", getPercentage)
val processedDF = df1.join(idBucketMapping, "ID")
.withColumn("percentage", getPercentage(col("Bucket")))
// .withColumn("percentile", expr(s"TRANSFORM(percentage," +
// s" x -> percentile_approx(count, 0.5, ${ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY}))"))
.withColumn("percentile",
expr(s"percentile_approx(count, percentage, ${ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY})" +
s" over(partition by ID)"))
processedDF.show(false)
processedDF.printSchema()
}
// ############################################################################################################
@Test
def test62212504(): Unit = {
val data =
"""
|{
| "group": "1",
| "name": "badboi",
| "rank": "3",
| "fellows": [
| {
| "name": "David",
| "age": "25",
| "hobby": "code"
| },
| {
| "name": "John",
| "age": "27",
| "hobby": "tennis"
| },
| {
| "name": "Anata",
| "age": "23",
| "hobby": "dance"
| }
| ]
|}
""".stripMargin
val df = spark.read.option("multiLine", "true").json(Seq(data).toDS())
df.show(false)
df.printSchema()
/**
* +-----------------------------------------------------------+-----+------+----+
* |fellows |group|name |rank|
* +-----------------------------------------------------------+-----+------+----+
* |[[25, code, David], [27, tennis, John], [23, dance, Anata]]|1 |badboi|3 |
* +-----------------------------------------------------------+-----+------+----+
*
* root
* |-- fellows: array (nullable = true)
* | |-- element: struct (containsNull = true)
* | | |-- age: string (nullable = true)
* | | |-- hobby: string (nullable = true)
* | | |-- name: string (nullable = true)
* |-- group: string (nullable = true)
* |-- name: string (nullable = true)
* |-- rank: string (nullable = true)
*/
val processedDF = df.withColumn("fellows",
expr("TRANSFORM(fellows, x -> named_struct('ID', md5(x.name), 'NAME', x.name, 'AGE', x.age, 'HOBBY', x.hobby))"))
processedDF.show(false)
processedDF.printSchema()
/**
* +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+----+
* |fellows |group|name |rank|
* +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+----+
* |[[464e07afc9e46359fb480839150595c5, David, 25, code], [61409aa1fd47d4a5332de23cbf59a36f, John, 27, tennis], [540356fa1779480b07d0743763c78159, Anata, 23, dance]]|1 |badboi|3 |
* +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+----+
*
* root
* |-- fellows: array (nullable = true)
* | |-- element: struct (containsNull = false)
* | | |-- ID: string (nullable = true)
* | | |-- NAME: string (nullable = true)
* | | |-- AGE: string (nullable = true)
* | | |-- HOBBY: string (nullable = true)
* |-- group: string (nullable = true)
* |-- name: string (nullable = true)
* |-- rank: string (nullable = true)
*/
processedDF.toJSON.show(false)
// {
// "fellows": [{
// "ID": "464e07afc9e46359fb480839150595c5",
// "NAME": "David",
// "AGE": "25",
// "HOBBY": "code"
// }, {
// "ID": "61409aa1fd47d4a5332de23cbf59a36f",
// "NAME": "John",
// "AGE": "27",
// "HOBBY": "tennis"
// }, {
// "ID": "540356fa1779480b07d0743763c78159",
// "NAME": "Anata",
// "AGE": "23",
// "HOBBY": "dance"
// }],
// "group": "1",
// "name": "badboi",
// "rank": "3"
// }
}
// ############################################################################################################
@Test
def test62222257(): Unit = {
spark.range(3).createOrReplaceTempView("df1")
println(spark.catalog.listTables().map(_.name).collect()
.map(table => table -> spark.table(table).count()).mkString(", "))
/**
* (df1,3)
*/
println(spark.catalog.listTables(spark.catalog.currentDatabase).map(_.name).collect()
.map(table => table -> spark.table(table).count()).mkString(", "))
/**
* (df1,3)
*/
}
// ############################################################################################################
@Test
def test62222717(): Unit = {
val data =
"""
|dept_id|user_id|entry_date
| 3| 1|2020-06-03
| 3| 2|2020-06-03
| 3| 3|2020-06-03
| 3| 4|2020-06-03
| 3| 1|2020-06-04
| 3| 1|2020-06-05
""".stripMargin
val stringDS1 = data.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df1 = spark.read
.option("sep", ",")
// .option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS1)
df1.show(false)
df1.printSchema()
/**
* +-------+-------+----------+
* |dept_id|user_id|entry_date|
* +-------+-------+----------+
* |3 |1 |2020-06-03|
* |3 |2 |2020-06-03|
* |3 |3 |2020-06-03|
* |3 |4 |2020-06-03|
* |3 |1 |2020-06-04|
* |3 |1 |2020-06-05|
* +-------+-------+----------+
*
* root
* |-- dept_id: string (nullable = true)
* |-- user_id: string (nullable = true)
* |-- entry_date: string (nullable = true)
*/
val w = Window.partitionBy("dept_id", "user_id")
val latestRec = when(datediff(max(to_date($"entry_date")).over(w), to_date($"entry_date")) =!= lit(0), 0)
.otherwise(1)
df1.withColumn("latest_rec", latestRec)
.orderBy("dept_id", "user_id", "entry_date")
.show(false)
/**
* +-------+-------+----------+----------+
* |dept_id|user_id|entry_date|latest_rec|
* +-------+-------+----------+----------+
* |3 |1 |2020-06-03|0 |
* |3 |1 |2020-06-04|0 |
* |3 |1 |2020-06-05|1 |
* |3 |2 |2020-06-03|1 |
* |3 |3 |2020-06-03|1 |
* |3 |4 |2020-06-03|1 |
* +-------+-------+----------+----------+
*/
}
// ############################################################################################################
@Test
def test62224198(): Unit = {
val df = spark.range(1).withColumn("memberurn", lit("urn:li:member:10000012"))
df.withColumn("member_id",
expr("""CAST(regexp_extract(memberurn, 'urn:li:member:(\\d+)', 1) AS BIGINT)"""))
.show(false)
/**
* +---+----------------------+---------+
* |id |memberurn |member_id|
* +---+----------------------+---------+
* |0 |urn:li:member:10000012|10000012 |
* +---+----------------------+---------+
*/
df.withColumn("member_id",
substring_index($"memberurn", ":", -1).cast("bigint"))
.show(false)
/**
* +---+----------------------+---------+
* |id |memberurn |member_id|
* +---+----------------------+---------+
* |0 |urn:li:member:10000012|10000012 |
* +---+----------------------+---------+
*/
}
// ############################################################################################################
@Test
def test62228733(): Unit = {
/**
* test/File1.json
* -----
* {
* "Value": 123
* }
*/
/**
* test/File2.json
* ---------
* {
* "Value": {
* "Value": "On",
* "ValueType": "State",
* "IsSystemValue": true
* }
* }
*/
val path = getClass.getResource("/test" ).getPath
val df = spark.read
.option("multiLine", true)
.json(path)
df.show(false)
df.printSchema()
/**
* +-------------------------------------------------------+
* |Value |
* +-------------------------------------------------------+
* |{"Value":"On","ValueType":"State","IsSystemValue":true}|
* |123 |
* +-------------------------------------------------------+
*
* root
* |-- Value: string (nullable = true)
*/
df.withColumn("File", substring_index(input_file_name(),"/", -1))
.withColumn("ValueType", get_json_object(col("Value"), "$.ValueType"))
.withColumn("IsSystemValue", get_json_object(col("Value"), "$.IsSystemValue"))
.withColumn("Value", coalesce(get_json_object(col("Value"), "$.Value"), col("Value")))
.show(false)
/**
* +-----+----------+---------+-------------+
* |Value|File |ValueType|IsSystemValue|
* +-----+----------+---------+-------------+
* |On |File2.json|State |true |
* |123 |File1.json|null |null |
* +-----+----------+---------+-------------+
*/
}
// ############################################################################################################
@Test
def test62229725(): Unit = {
val df = spark.range(1)
.withColumn("Description", lit("{0} is the 4th biggest"))
.withColumn("States", lit("Andhra Pradesh"))
df.show(false)
df.printSchema()
/**
* +---+----------------------+--------------+
* |id |Description |States |
* +---+----------------------+--------------+
* |0 |{0} is the 4th biggest|Andhra Pradesh|
* +---+----------------------+--------------+
*
* root
* |-- id: long (nullable = false)
* |-- Description: string (nullable = false)
* |-- States: string (nullable = false)
*/
val replace1 = udf((s: String, replace: String) => java.text.MessageFormat.format(s, replace))
df.withColumn("Description", replace1($"Description", $"States"))
.show(false)
/**
* +---+---------------------------------+--------------+
* |id |Description |States |
* +---+---------------------------------+--------------+
* |0 |Andhra Pradesh is the 4th biggest|Andhra Pradesh|
* +---+---------------------------------+--------------+
*/
df.withColumn("Description", expr("case when States is null then Description else replace(Description, '{0}', States) end"))
.show(false)
/**
* +---+---------------------------------+--------------+
* |id |Description |States |
* +---+---------------------------------+--------------+
* |0 |Andhra Pradesh is the 4th biggest|Andhra Pradesh|
* +---+---------------------------------+--------------+
*/
}
// ############################################################################################################
@Test
def test62239923(): Unit = {
val data =
"""
|id | id1 | seq_nbr | id2 |orig_id1 | orig_id2
|1 | 740 | 2 | 217 | 740 | 217
|1 | 740 | 3 | 216 | 740 | 216
|1 | 740 | 4 | 216 | 740 | 216
|1 | 740 | 5 | 217 | 740 | 217
|1 | 367 | 1 | 217 | 740 | 217
""".stripMargin
val stringDS1 = data.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df1 = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS1)
df1.show(false)
df1.printSchema()
/**
* +---+---+-------+---+--------+--------+
* |id |id1|seq_nbr|id2|orig_id1|orig_id2|
* +---+---+-------+---+--------+--------+
* |1 |740|2 |217|740 |217 |
* |1 |740|3 |216|740 |216 |
* |1 |740|4 |216|740 |216 |
* |1 |740|5 |217|740 |217 |
* |1 |367|1 |217|740 |217 |
* +---+---+-------+---+--------+--------+
*
* root
* |-- id: integer (nullable = true)
* |-- id1: integer (nullable = true)
* |-- seq_nbr: integer (nullable = true)
* |-- id2: integer (nullable = true)
* |-- orig_id1: integer (nullable = true)
* |-- orig_id2: integer (nullable = true)
*/
val win = Window.partitionBy("orig_id1", "orig_id2")
df1.withColumn("orig_seq_nbr",
min(when($"orig_id1" === $"id1" && $"orig_id2" === $"id2", $"seq_nbr").otherwise(null))
.over(win)
).show()
/**
* +---+---+-------+---+--------+--------+------------+
* | id|id1|seq_nbr|id2|orig_id1|orig_id2|orig_seq_nbr|
* +---+---+-------+---+--------+--------+------------+
* | 1|740| 3|216| 740| 216| 3|
* | 1|740| 4|216| 740| 216| 3|
* | 1|740| 2|217| 740| 217| 2|
* | 1|740| 5|217| 740| 217| 2|
* | 1|367| 1|217| 740| 217| 2|
* +---+---+-------+---+--------+--------+------------+
*/
df1.withColumn("orig_seq_nbr",
expr("min(case when orig_id1=id1 and orig_id2=id2 then seq_nbr else NULL end) " +
"OVER (PARTITION BY orig_id1, orig_id2) ")
).show()
/**
* +---+---+-------+---+--------+--------+------------+
* | id|id1|seq_nbr|id2|orig_id1|orig_id2|orig_seq_nbr|
* +---+---+-------+---+--------+--------+------------+
* | 1|740| 3|216| 740| 216| 3|
* | 1|740| 4|216| 740| 216| 3|
* | 1|740| 2|217| 740| 217| 2|
* | 1|740| 5|217| 740| 217| 2|
* | 1|367| 1|217| 740| 217| 2|
* +---+---+-------+---+--------+--------+------------+
*/
}
// ############################################################################################################
@Test
def test62240275(): Unit = {
val data =
"""
|date|value
| 1|19.75
| 2|15.51
| 3|20.66
""".stripMargin
val stringDS1 = data.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df1 = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS1)
df1.show(false)
df1.printSchema()
/**
* +----+-----+
* |date|value|
* +----+-----+
* |1 |19.75|
* |2 |15.51|
* |3 |20.66|
* +----+-----+
*
* root
* |-- date: integer (nullable = true)
* |-- value: double (nullable = true)
*/
df1.selectExpr(
s"element_at(array('None','Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec'), date) as date",
"value"
).show(false)
/**
* +----+-----+
* |date|value|
* +----+-----+
* |None|19.75|
* |Jan |15.51|
* |Feb |20.66|
* +----+-----+
*/
}
// ############################################################################################################
trait R {
def run
}
object A extends R {
def run() = println("Class A")
}
object B extends R {
def run() = println("Class B")
}
object C extends R {
def run() = println("class C")
}
@Test
def test62246488(): Unit = {
def runUtil(r: Seq[R] = Seq(A, B, C) ) = r.foreach(_.run)
println("Run-1")
runUtil(Seq(A))
println("Run-2")
runUtil()
/**
* Run-1
* Class A
* Run-2
* Class A
* Class B
* class C
*/
}
// ############################################################################################################
@Test
def test62264106(): Unit = {
val table = spark.sql("select name, age from values ('bob', 1), ('sam', 2), ('bob', 1) T(name,age)")
table.show(false)
table.printSchema()
/**
* +----+---+
* |name|age|
* +----+---+
* |bob |1 |
* |sam |2 |
* |bob |1 |
* +----+---+
*
* root
* |-- name: string (nullable = false)
* |-- age: integer (nullable = false)
*/
val rowArray = table.select($"name", $"age").collect()
val nameList = rowArray.map(_(0)).toList.distinct
val ageList = rowArray.map(_(1)).toList.distinct
println(nameList.mkString(", "))
println(ageList.mkString(", "))
/**
* bob, sam
* 1, 2
*/
val row = table.select(
collect_set($"name").as("name"),
collect_set($"age").as("age")
).head
val nameSet = row.getAs[Seq[String]]("name")
val ageSet = row.getAs[Seq[Int]]("age")
println(nameSet.mkString(", "))
println(ageSet.mkString(", "))
/**
* bob, sam
* 1, 2
*/
}
// ############################################################################################################
@Test
def test62262873(): Unit = {
val workers: Dataset[Worker] = Seq(
Worker("Bob", id = 1, skills = Array("communication", "teamwork")),
Worker("Sam", id = 2, skills = Array("self-motivation"))
).toDS
import scala.reflect.runtime.universe._
def doWork[T : TypeTag](persons: Dataset[T]): Unit = {
typeOf[T] match {
case t if t =:= typeOf[Worker] => println("I'm worker")
persons.as[Worker].filter(_.id == 2).show(false)
case t if t =:= typeOf[Customer] => println("I'm Customer")
persons.as[Customer].filter(_.name.contains("B")).show(false)
}
}
doWork(workers)
/**
* I'm worker
* +----+---+-----------------+
* |name|id |skills |
* +----+---+-----------------+
* |Sam |2 |[self-motivation]|
* +----+---+-----------------+
*/
}
// ############################################################################################################
@Test
def test62272099(): Unit = {
val df = spark.range(1).withColumn("date",
explode(sequence(to_date(lit("2020-06-09")), to_date(lit("2020-06-20")), expr("interval 1 day")))
).withColumn("year", year($"date"))
.withColumn("month", month($"date"))
.withColumn("day", dayofmonth($"date"))
df.show(false)
df.printSchema()
/**
* +---+----------+----+-----+---+
* |id |date |year|month|day|
* +---+----------+----+-----+---+
* |0 |2020-06-09|2020|6 |9 |
* |0 |2020-06-10|2020|6 |10 |
* |0 |2020-06-11|2020|6 |11 |
* |0 |2020-06-12|2020|6 |12 |
* |0 |2020-06-13|2020|6 |13 |
* |0 |2020-06-14|2020|6 |14 |
* |0 |2020-06-15|2020|6 |15 |
* |0 |2020-06-16|2020|6 |16 |
* |0 |2020-06-17|2020|6 |17 |
* |0 |2020-06-18|2020|6 |18 |
* |0 |2020-06-19|2020|6 |19 |
* |0 |2020-06-20|2020|6 |20 |
* +---+----------+----+-----+---+
*
* root
* |-- id: long (nullable = false)
* |-- date: date (nullable = false)
* |-- year: integer (nullable = false)
* |-- month: integer (nullable = false)
* |-- day: integer (nullable = false)
*/
df.repartition(2).write.partitionBy("year", "month", "day")
.option("header", true)
.mode(SaveMode.Overwrite)
.csv("/Users/sokale/models/hive_table")
/**
* File structure
* ---------------
* year=2020
* year=2020/month=6
* year=2020/month=6/day=10
* |- part...csv files (same part files for all the below directories)
* year=2020/month=6/day=11
* year=2020/month=6/day=12
* year=2020/month=6/day=13
* year=2020/month=6/day=14
* year=2020/month=6/day=15
* year=2020/month=6/day=16
* year=2020/month=6/day=17
* year=2020/month=6/day=18
* year=2020/month=6/day=19
* year=2020/month=6/day=20
* year=2020/month=6/day=9
*/
val csvDF = spark.read.option("header", true)
.csv("/Users/sokale/models/hive_table")
// val readDF = spark.catalog.createTable("my_table", "csv", csvDF.schema, Map.empty[String, String])
// spark.catalog.recoverPartitions("my_table")
// val readDF = spark.sql(
// """
// |CREATE EXTERNAL TABLE my_table (date1 String)
// | PARTITIONED BY (year INT, month INT, day INT)
// | ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
// | STORED AS TEXTFILE LOCATION '/Users/sokale/models/hive_table'
// | TBLPROPERTIES ('skip.header.line.count' = '1')
// """.stripMargin)
csvDF.show(false)
csvDF.printSchema()
/**
* +---+----------+----+-----+---+
* |id |date |year|month|day|
* +---+----------+----+-----+---+
* |0 |2020-06-20|2020|6 |20 |
* |0 |2020-06-19|2020|6 |19 |
* |0 |2020-06-09|2020|6 |9 |
* |0 |2020-06-12|2020|6 |12 |
* |0 |2020-06-10|2020|6 |10 |
* |0 |2020-06-15|2020|6 |15 |
* |0 |2020-06-16|2020|6 |16 |
* |0 |2020-06-17|2020|6 |17 |
* |0 |2020-06-13|2020|6 |13 |
* |0 |2020-06-18|2020|6 |18 |
* |0 |2020-06-14|2020|6 |14 |
* |0 |2020-06-11|2020|6 |11 |
* +---+----------+----+-----+---+
*
* root
* |-- id: string (nullable = true)
* |-- date: string (nullable = true)
* |-- year: integer (nullable = true)
* |-- month: integer (nullable = true)
* |-- day: integer (nullable = true)
*/
}
}
sealed trait Person {
def name: String
}
final case class Customer(override val name: String, email: String) extends Person
final case class Worker(override val name: String, id: Int, skills: Array[String]) extends Person
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment