Skip to content

Instantly share code, notes, and snippets.

@hooptie45
Last active December 22, 2015 21:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hooptie45/6535740 to your computer and use it in GitHub Desktop.
Save hooptie45/6535740 to your computer and use it in GitHub Desktop.
Demo CSV Generation Storm Topology with Storm, Tire and Redis
require 'red_storm'
require 'tire'
require 'json'
# rvm use jruby@csv-topology --create
# bundle install
# redstorm bundle topology
# redstorm local csv_topology.rb
class PersistentArticle
include Tire::Model::Persistence
property :title
property :published_on
property :tags
end
class JobsSpout < RedStorm::SimpleSpout
on_send do
if @q.size > 0
message = @q.pop
message
end
end
on_init do
@q = Queue.new
@redis_reader = detach_redis_reader
end
private
def detach_redis_reader
Thread.new do
Thread.current.abort_on_exception = true
redis = Redis.new(:host => "localhost", :port => 6379)
loop do
if data = redis.blpop("jobs", 0)
@q << data[1]
end
end
end
end
end
class ExtractJobBolt < RedStorm::SimpleBolt
on_receive do |tuple|
json = JSON.parse(tuple.getString(0))
criteria, id = json['criteria'], json['id']
[id, JSON(criteria)]
end
end
class ExecuteSearchBolt < RedStorm::SimpleBolt
on_receive :emit => false do |tuple|
json = JSON.parse(tuple[:criteria])
job_id = tuple[:job_id]
page = 1
per_page = 500
term = json['keywords'] || 'test'
result = PersistentArticle.search(term, per_page: per_page, page: 1)
total = result.total
n = result.size
unanchored_emit(page, per_page, total, job_id, result.to_json)
# puts ["EXEC", job_id, page, per_page, total, term, n].join(" -- ")
while n < total && n < 10_000
page += 1
# puts ["EXEC", job_id, page, per_page, total, term, n].join(" -- ")
result = PersistentArticle.search(term, per_page: per_page, page: page)
n += result.size
unanchored_emit(page, per_page, total, job_id, result.to_json)
end
end
end
class GenerateCsvBolt < RedStorm::SimpleBolt
on_init do
@csv = []
end
on_receive do |tuple|
job_id = tuple[:job_id]
rows = JSON.parse(tuple[:rows])
page = tuple[:page]
per_page = tuple[:per_page]
total = tuple[:total]
csv = rows.reduce([]) { |r, row|
r << row.values_at(*%w[title tags]).join(",")
}.join("\n")
[page, per_page, total, job_id, csv]
end
end
class WriteFileBolt < RedStorm::SimpleBolt
on_receive :emit => false do |tuple|
puts "WriteFileBolt"
job_id = tuple[:job_id]
csv = tuple[:csv]
page = tuple[:page]
per_page = tuple[:per_page]
total = tuple[:total]
fn = "csv_#{job_id}.csv"
if page==1
File.delete(fn) if File.exists?(fn)
end
File.open(fn, "a") do |f|
f.print csv
end
end
end
class CsvTopology < RedStorm::SimpleTopology
spout JobsSpout do
output_fields :job
end
bolt ExtractJobBolt do
source JobsSpout, :shuffle
output_fields :job_id, :criteria
end
bolt ExecuteSearchBolt, :parallelism => 10 do
source ExtractJobBolt, :fields => :job_id
output_fields :page, :per_page, :total, :job_id, :rows
end
bolt GenerateCsvBolt, :parallelism => 2 do
source ExecuteSearchBolt, :fields => :job_id
output_fields :page, :per_page, :total, :job_id, :csv
end
bolt WriteFileBolt do
source GenerateCsvBolt, :fields => :job_id
output_fields :job_id, :file
end
configure do |env|
debug false
case env
when :local
max_task_parallelism 100
num_workers 25
when :cluster
num_workers 20
max_spout_pending 5000
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment