Skip to content

Instantly share code, notes, and snippets.

@mackuba
Created January 4, 2024 15:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mackuba/c33c9b623590cf5a713dd86f6459ae58 to your computer and use it in GitHub Desktop.
Save mackuba/c33c9b623590cf5a713dd86f6459ae58 to your computer and use it in GitHub Desktop.
Downloading bsky firehose events to a file and streaming them from a mock server
require 'skyfall'
AVG_EVENTS_PER_SEC = 50
desc "Download a part of the firehose cache to a file"
task :fetch do
current_head = nil
sky = Skyfall::Stream.new(ENV['FIREHOSE'] || 'bsky.network', :subscribe_repos)
sky.on_message do |m|
current_head = m.seq
sky.disconnect
end
sky.connect
if ENV['HOURS'].to_s != ''
cursor = current_head - (AVG_EVENTS_PER_SEC * 3600 * ENV['HOURS'].to_f).to_i
else
cursor = 100000000
end
start = nil
seq = nil
messages = []
sky = Skyfall::Stream.new(ENV['FIREHOSE'] || 'bsky.network', :subscribe_repos, cursor)
sky.on_error { |e| puts e }
sky.on_raw_message do |m|
if !start
atp_message = Skyfall::WebsocketMessage.new(m)
puts atp_message.time.getlocal
start = atp_message.seq
seq = start
else
seq += 1
end
print "Fetching messages [#{seq - start}/#{current_head - start}]\r"
messages << m
if seq == current_head
sky.disconnect
end
end
sky.connect
File.write(ENV['FILE'] || 'firehose.bin', Marshal.dump(messages))
end
#!/usr/bin/env ruby
require 'bundler/setup'
require 'faye/websocket'
require 'rack'
filename = ARGV[0]
if filename.to_s.empty?
puts "Usage: #{$PROGRAM_NAME} <filename>"
exit 1
end
messages = Marshal.load(File.read(filename))
App = lambda do |env|
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env)
ws.on :open do
puts "Opened connection"
messages.each do |msg|
ws.send(msg.bytes)
end
# TODO: don't close before the client finishes processing events in the buffer
ws.close
end
ws.on :close do
puts "Closed connection"
ws = nil
end
ws.rack_response
else
puts "Bad request"
[400, { 'Content-Type' => 'text/plain' }, ['Bad Request']]
end
end
Faye::WebSocket.load_adapter('thin')
Rack::Handler.default.run(App)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment