Skip to content

Instantly share code, notes, and snippets.

@qi-qi
Last active February 15, 2019 08:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save qi-qi/f533b13a8e887a63ec60c71778417ab4 to your computer and use it in GitHub Desktop.
Save qi-qi/f533b13a8e887a63ec60c71778417ab4 to your computer and use it in GitHub Desktop.
java11 and spark2.4
// java 11 single-file source-code spark
// java -cp ".:/Users/qi/Desktop/spark-2.4.0-bin-without-hadoop/jars/*:/Users/qi/Desktop/hadoop-3.2.0/share/etc/hadoop/*:/Users/qi/Desktop/hadoop-3.2.0/share/hadoop/common/lib/*:/Users/qi/Desktop/hadoop-3.2.0/share/hadoop/common/*:/Users/qi/Desktop/hadoop-3.2.0/share/hadoop/hdfs/*:/Users/qi/Desktop/hadoop-3.2.0/share/hadoop/hdfs/lib/*:/Users/qi/Desktop/hadoop-3.2.0/share/hadoop/hdfs/*:/Users/qi/Desktop/hadoop-3.2.0/share/hadoop/yarn/lib/*:/Users/qi/Desktop/hadoop-3.2.0/share/hadoop/yarn/*:/Users/qi/Desktop/hadoop-3.2.0/share/hadoop/mapreduce/lib/*:/Users/qi/Desktop/hadoop-3.2.0/share/hadoop/mapreduce/*:/Users/qi/Desktop/hadoop-3.2.0/share/hadoop/tools/lib/*" Main.java
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class Main {
private static SparkConf conf = new SparkConf()
.setMaster("local[*]")
.setAppName("test")
.set("spark.sql.parquet.int64AsTimestampMillis", "true")
.set("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.profile.ProfileCredentialsProvider");
// private static String s3path1 = "s3a://acast-data-raw-metadata/hosted-show/dt=2019-01-27/2019-01-27_hosted-show.gz";
private static String s3path2 = "s3a://acast-data-raw-metadata/hosted-episode/dt=2019-02-13/2019-02-13_hosted-episode.gz";
// private static String s3path3 = "s3a://acast-data-dev/airflow.cfg";
// private static String s3path4 = "s3a://acast-data-raw/requests-batch-all/2018/12/31/00/";
private static DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
private static UserDefinedFunction toTimestampUdf = udf((String s) -> {
try {
return Timestamp.valueOf(LocalDateTime.parse(s, fmt));
} catch (Exception ex) {
return null;
}
}, DataTypes.TimestampType);
public static void main(String[] args) {
Instant start = Instant.now();
try (var spark = SparkSession.builder().config(conf).getOrCreate()) {
spark.read().json(s3path2)
.select(
col("episode_id"),
col("show_id"),
col("episode_url"),
col("episode_title"),
toTimestampUdf.apply((col("create_date"))).as("create_date"),
toTimestampUdf.apply((col("publish_date"))).as("publish_date"),
toTimestampUdf.apply((col("modified_date"))).as("modified_date"),
toTimestampUdf.apply((col("snapshot_date"))).as("snapshot_date"))
.repartition(1)
.write()
.mode(SaveMode.Overwrite)
.parquet("./result");
}
System.out.println("Time: " + Duration.between(start, Instant.now()));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment