|
# Here's the output from running the create_tables.scala in a spark 2.1.1 shell with the LA timezone (not very interesting, just here for completeness) |
|
|
|
> bin/spark-shell --conf "spark.driver.extraJavaOptions=-Duser.timezone=America/Los_Angeles" |
|
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties |
|
Setting default log level to "WARN". |
|
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). |
|
17/05/26 09:41:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable |
|
17/05/26 09:41:13 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException |
|
Spark context Web UI available at http://172.30.32.36:4040 |
|
Spark context available as 'sc' (master = local[*], app id = local-1495816869835). |
|
Spark session available as 'spark'. |
|
Welcome to |
|
____ __ |
|
/ __/__ ___ _____/ /__ |
|
_\ \/ _ \/ _ `/ __/ '_/ |
|
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1 |
|
/_/ |
|
|
|
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_65) |
|
Type in expressions to have them evaluated. |
|
Type :help for more information. |
|
|
|
scala> import org.apache.spark.sql.Row |
|
import org.apache.spark.sql.Row |
|
|
|
scala> import org.apache.spark.sql.types._ |
|
import org.apache.spark.sql.types._ |
|
|
|
scala> |
|
|
|
scala> java.util.TimeZone.getDefault().getID() |
|
res0: String = America/Los_Angeles |
|
|
|
scala> |
|
|
|
scala> val tblPrefix = "la" |
|
tblPrefix: String = la |
|
|
|
scala> val schema = new StructType().add("ts", TimestampType) |
|
schema: org.apache.spark.sql.types.StructType = StructType(StructField(ts,TimestampType,true)) |
|
|
|
scala> val rows = sc.parallelize(Seq( |
|
| "2015-12-31 23:50:59.123", |
|
| "2015-12-31 22:49:59.123", |
|
| "2016-01-01 00:39:59.123", |
|
| "2016-01-01 01:29:59.123" |
|
| ).map { x => Row(java.sql.Timestamp.valueOf(x)) }) |
|
rows: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[0] at parallelize at <console>:28 |
|
|
|
scala> val rawData = spark.createDataFrame(rows, schema).toDF() |
|
rawData: org.apache.spark.sql.DataFrame = [ts: timestamp] |
|
|
|
scala> |
|
|
|
scala> rawData.show(truncate=false) |
|
+-----------------------+ |
|
|ts | |
|
+-----------------------+ |
|
|2015-12-31 23:50:59.123| |
|
|2015-12-31 22:49:59.123| |
|
|2016-01-01 00:39:59.123| |
|
|2016-01-01 01:29:59.123| |
|
+-----------------------+ |
|
|
|
|
|
scala> |
|
|
|
scala> Seq("parquet", "textfile").foreach { format => |
|
| val tblName = s"${tblPrefix}_$format" |
|
| spark.sql(s"DROP TABLE IF EXISTS $tblName") |
|
| spark.sql( |
|
| raw"""CREATE TABLE $tblName ( |
|
| | ts timestamp |
|
| | ) |
|
| | STORED AS $format |
|
| """.stripMargin) |
|
| rawData.write.insertInto(tblName) |
|
| } |
|
17/05/26 09:41:34 WARN HiveMetaStore: Location: file:/Users/irashid/spark-releases/spark-2.1.1-bin-hadoop2.7/spark-warehouse/la_parquet specified for non-external table:la_parquet |
|
17/05/26 09:41:34 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,000 bytes) of heap memory |
|
Scaling row group sizes to 96.54% for 7 writers |
|
17/05/26 09:41:34 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,000 bytes) of heap memory |
|
Scaling row group sizes to 84.47% for 8 writers |
|
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". |
|
SLF4J: Defaulting to no-operation (NOP) logger implementation |
|
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. |
|
17/05/26 09:41:34 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,000 bytes) of heap memory |
|
Scaling row group sizes to 96.54% for 7 writers |
|
17/05/26 09:41:35 WARN HiveMetaStore: Location: file:/Users/irashid/spark-releases/spark-2.1.1-bin-hadoop2.7/spark-warehouse/la_textfile specified for non-external table:la_textfile |
|
|
|
scala> |
|
|
|
scala> rawData.write.json(s"${tblPrefix}_json") |
|
|
|
scala> |
|
|
|
scala> rawData.write.csv(s"${tblPrefix}_csv") |