Created
May 9, 2023 14:40
-
-
Save ajantha-bhat/3401f2a29ddfa6b2b42f9168461ce98b to your computer and use it in GitHub Desktop.
Partition stats schema test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void testPartitionStats() throws Exception { | |
// Create 1 million unique timestamps | |
List<Timestamp> timestamps = new ArrayList<>(); | |
for (int i = 0; i < 1000000; i++) { | |
timestamps.add(new Timestamp(System.currentTimeMillis() + i * 1000)); | |
} | |
// Create DataFrame with unique timestamps and random values for other columns | |
List<Row> rows = new ArrayList<>(); | |
for (Timestamp ts : timestamps) { | |
int int1 = (int) (Math.random() * 100); | |
long long1 = (long) (Math.random() * 100); | |
int int2 = (int) (Math.random() * 100); | |
long long2 = (long) (Math.random() * 100); | |
int int3 = (int) (Math.random() * 100); | |
long long3 = (long) (Math.random() * 100); | |
int int4 = (int) (Math.random() * 100); | |
rows.add(RowFactory.create(ts, int1, long1, int2, long2, int3, long3, int4)); | |
} | |
StructType schema = DataTypes.createStructType(new StructField[] { | |
DataTypes.createStructField("timestamp", DataTypes.TimestampType, true), | |
DataTypes.createStructField("int1", DataTypes.IntegerType, true), | |
DataTypes.createStructField("long1", DataTypes.LongType, true), | |
DataTypes.createStructField("int2", DataTypes.IntegerType, true), | |
DataTypes.createStructField("long2", DataTypes.LongType, true), | |
DataTypes.createStructField("int3", DataTypes.IntegerType, true), | |
DataTypes.createStructField("long3", DataTypes.LongType, true), | |
DataTypes.createStructField("int4", DataTypes.IntegerType, true) | |
}); | |
Dataset<Row> data = spark.createDataFrame(rows, schema); | |
// Write data to Parquet file | |
File location = parquetPath.newFolder(); | |
data.orderBy("timestamp").repartition(1).write().mode(SaveMode.Append).option("compression", "snappy").parquet(location.toURI().toString()); | |
// Dataset<Row> rowDataset = spark.read().schema(data.schema()).parquet(location.toURI().toString()); | |
// rowDataset.show(5, false); | |
// Stop Spark session | |
spark.stop(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment