Skip to content

Instantly share code, notes, and snippets.

@choult
Last active December 21, 2022 01:06
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save choult/5c0c0df58f3137af5838800c39d182b6 to your computer and use it in GitHub Desktop.
Save choult/5c0c0df58f3137af5838800c39d182b6 to your computer and use it in GitHub Desktop.
Generic Benthos HTTP Server-Sent-Events ingest and parser
input:
broker:
copies: 1
inputs:
- http_client:
url: "https://mas.to/api/v1/streaming/public"
stream:
enabled: true
reconnect: true
codec: "delim:\n\n"
pipeline:
processors:
- unarchive:
format: lines
- mapping: |
root = if content().string().has_prefix(":") { deleted() } else { content().string().parse_yaml() }
- archive:
format: json_array
- mapping: |
root = this.squash().with("id","data","event","retry")
root.data = if root.exists("data") && root.data.type() != "array" { [root.data] }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment