Skip to content

Instantly share code, notes, and snippets.

@squito
Last active February 5, 2020 14:15
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save squito/f348508ca7903ec2e1a64f4233e7aa70 to your computer and use it in GitHub Desktop.
Save squito/f348508ca7903ec2e1a64f4233e7aa70 to your computer and use it in GitHub Desktop.
spark sql timestamp semantics, and how they changed from 2.0.0 to 2.0.1 (see query_output_2_0_0.txt vs query_output_2_0_1.txt) changed by SPARK-16216

Spark "Timestamp" Behavior

Reading data in different timezones

Note that the ansi sql standard defines "timestamp" as equivalent to "timestamp without time zone". However Spark's behavior depends on both the version of spark and the file format

format \ spark version <= 2.0.0 >= 2.0.1
hive table, textfile without timezone without timezone
hive table, parquet with timezone with timezone
json without timezone with timezone
csv NA with timezone
// This script creates tables with timestamps in a few different file formats.
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
java.util.TimeZone.getDefault().getID()
val tblPrefix = "la"
val schema = new StructType().add("ts", TimestampType)
val tsStrings = Seq(
"2015-12-31 23:50:59.123",
"2015-12-31 22:49:59.123",
"2016-01-01 00:39:59.123",
"2016-01-01 01:29:59.123"
)
val rows = sc.parallelize(tsStrings).map { x => Row(java.sql.Timestamp.valueOf(x)) }
val rawData = spark.createDataFrame(rows, schema).toDF()
rawData.show(truncate=false)
Seq("parquet", "textfile").foreach { format =>
val tblName = s"${tblPrefix}_$format"
spark.sql(s"DROP TABLE IF EXISTS $tblName")
spark.sql(
raw"""CREATE TABLE $tblName (
| ts timestamp
| )
| STORED AS $format
""".stripMargin)
rawData.write.insertInto(tblName)
}
rawData.write.json(s"${tblPrefix}_json")
rawData.write.csv(s"${tblPrefix}_csv")
val tsAndString = sc.parallelize(tsStrings).map { x => Row(x, java.sql.Timestamp.valueOf(x)) }
val tsAndStringSchema = new StructType().add("display", StringType).add("ts", TimestampType)
val tsAndStringDf = spark.createDataFrame(tsAndString, tsAndStringSchema).toDF()
// partitioning by timestamp is a spark-only thing -- and it behaves inconsistently as well
// tsAndStringDf.write.partitionBy("ts").json(s"${tblPrefix}_partitioned_json")
// these are queries on tables created by create_tables.scala. The point is to run these queries in a *different timezone*
// then the timezone you used to create the tables.
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("ts", TimestampType, true)))
spark.sql("select * from la_parquet").show(truncate=false)
spark.sql("select * from la_textfile").show(truncate=false)
spark.read.schema(schema).json("la_json").show(truncate=false)
spark.read.schema(schema).csv("la_csv").show(truncate=false)
spark.read.schema(schema).json("la_json").join(spark.sql("select * from la_textfile"), "ts").show(truncate=false)
spark.read.schema(schema).json("la_json").join(spark.sql("select * from la_parquet"), "ts").show(truncate=false)
val tsAndStringSchema = new StructType().add("display", StringType).add("ts", TimestampType)
spark.read.schema(tsAndStringSchema).json("la_partitioned_json").join(spark.sql("select * from la_parquet"), "ts").show(truncate=false)

Spark 2.0.0

This is what happens when you run query.scala on a spark 2.0.0 shell, in the New York timezone (when you've created the tables in the LA timezone) *** DIFFERENT BEHAVIOR *** from 2.0.1. Its compliant w/ the sql standard (except for parquet) Notice that other than parquet, all the formats show the exact same time -- the same "display time" we used when inserting the data, even though we're in a different JVM time zone now.

(csv is relatively broken, but I suppose its closer to being "timestamp WITH timezone" behavior

spark-2.0.0-bin-hadoop2.7> bin/spark-shell --conf "spark.driver.extraJavaOptions=-Duser.timezone=America/New_York"
...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.0
      /_/
         
...

scala> spark.sql("select * from la_parquet").show(truncate=false)
+-----------------------+
|ts                     |
+-----------------------+
|2016-01-01 02:50:59.123|
|2016-01-01 01:49:59.123|
|2016-01-01 03:39:59.123|
|2016-01-01 04:29:59.123|
+-----------------------+


scala> spark.sql("select * from la_textfile").show(truncate=false)
+-----------------------+
|ts                     |
+-----------------------+
|2015-12-31 23:50:59.123|
|2015-12-31 22:49:59.123|
|2016-01-01 00:39:59.123|
|2016-01-01 01:29:59.123|
+-----------------------+


scala> spark.read.schema(schema).json("la_json").show(truncate=false)
+-----------------------+
|ts                     |
+-----------------------+
|2015-12-31 23:50:59.123|
|2015-12-31 22:49:59.123|
|2016-01-01 00:39:59.123|
|2016-01-01 01:29:59.123|
+-----------------------+


scala> spark.read.schema(schema).json("la_csv").show(truncate=false)
+----+
|ts  |
+----+
|null|
|null|
|null|
|null|
+----+


scala> spark.read.schema(schema).json("la_json").join(spark.sql("select * from la_textfile"), "ts").show(truncate=false)
+-----------------------+
|ts                     |
+-----------------------+
|2015-12-31 23:50:59.123|
|2015-12-31 22:49:59.123|
|2016-01-01 00:39:59.123|
|2016-01-01 01:29:59.123|
+-----------------------+


scala> spark.read.schema(schema).json("la_json").join(spark.sql("select * from la_parquet"), "ts").show(truncate=false)
+---+
|ts |
+---+
+---+

Spark 2.0.1

This is what happens when you run query.scala on a spark 2.0.1 shell, in the New York timezone (when you've created the tables in the LA timezone) BEHAVIOR CHANGE from spark 2.0.0, and the sql spec changed by https://issues.apache.org/jira/browse/SPARK-16216

Notice that now json & a hive textfile table behave differently. Hive textfile shows the same "display time" as it did when we inserted the data. Json & csv show a different "display time", but one that represents the same instant in time as the original insert, given that we've changed timezones. Though both were created from the same original dataframe, now if we try to join them to each other, we get an empty set.

spark-2.0.1-bin-hadoop2.7> bin/spark-shell --conf "spark.driver.extraJavaOptions=-Duser.timezone=America/New_York"
...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.1
      /_/
...
scala> spark.sql("select * from la_parquet").show(truncate=false)
+-----------------------+
|ts                     |
+-----------------------+
|2016-01-01 02:50:59.123|
|2016-01-01 01:49:59.123|
|2016-01-01 03:39:59.123|
|2016-01-01 04:29:59.123|
+-----------------------+


scala> spark.sql("select * from la_textfile").show(truncate=false)
+-----------------------+
|ts                     |
+-----------------------+
|2015-12-31 23:50:59.123|
|2015-12-31 22:49:59.123|
|2016-01-01 00:39:59.123|
|2016-01-01 01:29:59.123|
+-----------------------+


scala> spark.read.schema(schema).json("la_json").show(truncate=false)
+-----------------------+
|ts                     |
+-----------------------+
|2016-01-01 02:50:59.123|
|2016-01-01 01:49:59.123|
|2016-01-01 03:39:59.123|
|2016-01-01 04:29:59.123|
+-----------------------+


scala> spark.read.schema(schema).csv("la_csv").show(truncate=false)
+-----------------------+
|ts                     |
+-----------------------+
|2016-01-01 02:50:59.123|
|2016-01-01 01:49:59.123|
|2016-01-01 03:39:59.123|
|2016-01-01 04:29:59.123|
+-----------------------+


scala> spark.read.schema(schema).json("la_json").join(spark.sql("select * from la_textfile"), "ts").show(truncate=false)
+---+
|ts |
+---+
+---+


scala> spark.read.schema(schema).json("la_json").join(spark.sql("select * from la_parquet"), "ts").show(truncate=false)
+-----------------------+
|ts                     |
+-----------------------+
|2016-01-01 02:50:59.123|
|2016-01-01 01:49:59.123|
|2016-01-01 03:39:59.123|
|2016-01-01 04:29:59.123|
+-----------------------+

# This is what happens when you run query.scala on a spark 2.1.1 shell, in the New York timezone (when
# you've created the tables in the LA timezone)
# ***BEHAVIOR CHANGE*** from spark 2.0.0, and the sql spec (but the same as spark 2.0.1)
> bin/spark-shell --conf "spark.driver.extraJavaOptions=-Duser.timezone=America/New_York"
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/05/26 12:49:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/05/26 12:49:07 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://172.30.32.36:4040
Spark context available as 'sc' (master = local[*], app id = local-1495817343586).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_65)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.sql("select * from la_parquet").show(truncate=false)
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
+-----------------------+
|ts |
+-----------------------+
|2016-01-01 02:50:59.123|
|2016-01-01 01:49:59.123|
|2016-01-01 03:39:59.123|
|2016-01-01 04:29:59.123|
+-----------------------+
scala> spark.sql("select * from la_textfile").show(truncate=false)
+-----------------------+
|ts |
+-----------------------+
|2015-12-31 23:50:59.123|
|2015-12-31 22:49:59.123|
|2016-01-01 00:39:59.123|
|2016-01-01 01:29:59.123|
+-----------------------+
scala> spark.read.json("la_json").show(truncate=false)
+-----------------------------+
|ts |
+-----------------------------+
|2015-12-31T23:50:59.123-08:00|
|2015-12-31T22:49:59.123-08:00|
|2016-01-01T00:39:59.123-08:00|
|2016-01-01T01:29:59.123-08:00|
+-----------------------------+
scala> spark.read.json("la_json").join(spark.sql("select * from la_textfile"), "ts").show(truncate=false)
+---+
|ts |
+---+
+---+
scala> spark.read.json("la_json").join(spark.sql("select * from la_parquet"), "ts").show(truncate=false)
+-----------------------------+
|ts |
+-----------------------------+
|2015-12-31T23:50:59.123-08:00|
|2015-12-31T22:49:59.123-08:00|
|2016-01-01T00:39:59.123-08:00|
|2016-01-01T01:29:59.123-08:00|
+-----------------------------+
# Here's the output from running the create_tables.scala in a spark 2.0 shell, with the LA timezone (not very interesting, just here for completeness)
> bin/spark-shell --conf "spark.driver.extraJavaOptions=-Duser.timezone=America/Los_Angeles"
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
17/05/26 09:47:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/05/26 09:47:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
17/05/26 09:47:19 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://172.30.32.36:4041
Spark context available as 'sc' (master = local[*], app id = local-1495817238902).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.0.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_65)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala>
scala> java.util.TimeZone.getDefault().getID()
res0: String = America/Los_Angeles
scala>
scala> val tblPrefix = "la"
tblPrefix: String = la
scala> val schema = new StructType().add("ts", TimestampType)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(ts,TimestampType,true))
scala> val rows = sc.parallelize(Seq(
| "2015-12-31 23:50:59.123",
| "2015-12-31 22:49:59.123",
| "2016-01-01 00:39:59.123",
| "2016-01-01 01:29:59.123"
| ).map { x => Row(java.sql.Timestamp.valueOf(x)) })
rows: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[0] at parallelize at <console>:28
scala> val rawData = spark.createDataFrame(rows, schema).toDF()
rawData: org.apache.spark.sql.DataFrame = [ts: timestamp]
scala>
scala> rawData.show(truncate=false)
+-----------------------+
|ts |
+-----------------------+
|2015-12-31 23:50:59.123|
|2015-12-31 22:49:59.123|
|2016-01-01 00:39:59.123|
|2016-01-01 01:29:59.123|
+-----------------------+
scala>
scala> Seq("parquet", "textfile").foreach { format =>
| val tblName = s"${tblPrefix}_$format"
| spark.sql(s"DROP TABLE IF EXISTS $tblName")
| spark.sql(
| raw"""CREATE TABLE $tblName (
| | ts timestamp
| | )
| | STORED AS $format
| """.stripMargin)
| rawData.write.insertInto(tblName)
| }
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
...snip...
scala>
scala> rawData.write.json(s"${tblPrefix}_json")
scala>
scala> rawData.write.csv(s"${tblPrefix}_csv")
scala> :quit
# Here's the output from running the create_tables.scala in a spark 2.0.1 shell with the LA timezone
# (not very interesting, just here for completeness)
> bin/spark-shell --conf "spark.driver.extraJavaOptions=-Duser.timezone=America/Los_Angeles"
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
17/05/26 12:57:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/05/26 12:57:31 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://172.30.32.36:4040
Spark context available as 'sc' (master = local[*], app id = local-1495828651614).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.0.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_65)
Type in expressions to have them evaluated.
Type :help for more information.
scala> // This script creates tables with timestamps in a few different file formats.
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala>
scala> java.util.TimeZone.getDefault().getID()
res0: String = America/Los_Angeles
scala>
scala> val tblPrefix = "la"
tblPrefix: String = la
scala> val schema = new StructType().add("ts", TimestampType)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(ts,TimestampType,true))
scala> val rows = sc.parallelize(Seq(
| "2015-12-31 23:50:59.123",
| "2015-12-31 22:49:59.123",
| "2016-01-01 00:39:59.123",
| "2016-01-01 01:29:59.123"
| ).map { x => Row(java.sql.Timestamp.valueOf(x)) })
rows: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[0] at parallelize at <console>:28
scala> val rawData = spark.createDataFrame(rows, schema).toDF()
17/05/26 12:57:43 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/05/26 12:57:43 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
rawData: org.apache.spark.sql.DataFrame = [ts: timestamp]
scala>
scala> rawData.show(truncate=false)
+-----------------------+
|ts |
+-----------------------+
|2015-12-31 23:50:59.123|
|2015-12-31 22:49:59.123|
|2016-01-01 00:39:59.123|
|2016-01-01 01:29:59.123|
+-----------------------+
scala>
scala> Seq("parquet", "textfile").foreach { format =>
| val tblName = s"${tblPrefix}_$format"
| spark.sql(s"DROP TABLE IF EXISTS $tblName")
| spark.sql(
| raw"""CREATE TABLE $tblName (
| | ts timestamp
| | )
| | STORED AS $format
| """.stripMargin)
| rawData.write.insertInto(tblName)
| }
... snip ...
scala>
scala> rawData.write.json(s"${tblPrefix}_json")
scala>
scala> rawData.write.csv(s"${tblPrefix}_csv")
# Here's the output from running the create_tables.scala in a spark 2.1.1 shell with the LA timezone (not very interesting, just here for completeness)
> bin/spark-shell --conf "spark.driver.extraJavaOptions=-Duser.timezone=America/Los_Angeles"
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/05/26 09:41:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/05/26 09:41:13 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://172.30.32.36:4040
Spark context available as 'sc' (master = local[*], app id = local-1495816869835).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_65)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala>
scala> java.util.TimeZone.getDefault().getID()
res0: String = America/Los_Angeles
scala>
scala> val tblPrefix = "la"
tblPrefix: String = la
scala> val schema = new StructType().add("ts", TimestampType)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(ts,TimestampType,true))
scala> val rows = sc.parallelize(Seq(
| "2015-12-31 23:50:59.123",
| "2015-12-31 22:49:59.123",
| "2016-01-01 00:39:59.123",
| "2016-01-01 01:29:59.123"
| ).map { x => Row(java.sql.Timestamp.valueOf(x)) })
rows: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[0] at parallelize at <console>:28
scala> val rawData = spark.createDataFrame(rows, schema).toDF()
rawData: org.apache.spark.sql.DataFrame = [ts: timestamp]
scala>
scala> rawData.show(truncate=false)
+-----------------------+
|ts |
+-----------------------+
|2015-12-31 23:50:59.123|
|2015-12-31 22:49:59.123|
|2016-01-01 00:39:59.123|
|2016-01-01 01:29:59.123|
+-----------------------+
scala>
scala> Seq("parquet", "textfile").foreach { format =>
| val tblName = s"${tblPrefix}_$format"
| spark.sql(s"DROP TABLE IF EXISTS $tblName")
| spark.sql(
| raw"""CREATE TABLE $tblName (
| | ts timestamp
| | )
| | STORED AS $format
| """.stripMargin)
| rawData.write.insertInto(tblName)
| }
17/05/26 09:41:34 WARN HiveMetaStore: Location: file:/Users/irashid/spark-releases/spark-2.1.1-bin-hadoop2.7/spark-warehouse/la_parquet specified for non-external table:la_parquet
17/05/26 09:41:34 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,000 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
17/05/26 09:41:34 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,000 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
17/05/26 09:41:34 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,000 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
17/05/26 09:41:35 WARN HiveMetaStore: Location: file:/Users/irashid/spark-releases/spark-2.1.1-bin-hadoop2.7/spark-warehouse/la_textfile specified for non-external table:la_textfile
scala>
scala> rawData.write.json(s"${tblPrefix}_json")
scala>
scala> rawData.write.csv(s"${tblPrefix}_csv")
// This script creates tables with timestamps in a few different file formats.
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
java.util.TimeZone.getDefault().getID()
val tblPrefix = "la"
val schema = new StructType().add("ts", TimestampType)
val rows = sc.parallelize(Seq(
"2015-12-31 23:50:59.123",
"2015-12-31 22:49:59.123",
"2016-01-01 00:39:59.123",
"2016-01-01 01:29:59.123"
).map { x => Row(java.sql.Timestamp.valueOf(x)) })
val rawData = sqlContext.createDataFrame(rows, schema).toDF()
rawData.show(truncate=false)
Seq("parquet", "textfile").foreach { format =>
val tblName = s"${tblPrefix}_$format"
sqlContext.sql(s"DROP TABLE IF EXISTS $tblName")
sqlContext.sql(
raw"""CREATE TABLE $tblName (
| ts timestamp
| )
| STORED AS $format
""".stripMargin)
rawData.write.insertInto(tblName)
}
rawData.write.json(s"${tblPrefix}_json")
// these are queries on tables created by create_tables.scala. The point is to run these queries in a *different timezone*
// then the timezone you used to create the tables.
sqlContext.sql("select * from la_parquet").show(truncate=false)
sqlContext.sql("select * from la_textfile").show(truncate=false)
sqlContext.read.json("la_json").show(truncate=false)
sqlContext.read.json("la_json").join(sqlContext.sql("select * from la_textfile"), "ts").show(truncate=false)
sqlContext.read.json("la_json").join(sqlContext.sql("select * from la_parquet"), "ts").show(truncate=false)
# This is what happens when you run query.scala on a spark 2.0.0 shell, in the New York timezone (when
# you've created the tables in the LA timezone)
# basically the same as query_output_2_0_0.txt, just here for completeness
> bin/spark-shell --conf "spark.driver.extraJavaOptions=-Duser.timezone=America/New_York"
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.1
/_/
Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_65)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc
...snip...
SQL context available as sqlContext.
scala> // these are queries on tables created by create_tables.scala. The point is to run these queries in a *different timezone*
scala> // then the timezone you used to create the tables.
scala> sqlContext.sql("select * from la_parquet").show(truncate=false)
[Stage 0:> (0 + 0) / 8]SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
...snip...
+-----------------------+
|ts |
+-----------------------+
|2016-01-01 02:50:59.123|
|2016-01-01 01:49:59.123|
|2016-01-01 03:39:59.123|
|2016-01-01 04:29:59.123|
+-----------------------+
scala> sqlContext.sql("select * from la_textfile").show(truncate=false)
+-----------------------+
|ts |
+-----------------------+
|2015-12-31 23:50:59.123|
|2015-12-31 22:49:59.123|
|2016-01-01 00:39:59.123|
|2016-01-01 01:29:59.123|
+-----------------------+
scala> sqlContext.read.json("la_json").show(truncate=false)
+-----------------------+
|ts |
+-----------------------+
|2015-12-31 23:50:59.123|
|2015-12-31 22:49:59.123|
|2016-01-01 00:39:59.123|
|2016-01-01 01:29:59.123|
+-----------------------+
scala> sqlContext.read.json("la_json").join(sqlContext.sql("select * from la_textfile"), "ts").show(truncate=false)
+-----------------------+
|ts |
+-----------------------+
|2015-12-31 23:50:59.123|
|2015-12-31 22:49:59.123|
|2016-01-01 00:39:59.123|
|2016-01-01 01:29:59.123|
+-----------------------+
scala> sqlContext.read.json("la_json").join(sqlContext.sql("select * from la_parquet"), "ts").show(truncate=false)
...snip...
+---+
|ts |
+---+
+---+
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment