Skip to content

Instantly share code, notes, and snippets.

@PeterCorless
Last active October 22, 2018 23:07
Show Gist options
  • Save PeterCorless/f467a40abc8595a4367b4ea6ce238319 to your computer and use it in GitHub Desktop.
Save PeterCorless/f467a40abc8595a4367b4ea6ce238319 to your computer and use it in GitHub Desktop.
Hooking up Spark and ScyllaDB: Part 4
CREATE TABLE quotes.quotes
(symbol TEXT,
timestamp TIMESTAMP,
day TIMESTAMP,
latest_price DOUBLE,
previous_close DOUBLE,
latest_volume BIGINT,
PRIMARY KEY ((symbol), timestamp));
./create-tables.sh
./run.sh
docker-compose exec spark-master spark-shell \
--conf spark.driver.host=spark-master \
--conf spark.cassandra.connection.host=scylla \
--packages datastax:spark-cassandra-connector:2.3.0-s_2.11,commons-configuration:commons-configuration:1.10
{
"AAPL": {
"quote": {
"latestPrice": 221.43,
"latestSource": "IEX real time price",
"latestUpdate": 1537455071032,
"latestVolume": 18919004,
"previousClose": 218.37,
"symbol": "AAPL"
}
},
"FB": {
"quote": {
"latestPrice": 165.55,
"latestSource": "IEX real time price",
"latestUpdate": 1537455069589,
"latestVolume": 14295872,
"previousClose": 163.06,
"symbol": "FB"
}
}
}
import org.apache.spark.sql.{functions => f}
val query = queryDefn
.selectExpr("CAST(key AS STRING) AS symbol",
"CAST(value AS STRING) AS data",
"timestamp")
.select($"symbol", $"timestamp", f.from_json($"data", schema).as("data"))
// query: org.apache.spark.sql.DataFrame = [symbol: string, timestamp: timestamp ... 1 more field]
import org.apache.spark.sql.types._
val schema = StructType(
List(
StructField(
"quote",
StructType(
List(
StructField("latestPrice", DoubleType),
StructField("previousClose", DoubleType),
StructField("latestVolume", LongType)
)
)
)
))
+-------------+--------------------+------+---------+------+--------------------+-------------+
| key| value| topic|partition|offset| timestamp|timestampType|
+-------------+--------------------+------+---------+------+--------------------+-------------+
|[41 41 50 4C]|[7B 22 71 75 6F 7...|quotes| 0| 0|2018-09-20 14:58:...| 0|
...
| [46 42]|[7B 22 71 75 6F 7...|quotes| 0| 16|2018-09-20 14:58:...| 0|
|[53 4E 41 50]|[7B 22 71 75 6F 7...|quotes| 0| 17|2018-09-20 14:58:...| 0|
|[54 53 4C 41]|[7B 22 71 75 6F 7...|quotes| 0| 18|2018-09-20 14:58:...| 0|
|[41 4D 5A 4E]|[7B 22 71 75 6F 7...|quotes| 0| 19|2018-09-20 14:56:...| 0|
+-------------+--------------------+------+---------+------+--------------------+-------------+
$ http localhost:3000/stats/2018/09/04
HTTP/1.1 200 OK
Content-Length: 295
Content-Type: text/plain; charset=UTF-8
Date: Tue, 04 Sep 2018 20:19:15 GMT
Server: akka-http/10.1.4
Symbol: AAPL, max difference: -0.33%, min difference: -0.32%
Symbol: TSLA, max difference: 4.21%, min difference: 4.23%
Symbol: FB, max difference: 2.6%, min difference: 2.61%
Symbol: SNAP, max difference: 2.84%, min difference: 2.94%
Symbol: AMZN, max difference: -1.35%, min difference: -1.33%
CREATE MATERIALIZED VIEW quotes.quotes_by_day AS
SELECT * FROM quotes.quotes
WHERE symbol IS NOT NULL AND
timestamp IS NOT NULL
PRIMARY KEY ((day), symbol, timestamp)
query.status
// res9: org.apache.spark.sql.streaming.StreamingQueryStatus =
// {
// "message" : "Waiting for data to arrive",
// "isDataAvailable" : false,
// "isTriggerActive" : false
// }
val query = queryDefn
.selectExpr("CAST(key AS STRING) AS symbol",
"CAST(value AS STRING) AS data",
"timestamp")
.writeStream
.format("console")
.start()
class ScyllaSink(parameters: Map[String, String]) extends Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit =
data.write
.cassandraFormat(parameters("table"),
parameters("keyspace"),
parameters("cluster"))
.mode(SaveMode.Append)
.save()
}
val quotes = spark.read
.cassandraFormat("quotes", "quotes", "scylla")
.load()
val result = quotes
.where($"day" === f.to_timestamp(f.lit(s"2018-09-20"), "yy-MM-dd"))
.groupBy($"symbol")
.agg(
f.max("latest_price").as("max_price"),
f.min("latest_price").as("min_price"),
f.min("previous_close").as("previous_close")
)
.select(
$"symbol",
f.round(((f.col("previous_close") - f.col("max_price")) / f.col(
"previous_close")) * 100,
2)
.as("max_difference"),
f.round(((f.col("previous_close") - f.col("min_price")) / f.col(
"previous_close")) * 100,
2)
.as("min_difference")
)
.collect()
// result: Array[org.apache.spark.sql.Row] =
// Array(
// [AAPL,-1.42,-1.2],
// [TSLA,0.26,0.72],
// [FB,-1.53,-1.22],
// [SNAP,0.6,0.93],
// [AMZN,-1.17,-0.93]
// )
//
val queryDefn = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "quotes")
.option("startingOffsets", "earliest")
.load()
val query = queryDefn
.writeStream
.format("console")
.start()
cd scylla-and-spark/streaming-into-scylla
docker-compose up -d
+------+--------------------+--------------------+
|symbol| data| timestamp|
+------+--------------------+--------------------+
| AAPL|{"quote":{"symbol...|2018-09-20 14:58:...|
| FB|{"quote":{"symbol...|2018-09-20 14:58:...|
...
| TSLA|{"quote":{"symbol...|2018-09-20 14:58:...|
| AMZN|{"quote":{"symbol...|2018-09-20 14:56:...|
+------+--------------------+--------------------+
+------+--------------------+-------------------+------------+--------------+-------------+
|symbol| timestamp| day|latest_price|previous_close|latest_volume|
+------+--------------------+-------------------+------------+--------------+-------------+
| AAPL|2018-09-20 14:58:...|2018-09-20 00:00:00| 221.02| 218.37| 9931987|
| FB|2018-09-20 14:58:...|2018-09-20 00:00:00| 165.17| 163.06| 7241683|
...
| TSLA|2018-09-20 14:58:...|2018-09-20 00:00:00| 298.25| 299.02| 2977981|
| AMZN|2018-09-20 14:56:...|2018-09-20 00:00:00| 1944.415| 1926.42| 1248901|
+------+--------------------+-------------------+------------+--------------+-------------+
val query = queryDefn
.selectExpr("CAST(key AS STRING) AS symbol",
"CAST(value AS STRING) AS data",
"timestamp")
.select($"symbol", $"timestamp", f.from_json($"data", schema).as("data"))
.select(
$"symbol",
$"timestamp",
f.col("timestamp").cast(DateType).cast(TimestampType).as("day"),
$"data.quote.latestPrice".as("latest_price"),
$"data.quote.previousClose".as("previous_close"),
$"data.quote.latestVolume".as("latest_volume")
)
.writeStream
.format("console")
.start()
val query = queryDefn
.selectExpr("CAST(key AS STRING) AS symbol",
"CAST(value AS STRING) AS data",
"timestamp")
.select($"symbol", $"timestamp", f.from_json($"data", schema).as("data"))
.select(
$"symbol",
$"timestamp",
f.col("timestamp").cast(DateType).cast(TimestampType).as("day"),
$"data.quote.latestPrice".as("latest_price"),
$"data.quote.previousClose".as("previous_close"),
$"data.quote.latestVolume".as("latest_volume")
)
.writeStream
.format("com.scylladb.streaming.ScyllaSinkProvider")
.outputMode(OutputMode.Append)
.options(
Map(
"cluster" -> "scylla",
"keyspace" -> "quotes",
"table" -> "quotes",
"checkpointLocation" -> "/tmp/checkpoints"
)
)
.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment