Skip to content

Instantly share code, notes, and snippets.

View piercelamb's full-sized avatar

Pierce Lamb piercelamb

View GitHub Profile
val conf = new SparkConf(true)
.set("spark.cassandra.output.batch.size.bytes", "5120")
.set("spark.cassandra.output.concurrent.writes", "32")
.set("spark.cassandra.output.consistency.level", "ANY")
.set("spark.cassandra.output.batch.grouping.key", "none")
….
snsc.sql("create table adImpressions(times_tamp timestamp, publisher string, " +
"advertiser string, website string, geo string, bid double, cookie string) " +
"using column options ( buckets '29', persistent 'asynchronous')")
snsc.sql("CREATE SAMPLE TABLE sampledAdImpressions" +
" OPTIONS(qcs 'geo,publisher', fraction '0.02', strataReservoirSize '50', baseTable 'adImpressions')")
snsc.getSchemaDStream("adImpressionStream").foreachDataFrame( df => {
df.write.insertInto("adImpressions")
df.write.insertInto("sampledAdImpressions")
val sc = new SparkContext(conf)
val csc = new CassandraSQLContext(sc)
CassandraConnector(conf).withSessionDo { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS adlogs " +
s"WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"CREATE TABLE IF NOT EXISTS adlogs.adimpressions " +
s"(timestamp bigint, publisher text, advertiser text, " +
"website text, geo text, bid double, cookie text, primary key (timestamp, cookie))")
}
memSqlContext.getMemSQLCluster.withMasterConn(conn => {
conn.withStatement(stmt => {
stmt.execute(s"CREATE DATABASE IF NOT EXISTS adLogs")
stmt.execute(s"DROP TABLE IF EXISTS adLogs.adImpressions")
stmt.execute(
“CREATE TABLE adLogs.adImpressions
(timestamp bigint,
publisher varchar(15),
advertiser varchar(15),
website varchar(20),
kafkaStream.map(_._2).foreachRDD(rdd => {
memSqlContext.createDataFrame(rowConverter.convert(rdd), schema)
.saveToMemSQL("adLogs", "adImpressions")
})
rowConverter does following conversion :
def convert(logRdd: RDD[AdImpressionLog]): RDD[Row]
select count(*) AS adCount, geo from adImpressions group by geo order by adCount desc limit 20;
select sum (bid) as max_bid, geo from adImpressions group by geo order by max_bid desc limit 20;
select sum (bid) as max_bid, publisher from adImpressions group by publisher order by max_bid desc limit 20;
The examples below show the DataFrame API extensions made by SnappyData
/**
* Insert one or more Row into an existing table
*/
def insert(tableName: String, rows: Row*): Int
/**
* Update all rows in table that match passed filter expression
*/
def addNewAccount(email: String, password: String, firstName: String, role: Role)(implicit s: DBSession = auto) {
val id = withSQL {
val pass = BCrypt.hashpw(password, BCrypt.gensalt())
QueryDSL.insert.into(Account).namedValues(
ac.email -> email,
ac.password -> pass,
ac.name -> firstName,
ac.role -> role.toString()
)
}.updateAndReturnGeneratedKey.apply()