-
-
Save gottlike/e9cfed216ea7637c1c9a4ef031eb4c9e to your computer and use it in GitHub Desktop.
Polyphony Issue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
$stdout.sync = true | |
require 'bundler/setup' | |
require 'json' | |
require 'nats/io/client' | |
require 'parallel' | |
ENV['NATS_CLUSTER'] = 'nats://localhost:4222' | |
ENV['QUEUE_NAME'] = 'queue.test' | |
raise('Environment variable NATS_CLUSTER not defined') if ENV.fetch('NATS_CLUSTER', nil).nil? | |
raise('Environment variable QUEUE_NAME not defined') if ENV.fetch('QUEUE_NAME', nil).nil? | |
%w[INT TERM].each do |sig| | |
trap(sig) do | |
puts "[#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}] - Received SIG#{sig}, shutting down" | |
exit! | |
end | |
end | |
nats_cluster = ENV.fetch('NATS_CLUSTER', nil) | |
queue_name = ENV.fetch('QUEUE_NAME', nil) | |
cluster_opts = { | |
servers: nats_cluster.split(','), | |
reconnect_time_wait: 1, | |
max_reconnect_attempts: -1, | |
ping_interval: 10, | |
dont_randomize_servers: true, | |
connect_timeout: 2 | |
} | |
nats = NATS::IO::Client.new | |
nats.on_reconnect { puts "Reconnected to server at #{nats.connected_server}" } | |
nats.on_disconnect { puts 'NATS was disconnected' } | |
nats.on_close { puts 'NATS was closed' } | |
nats.on_error { |error| puts error.message } | |
nats.connect(cluster_opts) | |
def send_request(nats, queue_name, data) | |
response = nats.request(queue_name, data, timeout: 5) | |
if response.nil? | |
puts data | |
return response | |
end | |
response.data.to_s | |
rescue NATS::Timeout | |
puts 'Request timed out' | |
nil | |
end | |
test_data = 5000.times.collect { |i| { id: i, msg: 'Ping!' } } | |
start = Time.now | |
responses = [] | |
Parallel.map(test_data, in_threads: 2000) do |data| | |
responses << send_request(nats, queue_name, data.to_json) | |
end | |
puts "Finished #{responses.compact.length} requests in #{(Time.now - start).round(4)}s, at rate of #{(responses.compact.length / (Time.now - start)).round} rps" | |
puts responses.last | |
nats.close |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
source 'https://rubygems.org' | |
gem 'base64' | |
gem 'json' | |
gem 'nats-pure' | |
gem 'parallel' | |
gem 'polyphony' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
$stdout.sync = true | |
require 'bundler/setup' | |
require 'json' | |
require 'nats/io/client' | |
require 'parallel' | |
require 'polyphony' | |
ENV['NATS_CLUSTER'] = 'nats://localhost:4222' | |
ENV['QUEUE_NAME'] = 'queue.test' | |
raise('Environment variable NATS_CLUSTER not defined') if ENV.fetch('NATS_CLUSTER', nil).nil? | |
raise('Environment variable QUEUE_NAME not defined') if ENV.fetch('QUEUE_NAME', nil).nil? | |
%w[INT TERM].each do |sig| | |
trap(sig) do | |
puts "[#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}] - Received SIG#{sig}, shutting down" | |
exit! | |
end | |
end | |
nats_cluster = ENV.fetch('NATS_CLUSTER', nil) | |
queue_name = ENV.fetch('QUEUE_NAME', nil) | |
cluster_opts = { | |
servers: nats_cluster.split(','), | |
reconnect_time_wait: 1, | |
max_reconnect_attempts: -1, | |
ping_interval: 10, | |
dont_randomize_servers: true, | |
connect_timeout: 2 | |
} | |
nats = NATS::IO::Client.new | |
nats.on_reconnect { puts "Reconnected to server at #{nats.connected_server}" } | |
nats.on_disconnect { puts 'NATS was disconnected' } | |
nats.on_close { puts 'NATS was closed' } | |
nats.on_error { |error| puts error.message } | |
nats.connect(cluster_opts) | |
def send_request(nats, queue_name, data) | |
response = nats.request(queue_name, data, timeout: 5) | |
if response.nil? | |
puts data | |
return response | |
end | |
response.data.to_s | |
rescue NATS::Timeout | |
puts 'Request timed out' | |
nil | |
end | |
nats.subscribe(queue_name, queue: queue_name) do |msg, reply, _subject| | |
# raise('raise works here') | |
spin do | |
# raise('raise does not work here') | |
msg = JSON.parse(msg, symbolize_names: true) | |
response = JSON.parse(send_request(nats, "#{queue_name}.sub-worker", msg.to_json), symbolize_names: true) | |
nats.publish(reply, response.to_json) if reply | |
end | |
end | |
sleep |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
$stdout.sync = true | |
require 'bundler/setup' | |
require 'json' | |
require 'nats/io/client' | |
require 'parallel' | |
require 'polyphony' | |
ENV['NATS_CLUSTER'] = 'nats://localhost:4222' | |
ENV['QUEUE_NAME'] = 'queue.test.sub-worker' | |
raise('Environment variable NATS_CLUSTER not defined') if ENV.fetch('NATS_CLUSTER', nil).nil? | |
raise('Environment variable QUEUE_NAME not defined') if ENV.fetch('QUEUE_NAME', nil).nil? | |
%w[INT TERM].each do |sig| | |
trap(sig) do | |
puts "[#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}] - Received SIG#{sig}, shutting down" | |
exit! | |
end | |
end | |
nats_cluster = ENV.fetch('NATS_CLUSTER', nil) | |
queue_name = ENV.fetch('QUEUE_NAME', nil) | |
cluster_opts = { | |
servers: nats_cluster.split(','), | |
reconnect_time_wait: 1, | |
max_reconnect_attempts: -1, | |
ping_interval: 10, | |
dont_randomize_servers: true, | |
connect_timeout: 2 | |
} | |
nats = NATS::IO::Client.new | |
nats.on_reconnect { puts "Reconnected to server at #{nats.connected_server}" } | |
nats.on_disconnect { puts 'NATS was disconnected' } | |
nats.on_close { puts 'NATS was closed' } | |
nats.on_error { |error| puts error.message } | |
nats.connect(cluster_opts) | |
nats.subscribe(queue_name, queue: queue_name) do |msg, reply, _subject| | |
# raise('raise works here') | |
spin do | |
# raise('raise does not work here') | |
sleep 3 | |
msg = JSON.parse(msg, symbolize_names: true) | |
response = { id: msg[:id], msg: 'Pong!' } | |
nats.publish(reply, response.to_json) if reply | |
end | |
end | |
sleep |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment