Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@PatrickCallaghan
Last active April 27, 2018 15:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save PatrickCallaghan/44201196072ec092e740eaa2b73e1040 to your computer and use it in GitHub Desktop.
Save PatrickCallaghan/44201196072ec092e740eaa2b73e1040 to your computer and use it in GitHub Desktop.

Using DSEFS for storing Parquet Files.

DSEFS (DataStax Enterprise file system) is a fault-tolerant, general-purpose, distributed file system within DataStax Enterprise. It is designed for use cases that need to leverage a distributed file system for data ingestion, data staging, and state management for Spark Streaming applications (such as checkpointing or write-ahead logging). DSEFS is similar to HDFS, but avoids the deployment complexity and single point of failure typical of HDFS. DSEFS is HDFS-compatible and is designed to work in place of HDFS in Spark and other systems.

When an application (online banking in this example) needs to archive data, it can simply selecting the data you want from the DSE Tables and write them to some file format, here we will use parquet.

This example is based on the sample application - https://github.com/PatrickCallaghan/datastax-banking-iot

To follow this example, DSE analytics and DSE FS need to be enabled on you Cluster. https://docs.datastax.com/en/dse/6.0/dse-admin/datastax_enterprise/analytics/enablingDsefs.html

Using the spark-shell

val table = spark.read.cassandraFormat("latest_transactions", "datastax_banking_iot").load().where("transaction_time < cast('2018-04-20 00:00:00' as timestamp) and transaction_time >= cast('2018-04-19 00:00:00' as timestamp) ")

table.write.parquet("dsefs://127.0.0.1:5598/online_banking/2018-04-19/transactions.pq")

To query or analyse this data, we can read the file back from DSE FS and process as required.

val transactions = spark.read.format("parquet").load("dsefs://127.0.0.1:5598/online_banking/2018-04-19/transactions.pq")

val rows = table.select(table.col("*")).filter("location='London'");

rows.count

If you wanted to have parquet create partitions automatically, say by day, the we can do something like

val table = sqlContext.sql ("Select *, date_format(transaction_time, 'YMd') as day from datastax_banking_iot.latest_transactions") 

table.write.partitionBy("day").mode("append").parquet("dsefs://127.0.0.1:5598/online_banking/transactions.pq")

This allows parquet to push down day='20180322' into the query to allow for effectively searching the data.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment