Skip to content

Instantly share code, notes, and snippets.

@xerial
Last active September 20, 2022 11:19
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xerial/495048133dd6f494a87a62aa4019b751 to your computer and use it in GitHub Desktop.
Save xerial/495048133dd6f494a87a62aa4019b751 to your computer and use it in GitHub Desktop.
td-spark usage notes

td-spark usage notes

What You Can Do With td-spark

  • Reading and writing tables in TD through DataFrames of Spark.
  • Running Spark SQL queries against DataFrames.
  • Submitting Presto SQL queries to TD and reading the query results as DataFrame.
  • If you use PySpark, you can use both Spark's DataFrames and Pandas DataFrames interchangeably.
  • Using any hosted Spark services, such as Amazon EMR, Databrics Cloud.
    • For the best performance, we recommend using us-east region in AWS.
    • It's also possible to use Google Colaboratory to run PySpark. See demos for the deatils.

Downloads

  • td-spark 1.1.0 (For Spark 2.4.0 + Scala 2.11): td-spark-assembly_2.11-1.1.0.jar

    • Reduced the td-spark assembly jar file size from 151MB to 22MB.
    • Add new methods:
      • td.presto(sql) 
        • Improved the query performance by using the api-presto gateway.
        • If you need to run a query that has large results, use td.prestoJob(sql).df.
      • td.executePresto(sql)
        • Run non-query Presto statements, e.g., CREATE TABLE, DROP TABLE, etc.
    • td.prestoJob(sql)
      • Run Presto query as a regular TD job
    • td.hiveJob(sql)
      • Run Hive query as a regular TD job
    • New Database/Table methods:
      • td.table(...).exists
      • td.table(...).dropIfExists
      • td.table(...).createIfNotExists
      • td.database(...).exists
      • td.database(...).dropIfExists
      • td.database(...).createIfNotExists
    • Add methods for creating new UDP tables:
      • td.createLongPartitionedTable(table_name, long type column name)
      • td.createStringPartitionedTable(table_name, string type column name)
    • Support df.createOrReplaceTD(...) for UDP tables 
    • Add spark.td.site configuration for multiple regions.
      • spark.td.site=us (default)
        • For US customers. Using us-east region provides the best performance
      • spark.td.site=jp
        • For Tokyo region customers. Using ap-northeast region provides the best performance.
      • spark.td.site=eu01
        • For EU region customers. Using eu-central region provides the best performance.
    • Enabled predicate pushdown for UDP table queries
      • Queries with exact match conditions for UDP keys can be accelerated.
    • Bug fixes:
      • Fixed a bug when using createOrReplaceTempView in multiple notebooks at Databricks cloud.
      • Fixed an error that showed NoSuchMethod when using td.presto command
  • td-spark 1.0.0

  • td-spark 0.4.2

Demos

Usage

Launching a local Spark Shell Using Docker

If you already have a Docker daemon installed on your machine, using Docker images of td-spark will be the most convenient method to try td-spark.

  • If you don't have Docker, install Docker on your machine.

  • Running Spark shell using your API Key

$ export TD_API_KEY=(your TD API key)
$ docker run -it -e TD_API_KEY=$TD_API_KEY armtd/td-spark-shell:latest
  • Running PySpark shell
$ docker run -it -e TD_API_KEY=$TD_API_KEY armtd/td-spark-pyspark:latest

Testing td-spark with your local Spark

It is also possible to use Spark binaries to test td-spark:

  • Download Spark 2.4.0 (or higher) package, and extract it to any folder you like.
  • Download td-spark_2.11_1.0.0.jar
  • Prepare td-spark.conf configuration file: td-spark.conf
spark.td.apikey=(YOUR TD API KEY)
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.execution.arrow.enabled=true
  • Launch Spark Shell:
$ ./bin/spark-shell --jars (path to td-spark-assembly_2.11_1.0.0.jar) --properties-file (path to td-spark.conf)

Import td-spark package

To use td-spark, import com.treasuredata.spark._ package:

import com.treasuredata.spark._; val td = spark.td

Selecting Tables

To select a time range of a table, you can use td.table("table name") and within filtering method. After applying some filtering, you can create a DataFrame with .df method:

val df = 
  td.table("(database).(table)")
    .within("-1d")
    .df

Here are examples of selecting time ranges:

val t = td.table("sample_datasets.www_access")
  
// Using duration strings as in TD_INTERVAL  
t.within("-1d") // last day
t.within("-1m/2014-10-03 09:13:00") // last 1 minute from a given offset
t.within("-1d PST") // specifying a time zone. (e.g., JST, PST, UTC etc.)

// Specifying unix time ranges
t.withinUnixTimeRange(from = 1412320845, until = 1412321000) // [from, unti) unix time range

t.fromUnixTime(1412320845) // [from, ...)
t.untilUnixTime(1412321000) // [... , until)

// Using time strings yyyy-MM-dd HH:mm:ss (timezone. default = UTC)?
t.fromTime("2014-10-03 09:12:00") // [from, ...)
t.untilTime("2014-10-03 09:13:00") // [..., until)

t.withinTimeRange(from = "2014-10-03 09:12:00", until = "2014-10-03 09:13:00") // [from, until)
t.withinTimeRange(from = "2014-10-03 09:12:00", until = "2014-10-03 09:13:00", ZoneId.of("Asia/Tokyo")) // [from, until) in Asia/Tokyo timezone

// Specifying timezone
t.fromTime("2014-10-03 09:12:00", ZoneId.of("Asia/Tokyo")) // [from, ...) in Asia/Tokyo timezone
t.untilTime("2014-10-03 09:13:00", ZoneId.of("Asia/Tokyo")) // [..., until) in Asia/Tokyo timezone

Run Presto Queries

val df = td.presto("select * from www_access limit 10")
df.show
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user|           host|                path|             referer|code|               agent|size|method|      time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null|  76.45.175.151|   /item/sports/4642|/search/?c=Sports...| 200|Mozilla/5.0 (Maci...| 137|   GET|1412380793|
|null|184.162.105.153|   /category/finance|                   -| 200|Mozilla/4.0 (comp...|  68|   GET|1412380784|
|null|  144.30.45.112|/item/electronics...| /item/software/4777| 200|Mozilla/5.0 (Maci...| 136|   GET|1412380775|
|null|  68.42.225.106|/category/networking|/category/electro...| 200|Mozilla/4.0 (comp...|  98|   GET|1412380766|
|null| 104.66.194.210|     /category/books|                   -| 200|Mozilla/4.0 (comp...|  43|   GET|1412380757|
|null|    64.99.74.69|  /item/finance/3775|/category/electro...| 200|Mozilla/5.0 (Wind...|  86|   GET|1412380748|
|null| 136.135.51.168|/item/networking/540|                   -| 200|Mozilla/5.0 (Wind...|  89|   GET|1412380739|
|null|   52.99.134.55|   /item/health/1326|/category/electro...| 200|Mozilla/5.0 (Maci...|  51|   GET|1412380730|
|null|  136.51.116.68|/category/finance...|                   -| 200|Mozilla/5.0 (comp...|  99|   GET|1412380721|
|null|136.141.218.177| /item/computers/959|                   -| 200|Mozilla/5.0 (Wind...| 124|   GET|1412380712|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+

Using SparkSQL

td.table("sample_datasets.www_access").df.createOrReplaceTempView("www_access")
val q = spark.sql("select agent, count(*) cnt from www_access group by 1")
q.show
+--------------------+---+
|               agent|cnt|
+--------------------+---+
|Mozilla/4.0 (comp...|159|
|Mozilla/5.0 (comp...|341|
|Mozilla/4.0 (comp...|140|
|Mozilla/5.0 (comp...|497|
|Mozilla/5.0 (Wind...|630|
|Mozilla/5.0 (iPad...|158|
|Mozilla/5.0 (Maci...|445|
|Mozilla/5.0 (comp...|261|
|Mozilla/5.0 (Wind...|304|
|Mozilla/4.0 (comp...|187|
|Mozilla/4.0 (comp...|150|
|Mozilla/5.0 (Wind...|291|
|Mozilla/5.0 (Wind...|297|
|Mozilla/4.0 (comp...|132|
|Mozilla/5.0 (Wind...|147|
|Mozilla/4.0 (comp...|123|
|Mozilla/5.0 (iPho...|139|
|Mozilla/5.0 (Wind...|439|
|Mozilla/4.0 (comp...|160|
+--------------------+---+

Uploading DataFrame to TD

To upload a DataFrame to TD, use df.createOrReplaceTD, df.insertIntoTD:

val df: DataFrame = ... // Prepare some DataFrame

// Writes DataFrame to a new TD table. If the table already exists, this will fail.
df.write.td("(database name).(table name)")

// Create or replace the target TD table with the contents of DataFrame
df.createOrReplaceTD("(database name).(table name)")

// Append the contents of DataFrame to the target TD table.
// If the table doesn't exists, it will create a new table
df.insertIntoTD("(database name).(table name)")

You can also use the full DataFrame.write and save syntax. Specify the format as com.treasuredata.spark, then select the target table with .option("table", "(target database).(table name)"):

df.write // df is an arbitrary DataFrame
  .mode("overwrite") // append, overwrite, error, ignore
  .format("com.treasuredata.spark")
  .option("table", "(database name).(table name)") // Specify an upload taget table
  .save

Available write modes (SaveMode)

mode behavior
append Append to the target table. Throws an error if the table doesn't exist.
overwrite This performs two-step update. First it deletes the target table if it exists, then creates a new table with the new data.
error (default) If the target table already exists, throws an exception.
ignore If the target table exists, ignores the save operation.

Writing Data Using SQL

Creating a new table with CREATE TABLE AS (SELECT ...):

spark.sql("""
CREATE TABLE my_table
USING com.treasuredata.spark
OPTIONS(table '(database name).(table name)')
AS SELECT ...
""")

Limitations

  • overwrite for UDP tables is not supported yet. You need to create an UDP table using Presto, then use append mode for inserting data to the table.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment