Skip to content

Instantly share code, notes, and snippets.

@brianmhess
Created July 21, 2016 21:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save brianmhess/44b0742bfc22ef41f5535f6b697d7f7b to your computer and use it in GitHub Desktop.
Save brianmhess/44b0742bfc22ef41f5535f6b697d7f7b to your computer and use it in GitHub Desktop.
A. DSEFS
--------
0.1. Enable
vi /etc/default/dse
Edit to enable Spark
vi /etc/dse/dse.yaml
Edit to enable dsefs
dsefs_option:
enabled: true
Look at
work_dir: /var/lib/dsefs/data
not much data stored here
Look at
data_directories:
- dir: /var/lib/dsefs/data
storage_weight: 1.0
Can do JBOD
for each dir there is a storage_weight which is relative usage preference
Can ignore advanced settings for now
0.2. Restart dse
service dse stop
service dse start
1. Start dse fs
dse fs
help
see help
2. Explore
ls
see /
df
see storage
3. Create directory
mkdir /test
ls
see /test
cd /test
ls
see nothing
4. Put file
put /usr/share/dse/demos/spark-mllib/iris.csv /test/iris.csv
ls
see iris.csv
ls -l
see details
cat iris.csv
see file
5. Copy file - sadly not in 'dse fs' :(
exit
dse hadoop fs -cp dsefs:///test/iris.csv dsefs:///test/iris2.csv
dse hadoop fs -ls dsefs:///test
see that 'dse hadoop fs' works, too
6. Remove file
dse fs
cd test
ls -l
see files
rm iris2.csv
ls -l
see fewer files
7. Stat file
stat iris.csv
see info
8. Get file
get /test/iris.csv /tmp/foo.csv
exit
ls -l /tmp/foo.csv
see size is still 4700 bytes
Use in Spark
------------
1. Create KS and Table:
cqlsh
CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
CREATE TABLE test.iris (id int primary key, sepal_l double, sepal_w double, petal_l double, petal_w double, species text);
2. Load data:
dse spark
case class Iris(id: Long, sepal_l: Double, sepal_w: Double, petal_l: Double, petal_w: Double, species: String);
val rdd = sc.textFile("dsefs:///test/iris.csv").map(_.split(",")).map{x => (x(0).toDouble, x(1).toDouble, x(2).toDouble, x(3).toDouble, x(4))}.zipWithUniqueId().map(x => Iris(x._2, x._1._1,x._1._2, x._1._3, x._1._4, x._1._5));
rdd.saveToCassandra("test", "iris");
val df = rdd.toDF;
df.write.json(dsefs:///test/irisWithId.json");
3. Unload data:
val df2 = sqlContext.read.format("org.apache.spark.sql.cassandra").options(Map("keyspace"->"test", "table"->"iris")).load();
df2.write.json("dsefs:///test/irisWithId2.json");
4. Do stuff:
rdd.joinWithCassandraTable[Iris]("test", "iris").on(SomeColumns("id")).take(3);
df.join(df2, df("id") === df2("id")).show(3);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment