Skip to content

Instantly share code, notes, and snippets.

@sambostock
Created October 27, 2022 01:08
Show Gist options
  • Save sambostock/1ccc1f0df3b3fd5ebd4083818d473df0 to your computer and use it in GitHub Desktop.
Save sambostock/1ccc1f0df3b3fd5ebd4083818d473df0 to your computer and use it in GitHub Desktop.
Faraday linewise streaming example

Overview

Say you're trying to consume a streaming API using Faraday. And say you need to consume the content line-by-line.

Let's explore one way of doing that.

How it works?

  1. Faraday allows setting .options.on_data on the request to a Proc to run as data is streamed.
  2. We maintain a buffer for the current line we're reading.
  3. As we receive content, we scan through it looking for newlines.
  4. Every time we find a newline, we append what we've scanned to the buffer and flush it (yield it as a String).
  5. If we encounter the end of a chunk, we append what's left to the buffer.
  6. When we've finished streaming content, we check if we were in the middle of an incomplete line, and if so flush the buffer one last time.

Running it

To run the demo

  • Make sure you have the following installed
    • Ruby (obviously)
    • faraday (for the client)
    • sinatra (for the server)
  • Run ruby server.rb in one terminal
  • Run ruby client.rb in another terminal
  • Witness the streaming output
require "faraday"
require "stringio"
require "strscan"
# Example body to be echoed back to us
body = <<~BODY
This body consists of many lines.
It will be echoed in chunks of random sizes.
We want to consume it as lines,
without any issues being in the middle of characters.
But we also don't want to buffer the entire response.
Let's use emoji, to make it difficult: 1️⃣🇨🇦2️⃣🇨🇦3️⃣🇨🇦4️⃣🇨🇦
And also an empty line:
Hopefully it worked!
BODY
def log(text)
$stderr.puts "\e[2m#{text}\e[22m"
end
def indent(string, by: " ")
string.each_line.map { |line| "#{by}#{line}" }.join
end
def each_line(body)
buffer = StringIO.new
# Keep track of buffering state, to properly handle incomplete lines, or lack thereof
buffering = false
# Set a callback which will receive tuples of chunk Strings,
# the sum of characters received so far, and the response environment.
# The latter will allow access to the response status, headers and reason, as well as the request info.
on_data = Proc.new do |chunk, overall_received_bytes, env|
log "Received #{overall_received_bytes} characters"
next if chunk.nil? # Skip if no data received
buffering = true
scanner = StringScanner.new(chunk)
# Process lines in chunk and reset
while (until_newline = scanner.scan_until(/\n/))
buffer.write(until_newline)
yield buffer.string
buffer = StringIO.new
end
# Check for incomplete line remaining in chunk
if scanner.eos? # end of string?
buffering = false
else
buffer.write(scanner.rest)
end
end
Faraday.post("http://127.0.0.1:4567?max_chunk_size=100&sleep_duration=0.1") do |req|
req.body = body
req.options.on_data = on_data
end
# If content did not end in newline, then flush what's left
yield buffer.string if buffering
end
log "Performing chunked streaming round trip of:"
puts indent(body)
each_line(body) do |line|
puts indent(line)
end
require 'sinatra'
post '/' do
max_chunk_size = params.fetch(:max_chunk_size, 10).to_i
sleep_duration = params.fetch(:sleep_duration, 0).to_f
stream do |out|
while (content = request.body.read(rand(max_chunk_size)))
sleep sleep_duration
out << content
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment