Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save piercelamb/949c4cd85ff1e1f2d83aa02b6be8fbde to your computer and use it in GitHub Desktop.
Save piercelamb/949c4cd85ff1e1f2d83aa02b6be8fbde to your computer and use it in GitHub Desktop.
val sc = new SparkContext(conf)
val csc = new CassandraSQLContext(sc)
CassandraConnector(conf).withSessionDo { session =>
session.execute(s"CREATE KEYSPACE IF NOT EXISTS adlogs " +
s"WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"CREATE TABLE IF NOT EXISTS adlogs.adimpressions " +
s"(timestamp bigint, publisher text, advertiser text, " +
"website text, geo text, bid double, cookie text, primary key (timestamp, cookie))")
}
import com.datastax.spark.connector.streaming._
kafkaStream.map(_._2).map(ad =>
(ad.getTimestamp, ad.getPublisher, ad.getAdvertiser, ad.getWebsite,
ad.getGeo, ad.getBid, ad.getCookie))
.saveToCassandra("adlogs", "adimpressions",
SomeColumns("timestamp", "publisher", "advertiser", "website", "geo", "bid", "cookie"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment