Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yjshen/9403991333a5b099817b4e2f9b0dbda3 to your computer and use it in GitHub Desktop.
Save yjshen/9403991333a5b099817b4e2f9b0dbda3 to your computer and use it in GitHub Desktop.
val df = spark
.readStream
.format("pulsar")
.option("service.url", "pulsar://localhost:6650")
.option("admin.url", "http://localhost:8080")
.option("topicsPattern", "topic.*") // Subscribe to a pattern
// .option("topics", "topic1,topic2") // Subscribe to multiple topics
// .option("topic", "topic1"). //subscribe to a single topic
.option("startingOffsets", startingOffsets)
.load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment