Skip to content

Instantly share code, notes, and snippets.

@bepcyc
Last active January 25, 2019 13:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bepcyc/71ef754c0ca4a41af0cbb951f1c50c79 to your computer and use it in GitHub Desktop.
Save bepcyc/71ef754c0ca4a41af0cbb951f1c50c79 to your computer and use it in GitHub Desktop.
Jacek Laskowski's code is not-working
scala> println("This answer actually got some points on SO https://stackoverflow.com/a/53981675/918211")
This answer actually got some points on SO https://stackoverflow.com/a/53981675/918211
scala> println(spark.version)
2.4.0
scala> val sq = spark.readStream.format("rate").load
sq: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint]
scala> :type sq
org.apache.spark.sql.DataFrame
scala> assert(sq.isStreaming)
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.sql.DataFrame
sq.writeStream.foreachBatch { case (ds: DataFrame, batchId: Long) =>
// do whatever you want with your input DataFrame
// incl. writing to Hive
// I simply decided to print out the rows to the console
ds.show
}.start
// Exiting paste mode, now interpreting.
<pastie>:28: warning: non-variable type argument org.apache.spark.sql.Row in type pattern org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] (the underlying of org.apache.spark.sql.DataFrame) is unchecked since it is eliminated by erasure
sq.writeStream.foreachBatch { case (ds: DataFrame, batchId: Long) =>
^
<pastie>:28: error: missing parameter type for expanded function
The argument types of an anonymous function must be fully known. (SLS 8.5)
Expected type was: ?
sq.writeStream.foreachBatch { case (ds: DataFrame, batchId: Long) =>
^
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment